零代码同步方案
使用 DataX 和 ECNU-Datasync-Cli 实现零代码同步数据,使用crontab实现自动同步数据。
ecnu-datasync-cli 是一个开源的轻量级工具,用于华东师范大学数据开放平台的数据接口获取。
它提供了一种灵活、快速、方便的方式,允许在不用编写代码的情况下,将数据直接同步至 csv 或 xlsx 文件。
DataX 是阿里开源的离线数据同步工具/平台,实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS等各种异构数据源之间高效的数据同步功能。
crond 是linux下用来周期性地执行某种任务或等待处理某些事件的一个守护进程,使用该进程实现定时自动同步数据。
优势
无需代码编写,只需要编写相关配置文件,即可通过命令行命令实现同步数据至文件。
结合 datax 等第三方 ETL 工具,可以在无需编写代码的情况下实现各种同步任务。
命令行导出工具
获取ecnu-datasync-cli安装程序可执行文件
- 获取可执行文件 下载 ecnu-datasync-cli 可执行文件
通过 ecnu-datasync-cli 同步数据到 csv 或 xlsx 文件
不使用配置文件进行同步
不使用配置文件进行同步时,直接在命令行中指定参数。
命令
请确保处于校园网环境或使用校园网VPN(win 环境下请自行添加 .exe 后缀)
./ecnu-datasync-cli -c {client_id} -s {client_secret} -a {api_path} -o {output_file}
-o 的值既可以填写 csv 文件也可以填写 xlsx 文件,会自动根据文件的后缀名来判断导出的类型
例子
使用示例密钥访问接口 https://api.ecnu.edu.cn/api/v1/sync/fakewithts?ts=0 ,同步数据到 path_to_csv/test.csv 文件
shell./ecnu-datasync-cli -c=123456 -s=abcdef -a='/api/v1/sync/fakewithts?ts=0' -o='path_to_csv/test.csv'
使用示例密钥访问接口 https://api.ecnu.edu.cn/api/v1/sync/fakewithts?ts=0 ,同步数据到 path_to_xlsx/test.xlsx 文件
shell./ecnu-datasync-cli -c=123456 -s=abcdef -a='/api/v1/sync/fakewithts?ts=0' -o='path_to_xlsx/test.xlsx'
使用配置文件进行同步
您还可以使用指定配置文件的方式进行数据同步。
如果同时指定了 -config 和其他参数,那么会忽略其他参数,仅以配置文件中的配置进行接口调用。
命令
./ecnu-datasync-cli -config {config_file}
例子
- 首先,在某目录 path_to_json 下创建一个示例配置文件 cfg.json ,并根据您的环境修改配置文件 cfg.json
{
"oauth2_config":{
"client_id":"client_id",
"client_secret":"client_secret",
"base_url":"https://api.ecnu.edu.cn",
"scopes":["ECNU-Basic"],
"timeout":10,
"debug":false
},
"api_config":{
"api_path":"/api/v1/sync/fakewithts?ts=0",
"page_size":2000
},
"output_file":"./test.csv"
}
配置文件中,output_file 的值既可以填写 csv 文件也可以填写 xlsx 文件
- 然后,使用配置文件 path_to_json/cfg.json 中的配置进行数据同步。
./ecnu-datasync-cli -config path_to_json/cfg.json
使用 DataX 进行数据同步
获取 DataX 安装程序可执行文件
- 环境要求
- Linux
- JDK: 1.8+ 推荐1.8
- Python: 2/3
- 下载 DataX 压缩包shell
curl -O https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202309/datax.tar.gz
- 将压缩包解压shell
tar -xzvf datax.tar.gz
通过 DataX 将CSV文件中的数据同步到数据库
基础使用
首先,确保数据库中建立了待导入的表,并设置主键(如果不设置主键,则重复执行同步任务时会数据重复)
然后,查看配置样例,以同步到 mysql 为例
shellcd {YOUR_DATAX_HOME}/bin python datax.py -r txtfilereader -w mysqlwriter
其次,编写自己的的 json 配置文件 text2mysql.json
json{ "job": { "content": [ { "reader": { "name": "txtfilereader", "parameter": { "column": [ { "index":0, "type":"string" }, { "index":1, "type":"string" }, { "index":2, "type":"string" }, { "index":3, "type":"string" }, { "index":4, "type":"string" }, { "index":5, "type":"string" } ], "encoding": "UTF-8", "fieldDelimiter": ",", "path": ["{your_csv_path}"], "skipHeader": true } }, "writer": { "name": "mysqlwriter", "parameter": { "column": ["col1","col2","col3","col4","col5","col6"], "connection": [ { "jdbcUrl": "jdbc:mysql://{your_mysql_address}:{your_mysql_port}/{your_database_name}", "table": ["{your_table_name}"] } ], "password": "{your_password}", "preSql": [], "session": [], "username": "{your_mysql_username}", "writeMode": "{your_write_mode}" } } } ], "setting": { "speed": { "channel": 1 } } } }
配置文件中,mysql 支持 writeMode 设置,可选项为 insert / replace / update,分别对应写入数据到目标表时采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句
最后,启动 DataX
shellcd {YOUR_DATAX_DIR_BIN} python datax.py path_to_json/text2mysql.json
同步结束,显示日志如下:
... 2015-12-17 11:20:25.263 [job-0] INFO JobContainer - 任务启动时刻 : 2015-12-17 11:20:15 任务结束时刻 : 2015-12-17 11:20:25 任务总计耗时 : 10s 任务平均流量 : 205B/s 记录写入速度 : 5rec/s 读出记录总数 : 50 读写失败总数 : 0
增量同步
通过 DataX 同步数据时,并非所有的数据库都支持 update or insert 模式,如 Mysql 支持该模式,而 SqlServer、PostgreSql、Oracle 则不支持该模式,下面分是否支持两种情况介绍增量同步的方法。
DataX 支持 update or insert 模式的数据库增量同步
使用 DataX 增量同步到 mysql ,只需要将 基础使用 中的writeMode设置为 update 即可,其余步骤不变。
DataX 不支持 update or insert 模式的数据库增量同步
使用 DataX 增量同步到 SqlServer、PostgreSql 和 Oracle,不支持配置writeMode参数,可以通过两种方法实现目的。
方法一:
通过 presql 参数,向表更新数据前删除所有数据。步骤如下。
首先,确保数据库中建立了待导入的表,并设置主键(如果不设置主键,则重复执行同步任务时会数据重复)
然后,查看配置样例,以同步到 PostgreSql 为例
shellcd {YOUR_DATAX_HOME}/bin python datax.py -r txtfilereader -w postgresqlwriter
其次,编写自己的的 json 配置文件 text2postgresql.json
json{ "job": { "content": [ { "reader": { "name": "txtfilereader", "parameter": { "column": [ { "index":0, "type":"string" }, { "index":1, "type":"string" }, { "index":2, "type":"string" } ], "encoding": "UTF-8", "fieldDelimiter": ",", "path": ["{your_csv_path}"], "skipHeader": true } }, "writer": { "name": "postgresqlwriter", "parameter": { "column": ["col1","col2","col3"], "connection": [ { "jdbcUrl": "jdbc:postgresql://{your_postgresql_address}:{your_postgresql_port}/{your_database_name}", "table": ["{your_table_name}"] } ], "preSql": ["truncate table @table"], "session": [], "username": "{your_postgresql_username}", "password": "{your_password}" } } } ], "setting": { "speed": { "channel": 1 } } } }
preSql中的 @table 会自动转化为writer.connection.table中的值 preSql设置为["truncate table @table"],这会在导入数据前先删除参数中的表的所有数据,然后再进行导入,从而实现增量同步 但该方法不可避免地存在同步过程中表为空的短暂时期,如果想避免空表问题,请用方法二。
最后,启动 DataX
shellcd {YOUR_DATAX_DIR_BIN} python datax.py path_to_json/text2mysql.json
方法二:
如果同步任务频率较高,不能够容忍同步过程中短暂的空表问题,那么此时应该将数据先同步到临时表,然后通过 postsql 参数从临时表向正式表更新数据。步骤如下。
- 首先,确保数据库中建立了待导入的表 {your_table_name} ,并设置主键,假设为 col1(如果不设置主键,则重复执行同步任务时会数据重复)
- 然后,在数据库中再建立临时表 {your_tmp_table} ,该临时表应和待导入表具有相同的表结构
- 然后,查看配置样例,以同步到 PostgreSql 为例shell
cd {YOUR_DATAX_HOME}/bin python datax.py -r txtfilereader -w postgresqlwriter
- 其次,编写自己的的 json 配置文件
- 同步到 PostgreSql 的 text2postgresql.jsonjson
{ "job": { "content": [ { "reader": { "name": "txtfilereader", "parameter": { "column": [ { "index":0, "type":"string" }, { "index":1, "type":"string" }, { "index":2, "type":"string" } ], "encoding": "UTF-8", "fieldDelimiter": ",", "path": ["{your_csv_path}"], "skipHeader": true } }, "writer": { "name": "postgresqlwriter", "parameter": { "column": ["col1","col2","col3"], "connection": [ { "jdbcUrl": "jdbc:postgresql://{your_postgresql_address}:{your_postgresql_port}/{your_database_name}", "table": ["{your_tmp_table}"] } ], "preSql": ["truncate table @table"], "postSql": [ "INSERT INTO {your_table_name} (col1, col2, col3) SELECT s.col1, s.col2, s.col3 FROM @table s ON CONFLICT (col1) DO UPDATE SET col2 = EXCLUDED.col2, col3 = EXCLUDED.col3;" ], "session": [], "username": "{your_postgresql_username}", "password": "{your_password}" } } } ], "setting": { "speed": { "channel": 1 } } } }
- 同步到 Oracle 的 text2oracle.jsonjson
{ "job": { "content": [ { "reader": { "name": "txtfilereader", "parameter": { "column": [ { "index":0, "type":"string" }, { "index":1, "type":"string" }, { "index":2, "type":"string" } ], "encoding": "UTF-8", "fieldDelimiter": ",", "path": ["{your_csv_path}"], "skipHeader": true } }, "writer": { "name": "oraclewriter", "parameter": { "column": ["col1","col2","col3"], "connection": [ { "jdbcUrl": "jdbc:oracle:thin:@{your_oracle_address}:{your_oracle_port}:{your_oracle_database_name}", "table": ["{your_tmp_table}"] } ], "password": "{your_password}", "preSql": ["truncate table @table"], "postSql": [ "MERGE INTO {your_table_name} target USING @table source ON (target.col1 = source.col1) WHEN MATCHED THEN UPDATE SET target.col2 = source.col2, target.col3 = source.col3 WHEN NOT MATCHED THEN INSERT (col1, col2, col3) VALUES (source.col1, source.col2, source.col3);" ], "session": [], "username": "{your_postgresql_username}" } } } ], "setting": { "speed": { "channel": 1 } } } }
- 同步到 Oracle 的 text2oracle.json
- 同步到 PostgreSql 的 text2postgresql.json
- 同步到 SqlServer 的 text2sqlserver.jsonjson
{ "job": { "content": [ { "reader": { "name": "txtfilereader", "parameter": { "column": [ { "index":0, "type":"string" }, { "index":1, "type":"string" }, { "index":2, "type":"string" } ], "encoding": "UTF-8", "fieldDelimiter": ",", "path": ["{your_csv_path}"], "skipHeader": true } }, "writer": { "name": "oraclewriter", "parameter": { "column": ["col1","col2","col3"], "connection": [ { "jdbcUrl": "jdbc:sqlserver:{your_sqlserver_address}:{your_sqlserver_port};DatabaseName={your_sqlserver_database_name}", "table": ["{your_tmp_table}"] } ], "preSql": ["truncate table @table"], "postSql": [ "MERGE INTO {your_table_name} target USING @table source ON (target.col1 = source.col1) WHEN MATCHED THEN UPDATE SET target.col2 = source.col2, target.col3 = source.col3 WHEN NOT MATCHED THEN INSERT (col1, col2, col3) VALUES (source.col1, source.col2, source.col3);" ], "session": [], "username": "{your_sqlserver_username}", "password": "{your_password}" } } } ], "setting": { "speed": { "channel": 1 } } } }
preSql 和 postSql 中的 @table 会自动转化为 writer.connection.table 中的值 preSql设置为["truncate table @table"],这会在导入数据前先删除临时表的所有数据,然后再进行导入 postSql中的语句实现从临时表到目标表的增量导入
- 最后,启动 DataXshell
cd {YOUR_DATAX_DIR_BIN} python datax.py path_to_json/text2mysql.json
获取 crontab 安装程序可执行文件
- 环境要求
- Linux
检查crontab是否可用
绝大多数Linux机器中内置了crontab工具,使用以下命令检查是否可用:
shellcrontab -l
如果 [输出 no crontab] 或 [显示当前任务列表] 或 [不报错] 则已安装
安装 crontab
如果 crontab 已经可用,则不需要进行此步骤
- 使用以下命令安装 crontab
shell# Centos yum -y install vixie-cron crontabs # Ubuntu apt-get install corn
- 使用以下命令启动、关闭、重启crond服务或查看服务状态
shell# Centos service crond status service crond start service crond stop service crond restart # Ubuntu service cron status service cron start service cron stop service cron restart
使用 crond 实现定时自动同步数据任务
使用以下命令查看当前用户日程内容
shellcrontab -l
使用以下命令进入当前用户的日程文件编辑
crontab -e
输入i进入编辑模式
添加一行日程
如每周一到周五每天6点到18点,每隔1小时,使用 ecnu-datasync 和 datax 增量同步数据至 mysql 仓库。
shell0 6-18/1 * * 1-5 {YOUR_PATH_TO_BIN}/ecnu-datasync -config path_to_json/cfg.json && python {YOUR_DATAX_DIR_BIN}/datax.py path_to_json/text2database.json
text2database.json 中的输入文件应为 cfg.json 中的 output 指定的文件
text2database.json 应使用增量更新方案的写法
crondtab日程文件中不会自动引入环境变量,需要时请在日程文件开头自行设置环境变量
按 Esc 键,输入 :wq 并回车以保存并关闭文件
命令行提示以下内容代表日程创建成功,自动同步日程开始
shellcrontab: installing new crontab