以下文章来源于大数据左右手,作者左右
一.Flink提交系列
1.Flink怎么提交?
Local模式
JobManager和TaskManager共用一个JVM,只需要jdk支持,单节点运行,主要用来调试。
Standlone模式
Standlone是Flink自带的一个分布式集群,它不依赖其他的资源调度框架、不依赖yarn等。充当Master角色的是JobManager。充当Slave/Worker角色是TaskManager
Yarn模式
2.Flink集群规模?在Flink项目做了什么?
Flink群集大小时要考虑的一些方面:
1.记录数和每条记录的大小每秒到达流式传输框架的预期记录数以及每条记录的大小。不同的记录类型将具有不同的大小,这将最终影响Flink应用程序平稳运行所需的资源。
2.不同key的数量和每个键的状态大小。
3.状态更新的数量和状态后端的访问模式Java的堆状态后端上的各种访问模式可能会显着影响群集的大小以及Flink作业所需的资源。
4.网络容量网络容量不仅会受到Flink应用程序本身的影响,还会受到可能正在与之交互的外部服务(如Kafka或HDFS)的影响。此类外部服务可能会导致网络出现额外流量。例如,启用replication可能会在网络的消息brokers之间创建额外的流量。
5.磁盘带宽。
6.机器数量及其可用CPU和内存。
Flink项目做了什么?
实时监控:
用户行为预警,服务器攻击预警.....
实时报表:
活动直播大屏:双11、双12
对外数据产品时效性
数据化运营
流数据分析:
实时计算相关指标反馈及时调整决策
内容投放、无线智能推送、实时个性化推荐等
实时仓库:
数据实时清洗、归并、结构化
数仓的补充和优化
3.Flink提交作业的流程,以及与Yarn怎么交互?
(1)提交App之前,先上传Flink的Jar包和配置到HDFS,以便JobManager和TaskManager共享HDFS的数据。
(2)客户端向ResourceManager提交Job,ResouceManager接到请求后,先分配container资源,然后通知NodeManager启动ApplicationMaster。
(3)ApplicationMaster会加载HDFS的配置,启动对应的JobManager,然后JobManager会分析当前的作业图,将它转化成执行图(包含了所有可以并发执行的任务),从而知道当前需要的具体资源。
(4)接着,JobManager会向ResourceManager申请资源,ResouceManager接到请求后,继续分配container资源,然后通知ApplictaionMaster启动更多的TaskManager(先分配好container资源,再启动TaskManager)。container在启动TaskManager时也会从HDFS加载数据。
(5)TaskManager启动后,会向JobManager发送心跳包。JobManager向TaskManager分配任务。
4.Flink提交Job的方式以及参数设置?
./bin/flinkrun-tyarn-session\
-Dyarn.application.id=application_XXXX_YY\
./examples/streaming/TopSpeedWindowing.jar
./bin/flinkrun-tyarn-per-job
--detached./examples/streaming/TopSpeedWindowing.jar
./bin/flinkrun-application-tyarn-application
./examples/streaming/TopSpeedWindowing.jar
yn(实际)=Math.ceil(p/ys)
ys(总共)=yn(实际)*ys(指定)
ys(使用)=p(指定)
flinkrun
-c,--classFlink应用程序的入口
-C,--classpath指定所有节点都可以访问到的url,可用于多个应用程序都需要的工具类加载
-d,--detached是否使用分离模式,就是提交任务,cli是否退出,加了-d参数,cli会退出
-n,--allowNonRestoredState允许跳过无法还原的savepoint。比如删除了代码中的部分operator
-p,--parallelism执行并行度
-s,--fromSavepoint从savepoint恢复任务
-sae,--shutdownOnAttachedExit以attached模式提交,客户端退出的时候关闭集群
flinkyarn-cluster模式
-d,--detached是否使用分离模式
-m,--jobmanager指定提交的jobmanager
-yat,--yarnapplicationType设置yarn应用的类型
-yD使用给定属性的值
-yd,--yarndetached使用yarn分离模式
-yh,--yarnhelpyarnsession的帮助
-yid,--yarnapplicationId挂到正在运行的yarnsession上
-yj,--yarnjarFlinkjar文件的路径
-yjm,--yarnjobManagerMemoryjobmanager的内存(单位M)
-ynl,--yarnnodeLabel指定YARN应用程序YARN节点标签
-ynm,--yarnname自定义yarn应用名称
-yq,--yarnquery显示yarn的可用资源
-yqu,--yarnqueue指定yarn队列
-ys,--yarnslots指定每个taskmanager的slots数
-yt,--yarnship在指定目录中传输文件
-ytm,--yarntaskManagerMemory每个taskmanager的内存
-yz,--yarnzookeeperNamespace用来创建ha的zk子路径的命名空间
-z,--zookeeperNamespace用来创建ha的zk子路径的命名空间
5.Flink的JobManger?有多少个JobManager?
JobManger
(1)控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JM所控制执行。
(2)JM会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(ogicaldataflowgraph)和打包了所有的类、库和其它资源的JAR包。
(3)JM会把Jobgraph转换成一个物理层面的数据流图,这个图被叫做“执行图”(Executiongraph),包含了所有可以并发执行的任务。JobManager会向资源管理器(Resourcemanager)请求执行任务必要的资源,也就是任务管理器(Taskmanager)上的插槽slot。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TM上。而在运行过程中JM会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
有多少个JobManager
集群默认只有一个JobManager。但为了防止单点故障,我们配置了高可用。我们公司一般配置一个主JobManager,两个备用JobManager,然后结合ZooKeeper的使用,来达到高可用。
6.JobManger在集群启动过程中起到什么作用?
JobManager的职责主要是接收Flink作业,调度Task,收集作业状态和管理TaskManager。它包含一个Actor,并且做如下操作:
RegisterTaskManager:它由想要注册到JobManager的TaskManager发送。注册成功会通过AcknowledgeRegistration消息进行Ack。
SubmitJob:由提交作业到系统的Client发送。提交的信息是JobGraph形式的作业描述信息。
CancelJob:请求取消指定id的作业。成功会返回CancellationSuccess,否则返回CancellationFailure。
UpdateTaskExecutionState:由TaskManager发送,用来更新执行节点(ExecutionVertex)的状态。成功则返回true,否则返回false。
RequestNextInputSplit:TaskManager上的Task请求下一个输入split,成功则返回NextInputSplit,否则返回null。
JobStatusChanged:它意味着作业的状态(RUNNING,CANCELING,FINISHED,等)发生变化。这个消息由ExecutionGraph发送。
7.Flink的TaskManager?
(1)Flink中的工作进程。通常在Flink中会有多个TM运行,每个TM都包含了一定数量的插槽slots。插槽的数量限制了TM能够执行的任务数量。
(2)启动之后,TM会向资源管理器注册它的插槽;收到资源管理器的指令后,TM就会将一个或者多个插槽提供给JM调用。TM就可以向插槽分配任务tasks来执行了。
(3)在执行过程中,一个TM可以跟其它运行同一应用程序的TM交换数据。
TaskManager相当于整个集群的Slave节点,负责具体的任务执行和对应任务在每个节点上的资源申请和管理。客户端通过将编写好的Flink应用编译打包,提交到JobManager,然后JobManager会根据已注册在JobManager中TaskManager的资源情况,将任务分配给有资源的TaskManager节点,然后启动并运行任务。TaskManager从JobManager接收需要部署的任务,然后使用Slot资源启动Task,建立数据接入的网络连接,接收数据并开始数据处理。同时TaskManager之间的数据交互都是通过数据流的方式进行的。可以看出,Flink的任务运行其实是采用多线程的方式,这和MapReduce多JVM进行的方式有很大的区别,Flink能够极大提高CPU使用效率,在多个任务和Task之间通过TaskSlot方式共享系统资源,每个TaskManager中通过管理多个TaskSlot资源池进行对资源进行有效管理。
8.说一下slot,业务中一个TaskManager设置几个slot?
jobManager:负责接收FlinkClient提交的Job,并将Job分发到TaskManager执行,一个JobManager包含一个或多个TaskManager。
TaskManager:负责执行Client提交的Job。每个TaskManager可以有一个或多个slot,但slot的个数不能多于cpu-cores。
slot:slot是Flink任务的最小执行单位,并行度上限不能大于slot的数量。
9.Flink的并行度?
Flink中的任务被分为多个并行任务来执行,其中每个并行的实例处理一部分数据。这些并行实例的数量被称为并行度。设置并行度一般在四个层面设置(优先级由高到低)
操作算子层面
执行环境层面
客户端层面
系统层面
10.Flink计算资源的调度是如何实现的?
TaskManager中最细粒度的资源是Taskslot,代表了一个固定大小的资源子集,每个TaskManager会将其所占有的资源平分给它的slot。
通过调整taskslot的数量,用户可以定义task之间是如何相互隔离的。每个TaskManager有一个slot,也就意味着每个task运行在独立的JVM中。每个TaskManager有多个slot的话,也就是说多个task运行在同一个JVM中。而在同一个JVM进程中的task,可以共享TCP连接(基于多路复用)和心跳消息,可以减少数据的网络传输,也能共享一些数据结构,一定程度上减少了每个task的消耗。每个slot可以接受单个task,也可以接受多个连续task组成的pipeline。
11.Flink的Slot和Parallelism
slot是指taskmanager的并发执行能力,假设我们将taskmanager.numberOfTaskSlots配置为3那么每一个taskmanager中分配3个TaskSlot,3个taskmanager一共有9个TaskSlot。
parallelism是指taskmanager实际使用的并发能力。假设我们把parallelism.default设置为1,那么9个TaskSlot只能用1个,有8个空闲。
12.OperatorChains了解吗?
为了更高效地分布式执行,Flink会尽可能地将operator的subtask链接(chain)在一起形成task。每个task在一个线程中执行。
将operators链接成task是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。这就是OperatorChains(算子链)。
13.Flink的运行必须依赖Hadoop组件吗?
Flink可以完全独立于Hadoop,在不依赖Hadoop组件下运行。
但是Flink集成Yarn做资源调度,也可以读写HDFS,或者利用HDFS做检查点。
14.怎么修改正在运行的Flink程序?如果有新的实时指标你们是怎么上线的?
正在运行的Flink程序:修改不了。可动态加载配置广播等等
完成作业开发
作业调试,并且通过语法检查后
上线作业,即可将数据发布至生产环境。
二.状态编程系列
1.说一下状态编程(operatorstate,keyedstate)
我们知道Flink上的聚合和窗口操作,一般都是基于KeyedStream的,数据会按照key的哈希值进行分区,聚合处理的结果也应该是只对当前key有效。然而同一个分区(也就是slot)上执行的任务实例,可能会包含多个key的数据,它们同时访问和更改本地变量,就会导致计算结果错误。所以这时状态并不是单纯的本地变量。
容错性,也就是故障后的恢复。状态只保存在内存中显然是不够稳定的,我们需要将它持久化保存,做一个备份;在发生故障后可以从这个备份中恢复状态。
处理的数据量增大时,我们应该相应地对计算资源扩容,调大并行度。这时就涉及到了状态的重组调整。
状态的分类:
托管状态(ManagedState)和原始状态(RawState)
托管状态是由Flink的运行时(Runtime)来托管的;在配置容错机制后,状态会自动持久化保存,并在发生故障时自动恢复。当应用发生横向扩展时,状态也会自动地重组分配到所有的子任务实例上。
对于具体的状态内容,Flink也提供了值状态(ValueState)、列表状态(ListState)、映射状态(MapState)、聚合状态(AggregateState)等多种结构,内部支持各种数据类型。
聚合、窗口等算子中内置的状态,就都是托管状态。
我们也可以在富函数类(RichFunction)中通过上下文来自定义状态,这些也都是托管状态。
算子状态(OperatorState)和按键分区状态
1.按键分区状态
按键分区状态(KeyedState)顾名思义,是任务按照键(key)来访问和维护的状态。它的特点非常鲜明,就是以key为作用范围进行隔离。KeyedState类似于一个分布式的映射(map)数据结构,所有的状态会根据key保存成键值对(key-value)的形式。这样当一条数据到来时,任务就会自动将状态的访问范围限定为当前数据的key,从map存储中读取出对应的状态值。所以具有相同key的所有数据都会到访问相同的状态,而不同key的状态之间是彼此隔离的。
值状态(ValueState):
状态中只保存一个“值”(value)。
列表状态(ListState):
以列表(List)的形式组织起来。在ListState接口中同样有一个类型参数T,表示列表中数据的类型。ListState也提供了一系列的方法来操作状态,使用方式与一般的List非常相似。
映射状态(MapState):
把一些键值对(key-value)作为状态整体保存起来,可以认为就是一组key-value映射的列表。
归约状态(ReducingState):
类似于值状态(Value),不过需要对添加进来的所有数据进行归约,将归约聚合之后的值作为状态保存下来。
聚合状态(AggregatingState):
与归约状态非常类似,聚合状态也是一个值,用来保存添加进来的所有数据的聚合结果。
2.算子状态
算子状态(OperatorState)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的key无关,所以不同key的数据只要被分发到同一个并行子任务,就会访问到同一个OperatorState。
算子状态的实际应用场景不如KeyedState多,一般用在Source或Sink等与外部系统连接的算子上,或者完全没有key定义的场景。比如Flink的Kafka连接器中,就用到了算子状态。
当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同,重组分配的方案也会不同。
状态类型:ListState、UnionListState和BroadcastState。
ListState
KeyedState中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处理状态,所以每一个并行子任务上只会保留一个“列表”(list),也就是当前并行子任务上所有状态项的集合。
列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当于把多个分区的列表合并成了一个“大列表”,然后再均匀地分配给所有并行任务。这种“均匀分配”的具体方法就是“轮询”(round-robin),与之前介绍的rebanlance数据传输方式类似,是通过逐一“发牌”的方式将状态项平均分配的。这种方式也叫作“平均分割重组”(even-splitredistribution)。
算子状态中不会存在“键组”(keygroup)这样的结构,所以为了方便重组分配,就把它直接定义成了“列表”(list)。这也就解释了,为什么算子状态中没有最简单的值状态(ValueState)。
联合列表状态(UnionListState)
与ListState类似,联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。UnionListState的重点就在于“联合”(union)。在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。这样,并行度缩放之后的并行子任务就获取到了联合后完整的“大列表”,可以自行选择要使用的状态项和要丢弃的状态项。这种分配也叫作“联合重组”(unionredistribution)。如果列表中状态项数量太多,为资源和效率考虑一般不建议使用联合重组的方式。
广播状态(BroadcastState)
有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。
这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。因为广播状态在每个并行子任务上的实例都一样,所以在并行度调整的时候就比较简单,只要复制一份到新的并行任务就可以实现扩展;而对于并行度缩小的情况,可以将多余的并行子任务连同状态直接砍掉——因为状态都是复制出来的,并不会丢失。
2.10个int以数组的形式保存,保存在什么状态好?VlaueState还是ListState?存在哪个的性能比较好?
ValueState[Array[Int]]update形式。
ListState[Int]:add形式添加。
对于操控来说ListState方便取值与更改。
按键分区状态(KeyedState)选择ValueStateListState。
算子状态(OperatorState)选择ListState。
3.使用MapStage,groupbyid如何设计?id不放在key行不行?
MapState是KeyedState,也就是keyBy后才能使用MapState。所以State中肯定要保存key。
groupbyid。假设id有id1、id2这两个值,id1、id2就是key。
id的设计是结合业务场景,就是把同类数据或者同逻辑数据放到一起计算处理。
4.Flink是如何管理kafka的offset,使用什么类型的状态保存offset?
checkpoint是Flink的内部机制,可以从故障中恢复。通俗的理解是checkpoint是Flink应用程序状态的一致性副本,包括输入的读取位置(offset)。如果发生故障,Flink将通过从checkpoint加载状态后端并从恢复的读取位置继续恢复应用程序,可以做到所谓的断点续传。
checkpoint使Flink具有容错能力,并确保在发生故障时具有容错的能力。应用程序可以定期触发检查点。
Flink中的Kafka消费者将Flink的检查点机制与有状态运算符集成在一起,其状态是所有Kafka分区中的读取偏移量。触发checkpoint时,每个分区的偏移量都存储在checkpoint中。Flink的checkpoint机制确保所有操作员任务的存储状态是一致的。
即它们基于相同的输入数据。当所有操作员任务成功存储其状态时,检查点完成。因此,当从潜在的系统故障重新启动时,系统提供一次性状态更新保证。
privatetransientListStateTuple2KafkaTopicPartition,LongunionOffsetStates;
publicfinalvoidsnapshotState(FunctionSnapshotContextcontext)throwsException{
if(!running){
LOG.debug("snapshotState()calledonclosedsource");
}else{
unionOffsetStates.clear();
finalAbstractFetcher?,?fetcher=this.kafkaFetcher;
if(fetcher==null){
//thefetcherhasnotyetbeeninitialized,whichmeansweneedtoreturnthe
//originallyrestoredoffsetsortheassignedpartitions
for(Map.EntryKafkaTopicPartition,LongsubscribedPartition:
subscribedPartitionsToStartOffsets.entrySet()){
unionOffsetStates.add(
Tuple2.of(
subscribedPartition.getKey(),subscribedPartition.getValue()));
}
if(offsetCommitMode==OffsetCommitMode.ON_CHECKPOINTS){
//themapcannotbeasynchronouslyupdated,becauseonlyonecheckpointcall
//canhappen
//onthisfunctionatatime:eithersnapshotState()or
//notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(),restoredState);
}
}else{
HashMapKafkaTopicPartition,LongcurrentOffsets=fetcher.snapshotCurrentState();
if(offsetCommitMode==OffsetCommitMode.ON_CHECKPOINTS){
//themapcannotbeasynchronouslyupdated,becauseonlyonecheckpointcall
//canhappen
//onthisfunctionatatime:eithersnapshotState()or
//notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(),currentOffsets);
}
for(Map.EntryKafkaTopicPartition,LongkafkaTopicPartitionLongEntry:
currentOffsets.entrySet()){
unionOffsetStates.add(
Tuple2.of(
kafkaTopicPartitionLongEntry.getKey(),
kafkaTopicPartitionLongEntry.getValue()));
}
}
if(offsetCommitMode==OffsetCommitMode.ON_CHECKPOINTS){
//truncatethemapofpendingoffsetsto