更新时间:2022-07-19 GMT+08:00

Java

由于Spark开源版本升级,为避免出现API兼容性或可靠性问题,建议用户使用配套版本的开源API。

Spark Core常用接口

Spark主要使用到如下这几个类:

  • JavaSparkContext:是Spark的对外接口,负责向调用该类的Java应用提供Spark的各种功能,如连接Spark集群,创建RDD,累积量和广播量等。它的作用相当于一个容器。
  • SparkConf:Spark应用配置类,如设置应用名称,执行模式,executor内存等。
  • JavaRDD:用于在java应用中定义JavaRDD的类,功能类似于scala中的RDD(Resilient Distributed Dataset)类。
  • JavaPairRDD:表示key-value形式的JavaRDD类。提供的方法有groupByKey,reduceByKey等。
  • Broadcast:广播变量类。广播变量允许保留一个只读的变量,缓存在每一台机器上,而非每个任务保存一份拷贝。
  • StorageLevel:数据存储级别。有内存(MEMORY_ONLY),磁盘(DISK_ONLY),内存+磁盘(MEMORY_AND_DISK)等。

JavaRDD支持两种类型的操作:Transformation和Action,这两种类型的常用方法如表1表2

表1 Transformation

方法

说明

<R> JavaRDD<R> map(Function<T,R> f)

对RDD中的每个element使用Function。

JavaRDD<T> filter(Function<T,Boolean> f)

对RDD中所有元素调用Function,返回为true的元素。

<U> JavaRDD<U> flatMap(FlatMapFunction<T,U> f)

先对RDD所有元素调用Function,然后将结果扁平化。

JavaRDD<T> sample(boolean withReplacement, double fraction, long seed)

抽样。

JavaRDD<T> distinct(int numPartitions)

去除重复元素。

JavaPairRDD<K,Iterable<V>> groupByKey(int numPartitions)

返回(K,Seq[V]),将key相同的value组成一个集合。

JavaPairRDD<K,V> reduceByKey(Function2<V,V,V> func, int numPartitions)

对key相同的value调用Function。

JavaPairRDD<K,V> sortByKey(boolean ascending, int numPartitions)

按照key来进行排序,ascending为true时是升序否则为降序。

JavaPairRDD<K,scala.Tuple2<V,W>> join(JavaPairRDD<K,W> other)

当有两个KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numTasks为并发的任务数。

JavaPairRDD<K,scala.Tuple2<Iterable<V>,Iterable<W>>> cogroup(JavaPairRDD<K,W> other, int numPartitions)

当有两个KV的dataset(K,V)和(K,W),返回的是<K,scala.Tuple2<Iterable<V>,Iterable<W>>>的dataset,numTasks为并发的任务数。

JavaPairRDD<T,U> cartesian(JavaRDDLike<U,?> other)

返回该RDD与其它RDD的笛卡尔积。

表2 Action

方法

说明

T reduce(Function2<T,T,T> f)

对RDD中的元素调用Function2。

java.util.List<T> collect()

返回包含RDD中所有元素的一个数组。

long count()

返回的是dataset中的element的个数。

T first()

返回的是dataset中的第一个元素。

java.util.List<T> take(int num)

返回前n个elements。

java.util.List<T> takeSample(boolean withReplacement, int num, long seed)

对dataset随机抽样,返回有num个元素组成的数组。withReplacement表示是否使用replacement。

void saveAsTextFile(String path, Class<? extends org.apache.hadoop.io.compress.CompressionCodec> codec)

把dataset写到一个text file、hdfs、或者hdfs支持的文件系统中,spark把每条记录都转换为一行记录,然后写到file中。

java.util.Map<K,Object> countByKey()

对每个key出现的次数做统计。

void foreach(VoidFunction<T> f)

在数据集的每一个元素上,运行函数func。

java.util.Map<T,Long> countByValue()

对RDD中每个元素出现的次数进行统计。

Spark Streaming常用接口

Spark Streaming中常见的类有:

  • JavaStreamingContext:是Spark Streaming功能的主入口,负责提供创建DStreams的方法,入参中需要设置批次的时间间隔。
  • JavaDStream:是一种代表RDDs连续序列的数据类型,代表连续数据流。
  • JavaPairDStream:KV DStream的接口,提供reduceByKey和join等操作。
  • JavaReceiverInputDStream<T>:定义任何从网络接受数据的输入流。

Spark Streaming的常见方法与Spark Core类似,下表罗列了Spark Streaming特有的一些方法。

表3 Spark Streaming方法

方法

说明

JavaReceiverInputDStream<java.lang.String> socketStream(java.lang.String hostname,int port)

创建一个输入流,通过TCP socket从对应的hostname和端口接受数据。接受的字节被解析为UTF8格式。默认的存储级别为Memory+Disk。

JavaDStream<java.lang.String> textFileStream(java.lang.String directory)

入参directory为HDFS目录,该方法创建一个输入流检测可兼容Hadoop文件系统的新文件,并且读取为文本文件。

void start()

启动Streaming计算。

void awaitTermination()

当前进程等待终止,如Ctrl+C等。

void stop()

终止Streaming计算。

<T> JavaDStream<T> transform(java.util.List<JavaDStream<?>> dstreams,Function2<java.util.List<JavaRDD<?>>,Time,JavaRDD<T>> transformFunc)

对每个RDD进行function操作,得到一个新的DStream。这个函数中JavaRDDs的顺序和list中对应的DStreams保持一致。

<T> JavaDStream<T> union(JavaDStream<T> first,java.util.List<JavaDStream<T>> rest)

从多个具备相同类型和滑动时间的DStream中创建统一的DStream。

表4 Streaming增强特性接口

方法

说明

JAVADStreamKafkaWriter.writeToKafka()

支持将DStream中的数据批量写入到Kafka。

JAVADStreamKafkaWriter.writeToKafkaBySingle()

支持将DStream中的数据逐条写入到Kafka。

Spark SQL常用接口

Spark SQL中重要的类有:

  • SQLContext:是Spark SQL功能和DataFrame的主入口。
  • DataFrame:是一个以命名列方式组织的分布式数据集
  • DataFrameReader:从外部存储系统加载DataFrame的接口。
  • DataFrameStatFunctions:实现DataFrame的统计功能。
  • UserDefinedFunction:用户自定义的函数。

常见的Actions方法有:

表5 Spark SQL方法介绍

方法

说明

Row[] collect()

返回一个数组,包含DataFrame的所有列。

long count()

返回DataFrame的行数。

DataFrame describe(java.lang.String... cols)

计算统计信息,包含计数,平均值,标准差,最小值和最大值。

Row first()

返回第一行。

Row[] head(int n)

返回前n行。

void show()

用表格形式显示DataFrame的前20行。

Row[] take(int n)

返回DataFrame中的前n行。

表6 基本的DataFrame Functions介绍

方法

说明

void explain(boolean extended)

打印出SQL语句的逻辑计划和物理计划。

void printSchema()

打印schema信息到控制台。

registerTempTable

将DataFrame注册为一张临时表,其周期和SQLContext绑定在一起。

DataFrame toDF(java.lang.String... colNames)

返回一个列重命名的DataFrame。

DataFrame sort(java.lang.String sortCol,java.lang.String... sortCols)

根据不同的列,按照升序或者降序排序。

GroupedData rollup(Column... cols)

对当前的DataFrame特定列进行多维度的回滚操作。