1. 背景
项目中业务数据量比较大,每类业务表都达到千万级别,虽然做了分库分表,每张表数据控制在300W以下,但是效率还是达不到要求,为了提高查询效率,打算使用ES进行数据查询。
2. 同步原理
缺点:实时性不高,订阅mysql日志,DB中数据事务成功后,开始同步至canal。
3. 环境信息
名称 | 版本 |
---|---|
MySQL | 8.0.35 |
elasticsearch | 7.17.9 |
canal | 1.1.7 |
jdk | 1.8 |
本文以MySQL中的表ibds2.proof_data_history_202311为例进行数据同步实验。
4. 部署配置
本文的的重点是讲述canal的配置与使用,不再详细描述MySQL与ES的部署和配置。
4.1. MySQL
MySQL的部署与配置相对较简单,需要主要的是需要开启binlog,且配置binlog–format 为ROW
模式。此处不再做详细描述。
4.2. Elasticsearch
4.3. canal
deployer:
https://download.fastgit.org/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz
https://download.fastgit.org/alibaba/canal/releases/download/canal-1.1.7/canal.adapter-1.1.7.tar.gz
4.3.1. deployer
4.3.1.1. 配置
将canal.deployer-1.1.7.tar.gz
上传到/data/canal/Server
,并进行解压:
cd /data/canal/Server
tar -zxvf canal.deployer-1.1.7.tar.gz
修改配置文件conf/example/instance.properties,按如下配置即可,主要是修改数据库相关配置:
## mysql serverId , v1.0.26+ will autoGen 不要与MySQL的id重复
canal.instance.mysql.slaveId=10
# enable gtid use true/false 是否使用gtid
canal.instance.gtidon=true
# position info MySQL数据库信息
canal.instance.master.address=192.168.10.101:3307
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=X87_5w2Anxp3
canal.instance.connectionCharset = UTF-8
# table regex 需要订阅binlog的表的过滤正则表达式
canal.instance.filter.regex=.*\..*
4.3.1.2. 启动
cd /data/canal/Server/bin
./startup.sh
4.3.2. adapter
在adapter的配置中,一定要先在ES中创建索引,然后在启动adapter。
4.3.2.1. 配置
将canal.adapter-1.1.7.tar.gz
上传到/data/canal/Adapter
,并进行解压:
cd /data/canal/Adapter
tar -zxvf canal.adapter-1.1.7.tar.gz
修改配置文件conf/application.yml,按如下配置即可,主要是修改canal-server配置、数据源配置和客户端适配器配置:
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: -1
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 192.168.10.101:11111 # 需要修改
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:
srcDataSources:
defaultDS:
url: jdbc:mysql://192.168.10.101:3307/ibds2?useUnicode=true # 需要修改
username: root # 需要修改
password: X87_5w2Anxp3 # 需要修改
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# druid.stat.enable: false
# druid.stat.slowSqlMillis: 1000
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
- name: es7 # 需要修改
hosts: 192.168.10.101:9300 # 127.0.0.1:9200 for rest mode # 需要修改
properties:
mode: transport # or rest
# security.auth: test:123456 # only used for rest mode
cluster.name: my-es # 需要修改
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
# - name: phoenix
# key: phoenix
# properties:
# jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver
# jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db
# jdbc.username:
# jdbc.password:
配置重点
添加配置文件canal-adapter/conf/es7/proof_202311.yml,用于配置MySQL中的表与Elasticsearch中索引的映射关系:
dataSourceKey: defaultDS # 与application.yml中的srcDataSources对应
destination: example # canal的instance
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
_index: proof_data_history_202311 # 要在es中创建的索引名称
_type: _doc
_id: id # 与sql中的id一致
upsert: true
sql: "SELECT
t.id as id,
t.biz_id as biz_id,
t.biz_type as biz_type,
t.encrypt_mode as encrypt_mode,
t.sign_user as sign_user,
t.tx_hash as tx_hash,
t.from_addr as from_addr,
t.data as data,
t.channel_id as channel_id,
t.block_height as block_height,
t.tx_time as tx_time
FROM
proof_data_history_202311 t"
commitBatch: 3000
4.3.2.2. 创建索引
-
kinbana:
PUT /proof_data_history_202311
{
"mappings":{
"properties":{
"id": {
"type": "long"
},
"biz_id": {
"type": "text"
},
"biz_type": {
"type": "long"
},
"encrypt_mode": {
"type": "long"
},
"sign_user": {
"type": "text"
},
"tx_hash": {
"type": "text"
},
"from_addr": {
"type": "text"
},
"data": {
"type": "text"
},
"channel_id": {
"type": "text"
},
"block_height": {
"type": "long"
},
"tx_time": {
"type": "date"
}
}
}
}
-
命令行:
curl -XPUT "http://192.168.10.101:9200/proof_data_history_202311" -H 'Content-Type: application/json' -d'
{
"mappings":{
"properties":{
"id": {
"type": "long"
},
"biz_id": {
"type": "text"
},
"biz_type": {
"type": "long"
},
"encrypt_mode": {
"type": "long"
},
"sign_user": {
"type": "text"
},
"tx_hash": {
"type": "text"
},
"from_addr": {
"type": "text"
},
"data": {
"type": "text"
},
"channel_id": {
"type": "text"
},
"block_height": {
"type": "long"
},
"tx_time": {
"type": "date"
}
}
}
}'
4.3.2.3. 启动adapter
cd /data/canal/Adapter/bin
./startup.sh
4.3.3. 数据初始化
如果mysql中有数据就需要调用一次全量同步,如果mysql没数据,或者数据没用,就不需要调用此步骤。
canal-adapter提供一个REST接口可全量同步数据到ES:
# 全量同步proof_data_history_202311表数据到es的proof_data_history_202311索引中,路径中的es7是上一步的文件夹名
curl -X POST http://192.168.10.101:8081/etl/es7/proof_202311.yml
以120万行实验数据为例,分别进行ES中没有数据和已有数据的两种不同情况下的全量同步实验。
2023年 11月 28日 星期二 09:17:55 CST开始执行全量同步
{"succeeded":true,"resultMessage":"导入ES 数据:1200013 条"}
2023年 11月 28日 星期二 09:19:21 CST完成执行全量同步
ES中已有数据,同步时间87秒:
2023年 11月 28日 星期二 09:27:46 CST开始执行全量同步
{"succeeded":true,"resultMessage":"导入ES 数据:1200013 条"}
2023年 11月 28日 星期二 09:29:03 CST完成执行全量同步
5. 补充资料
在实际生产中,server和adapter往往部署在不同的网络区域,这里就涉及到网络安全访问策略的问题,下面梳理出了需要的网络安全访问策略。
源端 | 目标端 |
---|---|
Adapter端IP | 数据源端MySQL地址和端口 |
Adapter端IP | 数据目的地ES地址和端口(9200、9300) |
Adapter端IP | Server端地址和11110、11111、11112端口 |
Server端IP | 数据源端MySQL地址和端口 |
原文地址:https://blog.csdn.net/m0_38004228/article/details/134688017
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_10529.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!