Skip to content

零代码同步方案

使用 DataXECNU-Datasync-Cli 实现零代码同步数据,使用crontab实现自动同步数据。

ecnu-datasync-cli 是一个开源的轻量级工具,用于华东师范大学数据开放平台的数据接口获取。

它提供了一种灵活、快速、方便的方式,允许在不用编写代码的情况下,将数据直接同步至 csv 或 xlsx 文件。

DataX 是阿里开源的离线数据同步工具/平台,实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS等各种异构数据源之间高效的数据同步功能。

crond 是linux下用来周期性地执行某种任务或等待处理某些事件的一个守护进程,使用该进程实现定时自动同步数据。

优势

无需代码编写,只需要编写相关配置文件,即可通过命令行命令实现同步数据至文件。

结合 datax 等第三方 ETL 工具,可以在无需编写代码的情况下实现各种同步任务。

命令行导出工具

获取ecnu-datasync-cli安装程序可执行文件

  1. 获取可执行文件 下载 ecnu-datasync-cli 可执行文件

通过 ecnu-datasync-cli 同步数据到 csv 或 xlsx 文件

不使用配置文件进行同步

不使用配置文件进行同步时,直接在命令行中指定参数。

命令

请确保处于校园网环境或使用校园网VPN(win 环境下请自行添加 .exe 后缀)

shell
./ecnu-datasync-cli -c {client_id} -s {client_secret} -a {api_path} -o {output_file}

-o 的值既可以填写 csv 文件也可以填写 xlsx 文件,会自动根据文件的后缀名来判断导出的类型

例子

使用配置文件进行同步

您还可以使用指定配置文件的方式进行数据同步。

如果同时指定了 -config 和其他参数,那么会忽略其他参数,仅以配置文件中的配置进行接口调用。

命令
shell
./ecnu-datasync-cli -config {config_file}
例子
  1. 首先,在某目录 path_to_json 下创建一个示例配置文件 cfg.json ,并根据您的环境修改配置文件 cfg.json
json
    {
      "oauth2_config":{
        "client_id":"client_id",
        "client_secret":"client_secret",
        "base_url":"https://api.ecnu.edu.cn",
        "scopes":["ECNU-Basic"],
        "timeout":10,
        "debug":false
      },
      "api_config":{
        "api_path":"/api/v1/sync/fakewithts?ts=0",
        "page_size":2000
      },
      "output_file":"./test.csv"    
    }

配置文件中,output_file 的值既可以填写 csv 文件也可以填写 xlsx 文件

  1. 然后,使用配置文件 path_to_json/cfg.json 中的配置进行数据同步。
shell
   ./ecnu-datasync-cli -config path_to_json/cfg.json

使用 DataX 进行数据同步

获取 DataX 安装程序可执行文件

  1. 环境要求
  • Linux
  • JDK: 1.8+ 推荐1.8
  • Python: 2/3
  1. 下载 DataX 压缩包
    shell
    curl -O https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202309/datax.tar.gz
  2. 将压缩包解压
    shell
    tar -xzvf datax.tar.gz

通过 DataX 将CSV文件中的数据同步到数据库

基础使用

  1. 首先,确保数据库中建立了待导入的表,并设置主键(如果不设置主键,则重复执行同步任务时会数据重复)

  2. 然后,查看配置样例,以同步到 mysql 为例

    shell
    cd {YOUR_DATAX_HOME}/bin
    python datax.py -r txtfilereader -w mysqlwriter
  3. 其次,编写自己的的 json 配置文件 text2mysql.json

    json
    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "txtfilereader",
                        "parameter": {
                            "column": [
                               {
                                   "index":0,
                                   "type":"string"
                               },
                               {
                                   "index":1,
                                   "type":"string"
                               },
                               {
                                   "index":2,
                                   "type":"string"
                               },
                               {
                                   "index":3,
                                   "type":"string"
                               },
                               {
                                   "index":4,
                                   "type":"string"
                               },
                               {
                                   "index":5,
                                   "type":"string"
                               }
                            ],
                            "encoding": "UTF-8",
                            "fieldDelimiter": ",",
                            "path": ["{your_csv_path}"],
                                  "skipHeader": true
                        }
                    },
                    "writer": {
                        "name": "mysqlwriter",
                        "parameter": {
                            "column": ["col1","col2","col3","col4","col5","col6"],
                            "connection": [
                                {
                                    "jdbcUrl": "jdbc:mysql://{your_mysql_address}:{your_mysql_port}/{your_database_name}",
                                    "table": ["{your_table_name}"]
                                }
                            ],
                            "password": "{your_password}",
                            "preSql": [],
                            "session": [],
                            "username": "{your_mysql_username}",
                            "writeMode": "{your_write_mode}"
                        }
                    }
                }
            ],
            "setting": {
                "speed": {
                    "channel": 1
                }
            }
        }
    }

    配置文件中,mysql 支持 writeMode 设置,可选项为 insert / replace / update,分别对应写入数据到目标表时采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句

  4. 最后,启动 DataX

    shell
    cd {YOUR_DATAX_DIR_BIN}
    python datax.py path_to_json/text2mysql.json
  5. 同步结束,显示日志如下:

    ...
    2015-12-17 11:20:25.263 [job-0] INFO  JobContainer -
    任务启动时刻                    : 2015-12-17 11:20:15
    任务结束时刻                    : 2015-12-17 11:20:25
    任务总计耗时                    :                 10s
    任务平均流量                    :              205B/s
    记录写入速度                    :              5rec/s
    读出记录总数                    :                  50
    读写失败总数                    :                   0

增量同步

通过 DataX 同步数据时,并非所有的数据库都支持 update or insert 模式,如 Mysql 支持该模式,而 SqlServer、PostgreSql、Oracle 则不支持该模式,下面分是否支持两种情况介绍增量同步的方法。

DataX 支持 update or insert 模式的数据库增量同步

使用 DataX 增量同步到 mysql ,只需要将 基础使用 中的writeMode设置为 update 即可,其余步骤不变。

DataX 不支持 update or insert 模式的数据库增量同步

使用 DataX 增量同步到 SqlServer、PostgreSql 和 Oracle,不支持配置writeMode参数,可以通过两种方法实现目的。

方法一:

通过 presql 参数,向表更新数据前删除所有数据。步骤如下。

  1. 首先,确保数据库中建立了待导入的表,并设置主键(如果不设置主键,则重复执行同步任务时会数据重复)

  2. 然后,查看配置样例,以同步到 PostgreSql 为例

    shell
    cd {YOUR_DATAX_HOME}/bin
    python datax.py -r txtfilereader -w postgresqlwriter
  3. 其次,编写自己的的 json 配置文件 text2postgresql.json

    json
    {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "txtfilereader",
                        "parameter": {
                            "column": [
                               {
                                   "index":0,
                                   "type":"string"
                               },
                               {
                                   "index":1,
                                   "type":"string"
                               },
                               {
                                   "index":2,
                                   "type":"string"
                               }
                            ],
                            "encoding": "UTF-8",
                            "fieldDelimiter": ",",
                            "path": ["{your_csv_path}"],
                                  "skipHeader": true
                        }
                    },
                    "writer": {
                        "name": "postgresqlwriter",
                        "parameter": {
                            "column": ["col1","col2","col3"],
                            "connection": [
                                {
                                    "jdbcUrl": "jdbc:postgresql://{your_postgresql_address}:{your_postgresql_port}/{your_database_name}",
                                    "table": ["{your_table_name}"]
                                }
                            ],
                            "preSql": ["truncate table @table"],
                            "session": [],
                            "username": "{your_postgresql_username}",
                            "password": "{your_password}"
                        }
                    }
                }
            ],
            "setting": {
                "speed": {
                    "channel": 1
                }
            }
        }
    }

    preSql中的 @table 会自动转化为writer.connection.table中的值 preSql设置为["truncate table @table"],这会在导入数据前先删除参数中的表的所有数据,然后再进行导入,从而实现增量同步 但该方法不可避免地存在同步过程中表为空的短暂时期,如果想避免空表问题,请用方法二。

  4. 最后,启动 DataX

    shell
    cd {YOUR_DATAX_DIR_BIN}
    python datax.py path_to_json/text2mysql.json
方法二:

如果同步任务频率较高,不能够容忍同步过程中短暂的空表问题,那么此时应该将数据先同步到临时表,然后通过 postsql 参数从临时表向正式表更新数据。步骤如下。

  1. 首先,确保数据库中建立了待导入的表 {your_table_name} ,并设置主键,假设为 col1(如果不设置主键,则重复执行同步任务时会数据重复)
  2. 然后,在数据库中再建立临时表 {your_tmp_table} ,该临时表应和待导入表具有相同的表结构
  3. 然后,查看配置样例,以同步到 PostgreSql 为例
    shell
    cd {YOUR_DATAX_HOME}/bin
    python datax.py -r txtfilereader -w postgresqlwriter
  4. 其次,编写自己的的 json 配置文件
    • 同步到 PostgreSql 的 text2postgresql.json
      json
      {
          "job": {
              "content": [
                  {
                      "reader": {
                          "name": "txtfilereader",
                          "parameter": {
                              "column": [
                                 {
                                     "index":0,
                                     "type":"string"
                                 },
                                 {
                                     "index":1,
                                     "type":"string"
                                 },
                                 {
                                     "index":2,
                                     "type":"string"
                                 }
                              ],
                              "encoding": "UTF-8",
                              "fieldDelimiter": ",",
                              "path": ["{your_csv_path}"],
                                    "skipHeader": true
                          }
                      },
                      "writer": {
                          "name": "postgresqlwriter",
                          "parameter": {
                              "column": ["col1","col2","col3"],
                              "connection": [
                                  {
                                      "jdbcUrl": "jdbc:postgresql://{your_postgresql_address}:{your_postgresql_port}/{your_database_name}",
                                      "table": ["{your_tmp_table}"]
                                  }
                              ],
                              "preSql": ["truncate table @table"],
                              "postSql": [
                                 "INSERT INTO {your_table_name} (col1, col2, col3) SELECT s.col1, s.col2, s.col3 FROM @table s ON CONFLICT (col1) DO UPDATE SET col2 = EXCLUDED.col2, col3 = EXCLUDED.col3;"
                              ],
                              "session": [],
                              "username": "{your_postgresql_username}",
                              "password": "{your_password}"
                          }
                      }
                  }
              ],
              "setting": {
                  "speed": {
                      "channel": 1
                  }
              }
          }
      }
      • 同步到 Oracle 的 text2oracle.json
        json
           {
               "job": {
                   "content": [
                       {
                           "reader": {
                               "name": "txtfilereader",
                               "parameter": {
                                   "column": [
                                      {
                                          "index":0,
                                          "type":"string"
                                      },
                                      {
                                          "index":1,
                                          "type":"string"
                                      },
                                      {
                                          "index":2,
                                          "type":"string"
                                      }
                                   ],
                                   "encoding": "UTF-8",
                                   "fieldDelimiter": ",",
                                   "path": ["{your_csv_path}"],
                                         "skipHeader": true
                               }
                           },
                           "writer": {
                               "name": "oraclewriter",
                               "parameter": {
                                   "column": ["col1","col2","col3"],
                                   "connection": [
                                       {
                                           "jdbcUrl": "jdbc:oracle:thin:@{your_oracle_address}:{your_oracle_port}:{your_oracle_database_name}",
                                           "table": ["{your_tmp_table}"]
                                       }
                                   ],
                                   "password": "{your_password}",
                                   "preSql": ["truncate table @table"],
                                   "postSql": [
                                      "MERGE INTO {your_table_name} target USING @table source ON (target.col1 = source.col1) WHEN MATCHED THEN UPDATE SET target.col2 = source.col2, target.col3 = source.col3 WHEN NOT MATCHED THEN INSERT (col1, col2, col3) VALUES (source.col1, source.col2, source.col3);"
                                   ],
                                   "session": [],
                                   "username": "{your_postgresql_username}"
                               }
                           }
                       }
                   ],
                   "setting": {
                       "speed": {
                           "channel": 1
                       }
                   }
               }
           }
  • 同步到 SqlServer 的 text2sqlserver.json
    json
            {
                "job": {
                    "content": [
                        {
                            "reader": {
                                "name": "txtfilereader",
                                "parameter": {
                                    "column": [
                                       {
                                           "index":0,
                                           "type":"string"
                                       },
                                       {
                                           "index":1,
                                           "type":"string"
                                       },
                                       {
                                           "index":2,
                                           "type":"string"
                                       }
                                    ],
                                    "encoding": "UTF-8",
                                    "fieldDelimiter": ",",
                                    "path": ["{your_csv_path}"],
                                          "skipHeader": true
                                }
                            },
                            "writer": {
                                "name": "oraclewriter",
                                "parameter": {
                                    "column": ["col1","col2","col3"],
                                    "connection": [
                                        {
                                            "jdbcUrl": "jdbc:sqlserver:{your_sqlserver_address}:{your_sqlserver_port};DatabaseName={your_sqlserver_database_name}",
                                            "table": ["{your_tmp_table}"]
                                        }
                                    ],
                                    "preSql": ["truncate table @table"],
                                    "postSql": [
                                       "MERGE INTO {your_table_name} target USING @table source ON (target.col1 = source.col1) WHEN MATCHED THEN UPDATE SET target.col2 = source.col2, target.col3 = source.col3 WHEN NOT MATCHED THEN INSERT (col1, col2, col3) VALUES (source.col1, source.col2, source.col3);"
                                    ],
                                    "session": [],
                                    "username": "{your_sqlserver_username}",
                                    "password": "{your_password}"
                                }
                            }
                        }
                    ],
                    "setting": {
                        "speed": {
                            "channel": 1
                        }
                    }
                }
            }

    preSql 和 postSql 中的 @table 会自动转化为 writer.connection.table 中的值 preSql设置为["truncate table @table"],这会在导入数据前先删除临时表的所有数据,然后再进行导入 postSql中的语句实现从临时表到目标表的增量导入

  1. 最后,启动 DataX
    shell
    cd {YOUR_DATAX_DIR_BIN}
    python datax.py path_to_json/text2mysql.json

获取 crontab 安装程序可执行文件

  1. 环境要求
  • Linux
  1. 检查crontab是否可用

    绝大多数Linux机器中内置了crontab工具,使用以下命令检查是否可用:

    shell
    crontab -l

    如果 [输出 no crontab] 或 [显示当前任务列表] 或 [不报错] 则已安装

  2. 安装 crontab

    如果 crontab 已经可用,则不需要进行此步骤

    1. 使用以下命令安装 crontab
    shell
    # Centos
    yum -y install vixie-cron crontabs
    
    # Ubuntu
    apt-get install corn
    1. 使用以下命令启动、关闭、重启crond服务或查看服务状态
    shell
    # Centos
    service crond status
    service crond start
    service crond stop
    service crond restart
    
    # Ubuntu
    service cron status
    service cron start
    service cron stop
    service cron restart

使用 crond 实现定时自动同步数据任务

  1. 使用以下命令查看当前用户日程内容

    shell
    crontab -l
  2. 使用以下命令进入当前用户的日程文件编辑

    crontab -e
  3. 输入i进入编辑模式

  4. 添加一行日程

    如每周一到周五每天6点到18点,每隔1小时,使用 ecnu-datasync 和 datax 增量同步数据至 mysql 仓库。

    shell
    0 6-18/1 * * 1-5 {YOUR_PATH_TO_BIN}/ecnu-datasync -config path_to_json/cfg.json && python {YOUR_DATAX_DIR_BIN}/datax.py path_to_json/text2database.json

    text2database.json 中的输入文件应为 cfg.json 中的 output 指定的文件

    text2database.json 应使用增量更新方案的写法

    crondtab日程文件中不会自动引入环境变量,需要时请在日程文件开头自行设置环境变量

  5. 按 Esc 键,输入 :wq 并回车以保存并关闭文件

  6. 命令行提示以下内容代表日程创建成功,自动同步日程开始

    shell
    crontab: installing new crontab