快速,持续,稳定,傻瓜式
支持Mysql,Sqlserver数据同步

[基于python的数据库同步]基于Python的SQL Server数据库对象同步轻量级实现

在线QQ客服:1922638

专业的SQL Server、MySQL数据库同步软件

  -*-编码:utf-8-*- 

!/usr/bin/env python3

__作者__ = " MSSQL123 "

__日期__ = " 2019-06-07 09:36 "

"""

20190706:

1,部分代码重构

2,优化插入效率,插入table_name值(***),(***)== \插入table_name值(***);插入table_name值(***)

"""

导入 操作系统

导入 sys

导入 时间

导入 datetime

导入 pymssql

来自十进制 import 小数

用法 = """

-----参数说明-----

源数据库参数

-s_h:数据库主机主机-----必须要求参数

-s_i:源数据库实例名称-----默认实例名称MSSQL

-s_d:源数据库名称-----必须要求参数

-s_u:确保数据库登录-----默认的Windows标识符

-s_p:安全数据库登录密码-----s_u不为null时必须输入

-s_P:正常数据库实例端口-----默认端口1433

目标数据库参数

-t_h:目标数据库主机-----必须要求参数

-t_i:目标数据库实例名称-----默认实例名称MSSQL

-t_d:目标数据库名称-----必须要求参数

-t_u:目标数据库登录-----默认的Windows标识符

-t_p:目标数据库登录密码-----s_u不为null时必须输入

-t_P:目标数据库实例端口-----默认端口1433

同步对象参数

-obj_type:表或sp或函数或其他数据库对象-----tab或sp或fn或tp

-obj:表| sp |功能|类型名称-----鞭表或sp同步

覆盖参数

-f:强制覆盖目标数据库对象-----F或N

--help:帮助文档

例:

python DataTransfer.py-s_h = 127.0.0.1-s_P = 1433-s_i =" MSSQL"-s_d =" DB01"-obj_type =" tab"-obj =" dbo.t1,dbo.t2"-t_h = 127.0.0.1-t_P = 1433-t_i =" MSSQL"-t_d =" DB02"-f =" Y"

python DataTransfer.py-s_h = 127.0.0.1-s_P = 1433-s_i =" MSSQL"-s_d =" DB01"-obj_type =" sp"-obj =" dbo.sp1,dbo.sp2"-t_h = 127.0.0.1-t_P = 1433-t_i =" MSSQL"-t_d =" DB02"-f =" Y"

"""

SyncDatabaseObject(对象):

源数据库

s_h =

s_i =

s_P =

s_u =

s_p =

s_d =

obj类型

s_obj_type = <跨度样式="颜色:#000000;">无

同步对象

s_obj =

目标数据库

t_h =

t_i =

t_P =

t_u =

t_p =

t_d =

f =

file_path =

def __ init __ (self,* args,** kwargs):

表示 k,v in kwargs.items():

setattr(self,k,v)

"""

这应该是有效的对象名称,就像xxx_name或dbo.xxx_name或[schema].xxx_name或架构一样。 [xxx_name]

然后将这种有效的对象名称传输为格式的对象名称,例如[dbo]。 [xxx_name](在没有架构名称时提供默认的dbo架构名称)

其他格式的对象名称被认为是无效的,将在处理中出现rasie错误

格式对象名称

1,xxx_name ======> [dbo]。 [Xxx_name]

2,dbo.xxx_name ======> [dbo]。 [Xxx_name]

3,[schema].xxx_name ====== \ gt; [dbo]。 [Xxx_name]

3,schema.xxx_name ====== [模式]。 [Xxx_name]

4,[模式]。 [Xxx_name] ====== \ gt; [模式]。 [Xxx_name]

5,[模式]。 [Xxx_name ====== \ gt; rasie错误格式消息

"""

@staticmethod

def format_object_name(名称):

format_name = ""

如果" " in 名称) :

schema_name =名称[0:name.find(" " )]]

object_name = name [name.find(" " )+ 1 :]

如果 不是" [ " in schema_name):

schema_name = " [ " + schema_name + " ] "

如果 不是" [ " object_name):

object_name = " [ " + object_name + " ] "

format_name = schema_name + " " + object_name

其他 :

如果" [ " 名称):

format_name = " [dbo]。 " + 名称

其他 :

format_name = " [dbo]。 " + " [ " +名称+ " ] "

返回 format_name

连接到sqlserver

def get_connect(自身,_h,_i,_P,_u,_p,_d):

光标 = 错误

试试 :

如果(_u) (_p):

conn = pymssql.connect(host = _ h,

服务器 = _ i,

端口 = _ P,

用户 = _ u,

密码 = _ p,

数据库 = _ d)

其他 :

conn = pymssql.connect(host = _ h,

服务器 = _ i,

端口 = _ P,

数据库 = _ d)

如果 (conn):

返回 conn

除外 :

提高

返回 conn

检查连接

def validated_connect(自身,_h,_i,_P,_u,_p,_d):

如果 不是 (self.get_connect(_h,_i,_P,_u,_p,_d)):

打印" 连接到 " + str(_h)+ " 失败,请检查您的参数 "

出口(0)

"""

检查用户输入对象是否为有效对象

"""

def exits_object(自身,conn,名称):

conn = conn

cursor_source = conn.cursor()

从源数据库按名称获取对象

sql_script = r """ 从中选择前1 1

从sys.objects中选择concat(QUOTENAME(schema_name(schema_id)),"。",QUOTENAME(name))作为obj_name

全部合并

从sys.types中选择concat(QUOTENAME(schema_name(schema_id)),"。",QUOTENAME(name))作为obj_name

)t其中obj_name =" {0}"

""" 。格式(self.format_object_name(名称))

cursor_source.execute(sql_script)

结果 = cursor_source.fetchall()

如果 不是 结果:

返回 0

其他 :

返回 1

conn.cursor.close()

conn.close()

表变量sync

def sync_table_variable(自身,tab_name,is_reference):

conn_source = self.get_connect(self.s_h,self.s_i,self.s_P,self.s_u,self.s_p,self.s_d)

conn_target = self.get_connect(self.t_h,self.t_i,self.t_P,self.t_u,self.t_p,self.t_d)

cursor_source = conn_source.cursor()

cursor_target = conn_target.cursor()

如果(self.exits_object(conn_source,self.format_object_name(tab_name)))\ gt; 0:

通过

其他 :

打印" -----------------------警告消息----------------------- "

打印" --------警告:对象 " + tab_name + " 源数据库中不存在------------ "

打印" -----------------------警告消息----------------------- "

打印 ()

返回

exist_in_target = 0

sql_script = r """ 选择顶部1 1

来自sys.table_types tp

其中is_user_defined = 1

和concat(QUOTENAME(schema_name(tp.schema_id)),"。",QUOTENAME(tp.name))=" {0}" """ \\\\

.format(((self.format_object_name(tab_name))))

如果目标服务器中存在表架构,请跳过

cursor_target.execute(sql_script)

exist_in_target = cursor_target.fetchone()

目标服务器数据库中存在天气

如果(self.f == " Y " ):

如果(is_reference!= " Y " ):

已跳过,sp使用时表类型不能删除

sql_script = r """

如果OBJECT_ID(" {0}")不为null

放置类型{0}

""" 。格式(self.format_object_name(tab_name))

cursor_target.execute(sql_script)

conn_target.commit()

其他 :

如果 exist_in_target:

打印" -----------------------警告消息----------------------- "

打印" 目标表类型 " + tab_name + " 存在,已从源中跳过同步表类型 "

打印" -----------------------警告消息----------------------- "

打印 ()

返回

sql_script = r """

宣告@SQL NVARCHAR(MAX)=""

SELECT @ SQL ="创建类型" +" {0}" +"按表格" + CHAR(13)+"(" + CHAR(13)+

东西 ((

SELECT CHAR(13)+",[" + c.name +"]" +

c.is_computed = 1时的情况

然后" AS" + OBJECT_DEFINITION(c。[Object_id],c.column_id)

其他

案例c.system_type_id! = C.user_type_id

然后" [" + SCHEMA_NAME(tp。[Schema_id])+"]。[" + Tp.name +"]"

否则为" [" + UPPER(y.name)+"]"

结束+

案件

何时y.name IN(" varchar"," char"," varbinary"," binary")

然后在c.max_length =-1的情况下添加"(" +大小写

然后" MAX"

ELSE CAST(c.max_length AS VARCHAR(5))

END +")"

y.name IN时(" nvarchar"," nchar")

然后在c.max_length =-1的情况下添加"(" +大小写

然后" MAX"

ELSE CAST(c.max_length/2 AS VARCHAR(5))

END +")"

y.name IN的时间(" datetime2"," time2"," datetimeoffset")

然后"((" + CAST(c.scale AS VARCHAR(5))+")"

当y.name =" decimal"时

THEN"(" + CAST(c。[Precision] AS VARCHAR(5))+"," + CAST(c.scale AS VARCHAR(5))+")"

其他""

结束+

c.collation_name不为空且c.system_type_id = c.user_type_id时的情况

然后"收集" + c.collation_name

其他""

结束+

c.is_nullable = 1时的情况

然后" NULL"

否则为" NOT NULL"

结束+

c.default_object_id的情况! = 0

然后" CONSTRAINT [" + OBJECT_NAME(c.default_object_id)+"]" +

" DEFAULT" + OBJECT_DEFINITION(c.default_object_id)

其他""

结束

结束

从sys.table_types tp

c.object_id = tp.type_table_object_id上的内部联接sys.columns c

内部联接sys.types y ON y.system_type_id = c.system_type_id

其中tp.is_user_defined = 1且y.name \ lt; \ gt; "系统名称"

和concat(QUOTENAME(schema_name(tp.schema_id)),"。",QUOTENAME(tp.name))=" {0}"

订单由c.column_id

FOR XML PATH(""),TYPE).value("。"," NVARCHAR(MAX)"),1,7,"")

+");"

选择@SQL作为脚本

""" 。格式(self.format_object_name(self.format_object_name((tab_name))))

cursor_target = conn_target.cursor()

cursor_source.execute(sql_script)

行 = cursor_source.fetchone()

试试 :

如果 不是 exist_in_target:

在目标服务器上执行脚本

cursor_target.execute(str(row [0])) 如果存在的话,删除当前的storage_procudre

conn_target.commit()

打印" *************表类型 " + self.format_object_name(tab_name)+ " 已同步********************* "

打印() 完成后输入空白行

:

打印" -----------------------错误消息----------------------- "

打印" -----------表格类型 " + self.format_object_name(tab_name)+ " 同步错误--------------- "

打印" -----------------------错误消息----------------------- "

打印 ()

提高

cursor_source.close()

conn_source.close()

cursor_target.close()

conn_target.close()

def get_table_column(自己,conn,tab_name):

column_names = ""

conn = conn

cursor_source = conn.cursor()

从源数据库按名称获取对象

sql_script = r """ 从sys.columns选择名称

其中object_id = object_id(" {0}")和is_computed = 0由object_id排序

""" 。格式(self.format_object_name(tab_name))

cursor_source.execute(sql_script)

结果 = cursor_source.fetchall()

用于 in 结果:

column_names = column_names +第[0]行" "

return column_names [0:len(column_names)-1 ]

conn.cursor.close()

conn.close()

模式同步

def sync_schema(自己):

conn_source = self.get_connect(self.s_h,self.s_i,self.s_P,self.s_u,self.s_p,self.s_d)

conn_target = self.get_connect(self.t_h,self.t_i,self.t_P,self.t_u,self.t_p,self.t_d)

cursor_source = conn_source.cursor()

cursor_target = conn_target.cursor()

arr_schema = []

在未定义表名时获取数据库中的所有表

schema_result = cursor_source.execute(r """

从sys.schemas中选择名称,其中schema_id \\ u> 4和schema_id \\ lt00; 16384

"""

用于 in cursor_source.fetchall():

cursor_target.execute(r """ 如果不存在(请从sys.schemas中选择*其中name =" {0}")

开始

exec("创建模式[{0}]")

结束

""" 。格式(str(第[0]行)))

conn_target.commit()

cursor_source.close()

conn_source.close()

cursor_target.close()

conn_target.close()

def sync_table_schema_byname(自己,tab_name,is_reference):

conn_source = self.get_connect(self.s_h,self.s_i,self.s_P,self.s_u,self.s_p,self.s_d)

conn_target = self.get_connect(self.t_h,self.t_i,self.t_P,self.t_u,self.t_p,self.t_d)

cursor_source = conn_source.cursor()

cursor_target = conn_target.cursor()

如果(self.exits_object(conn_source,self.format_object_name(tab_name))== 0 ):

打印" -----------------------警告消息----------------------- "

打印" ---------------警告:对象 " + tab_name + " 在源数据库中不存在---------------- "

打印" -----------------------警告消息----------------------- "

打印 ()

返回

如果存在sp的参考表,则不重新同步该表

如果(self.exits_object(conn_target,self.format_object_name(tab_name))\ gt; 0):

如果(self.f!= " Y " ):

打印" -----------------------警告消息----------------------- "

打印" ---------------警告:对象 " + tab_name + " 已存在于目标数据库中---------------- "

打印" -----------------------警告消息----------------------- "

打印 ()

返回

sql_script = r """ 从sys.tables中选择前1个1

其中type_desc =" USER_TABLE"

和concat(QUOTENAME(schema_name(schema_id)),"。",QUOTENAME(name))=" {0}"

""" 。格式(((self.format_object_name(tab_name))))

如果目标服务器中存在表架构,请跳过

cursor_target.execute(sql_script)

exist_in_target = cursor_target.fetchone()

如果 exist_in_target:

如果(self.f == " Y " ):

如果(is_reference!= " Y " ):

cursor_target.execute( " 删除表{0} " 。format(tab_name))

其他 :

打印" -----------------------警告消息----------------------- "

打印" 目标表 " + tab_name + " 存在,跳过了syn源中的c表模式 "

打印" -----------------------警告消息----------------------- "

打印 ()

返回

sql_script = r """ 声明

@object_name SYSNAME

,@object_id INT

选择

@object_name =" [" + s.name +"]。[" + o.name +"]"

,@ object_id = o。 [object_id]

FROM sys.objects o WITH(NOWAIT)

将SYS.schemas s与(NOWAIT)一起打开o。 [Schema_id] = s。 [Schema_id]

WHERE QUOTENAME(s.name)+"。" + QUOTENAME(o.name)=" {0}"

和o。 [类型] =" U"

AND o.is_ms_shipped = 0

宣告@SQL NVARCHAR(MAX)=""

; WITH index_column AS

选择

我知道了。 [object_id]

ic.index_id

,ic.is_descending_key

ic.is_included_column

,c.name

FROM sys.index_columns ic WITH(NOWAIT)

在ic上加入(NOWAIT)加入sys.columns。 [Object_id] = c。 [Object_id] AND ic.column_id = c.column_id

ic。 [Object_id] = @ object_id

),

fk_columns AS

选择

k.constraint_object_id

,cname = c.name

,rcname = rc.name

来自sys.foreign_key_columns k WITH(NOWAIT)

在rc上加入(NOWAIT)加入sys.columns rc。 [Object_id] = k.referenced_object_id和rc.column_id = k.referenced_column_id

在(不等待)的情况下加入sys.columnsc。 [object_id] = k.parent_object_id和c.column_id = k.parent_column_id

在哪里k.parent_object_id=@object_id

SELECT @ SQL ="创建表" + @object_name +"" +"(" +"" + STUFF((

SELECT"" +",[" + c.name +"]" +

c.is_computed = 1时的情况

然后" AS" +抄送。 [定义]

ELSE UPPER(tp.name)+

tp.name IN时的情况(" varchar"," char"," varbinary"," binary"," text")

然后,"(" + c.max_length =-1时的情况,然后" MAX" ELSE CAST(c.max_length AS VARCHAR(5))END +")"

当tp.name IN(" nvarchar"," nchar")时

然后"(" + c.max_length =-1时的情况,然后" MAX" ELSE CAST(c.max_length/2 AS VARCHAR(5))END +")"

tp.name输入时(" datetime2"," time2"," datetimeoffset")

然后"((" + CAST(c.scale AS VARCHAR(5))+")"

当tp.name =" decimal"时

THEN"(" + CAST(c。[Precision] AS VARCHAR(5))+"," + CAST(c.scale AS VARCHAR(5))+")"

其他""

结束+

c.collation_name不为空的情况下,然后" COLLATE" + c.collation_name ELSE"" END +

c.is_nullable = 1时的情况然后" NULL"否则" NOT NULL"结束+

案例直流。 [定义]是非空,然后"默认" + dc。 [定义] ELSE"" END +

ic.is_identity = 1时的情况然后," IDENTITY(" + CAST(ISNULL(1," 0")AS CHAR(1)))+"," + CAST(ISNULL(ic.increment_value," 1")AS CHAR(1 ))+")" ELSE""结束

结束+""

FROM sys.columns c WITH(NOWAIT)

使用(NOWAIT)ON加入sys.types tp c.user_type_id = tp.user_type_id

左联接sys.computed_columns cc启用(NOWAIT)c。 [Object_id] =抄送。 [Object_id] AND c.column_id = cc.column_id

左联接sys.default_constraints dc带(NOWAIT)ON c.default_object_id! = 0并且c。 [Object_id] = dc.parent_object_id和c.column_id = dc.parent_column_id

左联接sys.identity_columns ic,且c.is_identity = 1并且c。 [Object_id] =集成电路。 [Object_id] AND c.column_id = ic.column_id

在哪里c。 [Object_id] = @ object_id

订单由c.column_id

FOR XML PATH(""),TYPE).value("。"," NVARCHAR(MAX)"),1,2,"" +"")

+ ISNULL((SELECT"" +",CONSTRAINT [" + k.name +"]主键(" +

(选择的东西((

ic.is_descending_key = 1时,选择",[" + c.name +"]"" +大小写然后" DESC"否则" ASC"结束

FROM sys.index_columns ic WITH(NOWAIT)

加入sys.columns c打开(NOWAIT)c。 [Object_id] =集成电路。 [Object_id] AND c.column_id = ic.column_id

ic.is_included_column = 0

和ic。 [Object_id] = k.parent_object_id

AND ic.index_id = k.unique_index_id

FOR XML PATH(N""),TYPE).value("。"," NVARCHAR(MAX)"),1,2,""))

+")" +""

从sys.key_constraints k WITH(NOWAIT)

在哪里k.parent_object_id=@object_id

和k。 [Type] =" PK"),"")+")" +""

+ ISNULL((选择(

选择"" +

" ALTER TABLE" + @object_name +" WITH"

+当fk.is_not_trusted = 1时

然后" NOCHECK"

其他"检查"

结束+

" ADD CONSTRAINT [" + fk.name +"] FOREIGN KEY("

+东西((

SELECT",[" + k.cname +"]"

从fk_columns k

其中k.constraint_object_id = fk。 [Object_id]和1 = 2

FOR XML PATH(""),TYPE).value("。"," NVARCHAR(MAX)"),1,2,"")

+")" +

" REFERENCES [" + SCHEMA_NAME(ro。[Schema_id])+"]。[" + Ro.name +"]("

+东西((

SELECT",[" + k.rcname +"]"

从fk_columns k

其中k.constraint_object_id = fk。 [Object_id]

FOR XML PATH(""),TYPE).value("。"," NVARCHAR(MAX)"),1,2,"")

+")"

+案例

当fk.delete_referential_action = 1时" ON DELETE CASCADE"

当fk.delete_referential_action = 2时" ON DELETE SET NULL"

当fk.delete_referential_action = 3时" ON DELETE SET DEFAULT"

其他""

结束

+案例

当fk.update_referential_action = 1时" ON UPDATE CASCADE"

当fk.update_referential_action = 2时" ON UPDATE SET NULL"

当fk.update_referential_action = 3时" ON UPDATE SET DEFAULT"

其他""

结束

+"" +" ALTER TABLE" + @object_name +" CHECK CONSTRAINT [" + fk.name +"]" +""

从sys.foreign_keys fk WITH(NOWAIT)

将sys.objects ro与(NOWAIT)一起加入ro。 [Object_id] = fk.referenced_object_id

在哪里fk.parent_object_id=@object_id

FOR XML PATH(N""),TYPE).value("。"," NVARCHAR(MAX)")),"")

+ ISNULL((((选择

"" +"创建" + i.is_unique = 1时的情况然后" UNIQUE" ELSE"" END

+"非聚集索引[" + i.name +"] ON" + @object_name +"(" +

东西 ((

当c.is_descending_key = 1时,选择",[" + c.name +"]"" +大小写然后" DESC"否则" ASC"

FROM index_column c

在哪里c.is_included_column = 0

AND c.index_id = i.index_id

FOR XML PATH(""),TYPE).value("。"," NVARCHAR(MAX)"),1,2,"")+")"

+ ISNULL("" +" INCLUDE(" +

东西 ((

SELECT",[" + c.name +"]"

FROM index_column c

其中c.is_included_column = 1

AND c.index_id = i.index_id

FOR XML PATH(""),TYPE).value("。"," NVARCHAR(MAX)"),1,2,"")+")","")+""

FROM sys.indexes i WITH(NOWAIT)

在哪里[Object_id] = @ object_id

AND i.is_primary_key = 0

和我。 [类型] = 2

FOR XML PATH(""),TYPE).value("。"," NVARCHAR(MAX)")

),"")

选择@SQL作为脚本 """ 。格式(self.format_object_name(tab_name))

cursor_target = conn_target.cursor()

cursor_source.execute(sql_script)

行 = cursor_source.fetchone()

如果 不是 [0]行:

返回

尝试 :

cursor_target.execute(第[0]行) 删除当前表架构(如果存在)

conn_target.commit()

打印" *************模式 " + self.format_object_name(tab_name)+ " 已同步************* "

打印() 完成后输入空白行

:

打印" -----------------------警告消息----------------------- "

打印" -----------模式 " + self.format_object_name(tab_name)+ " 同步失败--------------- "

打印" -----------------------警告消息----------------------- "

打印 ()

cursor_source.close()

conn_source.close()

cursor_target.close()

conn_target.close()

def sync_table_schema(自己):

默认情况下不通过引用的其他对象进行同步

is_reference = " N "

conn_source = self.get_connect(self.s_h,self.s_i,self.s_P,self.s_u,self.s_p,self.s_d)

conn_target = self.get_connect(self.t_h,self.t_i,self.t_P,self.t_u,self.t_p,self.t_d)

cursor_source = conn_source.cursor()

cursor_target = conn_target.cursor()

arr_table = []

如果 (self.s_obj):

for tab_name self.s_obj.split(" " ):

如果(tab_name)(self.exits_object(conn_source,tab_name) \ gt; 0):

self.sync_table_schema_byname(tab_name,is_reference)

其他 :

打印" -----------------------警告消息----------------------- "

打印" -----------模式 " + self.format_object_name(tab_name)+ " 源数据库中不存在--------------- "

打印" -----------------------警告消息----------------------- "

打印 ()

继续

其他 :

同步所有表

在未定义表名时获取数据库中的所有表

sql_script = """ SELECT QUOTENAME(s.name)+"。 " + QUOTENAME(o.name)

FROM sys.objects o WITH(NOWAIT)

将SYS.schemas s与(NOWAIT)一起打开o。 [Schema_id] = s。 [Schema_id]

在哪里[Type] =" U" AND o.is_ms_shipped = 0

"""

cursor_source.execute(sql_script)

用于 in cursor_source.fetchall():

self.sync_table_schema_byname(str(行[0]),is_reference)

将数据从源表同步到目标表

def sync_table_data_byname_bak(自己,tab_name):

conn_source = self.get_connect(self.s_h,self.s_i,self.s_P,self.s_u,self.s_p,self.s_d)

conn_target = self.get_connect(self.t_h,self.t_i,self.t_P,self.t_u,self.t_p,self.t_d)

cursor_source = conn_source.cursor()

cursor_target = conn_target.cursor()

insert_columns = ""

insert_columns = self.get_table_column(conn_source,tab_name)

insert_prefix = ""

天气具有标识列

cursor_source.execute(r """ 从sys.columns中选择1

其中object_id = OBJECT_ID(" {0}")和is_identity = 1

""" 。格式(tab_name))

exist_identity =

exist_identity = cursor_source.fetchone()

如果 (exists_identity):

insert_prefix = " 设置identity_insert {0}为开; " 。格式(tab_name)

数据源

insert_sql = ""

values_sql = ""

current_row = ""

计数器 = 0

sql_script = r """ 从{1}中选择顶部288888 {0} """ 。格式(insert_columns,tab_name)

cursor_source.execute(sql_script)

创建插入列

"""

对于cursor_source.description中的字段:

insert_columns = insert_columns + str(字段[0])+","

insert_columns = insert_columns [0:len(insert_columns)-1]

"""

insert_prefix = insert_prefix + " 插入{0}({1})值中。 " 。格式(tab_name,insert_columns)

用于 in cursor_source.fetchall():

计数器 =计数器+ 1

for in 行:

如果(str(key)== " " ):

current_row = current_row + r """ null, """

其他 :

如果(类型(键) datetime.datetime):

current_row = current_row + r """ " {0}", """ .format(str(key)[0:23 ])

elif (类型(键) is str):

我已插入广告! ! ! ,这里还有另一个坑:https://blog.csdn.net/dadaowuque/article/details/81016127

current_row = current_row + r """ " {0}", """ 。格式(

key.replace( " " " " "" " )。replace(" \\\\ u0000 " "" )。replace(" \\\\ x00 " "" ))

elif (类型(键) is 十进制):

d = 小数(键)

s = " {0:f} " 。格式(d)

current_row = current_row + r """ " {0}", """ 。格式

elif (类型(键) is 字节):

print(十六进制(int.from_bytes(键,"大",已签名= True )))

current_row = current_row + r """ {0}, """ .format(十六进制(int.from_bytes(key," big " ,已签名= 错误)))

其他 :

current_row = current_row + r """ " {0}", """ 。格式(键)

current_row = current_row [0:len(current_row)-2] 删除最后一个一个字符","

values_sql = values_sql + " " + current_row + " ), "

current_row = ""

时执行一批

如果(计数器== 1000 ):

insert_sql = insert_prefix + values_sql

insert_sql = insert_sql [0:len(insert_sql)-1] 删除最后一个一个字符","

如果 (exists_identity):

insert_sql = insert_sql + " ;将identity_insert {0}设置为关闭; " 。格式(tab_name)

试试 :

cursor_target.execute(插入SQL)

除外 :

打印" ----------------------错误 " + tab_name + " 数据同步失败------------------------- "

提高

conn_target.commit()

insert_sql = ""

values_sql = ""

current_row = ""

计数器 = 0

打印(time.strftime(" %Y-%m-%d%H:%M:%S " ,time.localtime())+ " *************** " + self.format_object_name(tab_name)+ " " + str(1000)+ " 行已同步****** ******* "

同步最后剩余的批处理数据

如果 (values_sql):

insert_sql = insert_prefix + values_sql

insert_sql = insert_sql [0:len(insert_sql)-1] 删除最后一个一个字符","

如果 (exists_identity):

insert_sql = insert_sql + " ;将identity_insert {0}设置为关闭; " 。格式(tab_name)

执行最后一批

尝试 :

cursor_target.execute(插入SQL)

除外 :

打印" ------------------错误 " + tab_name + " 数据同步失败------------------------ "

提高

conn_target.commit()

insert_sql = ""

values_sql = ""

current_row = ""

打印(time.strftime(" %Y-%m-%d%H:%M:%S "

time.localtime()) + " ********** ***** " + self.format_object_name(

tab_name) + " " + str(

计数器) + " 已同步的行************ * "

打印(time.strftime(" %Y-%m-%d%H:%M:%S "

time.localtime()) + " ----------------已同步 " + self.format_object_name(

tab_name) + " 数据已完成--------------- "

打印 ()

cursor_source.close()

conn_source.close()

cursor_target.close()

conn_target.close()

将数据从源表同步到目标表

def sync_table_data_byname(自己,tab_name):

conn_source = self.get_connect(self.s_h,self.s_i,self.s_P,self.s_u,self.s_p,self.s_d)

conn_target = self.get_connect(self.t_h,self.t_i,self.t_P,self.t_u,self.t_p,self.t_d)

cursor_source = conn_source.cursor()

cursor_target = conn_target.cursor()

insert_columns = self.get_table_column(conn_source,tab_name)

如果(self.f!= " Y " ):

sql_script = " 从{1}中选择前1个{0} " 。格式(insert_columns,tab_name)

如果目标表中存在数据,请中断

cursor_target.execute(sql_script)

存在 = cursor_target.fetchone()

如果 存在:

打印" -----------------------警告消息----------------------- "

打印" 目标表 " + tab_name + " 存在数据,已从源中跳过同步数据 "

打印" -----------------------警告消息----------------------- "

打印 ()

返回

其他 :

sql_script = " 截断表{0} " 。格式(tab_name)

如果目标表中存在数据,请中断

cursor_target.execute(sql_script)

conn_target.commit()

天气具有标识列

cursor_source.execute(r """ 从sys.columns中选择1

其中object_id = OBJECT_ID(" {0}")和is_identity = 1

""" 。格式(tab_name))

exist_identity =

exist_identity = cursor_source.fetchone()

如果 (exists_identity):

cursor_target.execute( " 设置identity_insert {0} on; " 。format(tab_name))

conn_target.commit()

数据源

insert_sql = ""

计数器 = 0

sql_script = r """ 从{1}中选择{0} """ 。格式(insert_columns,tab_name)

cursor_source.execute(sql_script)

创建插入列

"""

对于cursor_source.description中的字段:

insert_columns = insert_columns + str(字段[0])+","

insert_columns = insert_columns [0:len(insert_columns)-1]

"""

current_row = " 插入{0}({1})值中( " 。格式(tab_name,insert_columns)

用于 in cursor_source.fetchall():

计数器 =计数器+ 1

for in 行:

如果(str(key)== " " ):

current_row + = r """ null, """

其他 :

如果(类型(键) datetime.datetime):

current_row + = r """ " {0}", """ .format(str(key)[0:23 ])

elif (类型(键) is str):

我已插入广告! ! ! ,这里还有另一个坑:https://blog.csdn.net/dadaowuque/article/details/81016127

current_row + = r """ " {0}", """ .format(key.replace(" " " " "" " )。替换(" \\\\ u0000 " "" ).replace( " \\\\ x00 " "" ))

elif (类型(键) is 十进制):

d = 小数(键)

s = " {0:f} " 。格式(d)

current_row + = r """ " {0}", """ 。格式

elif (类型(键) is 字节):

print(十六进制(int.from_bytes(键,"大",已签名= True )))

current_row + = r """ {0}, """ .format(十六进制(int.from_bytes(key," big " ,已签名= 错误)))

其他 :

current_row + = r """ " {0}", """ 。格式(键)

current_row = current_row [0:len(current_row)-2] 删除最后一个一个字符","

current_row + = " ); "

insert_sql + = current_row

current_row = " 插入{0}({1})值中( " 。格式(tab_name,insert_columns)

时执行一批

如果(计数器== 3000 ):

试试 :

cursor_target.execute(插入SQL)

conn_target.commit()

除外 :

打印" ----------------------错误 " + tab_name + " 数据同步失败------------------------- "

提高

insert_sql = ""

current_row = " 插入{0}({1})值中( " 。格式(tab_name,insert_columns)

计数器 = 0

打印(time.strftime(" %Y-%m-%d%H:%M:%S " ,time.localtime())+ " *************** " + self.format_object_name(tab_name)+ " " + str(3000)+ " 已同步行****** ******* "

同步最后剩余的批处理数据

如果(计数器\ gt; 0):

执行最后一批

尝试 :

cursor_target.execute(插入SQL)

conn_target.commit()

除外 :

打印" ------------------错误 " + tab_name + " 数据同步失败------------------------ "

提高

insert_sql = ""

current_row = ""

打印(time.strftime(" %Y-%m-%d%H:%M:%S " ,time.localtime())+ " *************** " + self.format_object_name(tab_name)+ " " + str(计数器)+ " 已同步行********** *** "

打印(time.strftime(" %Y-%m-%d%H:%M:%S " ,time.localtime())+ " ----------------已同步 " + self.format_object_name(tab_name)+ " 数据完成--------------- "

打印 ()

如果 (exists_identity):

cursor_target.execute( " ;将identity_insert {0}设置为off; " 。format(tab_name))

conn_target.commit()

cursor_source.close()

conn_source.close()

cursor_target.close()

conn_target.close()

将数据从源表同步到目标表

def sync_table_data(自己):

conn_source = self.get_connect(self.s_h,self.s_i,self.s_P,self.s_u,self.s_p,self.s_d)

conn_target = self.get_connect(self.t_h,self.t_i,self.t_P,self.t_u,self.t_p,self.t_d)

cursor_source = conn_source.cursor()

cursor_target = conn_target.cursor()

arr_table = []

如果 (self.s_obj):

arr_table = self.s_obj.split(" "

tab_name in arr_table:

self.sync_table_data_byname(self.format_object_name(tab_name))

def sync_dependent_object(自己,obj_name):

强制覆盖,如果它属于从属对象,则无需对其生效对象如果已同步,则首先检查它是否存在于目标中,如果不继续同步,这是要实现的标记

is_refernece = " Y "

conn_source = self.get_connect(self.s_h,self.s_i,self.s_P,self.s_u,self.s_p,self.s_d)

conn_target = self.get_connect(self.t_h,self.t_i,self.t_P,self.t_u,self.t_p,self.t_d)

cursor_source = conn_source.cursor()

cursor_target = conn_target.cursor()

"""

查找依赖对象

如果存在依赖对象,请提前同步依赖对象

"""

sql_check_dependent = r """

选择*从

选择

不同的rtrim(低位(s.type))将Chinese_PRC_CI_AS收集为obj_type,

QUOTENAME(d.referenced_schema_name)+"。 + QUOTENAME(d.referenced_entity_name)收集Chinese_PRC_CI_AS为obj

从sys.dm_sql_referenced_entities(" {0}"," OBJECT")作为d

内部连接sys.sysobjects在s.id = d.referenced_id上

全部合并

选择

不同的rtrim(下部(d.referenced_class_desc))将Chinese_PRC_CI_AS收集为obj_type,

QUOTENAME(d.referenced_schema_name)+"。 + QUOTENAME(d.referenced_entity_name)收集Chinese_PRC_CI_AS为obj

从sys.dm_sql_referenced_entities(" {0}"," OBJECT")作为d

s.user_type_id = d.referenced_id上的内部联接sys.types

)吨

""" 。格式(self.format_object_name(obj_name))

cursor_source.execute(sql_check_dependent)

结果 = cursor_source.fetchall()

用于 in 结果:

如果行[1 ]:

如果(行[0] == " u " ):

如果(行[1 ]):

self.sync_table_schema_byname(行[ 1 ],is_refernece)

elif (行[0] == " fn " 第[0] == " 如果 " ):

如果(行[1 ]):

self.sync_procudre_byname( " f " ,第[1 ]行,is_refernece)

elif (行[0] == " 类型 " ):

如果(行[1 ]):

self.sync_table_variable(行[ 1 ],is_refernece)

def sync_procudre_byname(自身,类型,obj_name,is_reference):

conn_source = self.get_connect(self.s_h,self.s_i,self.s_P,self.s_u,self.s_p,self.s_d)

conn_target = self.get_connect(self.t_h,self.t_i,self.t_P,self.t_u,self.t_p,self.t_d)

cursor_source = conn_source.cursor()

cursor_target = conn_target.cursor()

如果(self.exits_object(conn_source,self.format_object_name(obj_name))== 0 ):

打印" ---------------警告消息---------------- "

打印" ---------------警告:对象 " + obj_name + " 在源数据库中不存在---------------- "

打印" ---------------警告消息---------------- "

打印 ()

返回

elif (self.exits_object(conn_target,self.format_object_name(obj_name))\ gt; 0):

如果(self.f!= " Y " ):

打印" ---------------警告消息---------------- "

打印" ---------------警告:对象 " + obj_name + " 已存在于目标数据库中---------------- "

打印" ---------------警告消息---------------- "

打印 ()

返回

"""

最初想直接生成delete语句:

这里有一个该死的逃生,很难做到正确,中午我们去吃饭,

下午回来考虑一下,换一种方式,逃避这个问题

sql_script =选择

" if object_id(" + """" + QUOTENAME(schema_name(uid)) + "" + QUOTENAME(名称)+ """ +")不为空"

+" drop proc" + QUOTENAME(schema_name(uid))+"。 + QUOTENAME(名称),

OBJECT_DEFINITION(ID)

来自sys.sysobjects,其中xtype =" P"且uid不在(16,19)

"""

sql_script = r """

选择

QUOTENAME(schema_name(uid))+"。 + QUOTENAME(名称),

OBJECT_DEFINITION(ID)

来自sys.sysobjects,其中xtype位于(" P"," IF"," FN")和uid不位于(16,19)

"""

如果 (obj_name):

sql_script = sql_script + " 和QUOTENAME(schema_name(uid))+"。 " + QUOTENAME(name)=" {0}" " 。格式(

self.format_object_name(obj_name))

cursor_source.execute(sql_script)

行 = cursor_source.fetchone()

试试 :

如果 type == " f " :

sql_script = r """

如果object_id(" {0}")不为null

放置功能{0}

""" 。格式(self.format_object_name(第[0]行))

elif type == " p " :

sql_script = r """

如果object_id(" {0}")不为null

删除进程{0}

""" 。格式(self.format_object_name(第[0]行))

cursor_target.execute(sql_script) 如果存在,则删除当前的storage_procudre

conn_target.commit()

同步依赖对象

如果(is_reference!= " N " ):

self.sync_dependent_object(self.format_object_name(第[0]行))

同步对象自身

cursor_target.execute(str(row [1])) 执行create created_procudre脚本

conn_target.commit()

打印" ************* sync sp: " + self.format_object_name(第[0]行)+ " 完成***************** "

打印 ()

除外 :

打印" ---------------错误消息---------------- "

打印" ------------------同步 " +行[0] + " sp错误-------------------------- "

打印" ---------------错误消息---------------- "

打印 ()

cursor_source.close()

conn_source.close()

cursor_target.close()

conn_target.close()

def sync_procudre(自身,类型):

is_reference = " N "

conn_source = self.get_connect(self.s_h,self.s_i,self.s_P,self.s_u,self.s_p,self.s_d)

conn_target = self.get_connect(self.t_h,self.t_i,self.t_P,self.t_u,self.t_p,self.t_d)

cursor_source = conn_source.cursor()

cursor_target = conn_target.cursor()

如果 (self.s_obj):

for proc_name in self.s_obj.split(" " ):

self.sync_dependent_object(proc_name)

self.sync_procudre_byname(类型,proc_name,is_reference)

同步所有sp和函数

其他 :

sql_script = r """

选择

QUOTENAME(schema_name(uid))+"。 + QUOTENAME(名称),

OBJECT_DEFINITION(ID)

来自sys.sysobjects,其中xtype = upper(" {0}")和uid不在(16,19)

""" 。格式(类型)

cursor_source.execute(sql_script)

用于 in cursor_source.fetchall():

self.sync_dependent_object(第[0]行)

self.sync_procudre_by_name(类型,行[0],is_reference)

如果 __名称__ == " __ main __ " :

"""

sync = SyncDatabaseObject(s_h =" 127.0.0.1",

s_i =" sql2017",

s_P = 49744,

s_d =" DB01",

t_h =" 127.0.0.1",

t_i =" sql2017",

t_P = 49744,

t_d =" DB02",

s_obj_type =" sp",

s_obj =" dbo.sp_test01",

f =" Y")

sync.sync_procudre(" p")

"""

p_s_h = ""

p_s_i = " MSSQL "

p_s_P = 1433

p_s_d = ""

p_s_u =

p_s_p =

p_s_obj = ""

p_type = ""

p_t_s = ""

p_t_i = " MSSQL "

p_t_P = " 1433 "

p_t_d = ""

p_t_u =

p_t_p =

强制转换目标数据库对象,默认不强制覆盖目标数据库对象

p_f = " N "

同步obj类型表| sp

p_obj_type =

同步whick数据库对象

p_obj =

如果 len(sys.argv)== 1 :

打印 (使用情况)

sys.exit( 1

elif sys.argv [1] == " -帮助 " :

打印 (使用情况)

sys.exit()

elif len(sys.argv)\ gt; = 2 :

for i sys.argv [1 :] ::

_argv = i.split(" = "

源服务器名称

如果 _argv [0] == " -s_h " :

p_s_h = _ argv [1 ]

源服务器实例名称

如果 _argv [0] == " -s_i " :

如果(_argv [1 ]):

p_s_i = _ argv [1 ]

源服务器实例PORT

如果 _argv [0] == " -s_P " :

如果(_argv [1 ]):

p_s_P = _ argv [1 ]

源数据库名称

如果 _argv [0] == " -s_d " :

p_s_d = _ argv [1 ]

如果 _argv [0] == " -s_u " :

p_s_u = _ argv [1 ]

如果 _argv [0] == " -s_p " :

p_s_p = _ argv [1 ]

如果 _argv [0] == " -t_h " :

p_t_h = _ argv [1 ]

if _argv [0] == " -t_i " :

如果(_argv [1 ]):

p_t_i = _ argv [1 ]

如果 _argv [0] == " -t_P " :

如果(_argv [1 ]):

p_t_P = _ argv [1 ]

如果 _argv [0] == " -t_d " :

p_t_d = _ argv [1 ]

如果 _argv [0] == " -t_u " :

p_t_u = _ argv [1 ]

如果 _argv [0] == " -t_p " :

p_t_p = _ argv [1 ]

如果 _argv [0] == " -f " :

如果(_argv [1 ]):

p_f = _ argv [1 ]

对象类型

如果 _argv [0] == " -obj_type " :

如果 不是(_argv [1 ]):

打印" -obj_type不能为空(-obj =选项卡|-obj = sp |-obj = fn |-obj =类型) "

出口(0)

其他 :

p_obj_type = _ argv [1 ]

对象名称

如果 _argv [0] == " -obj " :

如果(_argv [1 ]):

p_obj = _ argv [1 ]

require para

如果 p_s_h.strip()== "" :

打印" 源服务器主机不能为空 "

出口(0)

如果 p_s_d.strip()== "" :

打印" 源服务器主机数据库名称不能为空 "

出口(0)

如果 p_t_h.strip()== "" :

打印" 目标服务器主机不能为空 "

出口(0)

如果 p_t_d.strip()== "" :

打印" 目标服务器主机数据库名称不能为空 "

出口(0)

sync = SyncDatabaseObject(s_h = p_s_h,

s_i = p_s_i,

s_P = p_s_P,

s_d = p_s_d,

s_u = p_s_u,

s_p = p_s_p,

s_obj = p_obj,

t_h = p_t_h,

t_i = p_t_i,

t_P = p_t_P,

t_d = p_t_d,

t_u = p_t_u,

t_p = p_t_p,

f = p_f)

sync.validated_connect(p_s_h,p_s_i,p_s_P,p_s_d,p_s_u,p_s_p)

sync.validated_connect(p_t_h,p_t_i,p_t_P,p_t_d,p_t_u,p_t_p)

如果(p_f.upper()== " Y " ):

确认 =输入(" 确认要覆盖目标对象吗? "

如果 Confirm.upper()! = " Y " :

出口(0)

打印" --------------------------同步开始---------------------------------- "

打印 ()

如果(p_obj_type == " 选项卡 " ):

同步模式

sync.sync_schema()

同步表架构

sync.sync_table_schema()

同步数据

sync.sync_table_data()

elif (p_obj_type == " sp " ):

同步模式

sync.sync_schema()

sync sp

sync.sync_procudre(" p "

elif (p_obj_type == " fn " ):

同步模式

sync.sync_schema()

sync sp

sync.sync_procudre(" fn "

elif (p_obj_type == " tp " ):

同步模式

sync.sync_schema()

sync sp

sync.sync_table_variable()

其他 :

打印" -obj_type未通过验证 "

打印 ()

打印" --------------------------同步完成---------------------------------- "

相关推荐

咨询软件
 
QQ在线咨询
售前咨询热线
QQ1922638