Kafka ->Spark streaming -> Hbase. Task not serializable Error Caused by: java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING -
i trying write data produced kafka commandline producer topic.
i facing problem , unable proceed. below code creating jar , running through spark-submit on spark-shell.
am doing wrong inside foreachrdd()
? wrong sparkkafkademo$2.call(sparkkafkademo.java:63)
line in below error message?
sparkconf sparkconf = new sparkconf().setappname("javakafkademo").setmaster("local").setsparkhome("/users/kvk/softwares/spark-1.3.1-bin-hadoop2.4"); javastreamingcontext jsc = new javastreamingcontext(sparkconf, duration.seconds(1)); int numthreads = 2; map<string, integer> topicmap = new hashmap<string, integer>(); topicmap.put("nonview", numthreads); javapairreceiverinputdstream<string, string> messages = kafkautils.createstream(jsc, "localhost", "viewconsumer", topicmap); javadstream<string> lines = messages.map(new function<tuple2<string, string>, string>() { @override public string call(tuple2<string, string> tuple2) { return tuple2._2(); } }); lines.foreachrdd(new function<javardd<string>, void>() { @override public void call(javardd<string> stringjavardd) throws exception { javapairrdd<immutablebyteswritable, put> hbaseputs = stringjavardd.maptopair( new pairfunction<string, immutablebyteswritable, put>() { @override public tuple2<immutablebyteswritable, put> call(string line) throws exception { put put = new put(bytes.tobytes("rowkey" + math.random())); put.addcolumn(bytes.tobytes("firstfamily"), bytes.tobytes("firstcolumn"), bytes.tobytes(line+"fc")); return new tuple2<immutablebyteswritable, put>(new immutablebyteswritable(), put); } }); // save hbase- spark built-in api method hbaseputs.saveasnewapihadoopdataset(newapijobconfiguration1.getconfiguration()); return null; } } ); jsc.start(); jsc.awaittermination();
error :
./bin/spark-submit --class "sparkkafkademo" --master local /users/kvk/intellijworkspace/hbasedemo/hbasedemo.jar exception in thread "main" org.apache.spark.sparkexception: task not serializable @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:166) @ org.apache.spark.util.closurecleaner$.clean(closurecleaner.scala:158) @ org.apache.spark.sparkcontext.clean(sparkcontext.scala:1623) @ org.apache.spark.rdd.rdd.map(rdd.scala:286) @ org.apache.spark.api.java.javarddlike$class.maptopair(javarddlike.scala:113) @ org.apache.spark.api.java.abstractjavarddlike.maptopair(javarddlike.scala:46) @ sparkkafkademo$2.call(sparkkafkademo.java:63) @ sparkkafkademo$2.call(sparkkafkademo.java:60) @ org.apache.spark.streaming.api.java.javadstreamlike$$anonfun$foreachrdd$1.apply(javadstreamlike.scala:311) @ org.apache.spark.streaming.api.java.javadstreamlike$$anonfun$foreachrdd$1.apply(javadstreamlike.scala:311) @ org.apache.spark.streaming.dstream.dstream$$anonfun$foreachrdd$1.apply(dstream.scala:534) @ org.apache.spark.streaming.dstream.dstream$$anonfun$foreachrdd$1.apply(dstream.scala:534) @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply$mcv$sp(foreachdstream.scala:42) @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply(foreachdstream.scala:40) @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply(foreachdstream.scala:40) @ scala.util.try$.apply(try.scala:161) @ org.apache.spark.streaming.scheduler.job.run(job.scala:32) @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply$mcv$sp(jobscheduler.scala:176) @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply(jobscheduler.scala:176) @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply(jobscheduler.scala:176) @ scala.util.dynamicvariable.withvalue(dynamicvariable.scala:57) @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler.run(jobscheduler.scala:175) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615) @ java.lang.thread.run(thread.java:745) caused by: java.lang.illegalstateexception: job in state define instead of running @ org.apache.hadoop.mapreduce.job.ensurestate(job.java:283) @ org.apache.hadoop.mapreduce.job.tostring(job.java:452) @ java.lang.string.valueof(string.java:2847) @ java.lang.stringbuilder.append(stringbuilder.java:128) @ scala.stringcontext.standardinterpolator(stringcontext.scala:122) @ scala.stringcontext.s(stringcontext.scala:90) @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visit(serializationdebugger.scala:103) @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visitserializable(serializationdebugger.scala:158) @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visit(serializationdebugger.scala:99) @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visitserializable(serializationdebugger.scala:158) @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visit(serializationdebugger.scala:99) @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visitserializable(serializationdebugger.scala:158) @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visit(serializationdebugger.scala:99) @ org.apache.spark.serializer.serializationdebugger$.find(serializationdebugger.scala:58) @ org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:39) @ org.apache.spark.serializer.javaserializationstream.writeobject(javaserializer.scala:47) @ org.apache.spark.serializer.javaserializerinstance.serialize(javaserializer.scala:80) @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:164) ... 24 more
please add serialization
sparkconf.set("spark.serializer", "org.apache.spark.serializer.kryoserializer");
Comments
Post a Comment