前言
最近由于工作需要,要分析大几百G的Nginx日志数据。之前也有过类似的需求,但那个时候数据量不多。一次只有几百兆,或者几个G。因为数据都在Hive里面,当时的做法是:把数据从Hive导到MySQL,然后写代码查询MySQL并处理。如果你的处理逻辑比较简单,或只是查询统计,不会涉及上游的服务调用,也可以直接写HiveSQL。
上面的做法在面对少量数据时还可以应付,对于大量数据就很不可取了。从Hive导数据到MySQL,光这一步就够呛,就更别说自己写的Java脚本效率性能如何了。请教同事过后,告诉我可以用Spark,并潇洒地丢给我一个Spark-Demo的jar包。之前只接触过HDFS和Hive,Spark只听说过,也准备学,但一直没时间。这下好了,有了带薪学习的机会。其实照着同事给我的jar包,照葫芦画瓢也能写出来,但是很多API都非常陌生,写出来的代码自己也不放心,所以还是有必要学学Spark的。
不过从头开始,完整学一遍Spark的话,时间肯定不够。当时接需求时,虽然知道自己不会,但是还挺相信自己的学习能力的,承诺了开发时间。所以我们的目标就是——用Spark处理Hive里面的数据,并把结果输出到MySQL中。
学习一个新知识的正常路径是:了解产生背景、了解整体架构、分模块学习功能和了解API、实战、深入学习原理和优化。由于这次目的性很强,在第三步时,只用学习跟本次需求相关的模块即可,然后就可以实战了。先从以下两个问题入手,初步了解Spark。
可以用Spark做什么?
并行处理分布在集群中的大规模数据集。()
执行交互式查询语句来探索数据集并进行数据集可视化。
使用MLlib构建、训练,以及评估机器学习模型。
使用各种数据流实现端到端的数据流水线。
分析图数据和社交网络。
我们本次的目标就是用Spark处理大规模的数据集。
为什么选择Spark而不是MR?
Spark为中间计算结果提供了基于内存的存储,这让它比HadoopMR快了很多。Spark还整合了各种上层库,比如用于机器学习的库MLlib、提供交互式查询功能的SparkSQL、支持操作实时数据的流处理库StructuredStreaming,以及图计算库GraphX。这些库都提供了易用的API。
初步了解Spark
Spark支持Scala、Java、Python、SQL和R等编程语言。其提供了大量模块化功能,可以适用于各种场景。其中包括SparkSQL、SparkStructuredStreaming、SparkMLlib,以及GraphX等模块。模块化带来的好处就是扩展性高,Spark的重心在于快速的分布式计算引擎,而不是存储。和ApacheHadoop同时包含计算和存储不同,Spark解耦了计算和存储。这意味着你可以用Spark读取存储在各种数据源(ApacheHadoop、ApacheCassandra、ApacheHBase、MongoDB、ApacheHive、RDBMS等)中的数据,并在内存中进行处理。你还可以扩展Spark的DataFrameReader和DataFrameWriter,以便将其他数据源(如ApacheKafka、Kinesis、Azure存储、亚马逊S3)的数据读取为DataFrame的逻辑数据抽象,以进行操作。
Spark提供了一种称作RDD(resilientdistributeddataset,弹性分布式数据集)的简单逻辑数据结构,它是Spark最基本的抽象。Spark各种其他高级的结构化数据抽象(比如DataFrame和Dataset)都是基于RDD构建的。
RDD是Spark最基本的抽象。RDD关联着三个至关重要的属性:
依赖关系:告诉Spark如何从输入中构建RDD,Spark可以根据这些依赖关系重新执行操作,以此重建出RDD。这一属性赋予了RDD容错的弹性。
分区:分区允许Spark将工作以分区为单位,分配到多个执行器上进行并行计算。
计算函数:就是操作RDD的函数,可以生成RDD所表示数据的Iterator[T]对象。
RDD的操作可以分为转化操作和行动操作。顾名思义,转化操作就是将SparkDataFrame转化为新的DataFrame,而不改变原有数据的操作。比如select()、filter()这样的操作,不会改变原有数据,这些操作只会将转化结果作为新的DataFrame返回。一般转化操作后,会迎来一个行动操作。比如通过filter()过滤数据,最后通过count()统计过滤后的数据。这个count()就是行动操作。
上面提到了DataFrame,它是一个结构化、有格式的,且支持一些特定操作的数据集。就像分布式内存中的表一样,每列都有名字,有表结构定义,每列都有特定的数据类型。
实战Demo
引入Jar包,这里导入的版本不是很高,是因为公司的Spark集群也是2.3版本的,要跟你安装的Spark版本保持一致。
dependencygroupIdorg.scala-lang/groupIdartifactIdscala-library/artifactIdversion2.11.8/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_2.11/artifactIdversion2.3.2/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-hive_2.11/artifactIdversion2.3.2/versionscopeprovided/scope/dependency
下面代码中有必要的注释,带序号的注释会在代码之后会展开说说。
publicclassSparkDemo{//数据库相关配置privatestaticfinalPropertiesconnectionProperties=newProperties();privatestaticfinalStringHIVE_DATABASE="****";privatestaticfinalStringHIVE_TABLE_NAME="****";privatestaticfinalStringJDBC_URL="****";privatestaticfinalStringMYSQL_TABLE_NAME="****";static{connectionProperties.put("user","*****");connectionProperties.put("password","*****");connectionProperties.put("driver","