mysqlsmom(mysql 实时同步至elasticsearch)


1、安装mysqlsmom

个人建议用pyenv弄Python的虚拟环境 参考:http://www.65535.fun/article/2019/12/13/12.html 安装完环境后:

pip install mysqlsmom
# 我自己的elasticsearch 是7.0.1 的版本,所以
pip install --upgrade elasticsearch==7.0.1

2、全量同步

mkdir /usr/local/mysqlsmom
cd /usr/local/mysqlsmom
mom new v4_user_db/init_config.py -t init --force

cat /usr/local/mysqlsmom/v4_user_db/init.config.py

# coding=utf-8

STREAM = "INIT"

# 修改数据库连接
CONNECTION = {
    'host': '10.0.0.1',
    'port': 3306,
    'user': 'xxxxxxx',
    'passwd': 'xxxxxx'
}

# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
BULK_SIZE = 1

# 修改elasticsearch节点
NODES = [{"host": "10.0.0.2", "port": 9200}]

TASKS = [
    {
        "stream": {
            "database": "v4_user_db",  # 在此数据库执行sql语句
            "sql": "select * from login_log_202002",  # 将该sql语句选中的数据同步到 elasticsearch
            "pk": {"field": "log_id", "type": "int"}  # 当主键id的类型是字符串时
        },
        "jobs": [
            {
                "actions": ["insert", "update"],
                "pipeline": [
                    {"set_id": {"field": "log_id"}}  # 默认设置 id字段的值 为elasticsearch中的文档id,具体要看每张表id字段的名称。
                ],
                "dest": {
                    "es": {
                        "action": "upsert",
                        "index": "v4_user_db_login_log_202002",   # 设置 index
                        "type": "login_log",          # 设置 type
                        "nodes": NODES
                    }
                }
            }
        ]
    }
]

# CUSTOM_ROW_HANDLERS = "./my_handlers.py"
# CUSTOM_ROW_FILTERS = "./my_filters.py"
`
# 开始全量同步
cd /usr/local/mysqlsmom
mom run -c ./v4_user_db/init_config.py

1、增量同步同步

mom new v4_user_db/binlog_config.py -t binlog --force