阿里巴巴开源的 Canal 是一个基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费的中间件。Canal 的主要工作原理是伪装成一个 MySQL 的 Slave(从库),去监听 MySQL 实主库的 Binary Log(二进制日志文件)。当有数据变更时,它能够将这些变更捕获,并以一种易于消费的方式提供给下游系统,如数据库、消息队列等。 github地址
Canal 于2013年在阿里内部诞生,主要用于解决跨机房的数据同步问题,后于2018年对外开源,受到了广泛的关注和应用。其应用场景非常广泛,包括但不限于:
Canal 具有高可用、低延迟的特点,并且提供了灵活的部署方案和丰富的配置选项,可以根据实际需求调整。此外,Canal 社区活跃,拥有良好的文档和支持,使得开发者能够快速上手并解决遇到的问题。
确保 MySQL 已开启 binlog,并创建具有 REPLICATION SLAVE 权限的用户。
SQL-- 开启 binlog(在 MySQL 配置文件 `my.cnf` 中添加):
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1
-- 创建 Canal 用户并授权:
CREATE USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal_password';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
docker pull canal/canal-server
docker run --name canal-temp -d canal/canal-server docker cp canal-temp:/home/admin/canal-server/conf/canal.properties ./canal.properties docker cp canal-temp:/home/admin/canal-server/conf/example/instance.properties ./instance.properties docker stop canal-temp && docker rm canal-temp
canal.port = 11111 canal.zkServers = 127.0.0.1:2181 # 如果需要连接 Zookeeper
canal.instance.master.address=127.0.0.1:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal_password canal.instance.filter.regex=.*\\..* # 同步所
docker run --name canal \ -p 11111:11111 \ -v $(pwd)/canal.properties:/home/admin/canal-server/conf/canal.properties \ -v $(pwd)/instance.properties:/home/admin/canal-server/conf/example/instance.properties \ -v $(pwd)/logs:/home/admin/canal-server/logs \ -d canal/canal-server
docker logs canal
xml<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.6</version>  <!-- 使用与 Canal 服务器兼容的版本 -->
</dependency>
javaimport com.alibaba.otter.canal.client.CanalClient;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
public class CanalConsumer {
    public static void main(String[] args) {
        // 连接到 Canal 服务器
        CanalConnector connector = CanalConnectors.newSingleConnector(
            new InetSocketAddress("127.0.0.1", 11111),  // Canal 服务器地址
            "example",                                  // Canal 实例名称(与 instance.properties 中的 instance 名称一致)
            "canal", "canal_password");                 // 用户名和密码(可选)
        try {
            connector.connect();
            connector.subscribe(".*\\..*");  // 订阅所有表
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(100);  // 每次最多获取 100 条事件
                long batchId = message.getId();
                List<CanalEntry.Entry> entries = message.getEntries();
                for (CanalEntry.Entry entry : entries) {
                    if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                            System.out.println("操作类型: " + rowChange.getEventType());
                            System.out.println("表名: " + entry.getHeader().getTableName());
                            System.out.println("旧数据: " + rowData.getBeforeColumnsList());
                            System.out.println("新数据: " + rowData.getAfterColumnsList());
                        }
                    }
                }
                connector.ack(batchId);  // 确认已处理消息
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }
}
请自行引入mysql依赖
javaimport com.alibaba.otter.canal.protocol.CanalEntry;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class MysqlSyncExample {
    private static final String URL = "jdbc:mysql://localhost:3306/target_db";
    private static final String USER = "root";
    private static final String PASSWORD = "123456";
    public void syncToMysql(CanalEntry.RowChange rowChange, String tableName) {
        try (Connection conn = DriverManager.getConnection(URL, USER, PASSWORD)) {
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {
                    insertData(conn, tableName, rowData.getAfterColumnsList());
                } else if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
                    updateData(conn, tableName, rowData.getBeforeColumnsList(), rowData.getAfterColumnsList());
                } else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
                    deleteData(conn, tableName, rowData.getBeforeColumnsList());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    // 插入操作
    private void insertData(Connection conn, String table, List<CanalEntry.Column> columns) throws Exception {
        StringBuilder cols = new StringBuilder();
        StringBuilder values = new StringBuilder();
        for (CanalEntry.Column col : columns) {
            cols.append(col.getName()).append(",");
            values.append("?,");
        }
        cols.deleteCharAt(cols.length() - 1);
        values.deleteCharAt(values.length() - 1);
        String sql = "INSERT INTO " + table + "(" + cols + ") VALUES (" + values + ")";
        try (PreparedStatement ps = conn.prepareStatement(sql)) {
            int index = 1;
            for (CanalEntry.Column col : columns) {
                ps.setObject(index++, col.getValue());
            }
            ps.executeUpdate();
        }
    }
    // 更新操作
    private void updateData(Connection conn, String table, List<CanalEntry.Column> beforeCols, List<CanalEntry.Column> afterCols) throws Exception {
        StringBuilder sets = new StringBuilder();
        for (CanalEntry.Column col : afterCols) {
            sets.append(col.getName()).append("=?,");
        }
        sets.deleteCharAt(sets.length() - 1);
        String where = "id=?";
        String sql = "UPDATE " + table + " SET " + sets + " WHERE " + where;
        try (PreparedStatement ps = conn.prepareStatement(sql)) {
            int index = 1;
            for (CanalEntry.Column col : afterCols) {
                ps.setObject(index++, col.getValue());
            }
            for (CanalEntry.Column col : beforeCols) {
                if (col.getName().equals("id")) {
                    ps.setObject(index++, col.getValue());
                }
            }
            ps.executeUpdate();
        }
    }
    // 删除操作
    private void deleteData(Connection conn, String table, List<CanalEntry.Column> cols) throws Exception {
        String where = "id=?";
        String sql = "DELETE FROM " + table + " WHERE " + where;
        try (PreparedStatement ps = conn.prepareStatement(sql)) {
            for (CanalEntry.Column col : cols) {
                if (col.getName().equals("id")) {
                    ps.setObject(1, col.getValue());
                    break;
                }
            }
            ps.executeUpdate();
        }
    }
}
依赖
xml<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-high-level-rest-client</artifactId>
    <version>7.17.0</version>
</dependency>
javaimport com.alibaba.otter.canal.protocol.CanalEntry;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryResponse;
import org.elasticsearch.action.update.*;
import org.elasticsearch.index.query.QueryBuilders;
import java.util.HashMap;
import java.util.Map;
public class EsSyncExample {
    public void syncToElasticsearch(CanalEntry.RowChange rowChange, String indexName) {
        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
            Map<String, Object> data = new HashMap<>();
            for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
                data.put(column.getName(), column.getValue());
            }
            String id = getPrimaryKey(rowData.getAfterColumnsList());
            if (rowChange.getEventType() == CanalEntry.EventType.INSERT ||
                rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
                // 插入或更新文档
                UpdateRequest request = new UpdateRequest(indexName, id)
                        .doc(data, XContentType.JSON)
                        .upsert(data, XContentType.JSON);
                // client.update(request, RequestOptions.DEFAULT);
            } else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
                // 删除文档
                DeleteRequest deleteRequest = new DeleteRequest(indexName, id);
                // client.delete(deleteRequest, RequestOptions.DEFAULT);
            }
        }
    }
    private String getPrimaryKey(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column col : columns) {
            if (col.getName().equalsIgnoreCase("id")) {
                return col.getValue();
            }
        }
        return null;
    }
}
xml<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-sync</artifactId>
    <version>4.9.0</version>
</dependency>
javaimport com.alibaba.otter.canal.protocol.CanalEntry;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import static com.mongodb.client.model.Filters.eq;
import java.util.HashMap;
import java.util.Map;
public class MongoSyncExample {
    private MongoClient client = MongoClients.create("mongodb://localhost:27017");
    private MongoDatabase database = client.getDatabase("target_db");
    public void syncToMongo(CanalEntry.RowChange rowChange, String collectionName) {
        MongoCollection<Map> collection = database.getCollection(collectionName, Map.class);
        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
            Map<String, Object> document = new HashMap<>();
            for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
                document.put(column.getName(), column.getValue());
            }
            String id = getPrimaryKey(rowData.getAfterColumnsList());
            if (rowChange.getEventType() == CanalEntry.EventType.INSERT ||
                rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
                collection.replaceOne(eq("_id", id), document, new ReplaceOptions().upsert(true));
            } else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
                collection.deleteOne(eq("_id", id));
            }
        }
    }
    private String getPrimaryKey(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column col : columns) {
            if (col.getName().equalsIgnoreCase("id")) {
                return col.getValue();
            }
        }
        return null;
    }
}
xml<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.6.0</version>
</dependency>
javaimport com.alibaba.otter.canal.protocol.CanalEntry;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class PostgresSyncExample {
    private static final String URL = "jdbc:postgresql://localhost:5432/target_db";
    private static final String USER = "postgres";
    private static final String PASSWORD = "123456";
    public void syncToPostgres(CanalEntry.RowChange rowChange, String tableName) {
        try (Connection conn = DriverManager.getConnection(URL, USER, PASSWORD)) {
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {
                    insertData(conn, tableName, rowData.getAfterColumnsList());
                } else if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
                    updateData(conn, tableName, rowData.getBeforeColumnsList(), rowData.getAfterColumnsList());
                } else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
                    deleteData(conn, tableName, rowData.getBeforeColumnsList());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private void insertData(Connection conn, String table, List<CanalEntry.Column> columns) throws Exception {
        StringBuilder cols = new StringBuilder();
        StringBuilder values = new StringBuilder();
        for (CanalEntry.Column col : columns) {
            cols.append("\"").append(col.getName()).append("\",");
            values.append("?,");
        }
        cols.deleteCharAt(cols.length() - 1);
        values.deleteCharAt(values.length() - 1);
        String sql = "INSERT INTO \"" + table + "\" (" + cols + ") VALUES (" + values + ")";
        try (PreparedStatement ps = conn.prepareStatement(sql)) {
            int index = 1;
            for (CanalEntry.Column col : columns) {
                ps.setObject(index++, col.getValue());
            }
            ps.executeUpdate();
        }
    }
    private void updateData(Connection conn, String table, List<CanalEntry.Column> beforeCols, List<CanalEntry.Column> afterCols) throws Exception {
        StringBuilder sets = new StringBuilder();
        for (CanalEntry.Column col : afterCols) {
            sets.append("\"").append(col.getName()).append("\"=?,");
        }
        sets.deleteCharAt(sets.length() - 1);
        String where = "\"id\"=?";
        String sql = "UPDATE \"" + table + "\" SET " + sets + " WHERE " + where;
        try (PreparedStatement ps = conn.prepareStatement(sql)) {
            int index = 1;
            for (CanalEntry.Column col : afterCols) {
                ps.setObject(index++, col.getValue());
            }
            for (CanalEntry.Column col : beforeCols) {
                if (col.getName().equalsIgnoreCase("id")) {
                    ps.setObject(index++, col.getValue());
                }
            }
            ps.executeUpdate();
        }
    }
    private void deleteData(Connection conn, String table, List<CanalEntry.Column> cols) throws Exception {
        String where = "\"id\"=?";
        String sql = "DELETE FROM \"" + table + "\" WHERE " + where;
        try (PreparedStatement ps = conn.prepareStatement(sql)) {
            for (CanalEntry.Column col : cols) {
                if (col.getName().equalsIgnoreCase("id")) {
                    ps.setObject(1, col.getValue());
                    break;
                }
            }
            ps.executeUpdate();
        }
    }
}
本文作者:Weee
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!