flink不仅可以支持实时流式处理,它也可以支持批量处理,其中批量处理也可以看作是实时处理的一个特殊情况。
flink网络传输数据流向DataSetAPI主要可以分为3块来分析:DataSource、Transformation和Sink。
DataSource是程序的数据源输入。
Transformation是具体的操作,它对一个或多个输入数据源进行计算处理,比如Map、FlatMap、Filter等操作。
Sink是程序的输出,它可以把Transformation处理之后的数据输出到指定的存储介质中。
01DataSource组件:
DataSource对DataSet批处理而言,较频繁的操作,是读取HDFS中的文件数据。
1.基于集合fromCollection(Collection),从本地集合读取数据,主要是为了方便测试使用。
valenv=ExecutionEnvironment.getExecutionEnvironmentvaltextDataSet:DataSet[String]=env.fromCollection(list)env.fromCollection2.基于文件readTextFile(path),基于HDFS中的数据进行计算分析。
readTextFile:从文件中读取:
valtextDataSet:DataSet[String]=env.readTextFile("/data/a.txt")
readTextFile:遍历目录
readTextFile可以对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式:
valparameters=newConfiguration
//recursive.file.enumeration开启递归
parameters.setBoolean("recursive.file.enumeration",true)
valfile=env.readTextFile("/data").withParameters(parameters)
withParametersreadTextFile:读取压缩文件
对于以下压缩类型,不需要指定任何额外的inputformat方法,flink可以自动识别,并且解压。但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。
valfile=env.readTextFile("/data/file.gz")
因为Transform算子基于Source算子操作,所以首先构建Flink执行环境及Source算子,后续Transform算子操作基于此。
02Transformation组件
因为Transform算子基于Source算子操作,所以首先构建Flink执行环境及Source算子,后续Transform算子操作基于此。
Map:输入一个元素,然后返回一个元素,中间可以进行清洗转换等操作。
将DataSet中的每一个元素转换为另外一个元素:
//使用map将List转换为一个Scala的样例类
caseclassUser(name:String,id:String)
valuserDataSet:DataSet[User]=textDataSet.map{
text=
valfieldArr=text.split(",")
User(fieldArr(0),fieldArr(1))
}
userDataSet.print()
FlatMap:输入一个元素,可以返回零个、一个或者多个元素。
将DataSet中的每一个元素转换为0...n个元素:
//使用flatMap操作,将集合中的数据:
//根据第一个元素,进行分组。
//根据第二个元素,进行聚合求值。
valresult=textDataSet.flatMap(line=line)
.groupBy(0)//根据第一个元素,进行分组。
.sum(1)//根据第二个元素,进行聚合求值。
result.print()
MapPartition:类似Map,一次处理一个分区的数据(如果在进行Map处理的时候,需要获取第三方资源连接,建议使用MapPartition)。
将一个分区中的元素转换为另一个元素:
//使用mapPartition操作,将List转换为一个scala的样例类
caseclassUser(name:String,id:String)
valresult:DataSet[User]=textDataSet.mapPartition(line={
line.map(index=User(index._1,index._2))
})
result.print()
Filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下。
过滤出来一些符合条件的元素,返回boolean值为true的元素:
valsource:DataSet[String]=env.fromElements("java","scala","java")
//过滤出带java的数据
valfilter:DataSet[String]=source.filter(line=line.contains("java"))
filter.print()
Reduce:对数据,进行聚合操作,结合当前元素和上一次Reduce返回的值进行聚合操作,然后返回一个新的值。
可以对一个dataset或者一个group来进行聚合计算,最终聚合成一个元素:
//使用fromElements构建数据源
valsource=env.fromElements(("java",1),("scala",1),("java",1))
//使用map转换成DataSet元组
valmapData:DataSet[(String,Int)]=source.map(line=line)
//根据首个元素分组
valgroupData=mapData.groupBy(_._1)
//使用reduce聚合
valreduceData=groupData.reduce((x,y)=(x._1,x._2+y._2))
//打印测试
reduceData.print()
Aggregations:sum、max、min等。
在数据集上进行聚合求最值(最大值、最小值):
valdata=newmutable.MutableList[(Int,String,Double)]
data.+=((1,"yuwen",89.0))
data.+=((2,"shuxue",92.2))
data.+=((3,"yuwen",89.99))
//使用fromElements构建数据源
valinput:DataSet[(Int,String,Double)]=env.fromCollection(data)
//使用group执行分组操作
valvalue=input.groupBy(1)
//使用aggregate求最大值元素
.aggregate(Aggregations.MAX,2)
//打印测试
value.print()
Distinct:返回一个数据集中去重之后的元素。
去除重复的数据:
//数据源使用上一题的
//使用distinct操作,根据科目去除集合中重复的元组数据
valvalue:DataSet[(Int,String,Double)]=input.distinct(1)
value.print()
Join:内连接。
将两个DataSet按照一定条件连接到一起,形成新的DataSet:
//s1和s2数据集格式如下:
//DataSet[(Int,String,String,Double)]
valjoinData=s1.join(s2)//s1数据集joins2数据集
.where(0).equalTo(0){//join的条件
(s1,s2)=(s1._1,s1._2,s2._2,s1._3)
}
OuterJoin:外链接。
leftOuterJoin左外连接,左边的Dataset中的每一个元素,去连接右边的元素。
rightOuterJoin:右外连接,左边的Dataset中的每一个元素,去连接左边的元素。
fullOuterJoin:全外连接,左右两边的元素,全部连接。
Cross:获取两个数据集的笛卡尔积。
交叉操作,通过形成这个数据集和其他数据集的笛卡尔积,创建一个新的数据集。
和join类似,但是这种交叉操作会产生笛卡尔积,在数据比较大的时候,是非常消耗内存的操作:
Union:返回两个数据集的总和,数据类型需要一致。
联合操作,创建包含来自该数据集和其他数据集的元素的新数据集,不会去重:
First-n:获取集合中的前N个元素。
取前N个数:input.first(2)//取前两个数
SortPartition:在本地对数据集的所有分区进行排序,通过sortPartition()的链接调用,来完成对多个字段的排序。
根据指定的字段值进行分区的排序:
Flink针对DataSet,提供了一些数据分区规则,具体如下。
Rebalance:对数据集进行再平衡、重分区以及消除数据倾斜操作。
Flink也有数据倾斜的时候,比如当前有数据量大概10亿条数据需要处理,在处理过程中可能会发生:数据分区不均匀,个别区域处理时间过长。
这个时候本来总体数据量,只需要10分钟解决的问题,出现了数据倾斜,机器1上的任务需要4个小时才能完成,那么其他3台机器执行完毕,也要等待机器1执行完毕后,才算整体将任务完成。
所以在实际的工作中,出现这种情况比较好的解决方案就是接下来要介绍的—rebalance(内部使用roundrobin方法,将数据均匀打散,这对于数据倾斜时是很好的选择。)
//使用rebalance操作,避免数据倾斜
valrebalance=filterData.rebalance()
Hash-Partition:根据指定Key的散列值对数据集进行分区。
partitionByHash按照指定的key进行hash分区:
valcollection=env.fromCollection(data)
valunique=collection.partitionByHash(1).mapPartition{
line=
line.map(x=(x._1,x._2,x._3))
}
Range-Partition:partitionByRange根据指定的Key对数据集进行范围分区。
valcollection=env.fromCollection(data)
valunique=collection.partitionByRange(x=x._1).mapPartition(line=line.map{
x=
(x._1,x._2,x._3)
})
CustomPartitioning:自定义分区规则,自定义分区需要实现Partitioner接口的partitionCustom(partitioner,“someKey”)或者partitionCustom(partitioner,0)。
valresult:DataSet[String]=sourceDataSet.partitionCustom(newMyPartitioner2,x=x+"")
03Sink组件:
Flink针对DataSet提供了大量的已经实现的Sink,支持多种文件的存储格式,包括text文件,CSV文件等。
1、writeAsText():将元素以字符串形式,逐行写入,这些字符串通过调用每个元素的toString()方法来获取。
将数据输出到文件,Flink支持多种存储设备上的文件,包括本地文件,hdfs文件等。
//将数据写入本地文件
result.writeAsText("/data/a",WriteMode.OVERWRITE)
//将数据写入HDFS
result.writeAsText("hdfs://node01:/data/a",WriteMode.OVERWRITE)
writeAsCsv()是个半成品,只能写入DataSetTuple3String,String,Integerds2,这种类型的数据,不支持pojo类型的数据写入。
2、collect将数据输出到本地集合:
result.collect()方法返回一个可以关闭的行迭代器。除非所有的数据都被收集到本地,否则一个查询作业永远不会结束。所以我们应该通过CloseableIterator#close()方法,主动地关闭作业以防止资源泄露。
result.collect()
3、print():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中。
result.print()方法将查询结果打印到本地控制台。
TableResult中的结果数据只能被访问一次,因此一个TableResult实例中,collect()方法和print()方法不能被同时使用。
对于流模式,TableResult.collect()方法或者TableResult.print方法保证端到端精确一次的数据交付。这就要求开启checkpointing。默认情况下checkpointing是禁止的,我们可以通过TableConfig设置checkpointing相关属性来开启checkpointing。因此一条结果数据只有在其对应的checkpointing完成后才能在客户端被访问。