博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SparkStreaming+Kafka 处理实时WIFI数据
阅读量:6578 次
发布时间:2019-06-24

本文共 13530 字,大约阅读时间需要 45 分钟。

 

 


觉得有用的话,点个赞啊~~~ O(∩_∩)O~~


业务背景

通过实时抽取华为ESIGHT系统的wifi数据,与校园的学生数据、课程数据、地理位置数据等进行关联,进行校园大数据的流数据处理与分析。

技术选型

  • Kafka调用ESIGHT的resutful API,接入无线数据;
  • Sparkstreaming将流数据与Hive中的其他校园数据关联分析
  • 使用ES For Hadoop将分析结果导出到ES集群中

Kafka Producer

技术常规,使用kafka接入ESIGHT数据,只需要注意

  • 默认的分区方法是否产生数据偏移
  • 如果偏移需要自定义kafka.producer.Partitioner

SparkStreaming 接收Kafka数据流

用spark streaming流式处理kafka中的数据,第一步当然是先把数据接收过来,转换为spark streaming中的数据结构Dstream。

接收数据的方式有两种:

基于Receiver接收数据

这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。

然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

需要注意的问题有:

  • 在Receiver的方式中,Spark中的partition和kafka中的partition并不是相关的,所以如果我们加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度。
  • 对于不同的Group和topic我们可以使用多个Receiver创建不同的Dstream来并行接收数据,之后可以利用union来统一成一个Dstream。
  • 如果我们启用了Write Ahead Logs复制到文件系统如HDFS,那么storage level需要设置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER

直连方式读取kafka数据

这种新的不基于Receiver的直接方式,是在Spark 1.3之后引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

这种方式有如下优点:

  • 简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。

  • 高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。

  • 一次且仅一次(extract-once)的事务机制: 基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。 基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

Direct连接示例

import org.apache.spark.streaming.kafka.*; JavaPairInputDStream
directKafkaStream = KafkaUtils.createDirectStream(streamingContext, [key class], [value class], [key decoder class], [value decoder class], [map of Kafka parameters], [set of topics to consume]);

但Direct连接方式为了能够进行异常恢复,需要考虑如何维护KafkaOffset的问题。通常由两种方式维护

  • 使用Spark的checkpoint机制,根据需要定期checkpoint并恢复。由于项目使用SparkSQL从Hive中拉取数据,可能由于SparkSQLContext的恢复处理不当,在恢复的时候会失败;
  • 通过SparkStreaming的API在Zookeeper中维护Kafka的Offset

使用Zookeeper维护KafkaOffset示例

import java.util.HashMap;import java.util.HashSet;import java.util.Map;import java.util.Set;import java.util.concurrent.atomic.AtomicReference;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.VoidFunction;import org.apache.spark.broadcast.Broadcast;import org.apache.spark.streaming.Duration;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import org.apache.spark.streaming.kafka.HasOffsetRanges;import org.apache.spark.streaming.kafka.KafkaCluster;import org.apache.spark.streaming.kafka.KafkaUtils;import org.apache.spark.streaming.kafka.OffsetRange;import com.sugon.smartcampus.etl.wifi.conf.WIFIConfig;import kafka.common.TopicAndPartition;import kafka.message.MessageAndMetadata;import kafka.serializer.StringDecoder;import scala.Predef;import scala.Tuple2;import scala.collection.JavaConversions;import lombok.extern.slf4j.*;@Slf4jpublic class KafkaOffsetExample {	private static KafkaCluster kafkaCluster = null;	private static HashMap
kafkaParam = new HashMap
(); private static Broadcast
> kafkaParamBroadcast = null; private static scala.collection.immutable.Set
immutableTopics = null; /** * Create the Kafka Stream Directly With Offset in ZK * * @param jssc * SparkStreamContext * @param consumerOffsetsLong * Save the Offset of Kafka Topic * @return */ private static JavaInputDStream
createKafkaDStream(JavaStreamingContext jssc, Map
consumerOffsetsLong) { KafkaOffsetExample.log.warn("Create KafkaDriectStream with Offset"); JavaInputDStream
message = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParamBroadcast.getValue(), consumerOffsetsLong, new Function
, String>() { private static final long serialVersionUID = 1L; @Override public String call(MessageAndMetadata
v1) throws Exception { return v1.message(); } }); return message; } private static Map
initConsumerOffset(String topic) { Set
topicSet = new HashSet
(); topicSet.add(topic); scala.collection.mutable.Set
mutableTopics = JavaConversions.asScalaSet(topicSet); immutableTopics = mutableTopics.toSet(); scala.collection.immutable.Set
topicAndPartitionSet = kafkaCluster .getPartitions(immutableTopics).right().get(); // kafka direct stream 初始化时使用的offset数据 Map
consumerOffsetsLong = new HashMap
(); if (kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet).isLeft()) { KafkaOffsetExample.log.warn("没有保存offset, 各个partition offset 默认为0"); Set
topicAndPartitionSet1 = JavaConversions.setAsJavaSet(topicAndPartitionSet); for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) { consumerOffsetsLong.put(topicAndPartition, 0L); } } else { KafkaOffsetExample.log.warn("offset已存在, 使用保存的offset"); scala.collection.immutable.Map
consumerOffsetsTemp = kafkaCluster .getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet).right().get(); Map
consumerOffsets = JavaConversions.mapAsJavaMap(consumerOffsetsTemp); Set
topicAndPartitionSet1 = JavaConversions.setAsJavaSet(topicAndPartitionSet); KafkaOffsetExample.log.warn("put data in consumerOffsetsLong"); for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) { Long offset = (Long) consumerOffsets.get(topicAndPartition); consumerOffsetsLong.put(topicAndPartition, offset); } } return consumerOffsetsLong; } private static JavaDStream
getAndUpdateKafkaOffset(JavaInputDStream
message, AtomicReference
offsetRanges) { JavaDStream
javaDStream = message.transform(new Function
, JavaRDD
>() { private static final long serialVersionUID = 1L; public JavaRDD
call(JavaRDD
rdd) throws Exception { OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); offsetRanges.set(offsets); for (int i = 0; i < offsets.length; i++) KafkaOffsetExample.log.warn("topic : {}, partitions: {}, fromoffset: {}, untiloffset: {}", offsets[i].topic(), offsets[i].partition(), offsets[i].fromOffset(), offsets[i].untilOffset()); return rdd; } }); KafkaOffsetExample.log.warn("foreachRDD"); // output javaDStream.foreachRDD(new VoidFunction
>() { private static final long serialVersionUID = 1L; public void call(JavaRDD
rdd) throws Exception { if (rdd.isEmpty()) { KafkaOffsetExample.log.warn("Empty RDD"); return; } for (OffsetRange o : offsetRanges.get()) { // 封装topic.partition 与 offset对应关系 java Map TopicAndPartition topicAndPartition = new TopicAndPartition(o.topic(), o.partition()); Map
topicAndPartitionObjectMap = new HashMap
(); topicAndPartitionObjectMap.put(topicAndPartition, o.untilOffset()); KafkaOffsetExample.log.warn( "Topic: " + o.topic() + " partitions: " + o.partition() + " offset : " + o.untilOffset()); // 转换java map to scala immutable.map scala.collection.mutable.Map
testMap = JavaConversions .mapAsScalaMap(topicAndPartitionObjectMap); scala.collection.immutable.Map
scalatopicAndPartitionObjectMap = testMap .toMap(new Predef.$less$colon$less
, Tuple2
>() { private static final long serialVersionUID = 1L; @Override public Tuple2
apply(Tuple2
v1) { return v1; } }); // 更新offset到kafkaCluster kafkaCluster.setConsumerOffsets(kafkaParamBroadcast.getValue().get("group.id"), scalatopicAndPartitionObjectMap); } } }); return javaDStream; } private static void initKafkaParams() { kafkaParam.put("metadata.broker.list", WIFIConfig.BROKER_LIST); kafkaParam.put("zookeeper.connect", WIFIConfig.ZK_CONNECT); kafkaParam.put("auto.offset.reset", WIFIConfig.AUTO_OFFSET_RESET); kafkaParam.put("group.id", WIFIConfig.GROUP_ID); } private static KafkaCluster initKafkaCluster() { KafkaOffsetExample.log.warn("transform java Map to scala immutable.map"); // transform java Map to scala immutable.map scala.collection.mutable.Map
testMap = JavaConversions.mapAsScalaMap(kafkaParam); scala.collection.immutable.Map
scalaKafkaParam = testMap .toMap(new Predef.$less$colon$less
, Tuple2
>() { private static final long serialVersionUID = 1L; @Override public Tuple2
apply(Tuple2
arg0) { return arg0; } }); // init KafkaCluster KafkaOffsetExample.log.warn("Init KafkaCluster"); return new KafkaCluster(scalaKafkaParam); } public static void run() { initKafkaParams(); kafkaCluster = initKafkaCluster(); SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("tachyon-test-consumer"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5000)); // 得到rdd各个分区对应的offset, 并保存在offsetRanges中 KafkaOffsetExample.log.warn("initConsumer Offset"); Map
consumerOffsetsLong = initConsumerOffset(WIFIConfig.KAFKA_TOPIC); kafkaParamBroadcast = jssc.sparkContext().broadcast(kafkaParam); JavaInputDStream
message = createKafkaDStream(jssc, consumerOffsetsLong); final AtomicReference
offsetRanges = new AtomicReference
(); JavaDStream
javaDStream = getAndUpdateKafkaOffset(message, offsetRanges); javaDStream.print(); jssc.start(); try { jssc.awaitTermination(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { String testPath = "E:\\javaCodes\\svn\\SmartCampus\\Trunk\\smartcampus.etl.wifi\\src\\main\\resources\\WifiConfig.yaml"; WIFIConfig.init(testPath); KafkaOffsetExample.log.warn(WIFIConfig.toStr()); KafkaOffsetExample.run(); }}

  

 

SparkStreaming 数据处理

根据需要,将流式数据与Hive中的静态数据关联,结果通过Elasticsearch For Hadoop导出到ES集群中。

如果静态数据需要定时更新,可以在创建数据流后,在foreachRDD逻辑中,根据实际情况定期更新静态数据。

调优

由于个人经验较少,处理的数据量不大,以下内容大多是纸上谈兵,仅供参考。

合理的批处理时间(batchDuration)

  • 几乎所有的Spark Streaming调优文档都会提及批处理时间的调整,在StreamingContext初始化的时候,有一个参数便是批处理时间的设定。
  • 如果这个值设置的过短,即个batchDuration所产生的Job并不能在这期间完成处理,那么就会造成数据不断堆积,最终导致Spark Streaming发生阻塞。
  • 一般对于batchDuration的设置不会小于500ms,因为过小会导致SparkStreaming频繁的提交作业,对整个streaming造成额外的负担。
  • 在平时的应用中,根据不同的应用场景和硬件配置,我设在1~10s之间,我们可以根据SparkStreaming的可视化监控界面,观察Total Delay来进行batchDuration的调整,直达SparkStreaming刚刚能及时处理完上一个批处理的数据,这样就是目前情况的最优值。

 

合理的Kafka拉取量(maxRatePerPartition重要)

spark.streaming.kafka.maxRatePerPartition参数配置指定了每秒每一个topic的每一个分区获取的最大消息数。

对于Spark Streaming消费kafka中数据的应用场景,这个配置是非常关键的。这个参数默认是没有上限的,即kafka当中有多少数据它就会直接全部拉出。而根据生产者写入Kafka的速率以及消费者本身处理数据的速度,同时这个参数需要结合上面的batchDuration,使得每个partition拉取在每个batchDuration期间拉取的数据能够顺利的处理完毕,做到尽可能高的吞吐量,而这个参数的调整可以参考可视化监控界面中的Input Rate和Processing Time。

 

缓存反复使用的Dstream(RDD)

Spark中的RDD和SparkStreaming中的Dstream,如果被反复的使用,最好利用cache(),将该数据流缓存起来,防止过度的调度资源造成的网络开销。可以参考观察Scheduling Delay参数。

设置合理的GC

长期使用Java的小伙伴都知道,JVM中的垃圾回收机制,可以让我们不过多的关注与内存的分配回收,更加专注于业务逻辑,JVM都会为我们搞定。对JVM有些了解的小伙伴应该知道,在Java虚拟机中,将内存分为了初生代(eden generation)、年轻代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗费一定时间的,尤其是老年代的GC回收,需要对内存碎片进行整理,通常采用标记-清楚的做法。同样的在Spark程序中,JVM GC的频率和时间也是影响整个Spark效率的关键因素。在通常的使用中建议:

设置年老代为并发收集。--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"

设置合理的CPU资源数

CPU的core数量,每个executor可以占用一个或多个core,可以通过观察CPU的使用率变化来了解计算资源的使用情况,例如,很常见的一种浪费是一个executor占用了多个core,但是总的CPU使用率却不高(因为一个executor并不总能充分利用多核的能力),这个时候可以考虑让么个executor占用更少的core,同时worker下面增加更多的executor,或者一台host上面增加更多的worker来增加并行执行的executor的数量,从而增加CPU利用率。

但是增加executor的时候需要考虑好内存消耗,因为一台机器的内存分配给越多的executor,每个executor的内存就越小,以致出现过多的数据spill over甚至out of memory的情况。

设置合理的parallelism

partition和parallelism,partition指的就是数据分片的数量,每一次task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多executor的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低。在执行action类型操作的时候(比如各种reduce操作),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的paritition数量(而在进行map类操作的时候,partition数量通常取自parent RDD中较大的一个,而且也不会涉及shuffle,因此这个parallelism的参数没有影响)。所以说,这两个概念密切相关,都是涉及到数据分片的,作用方式其实是统一的。通过spark.default.parallelism可以设置默认的分片数量,而很多RDD的操作都可以指定一个partition参数来显式控制具体的分片数量。 在SparkStreaming+kafka的使用中,我们采用了Direct连接方式,前文阐述过Spark中的partition和Kafka中的Partition是一一对应的,我们一般默认设置为Kafka中Partition的数量。

使用高性能的算子

这里参考了美团技术团队的博文,并没有做过具体的性能测试,其建议如下:

  • 使用reduceByKey/aggregateByKey替代groupByKey

  • 使用mapPartitions替代普通map

  • 使用foreachPartitions替代foreach

  • 使用filter之后进行coalesce操作

  • 使用repartitionAndSortWithinPartitions替代repartition与sort类操作

  • 使用Kryo优化序列化性能 这个优化原则我本身也没有经过测试,但是好多优化文档有提到,这里也记录下来。 在Spark中,主要有三个地方涉及到了序列化:

  • 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。

  • 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。

  • 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。

官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。

以下是使用Kryo的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等):

// 创建SparkConf对象。val conf = new SparkConf().setMaster(...).setAppName(...) // 设置序列化器为KryoSerializer。 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 注册要序列化的自定义类型。 conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

参考

转载地址:http://khyno.baihongyu.com/

你可能感兴趣的文章
如何构建一个简单的CAAS系统
查看>>
Charles破解SSL
查看>>
linux 下使用poppler qt5
查看>>
安装 Elasticsearch + Kubana + Marvel
查看>>
如何成为一名Chrome应用开发者
查看>>
Flex 布局教程:语法篇
查看>>
linux ssh修改端口
查看>>
Compass布局模块
查看>>
Android 开源SlideSwitch源码分析
查看>>
【转】shell十三问,为linux学习打基础(下)
查看>>
SpringBoot应用之ELK
查看>>
一个后端的前端学习之旅——4.第一个demo上线以及关于前端框架我的看法
查看>>
飞龙的程序员书单 – 思想、工程、架构、职业发展
查看>>
AngularJS学习笔记一(RequireJS + AngularJS)
查看>>
泛型与闭包
查看>>
[Leetcode] Contains Duplicate 包含重复
查看>>
工欲善其事,必先利其器_Xcode插件的安装
查看>>
利用终端快速安装cocoapods
查看>>
作为一名Java程序员一定要不断关注学习最前沿的技术 ...
查看>>
机器人摘苹果,果农的的“世界末日”来临了吗? ...
查看>>