开源数据同步工具DataX

发布时间:2025-12-09 16:02:09 浏览次数:4

1. DataX

1.1. 产品特性

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。

官方提供的datax框架图:

最终把不同数据源和目标源组成的网状结构,变成了星型结构:

1.2. 支持场景

DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图,详情请点击:DataX数据源参考指南

类型数据源Reader(读)Writer(写)文档
RDBMS 关系型数据库MySQL读 、写
Oracle读 、写
SQLServer读 、写
PostgreSQL读 、写
DRDS读 、写
通用RDBMS(支持所有关系型数据库)读 、写
阿里云数仓数据存储ODPS读 、写
ADS
OSS读 、写
OCS读 、写
NoSQL数据存储OTS读 、写
Hbase0.94读 、写
Hbase1.1读 、写
Phoenix4.x读 、写
Phoenix5.x读 、写
MongoDB读 、写
Hive读 、写
Cassandra读 、写
无结构化数据存储TxtFile读 、写
FTP读 、写
HDFS读 、写
Elasticsearch
时间序列数据库OpenTSDB
TSDB读 、写

1.3. 迁移场景解决方案

1.3.1. 迁移测试场景

当前测试均选用阿里云RDS Mysql5.6作为测试源端以及目标端资源,DataX数据源可支持范围很大,具体支持场景可以参考本文档3段落部分

1.3.1.1. 待迁移源端数据库

数据库类型数据源数据库版本资源大小部署方式
关系型数据库阿里云RDSMysql 5.62C-4G-50G单主库

1.3.1.2. 待同步目标端数据库

数据库类型数据源数据库版本资源大小部署方式
关系型数据库阿里云RDSMysql 5.62C-4G-50G单主库

1.3.1.3. 迁移程序

  • 环境要求:linux(windows也可以)、JDK1.8级以上、 python 2.x
  • 本次测试环境:CentOS7.5、JDK1.8、Python2.7.5

1.3.2. 安装部署

部署两种方式

  • 使用官方编译好的 工具 包(datax.tar.gz),解压即用
  • 下载Datax源码,使用Maven进行编译。编译时间会有点长。

本次测试使用官方编译好的工具包(datax.tar.gz)下载并解压后使用

1.3.2.1. 安装JDK环境

yum install -y java-1.8.0 # 使用默认的CentOS7.5 Yum源即可

1.3.2.2. 安装apache-maven环境

yum install -y maven # 使用默认的CentOS7.5 Yum源即可

1.3.2.3. 安装Python环境

CentOS7.5 操作系统默认自带python 2.7.5,无需进行安装,可以直接进行使用

1.3.2.4. 下载DataX安装包

DataX安装包链接:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

curl -O http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

1.3.2.5. 解压缩datax程序包

cd /root/ # 登陆到datax.tar.gz的下载目录tar -zxvf datax.tar.gz -C /root/

1.3.3. 迁移使用

1.3.3.1. 生成迁移数据源样例模版

按照迁移需求使用DataX指令生成迁移数据源配置样例模版,然后可以根据数据源样例模版进行修改保存使用
执行以下指令生成数据源json文件,此自动生成的配置样例只是模版,还需要将输出json文件内容按照提示保存后进行修改(其他异构数据源模版,按照需求自主生成)

python /root/datax/bin/datax.py -r mysqlreader -w mysqlwriter DataX (DATAX-OPENSOURCE-3.0), From Alibaba !Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.Please refer to the mysqlreader document:https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.mdPlease refer to the mysqlwriter document:https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.mdPlease save the following configuration as a json file and usepython {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.jsonto run the job.# 复制以下json内容并保存成xxxx.json文件{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": [],"connection": [{"jdbcUrl": [],"table": []}],"password": "","username": "","where": ""}},"writer": {"name": "mysqlwriter","parameter": {"column": [],"connection": [{"jdbcUrl": "","table": []}],"password": "","preSql": [],"session": [],"username": "","writeMode": ""}}}],"setting": {"speed": {"channel": ""}}}}

1.3.3.2. 修改数据源样例模版

此处按照我们的测试样例,源为mysql,目标也为mysql,mysql2mysql.json文件样例内容如下:

{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": ["*"],"connection": [{"jdbcUrl": ["jdbc:mysql://39.103.21.61:3306/coredb"],"table": ["seepcore_table"]}],"password": "Abc999@1","username": "xuxingzhuang",}},"writer": {"name": "mysqlwriter","parameter": {"column": ["*"],"connection": [{"jdbcUrl": "jdbc:mysql://106.15.31.131:3306/coredb","table": ["seepcore_table"]}],"password": "Abc999@1","preSql": [],"session": [],"username": "xuxingzhuang","writeMode": "insert"}}}],"setting": {"speed": {"channel": "2"}}}}

注意⚠️:以上内容,要确保启动datax实例可以有权限访问远程数据库,源和目标库相关配置信息已经创建完成,具体参数详细配置信息,参考以下内容

配置样例mysqlreader参数详细说明,查看链接:DataX MysqlReader
配置样例mysqlwriter参数详细说明,查看链接:DataX MysqlWriter

1.3.3.3. 启动DataX程序

python /root/datax/bin/datax.py /root/datax/job/mysql2mysql.json
  • 程序执行输出LOG
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.2020-07-16 06:25:32.889 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl2020-07-16 06:25:32.902 [main] INFO Engine - the machine info =>osInfo:Oracle Corporation 1.8 25.252-b09jvmInfo:Linux amd64 3.10.0-862.3.2.el7.x86_64cpu num:4totalPhysicalMemory:-0.00GfreePhysicalMemory:-0.00GmaxFileDescriptorCount:-1currentOpenFileDescriptorCount:-1GC Names[PS MarkSweep, PS Scavenge]MEMORY_NAME | allocation_size | init_sizePS Eden Space | 256.00MB | 256.00MBCode Cache | 240.00MB | 2.44MBCompressed Class Space | 1,024.00MB | 0.00MBPS Survivor Space | 42.50MB | 42.50MBPS Old Gen | 683.00MB | 683.00MBMetaspace | -0.00MB | 0.00MB2020-07-16 06:25:32.927 [main] INFO Engine -{"content":[{"reader":{"name":"mysqlreader","parameter":{"column":["*"],"connection":[{"jdbcUrl":["jdbc:mysql://39.103.21.61:3306/coredb"],"table":["seepcore_table"]}],"password":"********","username":"xuxingzhuang"}},"writer":{"name":"mysqlwriter","parameter":{"column":["*"],"connection":[{"jdbcUrl":"jdbc:mysql://106.15.31.131:3306/coredb","table":["seepcore_table"]}],"password":"********","preSql":[],"session":[],"username":"xuxingzhuang","writeMode":"insert"}}}],"setting":{"speed":{"channel":"2"}}}2020-07-16 06:25:32.956 [main] WARN Engine - prioriy set to 0, because NumberFormatException, the value is: null2020-07-16 06:25:32.959 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=02020-07-16 06:25:32.959 [main] INFO JobContainer - DataX jobContainer starts job.2020-07-16 06:25:32.962 [main] INFO JobContainer - Set jobId = 02020-07-16 06:25:33.419 [job-0] INFO OriginalConfPretreatmentUtil - Available jdbcUrl:jdbc:mysql://39.103.21.61:3306/coredb?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true.2020-07-16 06:25:33.421 [job-0] WARN OriginalConfPretreatmentUtil - 您的配置文件中的列配置存在一定的风险. 因为您未配置读取数据库表的列,当您的表字段个数、类型有变动时,可能影响任务正确性甚至会运行出错。请检查您的配置并作出修改.2020-07-16 06:25:33.929 [job-0] INFO OriginalConfPretreatmentUtil - table:[seepcore_table] all columns:[id,date_time,line_3,line_4,line_10,line_11,line_12,line_13,line_14,line_15,line_16,line_17,line_18,line_19,line_20].2020-07-16 06:25:33.930 [job-0] WARN OriginalConfPretreatmentUtil - 您的配置文件中的列配置信息存在风险. 因为您配置的写入数据库表的列为*,当您的表字段个数、类型有变动时,可能影响任务正确性甚至会运行出错。请检查您的配置并作出修改.2020-07-16 06:25:33.933 [job-0] INFO OriginalConfPretreatmentUtil - Write data [insert INTO %s (id,date_time,line_3,line_4,line_10,line_11,line_12,line_13,line_14,line_15,line_16,line_17,line_18,line_19,line_20) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)], which jdbcUrl like:[jdbc:mysql://106.15.31.131:3306/coredb?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true]2020-07-16 06:25:33.935 [job-0] INFO JobContainer - jobContainer starts to do prepare ...2020-07-16 06:25:33.936 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do prepare work .2020-07-16 06:25:33.936 [job-0] INFO JobContainer - DataX Writer.Job [mysqlwriter] do prepare work .2020-07-16 06:25:33.938 [job-0] INFO JobContainer - jobContainer starts to do split ...2020-07-16 06:25:33.938 [job-0] INFO JobContainer - Job set Channel-Number to 2 channels.2020-07-16 06:25:33.946 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] splits to [1] tasks.2020-07-16 06:25:33.947 [job-0] INFO JobContainer - DataX Writer.Job [mysqlwriter] splits to [1] tasks.2020-07-16 06:25:33.978 [job-0] INFO JobContainer - jobContainer starts to do schedule ...2020-07-16 06:25:33.983 [job-0] INFO JobContainer - Scheduler starts [1] taskGroups.2020-07-16 06:25:33.986 [job-0] INFO JobContainer - Running by standalone Mode.2020-07-16 06:25:34.002 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.2020-07-16 06:25:34.010 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.2020-07-16 06:25:34.011 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.2020-07-16 06:25:34.023 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started2020-07-16 06:25:34.032 [0-0-0-reader] INFO CommonRdbmsReader$Task - Begin to read record by Sql: [select * from seepcore_table] jdbcUrl:[jdbc:mysql://39.103.21.61:3306/coredb?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].2020-07-16 06:25:44.018 [job-0] INFO StandAloneJobContainerCommunicator - Total 0 records, 0 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 0.00%2020-07-16 06:25:54.023 [job-0] INFO StandAloneJobContainerCommunicator - Total 109056 records, 23305935 bytes | Speed 2.22MB/s, 10905 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 8.666s | All Task WaitReaderTime 0.801s | Percentage 0.00%2020-07-16 06:26:04.026 [job-0] INFO StandAloneJobContainerCommunicator - Total 231936 records, 49667141 bytes | Speed 2.51MB/s, 12288 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 17.829s | All Task WaitReaderTime 1.541s | Percentage 0.00%2020-07-16 06:26:14.029 [job-0] INFO StandAloneJobContainerCommunicator - Total 354816 records, 75963461 bytes | Speed 2.51MB/s, 12288 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 26.983s | All Task WaitReaderTime 2.221s | Percentage 0.00%2020-07-16 06:26:24.032 [job-0] INFO StandAloneJobContainerCommunicator - Total 467456 records, 100068421 bytes | Speed 2.30MB/s, 11264 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 36.103s | All Task WaitReaderTime 2.919s | Percentage 0.00%2020-07-16 06:26:34.034 [job-0] INFO StandAloneJobContainerCommunicator - Total 584192 records, 125049925 bytes | Speed 2.38MB/s, 11673 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 45.179s | All Task WaitReaderTime 3.551s | Percentage 0.00%.....2020-07-16 06:30:34.113 [job-0] INFO VMInfo -[delta cpu info] =>curDeltaCpu | averageCpu | maxDeltaCpu | minDeltaCpu-1.00% | -1.00% | -1.00% | -1.00%[delta memory info] =>NAME | used_size | used_percent | max_used_size | max_percentPS Eden Space | 309.99MB | 95.09% | 309.99MB | 95.09%Code Cache | 6.02MB | 79.63% | 6.02MB | 79.63%Compressed Class Space | 1.81MB | 90.66% | 1.81MB | 90.66%PS Survivor Space | 5.53MB | 73.75% | 5.53MB | 73.75%PS Old Gen | 6.35MB | 0.93% | 6.35MB | 0.93%Metaspace | 18.57MB | 97.72% | 18.57MB | 97.72%[delta gc info] =>NAME | curDeltaGCCount | totalGCCount | maxDeltaGCCount | minDeltaGCCount | curDeltaGCTime | totalGCTime | maxDeltaGCTime | minDeltaGCTimePS MarkSweep | 0 | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s | 0.000sPS Scavenge | 100 | 100 | 100 | 100 | 0.854s | 0.854s | 0.854s | 0.854s2020-07-16 06:30:44.115 [job-0] INFO StandAloneJobContainerCommunicator - Total 3350592 records, 719410097 bytes | Speed 1.88MB/s, 9171 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 274.631s | All Task WaitReaderTime 19.540s | Percentage 0.00%.....2020-07-16 06:34:21.281 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql: [select * from seepcore_table] jdbcUrl:[jdbc:mysql://39.103.21.61:3306/coredb?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].2020-07-16 06:34:21.633 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[527612]ms2020-07-16 06:34:21.634 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks.2020-07-16 06:34:24.169 [job-0] INFO StandAloneJobContainerCommunicator - Total 5585206 records, 1197238078 bytes | Speed 3.49MB/s, 17183 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 484.287s | All Task WaitReaderTime 32.298s | Percentage 100.00%2020-07-16 06:34:24.169 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.2020-07-16 06:34:24.170 [job-0] INFO JobContainer - DataX Writer.Job [mysqlwriter] do post work.2020-07-16 06:34:24.171 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do post work.2020-07-16 06:34:24.172 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.2020-07-16 06:34:24.177 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /root/datax/hook2020-07-16 06:34:24.178 [job-0] INFO JobContainer -[total cpu info] =>averageCpu | maxDeltaCpu | minDeltaCpu-1.00% | -1.00% | -1.00%[total gc info] =>NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTimePS MarkSweep | 0 | 0 | 0 | 0.000s | 0.000s | 0.000sPS Scavenge | 166 | 100 | 66 | 1.335s | 0.854s | 0.481s2020-07-16 06:34:24.178 [job-0] INFO JobContainer - PerfTrace not enable!2020-07-16 06:34:24.178 [job-0] INFO StandAloneJobContainerCommunicator - Total 5585206 records, 1197238078 bytes | Speed 2.15MB/s, 10538 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 484.287s | All Task WaitReaderTime 32.298s | Percentage 100.00%2020-07-16 06:34:24.181 [job-0] INFO JobContainer -任务启动时刻 : 2020-07-16 06:25:32任务结束时刻 : 2020-07-16 06:34:24任务总计耗时 : 531s任务平均流量 : 2.15MB/s记录写入速度 : 10538rec/s读出记录总数 : 5585206读写失败总数 : 0

注意⚠️:当前测试为一张表进行迁移,如果有多张表,请按照需求进行配置多个json文件进行拷贝,datax实例可以cpu可以配置高一些,将json文件的speed.channel调整大一些,并发效果会更好一些

1.4. 迁移测试总结

1.4.1. DataX优势

  • DataX较适合跨数据库表级的数据一次性迁移。
  • 可跨异构数据库,支持多数据源

1.4.2. DataX缺点

  • 无法做增量数据同步,每一次同步都需要清空目标端表格资料。
  • 无法支持实时同步。
需要做网站?需要网络推广?欢迎咨询客户经理 13272073477