侧边栏壁纸
  • 累计撰写 22 篇文章
  • 累计创建 10 个标签
  • 累计收到 1 条评论
隐藏侧边栏

基于Canal中间件同步MySQL数据到Elasticsearch(V7.6.2)

阿歪(iy)
2021-11-25 / 0 评论 / 0 点赞 / 1,208 阅读 / 4,503 字 / 正在检测是否收录...

基于canal中间件同步MySQL数据到Elasticsearch(V7.6.2)

  • 本文软件版本(不提版本都是耍流氓)

OS: MacOS

mysql(mac server): 8.0

canal: 1.1.5发布版

elasticsearch: 7.6.2

kibana: 7.6.2

1.遇到的一些坑:

  1. adapter 订阅不到 mysql 的日志
    • mysql是否开启了binlog
    • canal.adapter-1.1.5/lib 下默认是没有5.x的jar包
    • 阿里云的RDS我实操时候是通过的,但是睡一觉起来,数据又订阅不到了,就放弃了
  2. 启动adapter时报:com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
    • 下载v1.1.5-alpha-2 版本下的client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar 替换原client-adapter.es7x-1.1.5-jar-with-dependencies.jar
    • 不要信改ES配置 hosts: 127.0.0.1:9200 -> hosts: http://127.0.0.1:9200 改了会提示你 hostName not found!

2.撸起袖子开始干:

  • 开启mysql binlog
vi /etc/my.cnf
# 新增如下配置
log-bin = mysql-bin #开启binlog
binlog-format = ROW #选择row模式
server_id = 1 #配置mysql replication需要定义,不能和canal的slaveId重复

canal.deployer-1.1.5/conf/canal.properties 配置基本不用动

修改:/canal-1.1.5/canal.deployer-1.1.5/conf/example/instance.properties

canal.instance.master.address=127.0.0.1:3306 #数据库
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

canal.instance.dbUsername=root #订阅的数据库账号
canal.instance.dbPassword=root #订阅的数据库密码

# table regex
canal.instance.filter.regex=.*\\..* #表筛选
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
  • 配置canal-adapter客户端同步给ES的相关配置
    canal-1.1.5/canal.adapter-1.1.5/conf/application.yml
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger # 日志的
      - name: es7 #ES 相关
        hosts: 127.0.0.1:9200 # 不用填http://
        properties:
          mode: rest #  9200填写rest,transport:9300用
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: elasticsearch
  • 配置canal-adapter客户端ES和MySQL字段的映射
    canal-1.1.5/canal.adapter-1.1.5/conf/es7/demo.yml
dataSourceKey: defaultDS
destination: example
groupId: g1 #
esMapping:
  _index: demo_index
  _id: id #这个字段和sql语句的查询别名一致
#  upsert: true
#  pk: id
  sql: "select id as id,name from demo"
#  objFields:
#    _labels: array:;
  commitBatch: 3000
  • canal.deployer完整配置
    canal-1.1.5/canal.deployer-1.1.5/conf/example/instance.properties
# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true

# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*

# mq config
canal.mq.topic=example
canal.mq.partition=0
#################################################

  • canal-adapter服务端完整配置
    canal-1.1.5/canal.adapter-1.1.5/conf/application.yml
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:

  srcDataSources:
   defaultDS:
     url: jdbc:mysql://127.0.0.1:3306/数据库名?useUnicode=true
     username: root
     password: root
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7
        hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
        properties:
          mode: rest # or rest
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: elasticsearch

Last but not least : 再遇到异常那就上git 搜你的对应异常Issue,你踩得坑别人一定都踩过了!

0

评论