Kafka ->Spark streaming -> Hbase. Task not serializable Error Caused by: java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING -


i trying write data produced kafka commandline producer topic.

i facing problem , unable proceed. below code creating jar , running through spark-submit on spark-shell.

am doing wrong inside foreachrdd() ? wrong sparkkafkademo$2.call(sparkkafkademo.java:63) line in below error message?

sparkconf sparkconf = new sparkconf().setappname("javakafkademo").setmaster("local").setsparkhome("/users/kvk/softwares/spark-1.3.1-bin-hadoop2.4");              javastreamingcontext jsc = new javastreamingcontext(sparkconf, duration.seconds(1));              int numthreads = 2;             map<string, integer> topicmap = new hashmap<string, integer>();              topicmap.put("nonview", numthreads);              javapairreceiverinputdstream<string, string> messages =                     kafkautils.createstream(jsc, "localhost", "viewconsumer", topicmap);              javadstream<string> lines = messages.map(new function<tuple2<string, string>, string>() {                 @override                 public string call(tuple2<string, string> tuple2) {                     return tuple2._2();                 }             });              lines.foreachrdd(new function<javardd<string>, void>() {                                  @override                                  public void call(javardd<string> stringjavardd) throws exception {                                      javapairrdd<immutablebyteswritable, put> hbaseputs = stringjavardd.maptopair(                                              new pairfunction<string, immutablebyteswritable, put>() {                                                  @override                                                  public tuple2<immutablebyteswritable, put> call(string line) throws exception {                                                       put put = new put(bytes.tobytes("rowkey" + math.random()));                                                      put.addcolumn(bytes.tobytes("firstfamily"), bytes.tobytes("firstcolumn"), bytes.tobytes(line+"fc"));                                                      return new tuple2<immutablebyteswritable, put>(new immutablebyteswritable(), put);                                                  }                                              });                                       // save hbase- spark built-in api method                                      hbaseputs.saveasnewapihadoopdataset(newapijobconfiguration1.getconfiguration());                                      return null;                                  }                              }             );             jsc.start();             jsc.awaittermination(); 

error :

./bin/spark-submit --class "sparkkafkademo" --master local /users/kvk/intellijworkspace/hbasedemo/hbasedemo.jar  exception in thread "main" org.apache.spark.sparkexception: task not serializable  @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:166)  @ org.apache.spark.util.closurecleaner$.clean(closurecleaner.scala:158)  @ org.apache.spark.sparkcontext.clean(sparkcontext.scala:1623)  @ org.apache.spark.rdd.rdd.map(rdd.scala:286)  @ org.apache.spark.api.java.javarddlike$class.maptopair(javarddlike.scala:113)  @ org.apache.spark.api.java.abstractjavarddlike.maptopair(javarddlike.scala:46)  @ sparkkafkademo$2.call(sparkkafkademo.java:63)  @ sparkkafkademo$2.call(sparkkafkademo.java:60)  @ org.apache.spark.streaming.api.java.javadstreamlike$$anonfun$foreachrdd$1.apply(javadstreamlike.scala:311)  @ org.apache.spark.streaming.api.java.javadstreamlike$$anonfun$foreachrdd$1.apply(javadstreamlike.scala:311)  @ org.apache.spark.streaming.dstream.dstream$$anonfun$foreachrdd$1.apply(dstream.scala:534)  @ org.apache.spark.streaming.dstream.dstream$$anonfun$foreachrdd$1.apply(dstream.scala:534)  @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply$mcv$sp(foreachdstream.scala:42)  @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply(foreachdstream.scala:40)  @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply(foreachdstream.scala:40)  @ scala.util.try$.apply(try.scala:161)  @ org.apache.spark.streaming.scheduler.job.run(job.scala:32)  @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply$mcv$sp(jobscheduler.scala:176)  @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply(jobscheduler.scala:176)  @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply(jobscheduler.scala:176)  @ scala.util.dynamicvariable.withvalue(dynamicvariable.scala:57)  @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler.run(jobscheduler.scala:175)  @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145)  @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615)  @ java.lang.thread.run(thread.java:745)  caused by: java.lang.illegalstateexception: job in state define instead of running  @ org.apache.hadoop.mapreduce.job.ensurestate(job.java:283)  @ org.apache.hadoop.mapreduce.job.tostring(job.java:452)  @ java.lang.string.valueof(string.java:2847)  @ java.lang.stringbuilder.append(stringbuilder.java:128)  @ scala.stringcontext.standardinterpolator(stringcontext.scala:122)  @ scala.stringcontext.s(stringcontext.scala:90)  @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visit(serializationdebugger.scala:103)  @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visitserializable(serializationdebugger.scala:158)  @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visit(serializationdebugger.scala:99)  @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visitserializable(serializationdebugger.scala:158)  @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visit(serializationdebugger.scala:99)  @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visitserializable(serializationdebugger.scala:158)  @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visit(serializationdebugger.scala:99)  @ org.apache.spark.serializer.serializationdebugger$.find(serializationdebugger.scala:58)  @ org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:39)  @ org.apache.spark.serializer.javaserializationstream.writeobject(javaserializer.scala:47)  @ org.apache.spark.serializer.javaserializerinstance.serialize(javaserializer.scala:80)  @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:164)  ... 24 more 

please add serialization

sparkconf.set("spark.serializer", "org.apache.spark.serializer.kryoserializer"); 

Comments

Popular posts from this blog

facebook - android ACTION_SEND to share with specific application only -

python - Creating a new virtualenv gives a permissions error -

javascript - cocos2d-js draw circle not instantly -