快速,持续,稳定,傻瓜式
支持Mysql,Sqlserver数据同步

连续SQL流发送和处理系统的简介(第2部分:MySQL)

在线QQ客服:1922638

专业的SQL Server、MySQL数据库同步软件

介绍

大多数客户端服务器数据库系统仅通过使用阻塞套接字和某些聊天协议来支持客户端和后端数据库之间的同步通信,该协议需要客户端或服务器在发送新数据块之前等待确认。等待时间(也称为等待时间)可能从局域网(LAN)的十分之几秒到广域网(WAN)的数百毫秒开始。等待时间长会大大降低应用程序的质量。

幸运的是,UDAParts开发了一个功能强大且安全的通信框架,称为SocketPro,该框架通过使用异步数据传输和并行计算,具有连续的内联请求/结果批处理和实时流处理功能,以实现最佳的网络效率,开发简便性,性能,可伸缩性,以及该站点(https://github.com/udaparts/socketpro)的许多出色甚至独特的功能。

此外,UDAParts通过ODBC驱动程序将强大的SocketPro框架应用于大量流行的数据库,例如SQLite,MySQL和MS SQL Server以及其他数据库,以支持连续的SQL流发送和处理。此外,这些数据库组件中的大多数对公众完全是永远免费的。为了降低学习的复杂性,我建议您在播放这些MySQL示例项目之前先研究SQLite的SQL流示例(第1部分:SQLite),因为SQLite和MySQL示例共享相同的客户端API函数。

MySQL是当前最受欢迎的开源客户端服务器分布式数据库管理系统。在研究了MySQL服务器插件功能之后,UDAParts将SocketPro SQL流技术应用到MySQL上,并开发了一个插件来支持服务器端连续SQL语句的发送和处理,以实现最佳性能和可伸缩性。此外,UDAParts已将SQL流技术与MySQL Connector / Net进行了性能比较。我们的性能研究表明,SQL流技术可以比WAN上的MySQL Connector / Net快一千倍。

源代码和示例

所有相关的源代码和示例都位于https://github.com/udaparts/socketpro。克隆到您的计算机通过GIT,注意子目录后的mysql目录内socketpro / stream_sql。SocketPro MySQL服务器插件源代码位于目录socketpro / stream_sql / smysql中。此外,您可以看到这些示例是从.NET,C / C ++,Java和Python开发环境创建的。但是,在本文中,我们使用C#代码(socketpro / stream_sql / mysql / test_csahrp)进行客户端开发以进行解释。

除上述示例外,您还可以在目录socketpro / stream_sql / mysql / DBPerf上使用MySQL示例数据库sakila查找性能研究示例该子目录包含三个性能研究项目cppperfnetperf并且mysqlperf,这是写在C ++ / SocketPro SQL流,.NET / SQL SocketPro流和ADO.NET提供技术,分别。

此外,SocketPro MySQL服务器插件支持数据表更新事件(DELETEINSERTUPDATE通过触发器)。您可以使用此功能将所选表的更新事件推送到客户端。该示例项目位于目录socketpro / stream_sql / mysql / test_cache中。

在运行这些示例应用程序之前,应将socketpro / bin目录内的系统库分发到系统目录中。

关于SocketPro通信框架,您还可以在socketpro / doc / SocketPro开发guide.pdf上参考其开发指南文档。

注册SocketPro MySQL SQL流媒体插件及其配置数据库

如本站点中所述,通过INSTALL PLUGIN UDAParts_SQL_Streaming SONAME ‘libsmysql.so’从应用程序中调用语句来注册MySQL SQL-stream插件mysql。如果成功,您应该看到sp_streaming_db创建了一个新数据库,如下图1所示。

MySQL SQL流配置数据库

图1:SocketPro SQL流配置数据库sp_streaming_db和表配置

配置数据库具有三个简单的表格,configservicepermission如图中图1上述预期SocketPro MySQL的SQL-流插件支持的工业安全标准,SSL3 / TLSv1.x到客户端和服务器之间的安全通信。默认情况下,SocketPro客户端可以使用IP v4或v6在端口号20902上访问MySQL数据库。请注意该记录cached_tables。如果你正确设置它的值,所有连接的SocketPro客户端可以看到这些表中的数据的变化(例如,表actorcountrycategorylanguage数据库内sakila)的实时性。请参考test_cache目录socketpro / stream_sql / mysql中的示例,您可以使用实时缓存功能通过减少中间层和数据库之间的数据传输来提高中间层的性能和可伸缩性。

一台SocketPro服务器能够使用一个TCP端口同时支持多种服务。如果愿意,可以将SocketPro MySQL SQL流媒体插件的值设置为’ 1‘为record 来启用websocket enable_http_websocket。此外,还可以通过正确设置记录的值来嵌入其他服务,services如上图1所示。更改表中的任何一个或多个值后config,应重新启动MySQL。否则,更改将无法正常运行。

关于表permission,SocketPro MySQL SQL流技术使用其记录来验证嵌入式服务的客户端,如下图2所示。MySQL SQL流插件使用这两个表mysql.user并对sp_streaming_db.permission所有服务的所有客户端进行身份验证。但是,其SQL流服务不使用表中的记录sp_streaming_db.permission进行身份验证。

MySQL SQL流权限表

图2:允许SocketPro异步持久消息队列服务使用3个用户(root,user_one和user_two)(服务id = 257)

在大多数情况下,您无需触摸表格服务。注意,SocketPro MySQL服务器插件支持MySQL服务器版本8.0.11或更高版本。

主功能

SocketPro从底部开始进行写操作,以通过使用一个或多个非阻塞套接字池来支持并行计算。每个池可以由一个或多个线程组成,并且每个线程在客户端托管一个或多个非阻塞套接字。但是,我们仅使用一个池在客户端对此示例进行清晰的演示,如下面的代码片段1所示。

static void Main(string[] args)
{
    Console.WriteLine("Remote host: ");
    string host = Console.ReadLine();
    CConnectionContext cc = new CConnectionContext(host, 20902, "root", "Smash123");
    using (CSocketPool<cmysql> spMysql = new CSocketPool<cmysql>()) {
        //start one socket pool having 1 worker thread hosting 1 non-block socket
        if (!spMysql.StartSocketPool(cc, 1, 1)) {
            Console.WriteLine("Failed in connecting to remote async mysql server");
            Console.WriteLine("Press any key to close the application ......");
            Console.Read();
            return;
        }
        CMysql mysql = spMysql.Seek(); //get an async handler

        //start to stream all types of requests including SQL statements
        bool ok = mysql.Open("", dr); //open a default MySQL database

        //create a container to receive all queries data
        List<KeyValuePair<CDBColumnInfoArray, CDBVariantArray>> ra = 
                       new List<KeyValuePair<CDBColumnInfoArray, CDBVariantArray>>();

        CMysql.DRows r = (handler, rowData) => {
            //rowset data come here
            int last = ra.Count - 1;
            KeyValuePair<CDBColumnInfoArray, CDBVariantArray> item = ra[last];
            item.Value.AddRange(rowData); //populate record data into receiving container ra
        };

        CMysql.DRowsetHeader rh = (handler) => {
            //rowset header comes here
            KeyValuePair<CDBColumnInfoArray, CDBVariantArray> item =  
                       new KeyValuePair<CDBColumnInfoArray, CDBVariantArray>
                                    (handler.ColumnInfo, new CDBVariantArray());
            ra.Add(item); //populate query column meta into receiving container ra
        };
            
        TestCreateTables(mysql); //there are 2 DDL requests inside the call
        ok = mysql.Execute("delete from employee;delete from company", er);
        TestPreparedStatements(mysql); //there are 2 requests (1 prepare + 
                                       //1 parameterized statements) inside the call
        InsertBLOBByPreparedStatement(mysql); //there are 2 requests (1 prepare + 
                                              //1 parameterized statements) inside the call
        ok = mysql.Execute("SELECT * from company;select * from employee;select curtime()", er, r, rh);
        CDBVariantArray vPData = new CDBVariantArray();
        //first set
        vPData.Add(1); //input
        vPData.Add(1.4); //inputoutput
        vPData.Add(0); //output

        //second set
        vPData.Add(2); //input
        vPData.Add(2.5); //inputoutput
        vPData.Add(0); //output

        //Test MySQL stored procedure
        TestStoredProcedure(mysql, ra, vPData); //there are 2 requests (1 prepare + 
                                                //1 parameterized statements) inside the call

        //end streaming all types of requests

        ok = mysql.WaitAll(); //make sure all streamed requests are processed and returned

        Console.WriteLine();
        Console.WriteLine("There are {0} output data returned", mysql.Outputs * 2);

        int index = 0;
        Console.WriteLine();
        Console.WriteLine("+++++ Start rowsets +++");
        foreach (KeyValuePair<CDBColumnInfoArray, CDBVariantArray> it in ra) {
            Console.Write("Statement index = {0}", index);
            if (it.Key.Count > 0)
                Console.WriteLine(", rowset with columns = {0}, records = {1}.", 
                                  it.Key.Count, it.Value.Count / it.Key.Count);
            else
                Console.WriteLine(", no rowset received.");
            ++index;
        }
        Console.WriteLine("+++++ End rowsets +++");
        Console.WriteLine();
        Console.WriteLine("Press any key to close the application ......");
        Console.Read();
    }
}
代码段1:用于在客户端演示SocketPro MySQL SQL流系统的主要功能

启动一个套接字池:上面的代码片段1启动了一个套接字池,该套接字池仅具有一个工作线程,该线程仅托管一个非阻塞套接字,以通过使用连接上下文的一个实例进行演示。但是,请注意,如有必要,您可以在一个客户端应用程序中创建多个池。然后,我们得到一个异步MySQL处理程序。

打开数据库:我们可以发送打开MySQL服务器数据库的请求。如果第一个输入为空或null string如本示例所示,例如,我们正在为连接的用户打开一个默认数据库。如果您想打开指定的数据库,则只需提供一个非空的有效数据库名称string。另外,如果需要,您需要设置一个回调或Lambda表达式来跟踪从服务器端返回的错误消息。请注意,SocketPro仅支持客户端和服务器之间的异步数据传输,因此可以使用一个或多个回调来输入请求,以处理返回的数据。这与同步数据传输完全不同。另外,我们创建一个容器实例ra 用于接收即将到来的查询中的所有记录集。

流式处理SQL语句:请记住,SocketPro可以通过设计毫不费力地在一个非阻塞套接字会话上流式传输所有类型的任意数量的请求。当然,我们可以轻松地流式传输所有其他SQL语句。所有SocketPro SQL流服务均支持此独特功能,以实现最佳网络效率,这将显着提高数据访问性能。据我们所知,您无法从其他技术中找到如此出色的功能。如果找到一个,请告诉我们。像普通的数据库访问API一样,SocketProSQL流技术也支持手动事务,如上一篇文章中所示。

等待直到所有处理完毕:由于SocketPro仅支持异步数据传输,因此SocketPro必须有一种方法可以等待所有请求和返回结果发送,处理和返回。SocketPro确实WaitAll在客户端使用此方法来实现此目的。如果愿意,可以使用此方法将所有异步请求转换为同步请求。

TestCreateTables,TestPreparedStatement和InsertBLOBByPreparedStatement

上面的代码片段有三个函数调用,TestCreateTablesTestPreparedStatementsInsertBLOBByPreparedStatement,但我们不希望再重新解释他们,因为他们是真正同那些中以前的文章。让我们集中精力执行带有输入输出和输出参数的MySQL存储过程。

TestStoredProcedure

MySQL完全支持存储过程。SocketPro SQL流技术也是如此。此外,SocketPro SQL流技术支持在一次调用中执行具有输入输出和输出参数的多组MySQL存储过程,如上面的代码片段1所示。下面的代码片段2显示了如何调用可能具有以下功能的MySQL存储过程:输入,输入/输出和输出参数,并返回多组记录。

static void TestStoredProcedure(CMysql mysql, List<KeyValuePair<CDBColumnInfoArray, 
                                CDBVariantArray>> ra, CDBVariantArray vPData) {
    bool ok = mysql.Prepare("call sp_TestProc(?,?,?)", dr);
    CMysql.DRows r = (handler, rowData) => {
        //rowset data come here if available
        int last = ra.Count - 1;
        KeyValuePair<CDBColumnInfoArray, CDBVariantArray> item = ra[last];
        item.Value.AddRange(rowData); //populate record data into receiving container ra
    };
    CMysql.DRowsetHeader rh = (handler) => {
        //rowset header comes here if available
        KeyValuePair<CDBColumnInfoArray, CDBVariantArray> item = 
        new KeyValuePair<CDBColumnInfoArray, CDBVariantArray>(handler.ColumnInfo, new CDBVariantArray());
        ra.Add(item); //populate query column meta into receiving container ra
    };
    ok = mysql.Execute(vPData, er, r, rh);
}
代码段2:调用MySQL存储过程,该存储过程返回多组记录和输出参数

如上面的代码片段2所示,通过SocketPro SQL-stream技术最终调用存储过程非常简单。请注意,所有输出参数数据都将直接复制到传递参数数据数组中vPDatarh当记录集元数据可用时,将调用该回调。每当记录数据数组出现时,r都会调用回调。您可以填充所有查询的meta并将数据记录到任意容器中ra,例如,通过两个回调。

绩效研究

SocketPro SQL流技术在查询和更新的数据库数据访问中均具有出色的性能。您可以在socketpro / stream_sql / mysql / DBPerf /中看到两个MySQL性能测试项目(cppperfnetperf)。第一个示例由C ++编写,另一个由C#编写。提供了C#的示例项目编写器,以将SocketPro SQL流技术与MySQL .NET提供程序的性能进行比较。mysqlperf

请参见下面的图3的性能研究数据,该数据是从三个带有固态驱动器的廉价Google Cloud虚拟机中获得的,可以免费评估。所有数据都是执行10,000个查询和50,000个插入操作所需的时间(以毫秒为单位)。性能研究还集中在网络延迟对MySQL访问速度的影响上。

图片3

图3:在三个廉价的Google云虚拟机上的SocketPro SQL-stream技术的MySQL流性能研究数据

我们的性能研究表明,很容易以每秒6,500(10,000 / 1.54)次和套接字连接的速度执行查询。对于插入记录,对于局域网(LAN,跨计算机,0.2 ms / 2.0 Gbps)上的MySQL,您可以轻松获得每秒43,000(50,000 / 1.17)次插入的速度。在LAN上,与传统的非流查询方法(SocketPro + Sync)相比,SocketPro流可以将性能提高150%。对于SQL插入,改进将超过7倍(10,400 / 1,170 = 8.9)。SocketPro流和内联批处理功能使网络效率极高,与现有的MySQL套接字通信方法相比,这带来了显着改善。

让我们考虑一下广域网(WAN,跨区域,34 ms / 40 Mbps)。SocketPro SQL流查询的速度可能是每秒5,000(10,000 / 2.00)次,并且是套接字连接。对于插入记录,速度可以轻松达到每秒17,600条记录(50,000 / 2.84)。相反,如果客户端由于高延迟而使用传统的通信方式(SocketPro + Sync / MySQL.NET Provider)进行数据库访问,则在WAN上的查询速度将低至每秒30个查询,如上图8所示。SocketPro SQL假设在高延迟WAN(跨区域)上数据库后端处理时间可忽略,则流查询速度比非流技术快170倍(349000/2000 = 174.5)。如果考虑使用SQL插入,则改进可能超过600倍(1,726,000 / 2840 = 607)。

在分析了图3中的性能数据之后,您会发现SocketPro流技术确实非常适用于加速本地访问和远程数据库访问。其次,如果测试WAN具有更好的网络带宽,则WAN的性能数据会更好。此外,SocketPro支持内联压缩,但是此测试研究未使用它。如果使用SocketPro在线压缩功能,则其流测试数据将在WAN上得到进一步改善。最后,仅在具有一个或两个CPU的廉价虚拟机上完成性能研究。如果使用专用机器进行测试,性能数据会更好。

与故障自动恢复并行执行SQL

并行计算:在研究了前面的两个简单示例之后,是时候在套接字propro / samples / auto_recovery /(test_cplusplus | test_java | test_python | test_sharp)目录下研究第三个示例了。SocketPro是从底部创建的,以支持并行计算。您可以将多个SQL语句分发到不同的后端数据库上以进行并行处理。此功能旨在提高应用程序的可伸缩性,如下面的代码片段3所示。

using System;
using SocketProAdapter;
using SocketProAdapter.ClientSide;
class Program {
    static void Main(string[] args) {
        const int sessions_per_host = 2;
        string[] vHost = { "localhost", "192.168.2.172" };
        const int cycles = 10000;
        using (CSocketPool<CMysql> sp = new CSocketPool<CMysql>()) {
            //set a local message queue to backup requests for auto fault recovery
            sp.QueueName = "ar_sharp";
            
            //one thread enough
            CConnectionContext[,] ppCc = new CConnectionContext[1, vHost.Length * sessions_per_host];
            for (int n = 0; n < vHost.Length; ++n) {
                for (int j = 0; j < sessions_per_host; ++j) {
                    ppCc[0, n * sessions_per_host + j] = 
                         new CConnectionContext(vHost[n], 20902, "root", "Smash123");
                }
            }
            bool ok = sp.StartSocketPool(ppCc);
            if (!ok) {
                Console.WriteLine("No connection and press any key to close the application ......");
                Console.Read(); return;
            }
            string sql = "SELECT max(amount), min(amount), avg(amount) FROM payment";
            Console.WriteLine("Input a filter for payment_id"); string filter = Console.ReadLine();
            if (filter.Length > 0) sql += (" WHERE " + filter); var v = sp.AsyncHandlers;
            foreach (var h in v) {
                ok = h.Open("sakila", (hsqlite, res, errMsg) => {
                    if (res != 0) Console.WriteLine("Error code: {0}, error message: {1}", res, errMsg);
                });
            }
            int returned = 0;
            double dmax = 0.0, dmin = 0.0, davg = 0.0;
            SocketProAdapter.UDB.CDBVariantArray row = new SocketProAdapter.UDB.CDBVariantArray();
            CAsyncDBHandler.DExecuteResult er = (h, res, errMsg, affected, fail_ok, lastId) => {
                if (res != 0)
                    Console.WriteLine("Error code: {0}, error message: {1}", res, errMsg);
                else {
                    dmax += double.Parse(row[0].ToString());
                    dmin += double.Parse(row[1].ToString());
                    davg += double.Parse(row[2].ToString());
                }
                ++returned;
            };
            CAsyncDBHandler.DRows r = (h, vData) => {
                row.Clear(); row.AddRange(vData);
            };
            CMysql mysql = sp.SeekByQueue(); //get one handler for querying one record
            ok = mysql.Execute(sql, er, r);
            ok = mysql.WaitAll();
            Console.WriteLine("Result: max = {0}, min = {1}, avg = {2}", dmax, dmin, davg);
            returned = 0; dmax = 0.0; dmin = 0.0; davg = 0.0;
            Console.WriteLine("Going to get {0} queries for max, min and avg", cycles);
            for (int n = 0; n < cycles; ++n) {
                mysql = sp.SeekByQueue();
                ok = mysql.Execute(sql, er, r);
            }
            foreach (var h in v) {
                ok = h.WaitAll();
            }
            Console.WriteLine("Returned = {0}, max = {1}, min = {2}, avg = {3}", 
                               returned, dmax, dmin, davg);
            Console.WriteLine("Press any key to close the application ......"); Console.Read();
        }
    }
}
代码段3:演示SocketPro并行计算和故障自动恢复功能

如上面的代码片段3所示,我们可以将多个非阻塞套接字启动到不同的机器(localhost, 192.168.2.172),并且两个数据库机器中的每一个都连接了两个套接字。该代码sakila为每个连接(foreach (var h in v) {......})打开一个默认数据库。首先,代码SELECT max(amount), min(amount), avg(amount) FROM payment …对一条记录执行一个查询’ ‘。最后,代码将查询发送10,000次到两台计算机上以进行并行处理(for (int n = 0; n < cycles; ++n) {......})。每个记录将在Lambda表达式(CAsyncDBHandler.DExecuteResult er = (h, res, errMsg, affected, fail_ok, lastId) => {......};)中求和,作为方法的回调Execute。注意,您可以为不同机器上托管的不同服务创建多个池。如您所见,SocketPro套接字池可用于显着提高应用程序可伸缩性。

自动故障恢复:SocketPro可以在本地打开文件,并将所有请求数据保存到该文件中,然后再通过网络将这些请求发送到服务器上。该文件称为本地消息队列或客户端消息队列。备份所有自动故障恢复请求很简单。要使用此功能,必须设置本地消息队列名称(sp.QueueName = "ar_sharp";),如上面的代码片段3所示。在开发实际应用程序时,通常会编写很多代码来正确处理各种通信错误。实际上,这通常是对软件开发人员的挑战。SocketPro客户端消息队列使通信错误处理非常简单。假设由于诸如机器关机,未处理的异常,软件/硬件维护和网络拔出等原因之一而无法访问机器192.168.2.172,则将立即或稍后通知套接字关闭事件。一旦套接字池发现套接字已关闭,SocketPro就会自动将与该套接字连接关联的所有请求合并到另一个尚未关闭的套接字中,以进行处理。

为了验证此功能,您可以在执行上述查询期间残酷地关闭其中一台MySQL服务器,并查看最终结果是否正确。

注意,UDAParts已将此功能应用于所有SocketPro SQL流服务,异步持久消息队列服务和远程文件交换服务,以简化开发。

兴趣点

最后,SocketPro MySQL SQL流插件完全不支持游标,但是它提供了所有必需的基本客户端/服务器数据库功能。此外,SQL流插件确实具有以下独特功能。

  1. 连续的内联请求/结果批处理和实时SQL流处理,以实现最佳的网络效率,尤其是在WAN上
  2. 默认情况下,客户端和服务器之间双向异步数据传输,但是如果需要,所有异步请求都可以转换为同步请求
  3. 强大的SocketPro通信体系结构带来卓越的性能和可扩展性
  4. 用于表更新,插入和删除的实时缓存,如示例项目所示,test_cache位于目录socketpro / stream_sql / mysql / test_cache
  5. 通过在客户端执行class 方法可以取消所有请求CancelCClientSocket
  6. Windows和Linux均受支持
  7. 所有支持的开发语言的简单开发
  8. 客户端和服务器组件都是线程安全的。它们可以在您的多线程应用程序中轻松重用,而与线程相关的问题要少得多
  9. 所有请求都可以在客户端备份,并在服务器因任何原因停机时自动重新发送到另一台服务器进行处理- 故障自动恢复

历史

  • 2017年9月20日==>初始版本
  • 2018年2月28日==>添加两个新部分,即性能研究与故障自动恢复并行执行SQL
  • 2018年5月27日==>更新MySQL服务器SQL流插件以支持MySQL 8.0.11或更高版本

相关推荐

咨询软件
 
QQ在线咨询
售前咨询热线
QQ1922638