编辑
2025-07-14
实用工具
00

目录

简介
Docker部署Canal
MySQL 已开启 binlog
拉取 Canal 镜像
创建并挂载配置文件
修改配置文件
canal.properties:调整网络和日志配置。
instance.properties:配置 MySQL 连接信息。
启动 Canal 容器
验证
java使用
引入依赖
例子-监听变更并打印
例子-同步到另外的数据库
例子-同步数据到 Elasticsearch
例子-同步数据到 MongoDB
例子-同步数据到 PostgreSQL

简介

阿里巴巴开源的 Canal 是一个基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费的中间件。Canal 的主要工作原理是伪装成一个 MySQL 的 Slave(从库),去监听 MySQL 实主库的 Binary Log(二进制日志文件)。当有数据变更时,它能够将这些变更捕获,并以一种易于消费的方式提供给下游系统,如数据库、消息队列等。 github地址

Canal 于2013年在阿里内部诞生,主要用于解决跨机房的数据同步问题,后于2018年对外开源,受到了广泛的关注和应用。其应用场景非常广泛,包括但不限于:

  • 数据库同步:支持不同数据库之间的数据同步,比如 MySQL 到 MySQL,MySQL 到其他数据库。
  • 缓存更新:监听数据库变化,实时更新缓存中的数据,保证数据的一致性。
  • 搜索构建:可以用来监听数据库变化,实现实时搜索功能的数据更新。
  • 复杂网络环境下的数据同步:特别是在跨数据中心的情况下,利用 Canal 可以有效进行数据同步。

Canal 具有高可用、低延迟的特点,并且提供了灵活的部署方案和丰富的配置选项,可以根据实际需求调整。此外,Canal 社区活跃,拥有良好的文档和支持,使得开发者能够快速上手并解决遇到的问题。

Docker部署Canal

MySQL 已开启 binlog

确保 MySQL 已开启 binlog,并创建具有 REPLICATION SLAVE 权限的用户。

SQL
-- 开启 binlog(在 MySQL 配置文件 `my.cnf` 中添加): [mysqld] log-bin=mysql-bin binlog-format=ROW server_id=1 -- 创建 Canal 用户并授权: CREATE USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal_password'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;

拉取 Canal 镜像

docker pull canal/canal-server

创建并挂载配置文件

docker run --name canal-temp -d canal/canal-server docker cp canal-temp:/home/admin/canal-server/conf/canal.properties ./canal.properties docker cp canal-temp:/home/admin/canal-server/conf/example/instance.properties ./instance.properties docker stop canal-temp && docker rm canal-temp

修改配置文件

canal.properties:调整网络和日志配置。

canal.port = 11111 canal.zkServers = 127.0.0.1:2181 # 如果需要连接 Zookeeper

instance.properties:配置 MySQL 连接信息。

canal.instance.master.address=127.0.0.1:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal_password canal.instance.filter.regex=.*\\..* # 同步所

启动 Canal 容器

docker run --name canal \ -p 11111:11111 \ -v $(pwd)/canal.properties:/home/admin/canal-server/conf/canal.properties \ -v $(pwd)/instance.properties:/home/admin/canal-server/conf/example/instance.properties \ -v $(pwd)/logs:/home/admin/canal-server/logs \ -d canal/canal-server

验证

docker logs canal

java使用

引入依赖

xml
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.6</version> <!-- 使用与 Canal 服务器兼容的版本 --> </dependency>

例子-监听变更并打印

java
import com.alibaba.otter.canal.client.CanalClient; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.List; public class CanalConsumer { public static void main(String[] args) { // 连接到 Canal 服务器 CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), // Canal 服务器地址 "example", // Canal 实例名称(与 instance.properties 中的 instance 名称一致) "canal", "canal_password"); // 用户名和密码(可选) try { connector.connect(); connector.subscribe(".*\\..*"); // 订阅所有表 connector.rollback(); while (true) { Message message = connector.getWithoutAck(100); // 每次最多获取 100 条事件 long batchId = message.getId(); List<CanalEntry.Entry> entries = message.getEntries(); for (CanalEntry.Entry entry : entries) { if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { System.out.println("操作类型: " + rowChange.getEventType()); System.out.println("表名: " + entry.getHeader().getTableName()); System.out.println("旧数据: " + rowData.getBeforeColumnsList()); System.out.println("新数据: " + rowData.getAfterColumnsList()); } } } connector.ack(batchId); // 确认已处理消息 } } catch (Exception e) { e.printStackTrace(); } finally { connector.disconnect(); } } }

例子-同步到另外的数据库

请自行引入mysql依赖

java
import com.alibaba.otter.canal.protocol.CanalEntry; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; public class MysqlSyncExample { private static final String URL = "jdbc:mysql://localhost:3306/target_db"; private static final String USER = "root"; private static final String PASSWORD = "123456"; public void syncToMysql(CanalEntry.RowChange rowChange, String tableName) { try (Connection conn = DriverManager.getConnection(URL, USER, PASSWORD)) { for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { if (rowChange.getEventType() == CanalEntry.EventType.INSERT) { insertData(conn, tableName, rowData.getAfterColumnsList()); } else if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) { updateData(conn, tableName, rowData.getBeforeColumnsList(), rowData.getAfterColumnsList()); } else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) { deleteData(conn, tableName, rowData.getBeforeColumnsList()); } } } catch (Exception e) { e.printStackTrace(); } } // 插入操作 private void insertData(Connection conn, String table, List<CanalEntry.Column> columns) throws Exception { StringBuilder cols = new StringBuilder(); StringBuilder values = new StringBuilder(); for (CanalEntry.Column col : columns) { cols.append(col.getName()).append(","); values.append("?,"); } cols.deleteCharAt(cols.length() - 1); values.deleteCharAt(values.length() - 1); String sql = "INSERT INTO " + table + "(" + cols + ") VALUES (" + values + ")"; try (PreparedStatement ps = conn.prepareStatement(sql)) { int index = 1; for (CanalEntry.Column col : columns) { ps.setObject(index++, col.getValue()); } ps.executeUpdate(); } } // 更新操作 private void updateData(Connection conn, String table, List<CanalEntry.Column> beforeCols, List<CanalEntry.Column> afterCols) throws Exception { StringBuilder sets = new StringBuilder(); for (CanalEntry.Column col : afterCols) { sets.append(col.getName()).append("=?,"); } sets.deleteCharAt(sets.length() - 1); String where = "id=?"; String sql = "UPDATE " + table + " SET " + sets + " WHERE " + where; try (PreparedStatement ps = conn.prepareStatement(sql)) { int index = 1; for (CanalEntry.Column col : afterCols) { ps.setObject(index++, col.getValue()); } for (CanalEntry.Column col : beforeCols) { if (col.getName().equals("id")) { ps.setObject(index++, col.getValue()); } } ps.executeUpdate(); } } // 删除操作 private void deleteData(Connection conn, String table, List<CanalEntry.Column> cols) throws Exception { String where = "id=?"; String sql = "DELETE FROM " + table + " WHERE " + where; try (PreparedStatement ps = conn.prepareStatement(sql)) { for (CanalEntry.Column col : cols) { if (col.getName().equals("id")) { ps.setObject(1, col.getValue()); break; } } ps.executeUpdate(); } } }

例子-同步数据到 Elasticsearch

依赖

xml
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-high-level-rest-client</artifactId> <version>7.17.0</version> </dependency>
java
import com.alibaba.otter.canal.protocol.CanalEntry; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.index.reindex.UpdateByQueryResponse; import org.elasticsearch.action.update.*; import org.elasticsearch.index.query.QueryBuilders; import java.util.HashMap; import java.util.Map; public class EsSyncExample { public void syncToElasticsearch(CanalEntry.RowChange rowChange, String indexName) { for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { Map<String, Object> data = new HashMap<>(); for (CanalEntry.Column column : rowData.getAfterColumnsList()) { data.put(column.getName(), column.getValue()); } String id = getPrimaryKey(rowData.getAfterColumnsList()); if (rowChange.getEventType() == CanalEntry.EventType.INSERT || rowChange.getEventType() == CanalEntry.EventType.UPDATE) { // 插入或更新文档 UpdateRequest request = new UpdateRequest(indexName, id) .doc(data, XContentType.JSON) .upsert(data, XContentType.JSON); // client.update(request, RequestOptions.DEFAULT); } else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) { // 删除文档 DeleteRequest deleteRequest = new DeleteRequest(indexName, id); // client.delete(deleteRequest, RequestOptions.DEFAULT); } } } private String getPrimaryKey(List<CanalEntry.Column> columns) { for (CanalEntry.Column col : columns) { if (col.getName().equalsIgnoreCase("id")) { return col.getValue(); } } return null; } }

例子-同步数据到 MongoDB

xml
<dependency> <groupId>org.mongodb</groupId> <artifactId>mongodb-driver-sync</artifactId> <version>4.9.0</version> </dependency>
java
import com.alibaba.otter.canal.protocol.CanalEntry; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import static com.mongodb.client.model.Filters.eq; import java.util.HashMap; import java.util.Map; public class MongoSyncExample { private MongoClient client = MongoClients.create("mongodb://localhost:27017"); private MongoDatabase database = client.getDatabase("target_db"); public void syncToMongo(CanalEntry.RowChange rowChange, String collectionName) { MongoCollection<Map> collection = database.getCollection(collectionName, Map.class); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { Map<String, Object> document = new HashMap<>(); for (CanalEntry.Column column : rowData.getAfterColumnsList()) { document.put(column.getName(), column.getValue()); } String id = getPrimaryKey(rowData.getAfterColumnsList()); if (rowChange.getEventType() == CanalEntry.EventType.INSERT || rowChange.getEventType() == CanalEntry.EventType.UPDATE) { collection.replaceOne(eq("_id", id), document, new ReplaceOptions().upsert(true)); } else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) { collection.deleteOne(eq("_id", id)); } } } private String getPrimaryKey(List<CanalEntry.Column> columns) { for (CanalEntry.Column col : columns) { if (col.getName().equalsIgnoreCase("id")) { return col.getValue(); } } return null; } }

例子-同步数据到 PostgreSQL

xml
<dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.6.0</version> </dependency>
java
import com.alibaba.otter.canal.protocol.CanalEntry; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; public class PostgresSyncExample { private static final String URL = "jdbc:postgresql://localhost:5432/target_db"; private static final String USER = "postgres"; private static final String PASSWORD = "123456"; public void syncToPostgres(CanalEntry.RowChange rowChange, String tableName) { try (Connection conn = DriverManager.getConnection(URL, USER, PASSWORD)) { for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { if (rowChange.getEventType() == CanalEntry.EventType.INSERT) { insertData(conn, tableName, rowData.getAfterColumnsList()); } else if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) { updateData(conn, tableName, rowData.getBeforeColumnsList(), rowData.getAfterColumnsList()); } else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) { deleteData(conn, tableName, rowData.getBeforeColumnsList()); } } } catch (Exception e) { e.printStackTrace(); } } private void insertData(Connection conn, String table, List<CanalEntry.Column> columns) throws Exception { StringBuilder cols = new StringBuilder(); StringBuilder values = new StringBuilder(); for (CanalEntry.Column col : columns) { cols.append("\"").append(col.getName()).append("\","); values.append("?,"); } cols.deleteCharAt(cols.length() - 1); values.deleteCharAt(values.length() - 1); String sql = "INSERT INTO \"" + table + "\" (" + cols + ") VALUES (" + values + ")"; try (PreparedStatement ps = conn.prepareStatement(sql)) { int index = 1; for (CanalEntry.Column col : columns) { ps.setObject(index++, col.getValue()); } ps.executeUpdate(); } } private void updateData(Connection conn, String table, List<CanalEntry.Column> beforeCols, List<CanalEntry.Column> afterCols) throws Exception { StringBuilder sets = new StringBuilder(); for (CanalEntry.Column col : afterCols) { sets.append("\"").append(col.getName()).append("\"=?,"); } sets.deleteCharAt(sets.length() - 1); String where = "\"id\"=?"; String sql = "UPDATE \"" + table + "\" SET " + sets + " WHERE " + where; try (PreparedStatement ps = conn.prepareStatement(sql)) { int index = 1; for (CanalEntry.Column col : afterCols) { ps.setObject(index++, col.getValue()); } for (CanalEntry.Column col : beforeCols) { if (col.getName().equalsIgnoreCase("id")) { ps.setObject(index++, col.getValue()); } } ps.executeUpdate(); } } private void deleteData(Connection conn, String table, List<CanalEntry.Column> cols) throws Exception { String where = "\"id\"=?"; String sql = "DELETE FROM \"" + table + "\" WHERE " + where; try (PreparedStatement ps = conn.prepareStatement(sql)) { for (CanalEntry.Column col : cols) { if (col.getName().equalsIgnoreCase("id")) { ps.setObject(1, col.getValue()); break; } } ps.executeUpdate(); } } }

本文作者:Weee

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!