本文介绍: 项目业务数据量比较大,每类业务表都达到千万级别,虽然做了分库分表,每张表数据控制在300W以下,但是效率还是达不到要求,为了提高查询效率,打算使用ES进行数查询

1. 背景

项目中业务数据量比较大,每类业务表都达到千万级别,虽然做了分库分表,每张表数据控制在300W以下,但是效率还是达不到要求,为了提高查询效率,打算使用ES进行数查询

2. 同步原理

优点: 可以完全和业务代码解耦,增量日志订阅

缺点:实时性不高,订阅mysql日志,DB中数据事务成功后,开始同步canal。

3. 环境信息

名称 版本
MySQL 8.0.35
elasticsearch 7.17.9
canal 1.1.7
jdk 1.8

本文以MySQL中的表ibds2.proof_data_history_202311为例进行数据同步实验

4. 部署配置

本文的的重点是讲述canal的配置使用,不再详细描述MySQL与ES的部署配置

4.1. MySQL

MySQL的部署与配置相对较简单需要主要的是需要开启binlog,且配置binlogformatROW模式。此处不再做详细描述

4.2. Elasticsearch

ES的部署与配置比较简单,此处不再做详细描述

建议安装版本分词器插件

4.3. canal

版本下载地址

deployer:

https://download.fastgit.org/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz

adapter:

https://download.fastgit.org/alibaba/canal/releases/download/canal-1.1.7/canal.adapter-1.1.7.tar.gz

4.3.1. deployer
4.3.1.1. 配置

canal.deployer-1.1.7.tar.gz上传/data/canal/Server,并进行解压

cd /data/canal/Server
tar -zxvf canal.deployer-1.1.7.tar.gz

修改配置文件conf/example/instance.properties,按如下配置即可,主要是修改数据库相关配置:

## mysql serverId , v1.0.26+ will autoGen 不要与MySQL的id重复
canal.instance.mysql.slaveId=10
​
# enable gtid use true/false 是否使用gtid
canal.instance.gtidon=true
​
# position info MySQL数据库信息
canal.instance.master.address=192.168.10.101:3307
​
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=X87_5w2Anxp3
canal.instance.connectionCharset = UTF-8
​
# table regex 需要订阅binlog的表的过滤正则表达式
canal.instance.filter.regex=.*\..*
4.3.1.2. 启动

进入bin目录启动deployer:

cd /data/canal/Server/bin
./startup.sh
4.3.2. adapter

adapter的配置中,一定要先在ES中创建索引然后启动adapter。

4.3.2.1. 配置

canal.adapter-1.1.7.tar.gz上传/data/canal/Adapter,并进行解压

cd /data/canal/Adapter
tar -zxvf canal.adapter-1.1.7.tar.gz

修改配置文件conf/application.yml,按如下配置即可,主要是修改canal-server配置、数据源配置和客户端适配器配置:

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
​
canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: -1
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 192.168.10.101:11111 # 需要修改
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:
​
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://192.168.10.101:3307/ibds2?useUnicode=true   # 需要修改
      username: root    # 需要修改
      password: X87_5w2Anxp3    # 需要修改
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#          druid.stat.enable: false
#          druid.stat.slowSqlMillis: 1000
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
      - name: es7   # 需要修改
        hosts: 192.168.10.101:9300 # 127.0.0.1:9200 for rest mode # 需要修改
        properties:
          mode: transport # or rest
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: my-es   # 需要修改
#      - name: kudu
#        key: kudu
#        properties:
#          kudu.master.address: 127.0.0.1 # ',' split multi address
#      - name: phoenix
#        key: phoenix
#        properties:
#          jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver
#          jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db
#          jdbc.username:
#          jdbc.password:

配置重点

添加配置文件canal-adapter/conf/es7/proof_202311.yml用于配置MySQL中的表与Elasticsearch中索引的映射关系:

dataSourceKey: defaultDS    # 与application.yml中的srcDataSources对应
destination: example    # canal的instance
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
  _index: proof_data_history_202311 # 要在es中创建的索引名称
  _type: _doc
  _id: id   # 与sql中的id一致
  upsert: true
  sql: "SELECT
        t.id as id,
        t.biz_id as biz_id,
        t.biz_type as biz_type,
        t.encrypt_mode as encrypt_mode,
        t.sign_user as sign_user,
        t.tx_hash as tx_hash,
        t.from_addr as from_addr,
        t.data as data,
        t.channel_id as channel_id,
        t.block_height as block_height,
        t.tx_time as tx_time
        FROM
        proof_data_history_202311 t"
  commitBatch: 3000
4.3.2.2. 创建索引

根据MySQL的表结构信息,在ES中创建对应的索引信息

在ES中创建索引的方法有多种,推荐两种方法

  • kinbana:

PUT /proof_data_history_202311
  {
    "mappings":{
        "properties":{
          "id": {
            "type": "long"
          },
          "biz_id": {
            "type": "text"
          },
          "biz_type": {
            "type": "long"
          },
          "encrypt_mode": {
            "type": "long"
          },
          "sign_user": {
            "type": "text"
          },
          "tx_hash": {
            "type": "text"
          },
          "from_addr": {
            "type": "text"
          },
          "data": {
            "type": "text"
          },
          "channel_id": {
            "type": "text"
          },
          "block_height": {
            "type": "long"
          },
          "tx_time": {
            "type": "date"
        }
      }
    }
  }
curl -XPUT "http://192.168.10.101:9200/proof_data_history_202311" -H 'Content-Type: application/json' -d'
{
    "mappings":{
        "properties":{
          "id": {
            "type": "long"
          },
          "biz_id": {
            "type": "text"
          },
          "biz_type": {
            "type": "long"
          },
          "encrypt_mode": {
            "type": "long"
          },
          "sign_user": {
            "type": "text"
          },
          "tx_hash": {
            "type": "text"
          },
          "from_addr": {
            "type": "text"
          },
          "data": {
            "type": "text"
          },
          "channel_id": {
            "type": "text"
          },
          "block_height": {
            "type": "long"
          },
          "tx_time": {
            "type": "date"
        }
      }
    }
  }'
4.3.2.3. 启动adapter
cd /data/canal/Adapter/bin
./startup.sh
4.3.3. 数据初始化

如果mysql中有数据就需要调用一次全量同步,如果mysql没数据,或者数据没用,就不需要调用步骤

canal-adapter提供一个REST接口全量同步数据到ES:

# 全量同步proof_data_history_202311表数据到es的proof_data_history_202311索引中,路径中的es7是上一步的文件夹curl  -X POST http://192.168.10.101:8081/etl/es7/proof_202311.yml

以120万行实验数据为例,分别进行ES中没有数据和已有数据的两种不同情况下的全量同步实验。

ES中没有数据,同步时间86秒:

2023年 11月 28日 星期二 09:17:55 CST开始执行全量同步
{"succeeded":true,"resultMessage":"导入ES 数据:1200013 条"}
2023年 11月 28日 星期二 09:19:21 CST完成执行全量同步

ES中已有数据,同步时间87秒:

2023年 11月 28日 星期二 09:27:46 CST开始执行全量同步
{"succeeded":true,"resultMessage":"导入ES 数据:1200013 条"}
2023年 11月 28日 星期二 09:29:03 CST完成执行全量同步

5. 补充资料

在实际生产中,server和adapter往往部署在不同的网络区域这里就涉及到网络安全访问策略问题,下面梳理出了需要的网络安全访问策略

源端 目标端
Adapter端IP 数据源端MySQL地址端口
Adapter端IP 数据目的地ES地址和端口(9200、9300)
Adapter端IP Server端地址和11110、11111、11112端口
Server端IP 数据源端MySQL地址和端口

原文地址:https://blog.csdn.net/m0_38004228/article/details/134688017

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。

如若转载,请注明出处:http://www.7code.cn/show_10529.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注