博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
解决spark.rdd.MapPartitionsRDD cannot be cast to streaming.kafka010.HasOffsetRange问题
阅读量:2178 次
发布时间:2019-05-01

本文共 3360 字,大约阅读时间需要 11 分钟。

最近在做sparkstreaming测试的时候,自己出了一个小问题,记录下.

贴部分代码:

package com.ybs.screen.test.dataimport java.langimport java.util.Propertiesimport com.ybs.screen.constant.Constantsimport com.ybs.screen.model.{ProperModel, UnitInfo}import com.ybs.screen.utils.PropertiesUtilimport org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer}import org.apache.kafka.common.TopicPartitionimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.sql.SparkSessionimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.dstream.{DStream, InputDStream}import org.apache.spark.streaming.kafka010._import org.apache.spark.{SparkConf, SparkContext, TaskContext}import org.elasticsearch.spark.streaming.EsSparkStreamingimport scala.collection.JavaConverters._object DemoTest {  def main(args: Array[String]): Unit = {    val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")    val sparkSession: SparkSession = PropertiesUtil.getSparkSessionTest(sparkConf)    sparkSession.sparkContext.setLogLevel("WARN")    val ssc: StreamingContext = new StreamingContext(sparkSession.sparkContext,Seconds(10))    //kafka集群和topic    val kafkaBrokers:String = ProperModel.getString(Constants.KAFKA_METADATA_BROKER_LIST)    val kafkaTopics: String =ProperModel.getString( Constants.KAFKA_TOPICS)    val kafkaParam = Map(      "bootstrap.servers" -> kafkaBrokers,      "key.deserializer" -> classOf[StringDeserializer],      "value.deserializer" -> classOf[StringDeserializer],      "group.id" -> "group4",      //      "auto.offset.reset" -> "latest",      "enable.auto.commit" -> (false: lang.Boolean)    )    //    ssc.checkpoint("./streaming_checkpoint")    //从kafka获取数据    val inputDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](      ssc,      LocationStrategies.PreferConsistent,      ConsumerStrategies.Subscribe[String, String](Set(kafkaTopics), kafkaParam, getLastOffsets(kafkaParam,Set(kafkaTopics))))    val value: DStream[String] = inputDStream.map(x => x.value())    EsSparkStreaming.saveToEs(value, "test/doc")    value.foreachRDD(rdd =>{      val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges      inputDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)    })    ssc.start()    ssc.awaitTermination()  }	}

在保存offset的时候,报了一个错误spark.rdd.MapPartitionsRDD cannot be cast to streaming.kafka010.HasOffsetRange保存offset报错

在网上查了一下发现只有从kafka拿到的inputDStream,才能转换为kafkaRDD. 后面做其他操作的时候会把kafkaRDD转换为非kafkaRDD,这时候就会报错,贴一下源码

private[spark] class KafkaRDD[K, V](    sc: SparkContext,    val kafkaParams: ju.Map[String, Object],    val offsetRanges: Array[OffsetRange],    val preferredHosts: ju.Map[TopicPartition, String],    useConsumerCache: Boolean) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges//只有KafkaRDD才可以转换成OffsetRange//且只有通过InputDStream所得到的第一手数据才包含KafkaRDD

知道了原因以后,解决起来就简单了,可以在获得的inputDStream里面操作,获取偏移量,将存往elasticsearch的操作放在inputDStream里面, 或者在获取到inputDStream的时候先保存offset.然后再操作,这里我采取了笨一点的方法,在拿到inputDStream的时候就直接先存了offset.再进行其他操作

inputDStream.foreachRDD(rdd =>{      val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges      inputDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)    })

参考:

转载地址:http://ezjkb.baihongyu.com/

你可能感兴趣的文章
分布式系统理论基础5:选举、多数派和租约
查看>>
分布式系统理论基础6:Raft、Zab
查看>>
分布式系统理论进阶7:Paxos变种和优化
查看>>
分布式系统理论基础8:zookeeper分布式协调服务
查看>>
搞懂分布式技术1:分布式系统的一些基本概念
查看>>
搞懂分布式技术2:分布式一致性协议与Paxos,Raft算法
查看>>
搞懂分布式技术3:初探分布式协调服务zookeeper
查看>>
搞懂分布式技术4:ZAB协议概述与选主流程详解
查看>>
搞懂分布式技术5:Zookeeper的配置与集群管理实战
查看>>
搞懂分布式技术6:Zookeeper典型应用场景及实践
查看>>
搞懂分布式技术10:LVS实现负载均衡的原理与实践
查看>>
搞懂分布式技术11:分布式session解决方案与一致性hash
查看>>
搞懂分布式技术12:分布式ID生成方案
查看>>
搞懂分布式技术13:缓存的那些事
查看>>
搞懂分布式技术14:Spring Boot使用注解集成Redis缓存
查看>>
搞懂分布式技术15:缓存更新的套路
查看>>
搞懂分布式技术16:浅谈分布式锁的几种方案
查看>>
搞懂分布式技术17:浅析分布式事务
查看>>
搞懂分布式技术18:分布式事务常用解决方案
查看>>
搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务
查看>>