ToziuhaNight:德古拉的复仇免安装绿色版
543M · 2025-09-28
在实时数据处理场景中,数据源(Source)是整个数据处理流程的起点。Flink作为流批一体的计算框架,提供了丰富的Source接口支持,其中通过Kafka获取实时数据是最常见的场景之一。本文将以Flink DataStream API为核心,带你从0到1实现“从Kafka消费数据并输出到日志”的完整流程,掌握Flink Source的核心用法。
Kafka作为分布式流处理平台,具备高吞吐量、低延迟、持久化存储等特性,是实时数据管道的首选。Flink与Kafka的集成方案经过多年优化,支持:
本文基于以下版本实现(需保持版本兼容):
在gradle添加Flink核心依赖及Kafka Connector依赖,build.gradle配置可以是如下:
plugins { id 'java' // Java项目插件 id 'application' // 支持main方法运行 } // 设置主类(可选,用于application插件) application { mainClass.set('com.cn.daimajiangxin.flink.source.KafkaSourceDemo') // 替换为你的主类全限定名 } // 依赖仓库(Maven中央仓库) repositories { mavenCentral() } // 依赖配置 dependencies { // Flink核心依赖(生产环境通常标记为provided,由Flink运行时提供) implementation 'org.apache.flink:flink-java:1.20.1' implementation 'org.apache.flink:flink-streaming-java_2.12:1.20.1' // Flink Kafka Connector(新版API,兼容Kafka 2.8+) implementation 'org.apache.flink:flink-connector-kafka_2.12:1.20.1' // SLF4J日志门面 + Log4j实现(避免日志警告) implementation 'org.apache.logging.log4j:log4j-api:2.17.1' implementation 'org.apache.logging.log4j:log4j-core:2.17.1' implementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.17.1' } // 编译配置(可选,根据需要调整) tasks.withType(JavaCompile) { options.encoding = 'UTF-8' // 指定编码 sourceCompatibility = JavaVersion.VERSION_17 // 兼容Java 8 targetCompatibility = JavaVersion.VERSION_17 }
在深入代码前,需理解Flink Kafka Source的核心组件:
整个流程分为5步:
以下是完整的示例代码,包含详细注释:
package com.cn.daimajiangxin.flink.source;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.connector.kafka.source.KafkaSource;import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.nio.charset.StandardCharsets;public class KafkaSourceDemo { private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceDemo.class);public static void main(String[] args) throws Exception { // 1. 创建Flink流执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 可选:启用检查点(生产环境必选,保证Exactly-Once语义) env.enableCheckpointing(5000); // 每5秒做一次检查点 // 启用检查点 env.enableCheckpointing(5000); // 每5秒做一次检查点// 设置检查点超时时间 env.getCheckpointConfig().setCheckpointTimeout(60000);// 2. 配置Kafka参数 String kafkaBootstrapServers = "172.30.244.152:9092"; // Kafka Broker地址 String topic = "test_topic"; // 目标主题 String consumerGroup = "flink-consumer-group"; // 消费者组IDLOG.info("Connecting to Kafka at " + kafkaBootstrapServers); LOG.info("Consuming topic: " + topic); LOG.info("Consumer group: " + consumerGroup);// 3. 定义Kafka Source(新版API) KafkaSource`<String>` kafkaSourceDemo = KafkaSource.`<String>`builder() .setBootstrapServers(kafkaBootstrapServers) // Kafka Broker地址 .setTopics(topic) // 订阅的主题 .setGroupId(consumerGroup) // 消费者组 .setProperty("enable.auto.commit", "true") .setProperty("auto.commit.interval.ms", "1000") .setProperty("session.timeout.ms", "30000") .setProperty("retry.backoff.ms", "1000") .setProperty("reconnect.backoff.max.ms", "10000") .setDeserializer(new KafkaRecordDeserializationSchema `<String>`() { @Override public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector `<String>` out) throws IOException { // 从ConsumerRecord中提取值(字节数组),并转为字符串 String value = new String(record.value(), StandardCharsets.UTF_8); LOG.info("Received message: " + value); out.collect(value); // 将反序列化后的数据收集到Flink流中 }@Override public TypeInformation`<String>` getProducedType() { return TypeInformation.of(String.class); } }) // 从最早偏移量开始消费(这样即使没有新消息也会读取历史数据) .setStartingOffsets(OffsetsInitializer.earliest()) .build();// 4. 将Kafka Source添加到Flink流环境,并处理数据 DataStream`<String>` kafkaStream = env.fromSource( kafkaSourceDemo, WatermarkStrategy.noWatermarks(), // 无水印(适用于无序数据场景) "Kafka Source" // Source名称(用于监控) );LOG.info("Kafka source created successfully");// 5. 处理数据:将每条数据打印到日志(实际生产中可替换为写入数据库、消息队列等) kafkaStream.print("KafkaData"); LOG.info("Flink Kafka Source Demo started."); // 6. 触发任务执行 env.execute("Flink Kafka Source Demo");} }
在WSL2的Ubuntu 环境中安装Kafka。
下载Kafka二进制包
访问Apache Kafka官网,选择最新稳定版(如3.9.0),使用wget下载:
wget https://mirrors.aliyun.com/apache/kafka/3.9.0/kafka_2.12-3.9.0.tgz
解压并配置环境变量
# 解压到/opt/kafka(全局可访问)sudo mkdir -p /opt/kafkatar -zxvf kafka_2.12-3.9.0.tgz -C /opt/kafka --strip-components=1# 永久生效(编辑~/.bashrc)echo 'export KAFKA_HOME=/opt/kafka' >> /etc/profileecho 'export PATH=$KAFKA_HOME/bin:$PATH' >> /etc/profilesource /etc/profile
Kafka的核心配置文件位于$KAFKA_HOME/config目录,需修改以下两个文件:
配置Kafka Broker(server.properties)
修改以下关键参数以适配WSL2环境:
# ==================== 核心角色与ID配置 ==================== # 启用KRaft模式(默认已启用) # 单节点同时担任Broker和控制器 process.roles=broker,controller # 节点唯一ID(单节点必须设为0) node.id=0 # 控制器ID(与node.id一致,单节点唯一) controller.id=0 # ==================== 监听端口配置 ==================== # 全局监听端口(客户端读写请求)和控制器监听端口 # 多个监听器使用逗号分隔,每个监听器都需要指定安全协议 listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 # 对外暴露的地址(Windows主机通过localhost访问) # 多个公布的监听器使用逗号分隔 advertised.listeners=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093 # 指定CONTROLLER监听器的安全协议 listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT # 定义控制器监听器的名称(KRaft模式必需) controller.listener.names=CONTROLLER # ==================== ZooKeeper兼容配置(可选) ==================== # 若需兼容旧客户端,可保留ZooKeeper配置(但KRaft模式无需ZooKeeper) # zookeeper.connect=localhost:2181 # ==================== 日志与分区配置 ==================== # 数据存储目录配置(Kafka的核心配置参数) # Kafka将主题数据、索引文件等存储在该目录下 log.dirs=/opt/kafka/data num.partitions=1 # 副本数(单节点设为1) default.replication.factor=1 # 最小同步副本数(单节点设为1) min.insync.replicas=1 # ==================== 日志存储高级配置 ==================== # 日志保留时间(默认7天,生产环境根据存储容量和需求调整) # log.retention.hours=168 # 或按大小限制保留(单位:字节) # log.retention.bytes=107374182400 # 100GB # 单个分区日志段大小(默认1GB,可根据实际需求调整) # log.segment.bytes=1073741824 # 日志段检查和清理的时间间隔(默认300000ms=5分钟) # log.retention.check.interval.ms=300000 # 控制是否自动创建主题(生产环境建议禁用,改为手动创建) # auto.create.topics.enable=false # ==================== 控制器引导配置 ==================== # 控制器引导服务器(单节点指向自己,格式:host:port) # 与控制器监听端口一致 controller.quorum.bootstrap.servers=localhost:9093 # 控制器投票者配置(单节点设为0@localhost:9093) controller.quorum.voters=0@localhost:9093
在KRaft模式下,需要先初始化元数据存储:
# 生成集群ID并保存到变量CLUSTER_ID=$($KAFKA_HOME/bin/kafka-storage.sh random-uuid)echo "生成的集群ID: $CLUSTER_ID"# 使用生成的集群ID格式化存储目录$KAFKA_HOME/bin/kafka-storage.sh format -t $CLUSTER_ID -c $KAFKA_HOME/config/server.properties
注意: 如果手动运行命令,请确保先执行生成集群ID的命令,然后使用实际生成的ID替换"$CLUSTER_ID"。
# 启动Broker(日志输出到$KAFKA_HOME/logs/server.log) $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
检查Kafka Broker进程: ps -ef | grep kafka # 应看到Kafka进程
确保Kafka服务已启动,并创建测试主题 test_topic
:
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test_topic
使用Kafka内置的生产者工具发送测试消息到 test_topic
:
# 启动Kafka生产者控制台 $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic # 输入几条测试消息(每行一条) > hello flink > flink kafka integration > real-time data processing
在IDE中直接运行 KafkaSourceDemo
类的 main
方法,或通过Gradle构建并运行:
# 构建项目 ./gradlew clean build # 运行Flink作业 ./gradlew run
成功运行后,你应该能在控制台看到类似如下输出:
在生产环境中,为了确保数据一致性,需要配置Flink的检查点机制和Kafka偏移量提交策略:
// 1. 启用检查点 env.enableCheckpointing(5000); // 每5秒做一次检查点 // 2. 获取检查点配置对象(Flink 1.20.1及以上版本推荐方式) CheckpointConfig checkpointConfig = env.getCheckpointConfig(); // 3. 配置检查点模式为EXACTLY_ONCE(精确一次语义) checkpointConfig.setMode(CheckpointingMode.EXACTLY_ONCE); // 4. 设置检查点超时时间 checkpointConfig.setCheckpointTimeout(Duration.ofSeconds(60)); // 4. 配置从上次提交的偏移量继续消费(生产环境推荐) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
合理设置并行度可充分利用集群资源并提高吞吐量:
// 设置Flink作业的全局并行度 env.setParallelism(3); // 与Kafka主题分区数匹配 // 或单独设置Source的并行度 KafkaSource`<String>` kafkaSource = KafkaSource.`<String>`builder() // ... 其他配置 ... .build(); DataStream`<String>` stream = env.fromSource( kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source") .setParallelism(3); // Source并行度
除了基础的字符串反序列化,还可以使用更灵活的反序列化方式:
// 使用Flink提供的String反序列化器 .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
如果Kafka消息是JSON格式,可以使用Jackson等库将其反序列化为POJO对象:
public class User { private String id; private String name; private int age; // getters, setters, constructors... } // 自定义POJO反序列化器 .setDeserializer(new KafkaRecordDeserializationSchema`<User>`() { private final ObjectMapper mapper = new ObjectMapper(); @Override public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector`<User>` out) throws IOException { User user = mapper.readValue(record.value(), User.class); out.collect(user); } @Override public TypeInformation`<User>` getProducedType() { return TypeInformation.of(User.class); } })
问题现象:程序启动后报 org.apache.kafka.common.errors.TimeoutException
解决方案:
ps -ef | grep kafka
bootstrap.servers
配置正确,特别是在WSL2环境中确保端口映射正确问题现象:部分Kafka消息未被Flink消费
解决方案:
setStartingOffsets
配置正确,生产环境建议使用 OffsetsInitializer.committedOffsets()
对于高吞吐量场景,可以通过以下方式优化性能:
fetch.max.bytes
和 max.partition.fetch.bytes
参数,增加单次拉取的数据量setUnboundedUsePreviousEventTimeWatermark()
优化水印生成本文详细介绍了如何使用Flink从Kafka读取数据,包括环境准备、代码实现、运行测试以及进阶配置。通过本文的学习,你应该能够掌握Flink数据源的核心用法,为构建企业级实时数据处理应用打下坚实基础。
在实际应用中,Flink还支持多种其他数据源,如:
SourceFunction
接口)后续文章将继续深入探讨Flink的数据转换、窗口计算、状态管理等核心概念,敬请关注!
543M · 2025-09-28
76.3M · 2025-09-28
241M · 2025-09-28