竹笋

首页 » 问答 » 常识 » 打开百度语音模式,反复熟记Flink流计
TUhjnbcbe - 2023/3/6 9:24:00
青年白癜风有效治疗方法 http://pf.39.net/bdfyy/zjdy/150731/4667942.html

flink不仅可以支持实时流式处理,它也可以支持批量处理,其中批量处理也可以看作是实时处理的一个特殊情况。

flink网络传输数据流向

DataSetAPI主要可以分为3块来分析:DataSource、Transformation和Sink。

DataSource是程序的数据源输入。

Transformation是具体的操作,它对一个或多个输入数据源进行计算处理,比如Map、FlatMap、Filter等操作。

Sink是程序的输出,它可以把Transformation处理之后的数据输出到指定的存储介质中。

01DataSource组件:

DataSource对DataSet批处理而言,较频繁的操作,是读取HDFS中的文件数据。

1.基于集合fromCollection(Collection),从本地集合读取数据,主要是为了方便测试使用。

valenv=ExecutionEnvironment.getExecutionEnvironment

valtextDataSet:DataSet[String]=env.fromCollection(list)

env.fromCollection

2.基于文件readTextFile(path),基于HDFS中的数据进行计算分析。

readTextFile:从文件中读取:

valtextDataSet:DataSet[String]=env.readTextFile("/data/a.txt")

readTextFile:遍历目录

readTextFile可以对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式:

valparameters=newConfiguration

//recursive.file.enumeration开启递归

parameters.setBoolean("recursive.file.enumeration",true)

valfile=env.readTextFile("/data").withParameters(parameters)

withParameters

readTextFile:读取压缩文件

对于以下压缩类型,不需要指定任何额外的inputformat方法,flink可以自动识别,并且解压。但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。

valfile=env.readTextFile("/data/file.gz")

因为Transform算子基于Source算子操作,所以首先构建Flink执行环境及Source算子,后续Transform算子操作基于此。

02Transformation组件

因为Transform算子基于Source算子操作,所以首先构建Flink执行环境及Source算子,后续Transform算子操作基于此。

Map:输入一个元素,然后返回一个元素,中间可以进行清洗转换等操作。

将DataSet中的每一个元素转换为另外一个元素:

//使用map将List转换为一个Scala的样例类

caseclassUser(name:String,id:String)

valuserDataSet:DataSet[User]=textDataSet.map{

text=

valfieldArr=text.split(",")

User(fieldArr(0),fieldArr(1))

}

userDataSet.print()

FlatMap:输入一个元素,可以返回零个、一个或者多个元素。

将DataSet中的每一个元素转换为0...n个元素:

//使用flatMap操作,将集合中的数据:

//根据第一个元素,进行分组。

//根据第二个元素,进行聚合求值。

valresult=textDataSet.flatMap(line=line)

.groupBy(0)//根据第一个元素,进行分组。

.sum(1)//根据第二个元素,进行聚合求值。

result.print()

MapPartition:类似Map,一次处理一个分区的数据(如果在进行Map处理的时候,需要获取第三方资源连接,建议使用MapPartition)。

将一个分区中的元素转换为另一个元素:

//使用mapPartition操作,将List转换为一个scala的样例类

caseclassUser(name:String,id:String)

valresult:DataSet[User]=textDataSet.mapPartition(line={

line.map(index=User(index._1,index._2))

})

result.print()

Filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下。

过滤出来一些符合条件的元素,返回boolean值为true的元素:

valsource:DataSet[String]=env.fromElements("java","scala","java")

//过滤出带java的数据

valfilter:DataSet[String]=source.filter(line=line.contains("java"))

filter.print()

Reduce:对数据,进行聚合操作,结合当前元素和上一次Reduce返回的值进行聚合操作,然后返回一个新的值。

可以对一个dataset或者一个group来进行聚合计算,最终聚合成一个元素:

//使用fromElements构建数据源

valsource=env.fromElements(("java",1),("scala",1),("java",1))

//使用map转换成DataSet元组

valmapData:DataSet[(String,Int)]=source.map(line=line)

//根据首个元素分组

valgroupData=mapData.groupBy(_._1)

//使用reduce聚合

valreduceData=groupData.reduce((x,y)=(x._1,x._2+y._2))

//打印测试

reduceData.print()

Aggregations:sum、max、min等。

在数据集上进行聚合求最值(最大值、最小值):

valdata=newmutable.MutableList[(Int,String,Double)]

data.+=((1,"yuwen",89.0))

data.+=((2,"shuxue",92.2))

data.+=((3,"yuwen",89.99))

//使用fromElements构建数据源

valinput:DataSet[(Int,String,Double)]=env.fromCollection(data)

//使用group执行分组操作

valvalue=input.groupBy(1)

//使用aggregate求最大值元素

.aggregate(Aggregations.MAX,2)

//打印测试

value.print()

Distinct:返回一个数据集中去重之后的元素。

去除重复的数据:

//数据源使用上一题的

//使用distinct操作,根据科目去除集合中重复的元组数据

valvalue:DataSet[(Int,String,Double)]=input.distinct(1)

value.print()

Join:内连接。

将两个DataSet按照一定条件连接到一起,形成新的DataSet:

//s1和s2数据集格式如下:

//DataSet[(Int,String,String,Double)]

valjoinData=s1.join(s2)//s1数据集joins2数据集

.where(0).equalTo(0){//join的条件

(s1,s2)=(s1._1,s1._2,s2._2,s1._3)

}

OuterJoin:外链接。

leftOuterJoin左外连接,左边的Dataset中的每一个元素,去连接右边的元素。

rightOuterJoin:右外连接,左边的Dataset中的每一个元素,去连接左边的元素。

fullOuterJoin:全外连接,左右两边的元素,全部连接。

Cross:获取两个数据集的笛卡尔积。

交叉操作,通过形成这个数据集和其他数据集的笛卡尔积,创建一个新的数据集。

和join类似,但是这种交叉操作会产生笛卡尔积,在数据比较大的时候,是非常消耗内存的操作:

Union:返回两个数据集的总和,数据类型需要一致。

联合操作,创建包含来自该数据集和其他数据集的元素的新数据集,不会去重:

First-n:获取集合中的前N个元素。

取前N个数:input.first(2)//取前两个数

SortPartition:在本地对数据集的所有分区进行排序,通过sortPartition()的链接调用,来完成对多个字段的排序。

根据指定的字段值进行分区的排序:

Flink针对DataSet,提供了一些数据分区规则,具体如下。

Rebalance:对数据集进行再平衡、重分区以及消除数据倾斜操作。

Flink也有数据倾斜的时候,比如当前有数据量大概10亿条数据需要处理,在处理过程中可能会发生:数据分区不均匀,个别区域处理时间过长。

这个时候本来总体数据量,只需要10分钟解决的问题,出现了数据倾斜,机器1上的任务需要4个小时才能完成,那么其他3台机器执行完毕,也要等待机器1执行完毕后,才算整体将任务完成。

所以在实际的工作中,出现这种情况比较好的解决方案就是接下来要介绍的—rebalance(内部使用roundrobin方法,将数据均匀打散,这对于数据倾斜时是很好的选择。)

//使用rebalance操作,避免数据倾斜

valrebalance=filterData.rebalance()

Hash-Partition:根据指定Key的散列值对数据集进行分区。

partitionByHash按照指定的key进行hash分区:

valcollection=env.fromCollection(data)

valunique=collection.partitionByHash(1).mapPartition{

line=

line.map(x=(x._1,x._2,x._3))

}

Range-Partition:partitionByRange根据指定的Key对数据集进行范围分区。

valcollection=env.fromCollection(data)

valunique=collection.partitionByRange(x=x._1).mapPartition(line=line.map{

x=

(x._1,x._2,x._3)

})

CustomPartitioning:自定义分区规则,自定义分区需要实现Partitioner接口的partitionCustom(partitioner,“someKey”)或者partitionCustom(partitioner,0)。

valresult:DataSet[String]=sourceDataSet.partitionCustom(newMyPartitioner2,x=x+"")

03Sink组件:

Flink针对DataSet提供了大量的已经实现的Sink,支持多种文件的存储格式,包括text文件,CSV文件等。

1、writeAsText():将元素以字符串形式,逐行写入,这些字符串通过调用每个元素的toString()方法来获取。

将数据输出到文件,Flink支持多种存储设备上的文件,包括本地文件,hdfs文件等。

//将数据写入本地文件

result.writeAsText("/data/a",WriteMode.OVERWRITE)

//将数据写入HDFS

result.writeAsText("hdfs://node01:/data/a",WriteMode.OVERWRITE)

writeAsCsv()是个半成品,只能写入DataSetTuple3String,String,Integerds2,这种类型的数据,不支持pojo类型的数据写入。

2、collect将数据输出到本地集合:

result.collect()方法返回一个可以关闭的行迭代器。除非所有的数据都被收集到本地,否则一个查询作业永远不会结束。所以我们应该通过CloseableIterator#close()方法,主动地关闭作业以防止资源泄露。

result.collect()

3、print():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中。

result.print()方法将查询结果打印到本地控制台。

TableResult中的结果数据只能被访问一次,因此一个TableResult实例中,collect()方法和print()方法不能被同时使用。

对于流模式,TableResult.collect()方法或者TableResult.print方法保证端到端精确一次的数据交付。这就要求开启checkpointing。默认情况下checkpointing是禁止的,我们可以通过TableConfig设置checkpointing相关属性来开启checkpointing。因此一条结果数据只有在其对应的checkpointing完成后才能在客户端被访问。

1
查看完整版本: 打开百度语音模式,反复熟记Flink流计