Flink 入门笔记(1)
好友投稿,整理自慕课网及同事分享
初识Flink
- 什么是Flink:基于有界和无界数据流的有状态的计算,分布式计算引擎
- 无界数据流:有开始没有结尾,流处理实时处理
- 有界数据流:有开始有结尾,用批处理处理有界数据流
- Flink提供的三个层级的API:
- SQL/Table API(使用sql语句)
- DataStream API
- ProcessFunction
- Flink运行模式:YARN、Mesos、K8S
- Flink、Spark Streaming、Storm:Spark Streaming以批处理为主,流式处理是批处理的一个特例(将流数据拆成mini batch进行计算),微批处理框架;Flink:以流式为主,批处理是流式处理的一个特例;Storm:流式处理,仅支持流
- 使用场景:事件驱动应用、数据分析应用、数据管道应用
- 什么是Flink:基于有界和无界数据流的有状态的计算,分布式计算引擎
应用场景:实时数仓、etl 数据清洗、特征处理等。
四大基础部分:
- checkpoint;
- state(如点击次数等)
- time(处理时间等)
- window(包括自带窗口和自定义窗口)
DataSet API编程(批处理)
- flink 中使用缓冲块进行批处理,如果超时时间设置为 0,就会变成流处理方式。
DataStream API编程(流处理)
- 流处理:实时地处理一些实时数据、实时地产生数据的合集
- Flink应用程序开发流程(编程模型):
- set up execution environment
- read
- transform operations
- sink
- execute
- Word Count:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public static void main(String[] agrs) throws Exception{
// 获取环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// read
DataStreamSource<String> data = env.socketTextStream("localhost", 9999);
// transfer
dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] tokens = value.split(",");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);
env.execute("job");
}- 自定义数据源、转换函数(map、filter、split等iterators)、自定义sink(实现SinkFunction)
Flink Table & SQL
1
2
3
4
5
6
7
8
9
10
11
12def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val path = "file://..."
val csv = env.readCsvFile[SalesLog](path, ignoreFirstLine = true)
csv.print()
val salesTable = tableEnv.fromDataSet(csv)
tableEnv.registerTable("sales", salesTable)
val resultTable = tableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId")
tableEnv.toDataSet[Row](resultTable).print()
}
case class SalesLog(transactionId: Int, customerId: Int, itemId: Int, amountPaid: Double)Window和Time操作
- Event Time
- Ingestion Time
- Processing Time
- Window:滑动窗口(数据可能会重叠)、滚动窗口(数据不会重叠)、会话窗口
Flink Connectors
- Kafka(source/sink):FlinkKafkaConsumer和FlinkKafkaProducer
- ElasticSearch(sink)
- HDFS(sink)
Flink部署及作业提交
- 单机部署:下载Flink源码进行编译:mvn clean install -DskipTests -Dhadoop.version=2.6.1
- Standalone
- on YARN
Flink监控及调优
状态容错, 无感知从失败中恢复:
- flink 使用单数据源时,使用 checkpoint 流转所有算子,然后记录状态(即 barrier),用来充当作业的恢复机制(flink 自动进行)。当出现 failed 时,会把整个 jobmanager 停掉,然后一次性恢复到 checkpoint 处。
- flink 使用多数据源时,需要 barrier 对齐,可能出现延迟或数据丢失,可以选择关闭 barrier 对齐。
- flink 使用savepoints,是通过用户手动配置的,充当手动备份的作用。
窗口计算:窗口内的消息的计算。
窗口:
- session window:如果出现 session gap 时间没有新的到达的情况,那么就触发上一个窗口的计算
- tumbling count window:满足多少元素数量,就可以触发计算;
- sliding time window
- tumbling time window:可能存在数据重叠,比如每隔 3 秒计算 5 秒的数据,有数据重叠需求时采用此模型。
- raw stream data
water mark:(blog.csdn.net/lmalds/article/details/52704170)
- window触发的条件:watermark时间 >= window_end_time;在[window_start_time,window_end_time]中有数据存在
- flink处理乱序:watermark+window机制
- 默认配置好的,断言性质的(认为比 watermarks 小的都已经排好序了)。一般 5s 发一个,触发窗口计算,然后做排序。
其他:flink 实现了与 kafka 的内部连接,可以做到对 kafka 的幂等写。
water mark、本地跑flink、产品、es