竹笋

首页 » 问答 » 环境 » 大数据培训Flink流怎么来处理API
TUhjnbcbe - 2023/6/9 21:41:00
北京白癜风医院 https://wapjbk.39.net/yiyuanzaixian/bjzkbdfyy/

5.1Environment

5.1.1getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

valenv:ExecutionEnvironment=ExecutionEnvironment.getExecutionEnvironment

valenv=StreamExecutionEnvironment.getExecutionEnvironment

如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是1。

5.1.2createLocalEnvironment

返回本地执行环境,需要在调用时指定默认的并行度。

valenv=StreamExecutionEnvironment.createLocalEnvironment(1)

5.1.3createRemoteEnvironment

返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。

valenv=ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname",,"YOURPATH//wordcount.jar")

5.2Source

5.2.1从集合读取数据

//定义样例类,传感器id,时间戳,温度

caseclassSensorReading(id:String,timestamp:Long,temperature:Double)

objectSensor{

defmain(args:Array[String]):Unit={

valenv=StreamExecutionEnvironment.getExecutionEnvironment

valstream1=env

.fromCollection(List(

SensorReading("sensor_1",,35.8),

SensorReading("sensor_6",,15.4),

SensorReading("sensor_7",,6.7),

SensorReading("sensor_10",,38.1)

))

stream1.print("stream1:").setParallelism(1)

env.execute()

}

}

5.2.2从文件读取数据

valstream2=env.readTextFile("YOUR_FILE_PATH")

5.2.3以kafka消息队列的数据作为来源

需要引入kafka连接器的依赖:

pom.xml

!--

1
查看完整版本: 大数据培训Flink流怎么来处理API