Fork me on GitHub

Flink 入门笔记(1)

Flink 入门笔记(1)

好友投稿,整理自慕课网及同事分享


  • 初识Flink

    • 什么是Flink:基于有界和无界数据流的有状态的计算,分布式计算引擎
      • 无界数据流:有开始没有结尾,流处理实时处理
      • 有界数据流:有开始有结尾,用批处理处理有界数据流
    • Flink提供的三个层级的API:
      • SQL/Table API(使用sql语句)
      • DataStream API
      • ProcessFunction
    • Flink运行模式:YARN、Mesos、K8S
    • Flink、Spark Streaming、Storm:Spark Streaming以批处理为主,流式处理是批处理的一个特例(将流数据拆成mini batch进行计算),微批处理框架;Flink:以流式为主,批处理是流式处理的一个特例;Storm:流式处理,仅支持流
    • 使用场景:事件驱动应用、数据分析应用、数据管道应用
  • 应用场景:实时数仓、etl 数据清洗、特征处理等。

  • 四大基础部分:

    • checkpoint;
    • state(如点击次数等)
    • time(处理时间等)
    • window(包括自带窗口和自定义窗口)
  • DataSet API编程(批处理)

    • flink 中使用缓冲块进行批处理,如果超时时间设置为 0,就会变成流处理方式。
  • DataStream API编程(流处理)

    • 流处理:实时地处理一些实时数据、实时地产生数据的合集
    • Flink应用程序开发流程(编程模型):
      • set up execution environment
      • read
      • transform operations
      • sink
      • execute
    • Word Count:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public static void main(String[] agrs) throws Exception{
    // 获取环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // read
    DataStreamSource<String> data = env.socketTextStream("localhost", 9999);
    // transfer
    dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
    String[] tokens = value.split(",");
    for (String token : tokens) {
    if (token.length() > 0) {
    out.collect(new Tuple2<>(token, 1));
    }
    }
    }
    }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);
    env.execute("job");
    }
    • 自定义数据源、转换函数(map、filter、split等iterators)、自定义sink(实现SinkFunction)
  • Flink Table & SQL

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    val path = "file://..."
    val csv = env.readCsvFile[SalesLog](path, ignoreFirstLine = true)
    csv.print()
    val salesTable = tableEnv.fromDataSet(csv)
    tableEnv.registerTable("sales", salesTable)
    val resultTable = tableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId")
    tableEnv.toDataSet[Row](resultTable).print()
    }
    case class SalesLog(transactionId: Int, customerId: Int, itemId: Int, amountPaid: Double)
  • Window和Time操作

    • Event Time
    • Ingestion Time
    • Processing Time
    • Window:滑动窗口(数据可能会重叠)、滚动窗口(数据不会重叠)、会话窗口
  • Flink Connectors

    • Kafka(source/sink):FlinkKafkaConsumer和FlinkKafkaProducer
    • ElasticSearch(sink)
    • HDFS(sink)
  • Flink部署及作业提交

    • 单机部署:下载Flink源码进行编译:mvn clean install -DskipTests -Dhadoop.version=2.6.1
    • Standalone
    • on YARN
  • Flink监控及调优

  • 状态容错, 无感知从失败中恢复:

    1. flink 使用单数据源时,使用 checkpoint 流转所有算子,然后记录状态(即 barrier),用来充当作业的恢复机制(flink 自动进行)。当出现 failed 时,会把整个 jobmanager 停掉,然后一次性恢复到 checkpoint 处。
    2. flink 使用多数据源时,需要 barrier 对齐,可能出现延迟或数据丢失,可以选择关闭 barrier 对齐。
    3. flink 使用savepoints,是通过用户手动配置的,充当手动备份的作用。
  • 窗口计算:窗口内的消息的计算。

  • 窗口:

    • session window:如果出现 session gap 时间没有新的到达的情况,那么就触发上一个窗口的计算
    • tumbling count window:满足多少元素数量,就可以触发计算;
    • sliding time window
    • tumbling time window:可能存在数据重叠,比如每隔 3 秒计算 5 秒的数据,有数据重叠需求时采用此模型。
    • raw stream data
  • water mark:(blog.csdn.net/lmalds/article/details/52704170)

    • window触发的条件:watermark时间 >= window_end_time;在[window_start_time,window_end_time]中有数据存在
    • flink处理乱序:watermark+window机制
    • 默认配置好的,断言性质的(认为比 watermarks 小的都已经排好序了)。一般 5s 发一个,触发窗口计算,然后做排序。

其他:flink 实现了与 kafka 的内部连接,可以做到对 kafka 的幂等写。

water mark、本地跑flink、产品、es

-------------The End-------------