阿里巴巴开源的 Canal 是一个基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费的中间件。Canal 的主要工作原理是伪装成一个 MySQL 的 Slave(从库),去监听 MySQL 实主库的 Binary Log(二进制日志文件)。当有数据变更时,它能够将这些变更捕获,并以一种易于消费的方式提供给下游系统,如数据库、消息队列等。 github地址
Canal 于2013年在阿里内部诞生,主要用于解决跨机房的数据同步问题,后于2018年对外开源,受到了广泛的关注和应用。其应用场景非常广泛,包括但不限于:
Canal 具有高可用、低延迟的特点,并且提供了灵活的部署方案和丰富的配置选项,可以根据实际需求调整。此外,Canal 社区活跃,拥有良好的文档和支持,使得开发者能够快速上手并解决遇到的问题。
确保 MySQL 已开启 binlog,并创建具有 REPLICATION SLAVE 权限的用户。
SQL-- 开启 binlog(在 MySQL 配置文件 `my.cnf` 中添加):
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1
-- 创建 Canal 用户并授权:
CREATE USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal_password';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
docker pull canal/canal-server
docker run --name canal-temp -d canal/canal-server docker cp canal-temp:/home/admin/canal-server/conf/canal.properties ./canal.properties docker cp canal-temp:/home/admin/canal-server/conf/example/instance.properties ./instance.properties docker stop canal-temp && docker rm canal-temp
canal.port = 11111 canal.zkServers = 127.0.0.1:2181 # 如果需要连接 Zookeeper
canal.instance.master.address=127.0.0.1:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal_password canal.instance.filter.regex=.*\\..* # 同步所
docker run --name canal \ -p 11111:11111 \ -v $(pwd)/canal.properties:/home/admin/canal-server/conf/canal.properties \ -v $(pwd)/instance.properties:/home/admin/canal-server/conf/example/instance.properties \ -v $(pwd)/logs:/home/admin/canal-server/logs \ -d canal/canal-server
docker logs canal
xml<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.6</version> <!-- 使用与 Canal 服务器兼容的版本 -->
</dependency>
javaimport com.alibaba.otter.canal.client.CanalClient;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
public class CanalConsumer {
public static void main(String[] args) {
// 连接到 Canal 服务器
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111), // Canal 服务器地址
"example", // Canal 实例名称(与 instance.properties 中的 instance 名称一致)
"canal", "canal_password"); // 用户名和密码(可选)
try {
connector.connect();
connector.subscribe(".*\\..*"); // 订阅所有表
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(100); // 每次最多获取 100 条事件
long batchId = message.getId();
List<CanalEntry.Entry> entries = message.getEntries();
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
System.out.println("操作类型: " + rowChange.getEventType());
System.out.println("表名: " + entry.getHeader().getTableName());
System.out.println("旧数据: " + rowData.getBeforeColumnsList());
System.out.println("新数据: " + rowData.getAfterColumnsList());
}
}
}
connector.ack(batchId); // 确认已处理消息
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
}
请自行引入mysql依赖
javaimport com.alibaba.otter.canal.protocol.CanalEntry;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class MysqlSyncExample {
private static final String URL = "jdbc:mysql://localhost:3306/target_db";
private static final String USER = "root";
private static final String PASSWORD = "123456";
public void syncToMysql(CanalEntry.RowChange rowChange, String tableName) {
try (Connection conn = DriverManager.getConnection(URL, USER, PASSWORD)) {
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {
insertData(conn, tableName, rowData.getAfterColumnsList());
} else if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
updateData(conn, tableName, rowData.getBeforeColumnsList(), rowData.getAfterColumnsList());
} else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
deleteData(conn, tableName, rowData.getBeforeColumnsList());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
// 插入操作
private void insertData(Connection conn, String table, List<CanalEntry.Column> columns) throws Exception {
StringBuilder cols = new StringBuilder();
StringBuilder values = new StringBuilder();
for (CanalEntry.Column col : columns) {
cols.append(col.getName()).append(",");
values.append("?,");
}
cols.deleteCharAt(cols.length() - 1);
values.deleteCharAt(values.length() - 1);
String sql = "INSERT INTO " + table + "(" + cols + ") VALUES (" + values + ")";
try (PreparedStatement ps = conn.prepareStatement(sql)) {
int index = 1;
for (CanalEntry.Column col : columns) {
ps.setObject(index++, col.getValue());
}
ps.executeUpdate();
}
}
// 更新操作
private void updateData(Connection conn, String table, List<CanalEntry.Column> beforeCols, List<CanalEntry.Column> afterCols) throws Exception {
StringBuilder sets = new StringBuilder();
for (CanalEntry.Column col : afterCols) {
sets.append(col.getName()).append("=?,");
}
sets.deleteCharAt(sets.length() - 1);
String where = "id=?";
String sql = "UPDATE " + table + " SET " + sets + " WHERE " + where;
try (PreparedStatement ps = conn.prepareStatement(sql)) {
int index = 1;
for (CanalEntry.Column col : afterCols) {
ps.setObject(index++, col.getValue());
}
for (CanalEntry.Column col : beforeCols) {
if (col.getName().equals("id")) {
ps.setObject(index++, col.getValue());
}
}
ps.executeUpdate();
}
}
// 删除操作
private void deleteData(Connection conn, String table, List<CanalEntry.Column> cols) throws Exception {
String where = "id=?";
String sql = "DELETE FROM " + table + " WHERE " + where;
try (PreparedStatement ps = conn.prepareStatement(sql)) {
for (CanalEntry.Column col : cols) {
if (col.getName().equals("id")) {
ps.setObject(1, col.getValue());
break;
}
}
ps.executeUpdate();
}
}
}
依赖
xml<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-high-level-rest-client</artifactId>
<version>7.17.0</version>
</dependency>
javaimport com.alibaba.otter.canal.protocol.CanalEntry;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryResponse;
import org.elasticsearch.action.update.*;
import org.elasticsearch.index.query.QueryBuilders;
import java.util.HashMap;
import java.util.Map;
public class EsSyncExample {
public void syncToElasticsearch(CanalEntry.RowChange rowChange, String indexName) {
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
Map<String, Object> data = new HashMap<>();
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
data.put(column.getName(), column.getValue());
}
String id = getPrimaryKey(rowData.getAfterColumnsList());
if (rowChange.getEventType() == CanalEntry.EventType.INSERT ||
rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
// 插入或更新文档
UpdateRequest request = new UpdateRequest(indexName, id)
.doc(data, XContentType.JSON)
.upsert(data, XContentType.JSON);
// client.update(request, RequestOptions.DEFAULT);
} else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
// 删除文档
DeleteRequest deleteRequest = new DeleteRequest(indexName, id);
// client.delete(deleteRequest, RequestOptions.DEFAULT);
}
}
}
private String getPrimaryKey(List<CanalEntry.Column> columns) {
for (CanalEntry.Column col : columns) {
if (col.getName().equalsIgnoreCase("id")) {
return col.getValue();
}
}
return null;
}
}
xml<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>4.9.0</version>
</dependency>
javaimport com.alibaba.otter.canal.protocol.CanalEntry;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import static com.mongodb.client.model.Filters.eq;
import java.util.HashMap;
import java.util.Map;
public class MongoSyncExample {
private MongoClient client = MongoClients.create("mongodb://localhost:27017");
private MongoDatabase database = client.getDatabase("target_db");
public void syncToMongo(CanalEntry.RowChange rowChange, String collectionName) {
MongoCollection<Map> collection = database.getCollection(collectionName, Map.class);
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
Map<String, Object> document = new HashMap<>();
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
document.put(column.getName(), column.getValue());
}
String id = getPrimaryKey(rowData.getAfterColumnsList());
if (rowChange.getEventType() == CanalEntry.EventType.INSERT ||
rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
collection.replaceOne(eq("_id", id), document, new ReplaceOptions().upsert(true));
} else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
collection.deleteOne(eq("_id", id));
}
}
}
private String getPrimaryKey(List<CanalEntry.Column> columns) {
for (CanalEntry.Column col : columns) {
if (col.getName().equalsIgnoreCase("id")) {
return col.getValue();
}
}
return null;
}
}
xml<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.6.0</version>
</dependency>
javaimport com.alibaba.otter.canal.protocol.CanalEntry;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class PostgresSyncExample {
private static final String URL = "jdbc:postgresql://localhost:5432/target_db";
private static final String USER = "postgres";
private static final String PASSWORD = "123456";
public void syncToPostgres(CanalEntry.RowChange rowChange, String tableName) {
try (Connection conn = DriverManager.getConnection(URL, USER, PASSWORD)) {
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {
insertData(conn, tableName, rowData.getAfterColumnsList());
} else if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
updateData(conn, tableName, rowData.getBeforeColumnsList(), rowData.getAfterColumnsList());
} else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
deleteData(conn, tableName, rowData.getBeforeColumnsList());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void insertData(Connection conn, String table, List<CanalEntry.Column> columns) throws Exception {
StringBuilder cols = new StringBuilder();
StringBuilder values = new StringBuilder();
for (CanalEntry.Column col : columns) {
cols.append("\"").append(col.getName()).append("\",");
values.append("?,");
}
cols.deleteCharAt(cols.length() - 1);
values.deleteCharAt(values.length() - 1);
String sql = "INSERT INTO \"" + table + "\" (" + cols + ") VALUES (" + values + ")";
try (PreparedStatement ps = conn.prepareStatement(sql)) {
int index = 1;
for (CanalEntry.Column col : columns) {
ps.setObject(index++, col.getValue());
}
ps.executeUpdate();
}
}
private void updateData(Connection conn, String table, List<CanalEntry.Column> beforeCols, List<CanalEntry.Column> afterCols) throws Exception {
StringBuilder sets = new StringBuilder();
for (CanalEntry.Column col : afterCols) {
sets.append("\"").append(col.getName()).append("\"=?,");
}
sets.deleteCharAt(sets.length() - 1);
String where = "\"id\"=?";
String sql = "UPDATE \"" + table + "\" SET " + sets + " WHERE " + where;
try (PreparedStatement ps = conn.prepareStatement(sql)) {
int index = 1;
for (CanalEntry.Column col : afterCols) {
ps.setObject(index++, col.getValue());
}
for (CanalEntry.Column col : beforeCols) {
if (col.getName().equalsIgnoreCase("id")) {
ps.setObject(index++, col.getValue());
}
}
ps.executeUpdate();
}
}
private void deleteData(Connection conn, String table, List<CanalEntry.Column> cols) throws Exception {
String where = "\"id\"=?";
String sql = "DELETE FROM \"" + table + "\" WHERE " + where;
try (PreparedStatement ps = conn.prepareStatement(sql)) {
for (CanalEntry.Column col : cols) {
if (col.getName().equalsIgnoreCase("id")) {
ps.setObject(1, col.getValue());
break;
}
}
ps.executeUpdate();
}
}
}
本文作者:Weee
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!