官方文档

https://github.com/alibaba/canal

使用场景

学习一件东西前,要知道为什么使用它。

1、同步mysql数据redis

常规情况下,产生数据方法可能很多地方,那么就需要多个地方中,都去做mysql数据同步到redis处理相对麻烦很多。
可以使用canal,对mysql进行集中统一处理

概述

canal [kə’næl],译意为水道/管道/沟渠,主要用途基于 MySQL 数据库增量日志解析,提供增量数据订阅消费
当前canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

原理

MySQL主备复制原理

image.png

canal 工作原理

架构

image.png

安装准备

Centos7安装Canal

1、Mysql配置

开启binlog日志

如果是使用Linux安装的话,则直接my.cnf直接修改内容即可

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

docker安装

1、安装my.cnf文件

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

2、修改dockercompose.yaml内容
配置挂载前面路径my.cnf路径,/etc/mysql/conf.d路径
image.png

3、查询是否成功

show variables like "%server_id%";

image.png

show variables like 'log_bin';

image.png

获取bin_log当前位置

show master status;

image.png
获取后,记录下来,然后不要动数据库

创建canal数据库用户
这里可以使用
mysql -uroot -p登录进入设置
或者直接可视化页面

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
2、Canal下载

下载
方式一:关注公众号 I am Walker 回复canal

方式二:https://github.com/alibaba/canal/releases?page=2
image.png

# 创建文件夹
mkdir /opt/env/canal
# 解压
tar -zxvf canal.deployer-1.1.4.tar.gz -C /opt/env/canal

3、配置文件修改

进入canal/conf/example/instance.properties
主要修改下列相关参数

# 数据库
canal.instance.master.address=127.0.0.1:3306
# bin log日志
canal.instance.master.journal.name=mysql-bin.000001
# bin log写入位置
canal.instance.master.position=157
 #数据库账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456

之后进入 canal/bin 执行 ./startup.sh
查看是否启动
image.png
有CanalLauncher代表ok或者日志ok

场景

springboot整合

简单整合

1、依赖
<dependency>
            &lt;groupId&gt;com.alibaba.otter</groupId&gt;
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>
2、配置文件
canal:
	# 服务地址
  serverAddress: localhost
  # 端口
  serverPort: 11111
  # 订阅 库 表
  subscrie: ".*\..*"
  # 
  batchSize: 100
  # 实例
  instance:
    - example

subscrie配置


全库全表	
connector.subscribe(".*\..*")
指定库全表	
connector.subscribe("test\..*")
单表
connector.subscribe("test.user")规则组合使用
connector.subscribe("test\..*,test2.user1,test3.user2")
3、properties
package com.walker.mybatisplus.canal;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import java.util.List;

@Data
@Component
@ConfigurationProperties(value = "canal")
public class CanalProperties {

    private String serverAddress;
    private Integer serverPort;
    private String subcribe;
    private Integer batchSize;
    private List<String> instance;


}

4、监听编写
package com.walker.mybatisplus.canal;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
@Component
public class CanalListener {

    @Autowired
    private CanalProperties canalProperties;

    public static Map<String,Integer> NUM_MAP=new HashMap<>();


    /**
    * 可能会有多业务不同业务应该多个处理类,不要使用if等
    */

    @PostConstruct
    public void run() throws InterruptedException, InvalidProtocolBufferException {

        //创建Canal连接对象
        CanalConnector conn = CanalConnectors.newSingleConnector(new InetSocketAddress(canalProperties.getServerAddress(),
                        canalProperties.getServerPort()),
                canalProperties.getInstance().get(0),
                null, null);

        while(true){
            //连接
            conn.connect();
//            监听数据库和表
            conn.subscribe(canalProperties.getSubcribe());
            //回滚操作
            conn.rollback();
            //获取信息
            Message message = conn.getWithoutAck(canalProperties.getBatchSize());
            long id = message.getId();
            List<CanalEntry.Entry> entries = message.getEntries();
            if(id!=-1&amp;&entries.size()>0){
                //处理数据
                messageProcess(entries);
            }else{
                //防止重复链接数据库
                Thread.sleep(1000);
            }
            //确认消费信息
            conn.ack(id);
            //释放连接
            conn.disconnect();
        }


    }

    private void messageProcess(List<CanalEntry.Entry> entries) throws InvalidProtocolBufferException {
        for (CanalEntry.Entry entry : entries) {
            log.info("接收Entry:{}", entry);
            
            CanalEntry.Header header = entry.getHeader();
            //数据库
            String schemaName = header.getSchemaName();
            //表名
            String tableName = header.getTableName();
            //事件类型
            CanalEntry.EventType eventType = header.getEventType();

            //这里可以对数据库和表进行一个重新判断 虽然在subscribe已经定义,但是一般可以配置一个库,然后表的可能可以是全部表
//            对库进行判断
            if(!"walker_share".equals(schemaName)){
                continue;
            }

            //对表进行判断
            //这里只是一个案例,如果是实际场景,可以使用工厂模式编写,不然会有很多的if
            if("dish".equals(tableName)){
                //获取修改数据
                List<CanalEntry.RowData> rowDataList = getRowDataList(entry);
     
                //新增
                if(eventType.getNumber()==CanalEntry.EventType.INSERT_VALUE){
                    log.info("新增事件");
                    if(CollUtil.isEmpty(rowDataList)) continue;
                    //模拟场景获取新增的数据,并存储redis中,这里直接存储到Map
                    for (CanalEntry.RowData rowData : rowDataList) {
                        List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                        for (CanalEntry.Column column : afterColumnsList) {
                            //获取name的类型
                            if("type".equals(column.getName())){
                                //模拟redis  根据类型进行分类
                                String key = column.getValue();
                                NUM_MAP.put(key,NUM_MAP.getOrDefault(key,0)+1);
                                log.info("NUM_MAP {}",NUM_MAP);
                                continue;
                            }
                        }
                    }
                }
                if(eventType.getNumber()==CanalEntry.EventType.UPDATE_VALUE){
                    log.info("修改事件");
                }
                if(eventType.getNumber()==CanalEntry.EventType.DELETE_VALUE){
                    log.info("删除事件");
                }

            }

        }
    }

    //获取row数据
    private List<CanalEntry.RowData> getRowDataList(CanalEntry.Entry entry) {
        CanalEntry.RowChange rowChange=null;
        try {
            //解析数据
            rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        } catch (InvalidProtocolBufferException e) {
            throw new RuntimeException("解析出现异常 data:" + entry.toString(), e);
        }
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        return rowDatasList;
    }
}

相关类和配置

CanalConnector

image.png

CanalEntry

image.png

EntryType

image.png

Header

image.png

EventType

事件类型,可以根据事件类型去做不一样的操作
image.png

RowChange

image.png

获取数据

CanalEntry.RowChange rowChange=null;
try {
    //解析数据
    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
    throw new RuntimeException("解析出现异常 data:" + entry.toString(), e);
}
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
log.info("rowDatasList:{}",rowDatasList);
RowData

image.png

CanalEntry.Column

属性
image.png

问题

IOException: caching_sha2_password Auth failed

因为mysql8.0.3后身份检验方式为caching_sha2_password,但canal使用的是mysql_native_password,因此需要设置检验方式(如果该版本之前的可跳过),否则会报错IOException: caching_sha2_password Auth failed

参考文档

Java开发 – Canal的基本用法_canal java-CSDN博客
15分钟学会Canal安装与部署-CSDN博客
SpringBoot整合Canal1.1.6并同步数据到Redis(超详细和很多踩坑点)_canal同步数据到redis-CSDN博客

原文地址:https://blog.csdn.net/Think_and_work/article/details/134812037

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_47236.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注