简介
Canal 是阿里巴巴为了满足跨机房数据同步的需求而开发的。Canal 基于 MySQL 的 Binlog 解析技术,能够捕捉到 MySQL 数据库的增删改操作,将这些变更以事件的形式发布出去,供其他系统订阅和消费。
使用
使用步骤
1. 准备工作
环境要求:确保你的服务器已安装 Java 运行环境,因为 Canal 是用 Java 编写的。
下载 Canal:从 Canal GitHub 或官方仓库下载 Canal 的最新发行版,并解压到指定目录。
2. 配置 MySQL
开启 Binlog:在 MySQL 配置文件(如 /etc/my.cnf)中,确保开启了 Binlog 并设置为 ROW 模式:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1 # 需要唯一,避免与现有复制链路冲突
重启 MySQL:修改配置后,重启 MySQL 服务使配置生效,并使用命令确认 Binlog 是否启用:
show variables like 'log_bin';
3. 配置 Canal
编辑 Canal 配置文件:通常位于 conf/example/instance.properties,根据实际情况修改 MySQL 连接信息、Zookeeper 地址(如果使用)、以及 Canal 实例名称等。
canal.instance.mysql.slaveId=1234 # 需要与 MySQL 的 server_id 不同
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=password
4. 启动 Canal
启动服务:在 Canal 解压目录下,使用以下命令启动 Canal:
sh bin/startup.sh example # example 是 instance.properties 文件所在目录的名称
检查运行状态:通过 Canal 的日志文件检查是否成功启动并连接到 MySQL。
5. 使用 Canal 客户端订阅数据
编写客户端程序:Canal 提供了 Java 客户端 SDK,你可以编写 Java 应用程序来订阅 Canal 发布的数据变更事件。示例代码可能包含如下步骤:
创建 CanalConnector 实例连接到 Canal 服务端。
调用 connect() 方法建立连接。
使用 subscribe() 方法订阅感兴趣的数据库和表。
在循环中调用 getWithoutAck() 获取数据变更事件,并处理这些事件。
处理完事件后,调用 ack() 或 rollback() 方法确认或回滚。
6. 测试与监控
测试数据订阅:在 MySQL 中对已订阅的表进行增删改操作,观察客户端是否能正确接收到变更事件。
监控与日志:定期检查 Canal 服务的日志,确保其正常运行无异常。
springboot集成cannal
1、添加依赖,由于 Canal 没有直接的 Spring Boot Starter,你需要直接引入 Canal 的客户端库。
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>{canal.version}</version>
</dependency>
2. 配置 Canal
在 application.yml 或 application.properties 文件中,配置 Canal 的相关参数,包括地址、端口、用户名、密码等。例如:
canal:
server: localhost:11111 # Canal Server 的地址和端口
destination: example # Canal 实例名称
username: canal # 连接 Canal Server 的用户名
password: canal # 密码
3. 创建 Canal 连接配置类
创建一个配置类来初始化 Canal 连接,这个类可以使用 @Configuration 注解标记,并定义一个 CanalConnector 的 Bean。
@Configuration
public class CanalConfig {
@Value("${canal.server}")
private String canalServer;
@Value("${canal.destination}")
private String destination;
@Value("${canal.username}")
private String username;
@Value("${canal.password}")
private String password;
@Bean
public CanalConnector canalConnector() {
// 初始化 CanalConnector
return CanalConnectors.newSingleConnector(new InetSocketAddress(canalServer.split(":")[0], Integer.parseInt(canalServer.split(":")[1])),
destination, username, password);
}
}
4. 实现数据监听处理器
创建一个 Canal 事件处理器类,实现数据的订阅和处理逻辑。你可以通过实现 CanalClientEventListener 接口或者直接在服务类中定义方法来处理数据。
@Component
public class CanalEventListener implements CanalClientEventListener {
@Override
public void onEvent(CanalEvent event) {
// 根据event类型处理数据,例如:
if (event instanceof EntryEvent) {
EntryEvent entryEvent = (EntryEvent) event;
Entry entry = entryEvent.getEntry();
if (entry.getEntryType() == EntryType.ROWDATA) {
// 处理RowData类型的Entry
// ...
}
}
}
@Override
public void rollback() {
// 可选的回滚逻辑
}
}
5. 启动 Canal 客户端
最后,你需要在 Spring Boot 应用启动时启动 Canal 客户端,订阅数据变更。这通常在某个初始化类或者 CommandLineRunner 实现类中完成。
@Component
public class CanalClientStarter implements CommandLineRunner {
@Autowired
private CanalConnector canalConnector;
@Autowired
private CanalEventListener canalEventListener;
@Override
public void run(String... args) throws Exception {
canalConnector.connect();
canalConnector.subscribe(".*\..*"); // 订阅所有数据库和表
canalConnector.rollback(); // 初始位置设为最新
while (true) { // 实际应用中应考虑更健壮的处理逻辑
Message message = canalConnector.getWithoutAck(100); // 获取数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId != -1 && size > 0) {
canalEventListener.onEvent(new CanalEvent(message));
canalConnector.ack(batchId); // 确认消息已消费
} else {
Thread.sleep(1000L); // 暂停一下,防止空转占用CPU
}
}
}
}