明日方舟苹果版
3.7G · 2025-10-14
在大数据处理领域,批处理和流处理曾经被视为两种截然不同的范式。然而,随着Apache Flink的出现,这种界限正在逐渐模糊。Flink的一个核心特性是其批流一体的架构设计,允许用户使用统一的API和执行引擎处理有界数据(批处理)和无界数据(流处理)。本文将深入探讨Flink的执行模式(Execution Mode),特别是在Flink 1.20.1版本中对批处理和流处理模式的支持和优化。
Flink的执行模式决定了作业如何被调度和执行。在Flink 1.12及以后的版本中,引入了统一的流批处理执行模式,主要包括以下三种模式:
这三种模式的引入使得Flink能够在同一套API上提供最佳的批处理和流处理性能。
Flink的执行模式经历了以下几个关键阶段:
虽然Flink使用相同的API和代码结构,但BATCH和STREAMING模式在内部执行方式上存在显著差异:
特性 | STREAMING模式 | BATCH模式 |
---|---|---|
调度策略 | 连续流式调度 | 批处理调度,类似于MapReduce |
资源利用 | 持续占用资源 | 任务完成后释放资源 |
优化技术 | 流式优化 | 批处理优化,如查询优化、物化视图 |
处理延迟 | 毫秒级延迟 | 较高延迟,但吞吐量更大 |
适用场景 | 实时数据处理 | 离线数据分析 |
Flink的批流一体架构基于以下核心理念:
首先,确保你已经设置了正确的依赖:
dependencies { // Flink核心依赖 implementation 'org.apache.flink:flink_core:1.20.1' implementation 'org.apache.flink:flink-streaming-java:1.20.1' implementation 'org.apache.flink:flink-clients:1.20.1' implementation 'org.apache.flink:flink-connector-files:1.20.1' implementation 'org.apache.flink:flink-connector-kafka:3.4.0-1.20'}
在Flink 1.20.1中,可以通过以下方式设置执行模式:
import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class ExecutionModeExample { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置执行模式为BATCH env.setRuntimeMode(RuntimeExecutionMode.BATCH); // 或者设置为STREAMING // env.setRuntimeMode(RuntimeExecutionMode.STREAMING); // 或者设置为AUTOMATIC(根据数据源自动选择) // env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // 后续代码... }}
也可以通过命令行参数覆盖代码中的设置:
bin/flink run -Dexecution.runtime-mode=BATCH -c com.example.ExecutionModeExample your-jar-file.jar
以下是使用BATCH模式处理文件数据的完整示例:
package com.cn.daimajiangxin.flink;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.connector.file.src.FileSource;import org.apache.flink.connector.file.src.reader.StreamFormat;import org.apache.flink.connector.file.src.reader.TextLineInputFormat;import org.apache.flink.core.fs.Path;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import java.time.Duration;import java.util.Arrays;public class BatchWordCount { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 明确设置为批处理模式 env.setRuntimeMode(RuntimeExecutionMode.BATCH); // 从文件读取数据(有界数据源) String inputPath = "path\flink-learning\data\input.txt"; // 1. 创建文件源构建器 Path filePath = new Path(inputPath); // 2. 配置文件读取格式 StreamFormat<String> format =new TextLineInputFormat("UTF-8"); // 3. 构建 FileSource FileSource<String> fileSource = FileSource .forRecordStreamFormat(format, filePath) .build(); // 4. 添加 Watermark 策略(批处理中可使用默认策略) WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(10)); DataStream<String> text = env.fromSource(fileSource,watermarkStrategy,"FileSource"); // 数据处理逻辑 DataStream<Tuple2<String, Integer>> counts = text .flatMap(new Tokenizer()) .keyBy(value -> value.f0) .sum(1); // 输出结果 counts.print(); // 执行作业 env.execute("Batch Word Count"); } public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // 分词并为每个单词生成(word, 1)的元组 Arrays.stream(value.toLowerCase().split("\W+")) .filter(word -> word.length() > 0) .forEach(word -> out.collect(new Tuple2<>(word, 1))); } }}
以下是使用STREAMING模式处理Kafka数据流的示例:
package com.cn.daimajiangxin.flink;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.connector.kafka.source.KafkaSource;import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import java.time.Duration;import java.util.Arrays;public class StreamingWordCount { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 明确设置为流处理模式 env.setRuntimeMode(RuntimeExecutionMode.STREAMING); // 启用检查点 env.enableCheckpointing(5000); // 创建Kafka源(无界数据源) KafkaSource<String> source = KafkaSource.<String> builder() .setBootstrapServers("localhost:9092") .setTopics("word-count-topic") .setGroupId("flink-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); // 从Kafka读取数据 DataStream<String> text = env.fromSource( source, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)), "Kafka Source" ); // 数据处理逻辑 DataStream<Tuple2<String, Integer>> counts = text .flatMap(new Tokenizer()) .keyBy(value -> value.f0) .sum(1); // 输出结果 counts.print(); // 执行作业 env.execute("Streaming Word Count"); } public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { Arrays.stream(value.toLowerCase().split("\W+")) .filter(word -> word.length() > 0) .forEach(word -> out.collect(new Tuple2<>(word, 1))); } }}
AUTOMATIC模式是Flink 1.20.1中的一个强大特性,它能够根据作业的数据源类型自动选择最合适的执行模式:
// 设置为自动模式env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
在某些复杂场景下,AUTOMATIC模式的选择可能不完全符合预期:
BATCH模式针对有界数据处理提供了多项性能优化:
使用相同的WordCount逻辑,分别在BATCH和STREAMING模式下处理1GB文本数据的性能对比:
模式 | 执行时间 | CPU使用率 | 内存消耗 |
---|---|---|---|
STREAMING | 38秒 | 稳定在70% | 2.4GB |
BATCH | 22秒 | 峰值95%,完成后释放 | 1.8GB |
Flink 1.20.1在执行模式方面带来了多项改进:
在使用Flink 1.20.1的执行模式时,需要注意以下兼容性问题:
场景 | 推荐模式 | 原因 |
---|---|---|
离线数据处理 | BATCH | 性能更好,资源利用率更高 |
实时数据处理 | STREAMING | 低延迟,持续处理能力 |
ETL作业 | BATCH | 更适合处理有界数据集 |
实时分析 | STREAMING | 满足实时性要求 |
不确定数据源类型 | AUTOMATIC | 自动适配不同数据源 |
在实际项目中,可以采用以下策略来管理执行模式:
Flink的批流一体执行模式是大数据处理领域的一次重要创新,它消除了批处理和流处理之间的界限,为开发者提供了统一、灵活的编程模型。通过Execution Mode的合理选择和配置,我们可以在不同场景下获得最佳的性能表现。
随着Flink 1.20.1的发布,批流一体架构进一步成熟,执行模式的自动选择更加智能,性能优化更加到位。未来,Flink将继续完善其批流一体架构,为大数据处理提供更加强大和灵活的解决方案。
通过本文的学习,相信你已经对Flink的执行模式有了深入的理解。在实际应用中,建议根据具体的数据特征和处理需求,选择合适的执行模式,充分发挥Flink批流一体的优势。
原文来自:http://blog.daimajiangxin.com.cn
源码地址:https://gitee.com/daimajiangxin/flink-learning