1、安装mysqlsmom
个人建议用pyenv弄Python的虚拟环境 参考:http://www.65535.fun/article/2019/12/13/12.html 安装完环境后:
pip install mysqlsmom
# 我自己的elasticsearch 是7.0.1 的版本,所以
pip install --upgrade elasticsearch==7.0.1
2、全量同步
mkdir /usr/local/mysqlsmom
cd /usr/local/mysqlsmom
mom new v4_user_db/init_config.py -t init --force
cat /usr/local/mysqlsmom/v4_user_db/init.config.py
# coding=utf-8
STREAM = "INIT"
# 修改数据库连接
CONNECTION = {
'host': '10.0.0.1',
'port': 3306,
'user': 'xxxxxxx',
'passwd': 'xxxxxx'
}
# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
BULK_SIZE = 1
# 修改elasticsearch节点
NODES = [{"host": "10.0.0.2", "port": 9200}]
TASKS = [
{
"stream": {
"database": "v4_user_db", # 在此数据库执行sql语句
"sql": "select * from login_log_202002", # 将该sql语句选中的数据同步到 elasticsearch
"pk": {"field": "log_id", "type": "int"} # 当主键id的类型是字符串时
},
"jobs": [
{
"actions": ["insert", "update"],
"pipeline": [
{"set_id": {"field": "log_id"}} # 默认设置 id字段的值 为elasticsearch中的文档id,具体要看每张表id字段的名称。
],
"dest": {
"es": {
"action": "upsert",
"index": "v4_user_db_login_log_202002", # 设置 index
"type": "login_log", # 设置 type
"nodes": NODES
}
}
}
]
}
]
# CUSTOM_ROW_HANDLERS = "./my_handlers.py"
# CUSTOM_ROW_FILTERS = "./my_filters.py"
`
# 开始全量同步
cd /usr/local/mysqlsmom
mom run -c ./v4_user_db/init_config.py
1、增量同步同步
mom new v4_user_db/binlog_config.py -t binlog --force