自动生成flinksink,自动生成目录怎么设置

http://www.itjxue.com  2023-01-20 08:42  来源:未知  点击次数: 

4.一文搞定:Flink与Kafka之间的精准一次性

在上一篇文章当中,也算是比较详细且通俗的聊了聊Flink是如何通过checkpoint机制来完成数据精准一次性的实现的。并且也在上一章的结尾表示,要在接下来聊一聊Flink与它的铁哥们Kafaka之间,是如何实现数据的精准一次性消费的。

本次的聊法,还是要通过以kafka(source)-Flink,Flink(source)-Kafka来分别展开讨论。

kafka是一个具有数据保存、数据回放能力的消息队列,说白了就是kafka中的每一个数据,都有一个专门的标记作为标识。而在Flink消费kafka传入的数据的时候,source任务就能够将这个偏移量以算子状态的角色进行保存,写入到设定好的检查点中。这样一旦发生故障,Flink中的FlinkKafkaProduce连接器就i能够按照自己保存的偏移量,自己去Kafka中重新拉取数据,也正是通过这种方式,就能够确保Kafka到Flink之间的精准一次性。

在上一篇文章当中,已经表明了,如果想要让输出端能够进行精准一次性消费,就需要使用到幂等性或者是事务。而事务中的两阶段提交是所有方案里面最好的实现。

其实Flink到Kafak之间也是采用了这种方式,具体的可以看一下ctrl进到FlinkKafkaProduce连接器内部去看一看:

这也就表明了,当数据通过Flink发送给sink端Kafka的时候,是经历了两个阶段的处理的。第一阶段就是Flink向Kafka中插入数据,进入预提交阶段。当JobManager发送的Ckeckpoint保存成功信号过来之后,才会提交事务进行正式的数据发送,也就是让原来不可用的数据可以被使用了。

这个实现过程到目前阶段就很清晰了,它的主体流程无非就是在开启检查点之后,由JobManager向各个阶段的处理逻辑发送有关于检查点的barrier。所有的计算任务接收到之后,就会根据自己当前的状态做一个检查点保存。而当这个barrier来到sink任务的时候,sink就会开启一个事务,然后通过这个事务向外预写数据。直到Jobmanager来告诉它这一次的检查点已经保存完成了,sink就会进行第二次提交,数据也就算是成功写出了。

1.必须要保证检查点被打开了,如果检查点没有打开,那么之前说的一切话都是空谈。因为Flink默认检查点是关着的。

2.在FlinkKafakProducer连接器的构造函数中要传入参数,这个参数就是用来保证状态一致性的。就是在构造函数的最后一个参数输入如下:

3.配置Kafka读取数据的隔离级别

在kafka中有个配置,这个配置用来管理Kafka读取数据的级别。而这个配置默认是能够读取预提交阶段的数据的,所以如果你没改这个配置,那两阶段提交的第一阶段就是白费了。所以需要改一下这个配置,来更换一下隔离级别:

4.事务超时时间

这个配置也很有意思,大家试想一下。如果要进行两阶段提交,就要保证sink端支持事务,Kafka是支持事务的,但是像这个组件对于很多机制都有一个超时时间的概念,也就是说如果时间到了这个界限还没完成工作,那就会默认这个工作失败。Kafka中由这个概念,Flink中同样由这个概念。但是flink默认超时时间是1小时,而Kafka默认是15分钟,这就有可能出现检查点保存东西的时间大于15分钟,假如说是16分钟保存完成然后给sink发送检查点保存陈功可以提交事务的信号,但是这个时候Kafka已经认为事务失败,把之前的数据都扔了。那数据不就是丢失了么。所以说Kafka的超时时间要大于Flink的超时时间才好。

截止到目前为止,基本上把有关于状态维护的一些东西都说完了,有状态后端、有检查点。还通过检查点完成可端到端的数据精准一次性消费。但是想到这我又感觉,如果有学习进度比我差一些的,万一没办法很好的理解怎么办。所以在下一篇文章当中我就聊聊Flink中的“状态”到底是个什么东西,都有什么类型,都怎么去用。

基于Flink的实时计算平台的构建

一、系统架构

1. 接入层

Canal、Flume、Kafka

针对业务系统数据,Canal监控Binlog日志,发送至kafka;

针对日志数据,由Flume来进行统一收集,并发送至kafka。

消息队列的数据既是离线数仓的原始数据,也是实时计算的原始数据,这样可以保证实时和离线的原始数据是统一的。

2. 计算层

Flink

有了源数据,在 计算层 经过Flink实时计算引擎做一些加工处理,然后落地到存储层中不同存储介质当中。

3. 存储层

HBase、Kafka、ES、Mysql、Hive、Redis

不同的 存储介质 是通过不同的应用场景来选择。

4. 数据应用层

风控、模型、图谱、大屏展示

通过存储层应用于不同的 数据应用 ,数据应用可能是我们的正式产品或者直接的业务系统

二、技术实现

1. 计算引擎

实时计算引擎的功能要求

提供高级 API,支持常见的数据操作比如关联聚合,最好是能支持 SQL

具有状态管理和自动支持久化方案,减少对存储的依赖

可靠的容错机制,低延时,最好能够保证Exactly-once

Flink的优势

Flink的API、容错机制与状态管理都满足实时数仓计算引擎的需求

Flink高吞吐、低延时的特性

端到端的Exactly-once

WaterMarkEvent Time的支持

Flink 不仅支持了大量常用的 SQL 语句,还有丰富的数据类型、内置函数以及灵活的自定义函数,基本覆盖了我们的开发场景

2. 存储引擎

根据不同的业务场景,使用最适合的存储引擎:

Kafka主要用于中间数据表的存储

ES主要针对日志数据的存储和分析

HBase、Redis可用于维表存储

Hive用于数据校验

Mysql可以用于指标计算结果的存储

三、数据分层

数据源:目前数据源主要是Binlog,通过Canal监控各个业务系统的Mysql,将binlog发送至kafka。

ODS层:主要将Binlog数据存储至Kafka,这一层不对数据进行任何操作,存储最原始的数据,Binlog 日志在这一层为库级别,即:一个库的变更数据存放在同一个 Kafka Topic 中。

DWD层:主要对数据进行简单的清洗。拆分主题,将库级别的主题拆分为表级别;打平数据,将data数组格式打平。

DWS层:主要根据不同的业务的需求,将该需求所涉及到的表进行join所得。

APP层:根据指标计算需求,对数据进行处理后,存储HBase,为了方便模型查询,主要将表存储为索引表和明细表,直接对数据进行指标计算后,将计算结果存储到HBase。

四、数据监控及校验

1. 数据监控

目前数据的监控的架构是pushgateway + Prometheus + Grafana

数据监控主要是接入Flink的Metric,通过Grafana对Flink系统指标及自定义指标进行图形化界面的展示,对关键指标进行监控报警

2. 数据校验

目前数据的监控的架构是Grafana + Mysql

Grafana用于监控指标的展示及相关阈值数据的报警,Mysql主要用于监控数据的存储

将每个服务的source收到的数据、sink发出的数据,根据表的不同将数据关键字段写入mysql中,通过统计各个阶段各个表中的数据条数,对数据完整性进行监控校验,若出现数据缺时,先查找原因,然后指定时间戳重启服务

五、系统管理

元数据管理

表,字段元数据管理,实时感知元数据的变化,大幅度降低使用数据的成本。

系统配置

对应用启动参数及相关配置参数的管理,对任务进行灵活配置及管理。

血缘管理

主要是梳理实时计算平台中数据依赖关系,以及实时任务的依赖关系,从底层ODS到DWD再到DWS,以及APP层用到哪些数据,将整个链度串联起来。

六、问题及解决方案

1. 数据倾斜

由于要拆分主题,要以table为key对数据进行keyBy,但是由于每个表的数据量相差较大,会出现数据倾斜

解决方案:

加盐,给key加前缀

前缀不能随便加,为了保证同一id的数据在相同的分区中,所以根据id_table进行keyBy

2. 数据重复

任务在进行自动或手动重启时,为了保证数据不丢失,数据会出现重复计算的问题,如果下游只是对数据进行HBase存储的话,由于幂等性,这种重复可以解。但是,如果下游要对数据进行聚合,这样会导致数据被计算多次,影响计算结果的准确性

解决方案:

上游在对数据进行发送时,对kafka producer 进行 exactly once的设置

在对数据统计时进行数据去重

3. 数据延时

由于所处理的数据表的大小不一样,处理大表时,会出现数据延时的问题。

解决方案:

针对大表数据增加并行度

4.数据乱序

由于Flink kafka producer默认是根据hash对数据进行随机分区,kafka consumer在对数据进行消费时,每个分区消费速度不同,这样最终在存储数据时,就会出现乱序即相同的id会出现老数据覆盖新数据的问题

解决方案:

对kafka每个阶段进行自定义分区,将id相同的数据分到同一个分区,保证同一id的数据的有序性

由于整个数据处理过程中可能会出现shuffle,导数数据重新乱序,所以在对数据存储前对数据进行排序

对数据进行排序的关键点时要保证每条数据的唯一性,即要有标记数据先后顺序的字段

5 . 数据唯一标记(很重要)

由于要对数据进行去重或者排序,所以要保证数据的唯一性

解决办法:

使用时间戳不可以,因为数据量很大的情况下,同一时间会处理上百条数据

在最初发出数据的时候,为数据打上标记,使用 partition + offset + idx 的组合来确认数据的唯一性及顺序性

6. 数据可靠性

我们对服务重启或对服务升级时,可能会出现数据的丢失

解决方案:

结合Flink 的checkpoint及savepoint机制保证数据的可靠性

开启Flink的checkpoint机制,服务进行自动重启时,会自动读取上次保存在checkpoint中offset,或者我们指定offset进行数据消费

对服务进行升级时,先将服务的状态保存至savepoint中,重启时指定savepoint进行服务启动,保证数据不丢失

7. 无感升级

由于我们目前数据量比较庞大,且在对服务进行升级时,耗时较长,会影响调用方的使用。

解决办法:

在对服务进行升级时,将数据写入备用库,等数据追上且服务稳定运行后,再将存储库进行切换

Flink的检查点和恢复机制定期的会保存应用程序状态的一致性检查点。在故障的情况下,应用程序的状态将会从最近一次完成的检查点恢复,并继续处理。尽管如此,可以使用检查点来重置应用程序的状态无法完全达到令人满意的一致性保证。相反,source和sink的连接器需要和Flink的检查点和恢复机制进行集成才能提供有意义的一致性保证。

对于流处理器内部来说,所谓的状态一致性,其实就是我们所说的计算结果要保证准确。 一条数据不应该丢失,也不应该重复计算 在遇到故障时可以恢复状态,恢复以后的重新计算,结果应该也是完全正确的。

Flink的 checkpoint机制和故障恢复机制给Flink内部提供了精确一次的保证,需要注意的是,所谓精确一次并不是说精确到每个event只执行一次,而是每个event对状态(计算结果)的影响只有一次。

目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在 Flink 流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如 Kafka)和输出到持久化系统

端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性

不同Source 和Sink的一致性保证

整个端到端的一致性级别取决于所有组件中一致性最弱的组件

Fink的检查点和恢复机制和可以重置读位置的source连接器结合使用,可以保证应用程序不会丢失任何数据。尽管如此,应用程序可能会发出两次计算结果,因为从上一次检查点恢复的应用程序所计算的结果将会被重新发送一次(一些结果已经发送出去了,这时任务故障,然后从上一次检查点恢复,这些结果将被重新计算一次然后发送出去)。所以,可重置读位置的source和Flink的恢复机制不足以提供端到端的恰好处理一次语义,即使应用程序的状态是恰好处理一次一致性级别。

端到端恰好处理一次语义一致性的应用程序需要特殊的sink连接器。sink连接器可以在不同的情况下使用两种技术来达到恰好处理一次一致性语义: 幂等性写入和事务性写入 。

所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了

必须保证在从检查点恢复以后,它将会覆盖之前已经写入的结果。

从Flink程序sink到的key-value存储中读取数据的应用,在Flink从检查点恢复的过程中,可能会看到不想看到的结果。当重播开始时,之前已经发出的计算结果可能会被更早的结果所覆盖(因为在恢复过程中)。所以,一个消费Flink程序输出数据的应用,可能会观察到时间回退,例如读到了比之前小的计数。

构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中

事务性的方法将不会遭受幂等性写入所遭受的重播不一致的问题。但是,事务性写入却带来了延迟,因为只有在检查点完成以后,我们才能看到计算结果。

Flink提供了两种构建模块来实现事务性sink连接器:write-ahead-log( WAL ,预写式日志)sink和 两阶段提交sink 。

把结果数据先当成状态保存,然后在收到 checkpoint 完成的通知时,一次性写入 sink 系统

简单易于实现,由于数据提前在状态后端中做了缓存,所以无论什么 sink 系统,都能用这种方式一批搞定

DataStream API 提供了一个模板类:GenericWriteAheadSink,来实现这种事务性 sink

对于每个 checkpoint,sink 任务会启动一个事务,并将接下来所有接收的数据添加到事务里

然后将这些数据写入外部 sink 系统,但不提交它们 —— 这时只是“预提交”

当它收到 checkpoint 完成的通知时,它才正式提交事务,实现结果的真正写入

这种方式真正实现了 exactly-once,它需要一个提供事务支持的外部 sink 系统。

Flink 提供了 TwoPhaseCommitSinkFunction 接口。

外部 sink 系统必须提供事务支持,或者 sink 任务必须能够模拟外部系统上的事务

在 checkpoint 的间隔期间里,必须能够开启一个事务并接受数据写入

在收到 checkpoint 完成的通知之前,事务必须是“等待提交”的状态。在故障恢复的情况下,这可能需要一些时间。如果这个时候sink系统关闭事务(例如超时了),那么未提交的数据就会丢失

sink 任务必须能够在进程失败后恢复事务

提交事务必须是 幂等操作

使用flink+kafka来实现一个端对端一致性保证,source - transform - sink

图解Exactly-Once 两阶段提交

Exactly-once 两阶段提交1:

JobManager 协调各个 TaskManager 进行 checkpoint 存储 checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存

Exactly-once 两阶段提交2:

当开启了checkpoint ,JobManager 会将检查点分界线(barrier)注入数据流 barrier会在算子间传递下去

每个算子会对当前的状态做个快照,保存到状态后端

checkpoint 机制可以保证内部的状态一致性

每个内部的 transform 任务遇到 barrier 时,都会把状态存到 checkpoint 里

sink 任务首先把数据写入外部 kafka,这些数据都属于预提交的事务;

遇到 barrier 时,把状态保存到状态后端,并开启新的预提交事务

当所有算子任务的快照完成,也就是这次的 checkpoint 完成时,JobManager 会向所有任务发通知,确认这次 checkpoint 完成

sink 任务收到确认通知,正式提交之前的事务,kafka 中未确认数据改为“已确认”

总结 Exactly-once 两阶段提交步骤

在使用kafka011 sink 时注意的点:

1.为了保证事务特性,在使用其他程序去消费我们flink sink 数据的kafka时,这个consumer需要设置了 isolation.level = read_committed ,那么它只会读取已经提交了的消息。

2.Checkpoint超时时间 必需大于 kafka 提交事务时间。

假如checkpoint失败时间高于 kafka事务等待时间,比如,设置了一个checkpoint最多等待10分钟,10分钟后会失败这个checkpoint的保存。而kafka 的事务只能等待5分钟,5分钟后把uncommitted的事务关掉。这个时候6分钟checkpoint成功了,但是对应kafka数据的事务已经失败。这样就无法保证Exactly-once的实现

1.流式计算分为无状态和有状态两种情况。 无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收水位数据,并在水位超过指定高度时发出警告。有状态的计算则会基于多个事件输出结果。以下是一些例子。

(1)所有类型的窗口。例如,计算过去一小时的平均水位,就是有状态的计算。

(2)所有用于复杂事件处理的状态机。例如,若在一分钟内收到两个相差20cm以上的水位差读数,则发出警告,这是有状态的计算。

(3)流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是有状态的计算。

2.下图展示了无状态流处理和有状态流处理的主要区别。 无状态流处理分别接收每条数据记录(图中的黑条),然后根据最新输入的数据生成输出数据(白条)。有状态流处理会维护状态(根据每条输入记录进行更新),并基于最新输入的记录和当前的状态值生成输出记录(灰条)。

上图中输入数据由黑条表示。无状态流处理每次只转换一条输入记录,并且仅根据最新的输入记录输出结果(白条)。有状态 流处理维护所有已处理记录的状态值,并根据每条新输入的记录更新状态,因此输出记录(灰条)反映的是综合考虑多个事件之后的结果。

3.有状态的算子和应用程序

Flink内置的很多算子,数据源source,数据存储sink都是有状态的,流中的数据都是buffer records,会保存一定的元素或者元数据。例如: ProcessWindowFunction会缓存输入流的数据,ProcessFunction会保存设置的定时器信息等等。

在Flink中,状态始终与特定算子相关联。总的来说,有两种类型的状态:

算子状态(operator state)

键控状态(keyed state)

4.算子状态(operator state)

算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。

Flink为算子状态提供三种基本数据结构:

列表状态(List state):将状态表示为一组数据的列表。

联合列表状态(Union list state):也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。

广播状态(Broadcast state):如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态

5.键控状态(keyed state)

键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。Keyed State很类似于一个分布式的key-value?map数据结构,只能用于KeyedStream(keyBy算子处理之后)。

6.状态后端(state backend)

每传入一条数据,有状态的算子任务都会读取和更新状态。由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务都会在本地维护其状态,以确保快速的状态访问。状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(state backend)

状态后端主要负责两件事:

1)本地的状态管理

2)将检查点(checkpoint)状态写入远程存储

状态后端分类:

(1)MemoryStateBackend

内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上;而将checkpoint存储在JobManager的内存中。

(2)FsStateBackend

将checkpoint存到远程的持久化文件系统(FileSystem)上。而对于本地状态,跟MemoryStateBackend一样,也会存在TaskManager的JVM堆上。

(3)RocksDBStateBackend

将所有状态序列化后,存入本地的RocksDB中存储。

7.状态一致性

当在分布式系统中引入状态时,自然也引入了一致性问题。一致性实际上是"正确性级别"的另一种说法,也就是说在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结果相比,前者到底有多正确?举例来说,假设要对最近一小时登录的用户计数。在系统经历故障之后,计数结果是多少?如果有偏差,是有漏掉的计数还是重复计数?

1)一致性级别

在流处理中,一致性可以分为3个级别:

(1) at-most-once : 这其实是没有正确性保障的委婉说法——故障发生之后,计数结果可能丢失。同样的还有udp。

(2) at-least-once : 这表示计数结果可能大于正确值,但绝不会小于正确值。也就是说,计数程序在发生故障后可能多算,但是绝不会少算。

(3) exactly-once : 这指的是系统保证在发生故障后得到的计数结果与正确值一致。

曾经,at-least-once非常流行。第一代流处理器(如Storm和Samza)刚问世时只保证at-least-once,原因有二。

(1)保证exactly-once的系统实现起来更复杂。这在基础架构层(决定什么代表正确,以及exactly-once的范围是什么)和实现层都很有挑战性。

(2)流处理系统的早期用户愿意接受框架的局限性,并在应用层想办法弥补(例如使应用程序具有幂等性,或者用批量计算层再做一遍计算)。

最先保证exactly-once的系统(Storm Trident和Spark Streaming)在性能和表现力这两个方面付出了很大的代价。为了保证exactly-once,这些系统无法单独地对每条记录运用应用逻辑,而是同时处理多条(一批)记录,保证对每一批的处理要么全部成功,要么全部失败。这就导致在得到结果前,必须等待一批记录处理结束。因此,用户经常不得不使用两个流处理框架(一个用来保证exactly-once,另一个用来对每个元素做低延迟处理),结果使基础设施更加复杂。曾经,用户不得不在保证exactly-once与获得低延迟和效率之间权衡利弊。Flink避免了这种权衡。

Flink的一个重大价值在于, 它既保证了 exactly-once ,也具有低延迟和高吞吐的处理能力 。

从根本上说,Flink通过使自身满足所有需求来避免权衡,它是业界的一次意义重大的技术飞跃。尽管这在外行看来很神奇,但是一旦了解,就会恍然大悟。

2)端到端(end-to-end)状态一致性

目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在 Flink 流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如 Kafka)和输出到持久化系统。

端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性,整个端到端的一致性级别取决于所有组件中一致性最弱的组件。具体可以划分如下:

1)source端 —— 需要外部源可重设数据的读取位置

2)link内部 —— 依赖checkpoint

3)sink端 —— 需要保证从故障恢复时,数据不会重复写入外部系统

而对于sink端,又有两种具体的实现方式:幂等(Idempotent)写入和事务性(Transactional)写入。

4)幂等写入

所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。

5)事务写入

需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中。

对于事务性写入,具体又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)。

8.检查点(checkpoint)

Flink具体如何保证exactly-once呢? 它使用一种被称为"检查点"(checkpoint)的特性,在出现故障时将系统重置回正确状态。下面通过简单的类比来解释检查点的作用。

9.Flink+Kafka如何实现端到端的exactly-once语义

我们知道,端到端的状态一致性的实现,需要每一个组件都实现,对于Flink + Kafka的数据管道系统(Kafka进、Kafka出)而言,各组件怎样保证exactly-once语义呢?

1)内部 —— 利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性

2)source —— kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性

3)sink —— kafka producer作为sink,采用两阶段提交 sink,需要实现一个TwoPhaseCommitSinkFunction

内部的checkpoint机制我们已经有了了解,那source和sink具体又是怎样运行的呢?接下来我们逐步做一个分析。

我们知道Flink由JobManager协调各个TaskManager进行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存。

当checkpoint 启动时,JobManager 会将检查点分界线(barrier)注入数据流;barrier会在算子间传递下去。

每个算子会对当前的状态做个快照,保存到状态后端。对于source任务而言,就会把当前的offset作为状态保存起来。下次从checkpoint恢复时,source任务可以重新提交偏移量,从上次保存的位置开始重新消费数据。

每个内部的transform 任务遇到 barrier 时,都会把状态存到 checkpoint 里。

sink 任务首先把数据写入外部 kafka,这些数据都属于预提交的事务(还不能被消费);当遇到 barrier 时,把状态保存到状态后端,并开启新的预提交事务。

当所有算子任务的快照完成,也就是这次的 checkpoint 完成时,JobManager 会向所有任务发通知,确认这次 checkpoint 完成。

当sink 任务收到确认通知,就会正式提交之前的事务,kafka 中未确认的数据就改为“已确认”,数据就真正可以被消费了。

所以我们看到,执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink操作,会发起“确认提交”,如果执行失败,预提交会放弃掉。

具体的两阶段提交步骤总结如下:

1)第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka 分区日志但标记为未提交,这就是“预提交”

2)触发 checkpoint 操作,barrier从 source 开始向下传递,遇到 barrier 的算子将状态存入状态后端,并通知jobmanager

3)sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据

4)jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成

5)sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据

6)外部kafka关闭事务,提交的数据可以正常消费了。

Flink 是一个流处理框架,支持流处理和批处理,特点是流处理有限,可容错,可扩展,高吞吐,低延迟。

流处理是处理一条,立马下一个节点会从缓存中取出,在下一个节点进行计算

批处理是只有处理一批完成后,才会经过网络传输到下一个节点

流处理的优点是低延迟 批处理的优点是高吞吐

flink同时支持两种,flink的网络传输是设计固定的缓存块为单位,用户可以设置缓存块的超时值来决定换存块什么时候进行传输。 数据大于0 进行处理就是流式处理。

如果设置为无限大就是批处理模型。

Flink 集群包括 JobManager 和 TaskManager .

JobManager 主要负责调度 Job 并协调 Task 做 checkpoint,职责上很像 Storm 的 Nimbus。从 Client 处接收到 Job 和 JAR 包 等资源后,会生成优化后的执行计划,并以 Task 的单元调度到各个 TaskManager 去执行。

TaskManager 在启动的时候就设置好了槽位数(Slot),每个 slot 能启动一个 Task,Task 为线程。从 JobManager 处接收需要 部署的 Task,部署启动后,与自己的上游建立 Netty 连接,接收数据并处理。

flink on yarn 是由client 提交 app到 RM 上, 然后RM 分配一个 AppMaster负责运行 Flink JobManager 和 Yarn AppMaster, 然后 AppMaster 分配 容器去运行 Flink TaskManger

SparkStreaming 是将流处理分成微批处理的作业, 最后的处理引擎是spark job

Spark Streaming把实时输入数据流以时间片Δt (如1秒)为单位切分成块,Spark Streaming会把每块数据作为一个RDD,并使用RDD操作处理每一小块数据。每个块都会生成一个Spark Job处理,然后分批次提交job到集群中去运行,运行每个 job的过程和真正的spark 任务没有任何区别。

JobScheduler, 负责 Job的调度通过定时器每隔一段时间根据Dstream的依赖关系生一个一个DAG图

ReceiverTracker负责数据的接收,管理和分配

ReceiverTracker在启动Receiver的时候他有ReceiverSupervisor,其实现是ReceiverSupervisorImpl, ReceiverSupervisor本身启 动的时候会启动Receiver,Receiver不断的接收数据,通过BlockGenerator将数据转换成Block。定时器会不断的把Block数据通会不断的把Block数据通过BlockManager或者WAL进行存储,数据存储之后ReceiverSupervisorlmpl会把存储后的数据的元数据Metadate汇报给ReceiverTracker,其实是汇报给ReceiverTracker中的RPC实体ReceiverTrackerEndpoin

spark on yarn 的cluster模式, Spark client 向RM提交job请求, RM会分配一个 AppMaster, driver 和 运行在AppMAster节点里, AM然后把Receiver作为一个Task提交给Spark Executor 节点, Receive启动接受数据,生成数据块,并通知Spark Appmaster, AM会根据数据块生成相应的Job, 并把Job 提交给空闲的 Executor 去执行。

1:需要关注流数据是否需要进行状态管理

2:At-least-once或者Exectly-once消息投递模式是否有特殊要求

3:对于小型独立的项目,并且需要低延迟的场景,建议使用storm

4:如果你的项目已经使用了spark,并且秒级别的实时处理可以满足需求的话,建议使用sparkStreaming

5:要求消息投递语义为 Exactly Once 的场景;数据量较大,要求高吞吐低延迟的场景;需要进行状态管理或窗口统计的场景,建议使用flink

Flink 提供的Api右 DataStream 和 DataSet ,他们都是不可变的数据集合,不可以增加删除中的元素, 通过 Source 创建 DataStream 和 DataSet

在创建运行时有:

Flink的每一个Operator称为一个任务, Operator 的每一个实例称为子任务,每一个任务在JVM线程中执行。可以将多个子任务链接成一个任务,减少上下文切换的开销,降低延迟。

source 和 算子map 如果是 one by one 的关系,他们的数据交换可以通过缓存而不是网络通信

TaskManager 为控制执行任务的数量,将计算资源划分多个slot,每个slot独享计算资源,这种静态分配利于任务资源隔离。

同一个任务可以共享一个slot, 不同作业不可以。

这里因为 Source 和 Map 并行度都是4 采用直连方式,他们的数据通信采用缓存形式

所以一共需要两个TaskManager source,Map 一个,reduce一个, 每个TaskManager 要3个slot

JobManager 将 JobGraph 部署 ExecutionGraph

设置的并行度,可以让一个ExecJobVertex 对应 多个并行的ExecVertex 实例。

Flink通过状态机管理 ExecGraph的作业执行进度。

Flink 将对象序列化为固定数量的预先分配的内存段,而不是直接把对象放在堆内存上。

Flink TaskManager 是由几个内部组件组成的:actor 系统(负责与 Flink master 协调)、IOManager(负责将数据溢出到磁盘并将其读取回来)、MemoryManager(负责协调内存使用。

数据源:

Sink:

时间:

处理时间:取自Operator的机器系统时间

事件时间: 由数据源产生

进入时间: 被Source节点观察时的系统时间

如果数据源没有自己正确创建水印,程序必须自己生成水印来确保基于事件的时间窗口可以正常工作。。

DataStream 提供了 周期性水印,间歇式水印,和递增式水印

flink提供了专门操作redis的Redis Sink

Redis Sink 提供用于向Redis发送数据的接口的类。接收器可以使用三种不同的方法与不同类型的Redis环境进行通信:

Redis Sink 核心类是 RedisMappe 是一个接口,使用时我们要编写自己的redis操作类实现这个接口中的三个方法

使用RedisCommand设置数据结构类型时和redis结构对应关系。

Flink Redis Connector

(责任编辑:IT教学网)

更多

推荐网络工程师文章