本文共 3360 字,大约阅读时间需要 11 分钟。
最近在做sparkstreaming测试的时候,自己出了一个小问题,记录下.
贴部分代码:package com.ybs.screen.test.dataimport java.langimport java.util.Propertiesimport com.ybs.screen.constant.Constantsimport com.ybs.screen.model.{ProperModel, UnitInfo}import com.ybs.screen.utils.PropertiesUtilimport org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer}import org.apache.kafka.common.TopicPartitionimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.sql.SparkSessionimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.dstream.{DStream, InputDStream}import org.apache.spark.streaming.kafka010._import org.apache.spark.{SparkConf, SparkContext, TaskContext}import org.elasticsearch.spark.streaming.EsSparkStreamingimport scala.collection.JavaConverters._object DemoTest { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]") val sparkSession: SparkSession = PropertiesUtil.getSparkSessionTest(sparkConf) sparkSession.sparkContext.setLogLevel("WARN") val ssc: StreamingContext = new StreamingContext(sparkSession.sparkContext,Seconds(10)) //kafka集群和topic val kafkaBrokers:String = ProperModel.getString(Constants.KAFKA_METADATA_BROKER_LIST) val kafkaTopics: String =ProperModel.getString( Constants.KAFKA_TOPICS) val kafkaParam = Map( "bootstrap.servers" -> kafkaBrokers, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "group4", // "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: lang.Boolean) ) // ssc.checkpoint("./streaming_checkpoint") //从kafka获取数据 val inputDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Set(kafkaTopics), kafkaParam, getLastOffsets(kafkaParam,Set(kafkaTopics)))) val value: DStream[String] = inputDStream.map(x => x.value()) EsSparkStreaming.saveToEs(value, "test/doc") value.foreachRDD(rdd =>{ val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges inputDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) }) ssc.start() ssc.awaitTermination() } }
在保存offset的时候,报了一个错误spark.rdd.MapPartitionsRDD cannot be cast to streaming.kafka010.HasOffsetRange
private[spark] class KafkaRDD[K, V]( sc: SparkContext, val kafkaParams: ju.Map[String, Object], val offsetRanges: Array[OffsetRange], val preferredHosts: ju.Map[TopicPartition, String], useConsumerCache: Boolean) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges//只有KafkaRDD才可以转换成OffsetRange//且只有通过InputDStream所得到的第一手数据才包含KafkaRDD
知道了原因以后,解决起来就简单了,可以在获得的inputDStream里面操作,获取偏移量,将存往elasticsearch的操作放在inputDStream里面, 或者在获取到inputDStream的时候先保存offset.然后再操作,这里我采取了笨一点的方法,在拿到inputDStream的时候就直接先存了offset.再进行其他操作
inputDStream.foreachRDD(rdd =>{ val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges inputDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) })
参考:
转载地址:http://ezjkb.baihongyu.com/