本笔记来源于:尚硅谷Redis零基础到进阶,最强redis7教程,阳哥亲自带练(附redis面试题)
b站视频
文章来自:
https://github.com/Romantic-Lei/Learning-in-practice
复习+面试

canal介绍
是什么
官网地址:https://github.com/alibaba/canal/wiki
canal [ka’nael],中文翻译为水道/管道/沟渠/运河,主要用途是于MySQL数据库增量日志数据的订阅、消费和解析,是阿里巴巴开发并开源的,采用Java语言开发;
历史背景是早期阿里巴巴因为杭州和美国双机房部署,存在跨机房数据同步的业务需求,实现方式主要是基于业务trigger (触发器)获取增量变更。从2010年开始,阿里巴巴逐步尝试采用解析数据库日志获取增量变更进行同步,由此衍生出了canal项目;
能干嘛
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
去哪下
下载地址: https://github.com/alibaba/canal/releases/tag/canal-1.1.6
工作原理,面试回答

MySQL的主从复制将经过如下步骤:
1、当master主服务器上的数据发生改变时,则将其改变写入二进制事件日志文件中;
2、salve 从服务器会在一定时间间隔内对master主服务器上的二进制日志进行探测,探测其是否发生过改变;
如果探测到master主服务器的二进制事件日志发生了改变,则开始一个I/O Thread请求master二进制事件日志;
3、同时master主服务器为每个I/0 Thread启动一个dump Thread, 用于向其发送二进制事件日志;
4、slave 从服务器将接收到的二进制事件日志保存至自己本地的中继日志文件中;
5、salve 从服务器将启动SQL Thread从中继日志中读取二进制日志,在本地重放,使得其数据和主服务器保持一致;
6、后I/O Thread和SQL Thread将进入睡眠状态,等待下一次被唤醒。

工作原理
- canal 模拟MySQL slave的交互协议,伪装自己为MySQL slave,向MySQL master发送dump协议
- MySQL master收到dump请求,开始推送binary log给slave (即canal )canal 解析binary log对象(原始为byte流)
双写一致性-MySQL
Java案例,来源出处
https://github.com/alibaba/canal/wiki/ClientExample
1 2 3
| log-bin=mysql-bin binlog-format=Row server_id=1
|

1 2 3 4 5
| DROP USER IF EXISTS 'canal'@'%' ; CREATE USER 'canal'@'%' IDENTIFIED BY 'canal' ; GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ; FLUSH PRIVILEGES; SELECT * FROM mysql .user;
|

canal服务端
下载
官网地址:https://github.com/alibaba/canal/releases/tag/canal-1.1.6

解压
解压后整体放入到/mycanal路径下
配置
修改 /mycanal/conf/example路径下instance.properties文件
换成自己的MySQL主机master的IP地址

换成自己的在MySQL新建的canal账户

启动
在/mycanal/bin路径下执行 -> ./startup.sh
查看
判断canal是否启动成功
查看server日志

查看样例 example的日志

双写一致性 coding-canal客户端(Java编写业务程序)
SQL脚本
选中一个库,执行下面建表语句
1 2 3 4 5
| CREATE TABLE `t_user`( `id` bigint(20)NOT NULL AUTO_INCREMENT, `userName` varchar(100)NOT NULL, PRIMARY KEY (`id`) )ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4
|
建module
改POM
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.5.14</version> <relativePath/> </parent> <groupId>com.luojia</groupId> <artifactId>canaldemo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>canaldemo</name> <description>Demo project for Spring Boot</description> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <junit.version>4.12</junit.version> <log4j.version>1.2.17</log4j.version> <lombok.version>1.16.18</lombok.version> <mysql.version>5.1.47</mysql.version> <druid.version>1.1.16</druid.version> <mapper.version>4.1.5</mapper.version> <mybatis.spring.boot.version>1.3.0</mybatis.spring.boot.version> </properties>
<dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.10</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>${druid.version}</version> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>${mybatis.spring.boot.version}</version> </dependency> <dependency> <groupId>javax.persistence</groupId> <artifactId>persistence-api</artifactId> <version>1.0.2</version> </dependency> <dependency> <groupId>tk.mybatis</groupId> <artifactId>mapper</artifactId> <version>${mapper.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.8.0</version> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.9.2</version> </dependency>
<dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.2.3</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.26</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.26</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
<build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project>
|
写YML
1 2 3 4 5 6 7 8 9 10
| server.port=8000
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.datasource.url=jdbc:mysql://127.0.0.1:3306/jmall?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC spring.datasource.username=root spring.datasource.password=root spring.datasource.druid.test-while-idle=fasle
|
业务类
RedisUtils
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| package com.luojia.canaldemo.utils;
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig;
public class RedisUtils {
public static final String REDIS_IP_ADDR = "127.0.0.1";
public static final String REDIS_PWD = "123456";
public static JedisPool jedisPool;
static { JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(20); jedisPoolConfig.setMaxIdle(10); jedisPool = new JedisPool(jedisPoolConfig, REDIS_IP_ADDR, 6379, 10000, REDIS_PWD); }
public static Jedis getJedis() throws Exception { if (null != jedisPool) { return jedisPool.getResource(); } throw new Exception("Jedispoll is not ok"); } }
|
RedisCanalClientExample
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
| package com.luojia.canaldemo.biz;
import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import com.luojia.canaldemo.utils.RedisUtils; import io.lettuce.core.RedisClient; import redis.clients.jedis.Jedis;
import java.net.InetSocketAddress; import java.util.List; import java.util.UUID;
public class RedisCanalClientExample {
public static final Integer _60SECONDS = 60; public static final String REDIS_IP_ADDR = "127.0.0.1";
public static void redisInsert(List<Column> columns) {
JSONObject jsonObject = new JSONObject(); for (Column column : columns) { System.out.println(column.getName() + ": " + column.getValue() + " insert = " + column.getUpdated()); jsonObject.put(column.getName(), column.getValue()); }
if (columns.size() > 0) { try (Jedis jedis = RedisUtils.getJedis()) { jedis.set(columns.get(0).getValue(), jsonObject.toJSONString()); } catch (Exception e) { e.printStackTrace(); } } }
public static void redisDelete(List<Column> columns) { JSONObject jsonObject = new JSONObject(); for (Column column : columns) { System.out.println(column.getName() + ": " + column.getValue() + " delete = " + column.getUpdated()); jsonObject.put(column.getName(), column.getValue()); }
if (columns.size() > 0) { try (Jedis jedis = RedisUtils.getJedis()) { jedis.del(columns.get(0).getValue()); } catch (Exception e) { e.printStackTrace(); } } }
public static void redisUpdate(List<Column> columns) {
JSONObject jsonObject = new JSONObject(); for (Column column : columns) { System.out.println(column.getName() + ": " + column.getValue() + " update = " + column.getUpdated()); jsonObject.put(column.getName(), column.getValue()); }
if (columns.size() > 0) { try (Jedis jedis = RedisUtils.getJedis()) { jedis.set(columns.get(0).getValue(), jsonObject.toJSONString()); } catch (Exception e) { e.printStackTrace(); } } }
public static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; }
RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); }
EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { redisDelete(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { redisInsert(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); redisUpdate(rowData.getBeforeColumnsList()); System.out.println("-------> after"); } } } }
public static void main(String[] args) { CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe("jmall.t_user"); connector.rollback(); int totalEmptyCount = 10 * _60SECONDS; while (emptyCount < totalEmptyCount) { System.out.println("我是canal,每秒监听一次:" + UUID.randomUUID().toString()); Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; printEntry(message.getEntries()); }
connector.ack(batchId); }
System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } }
|
题外话
java程序下 connector.subscribe配置的过滤正则
