apache spark - Serialization error when using a non-serializable object in driver code -


i'm using spark streaming process stream processing each partition (saving events hbase), ack last event in each rdd driver receiver, receiver can ack source in turn.

public class streamprocessor {    final ackclient ackclient;    public streamprocessor(ackclient ackclient) {     this.ackclient = ackclient;   }    public void process(final javareceiverinputdstream<event> inputdstream)     inputdstream.foreachrdd(rdd -> {       javardd<event> lastevents = rdd.mappartition(events -> {         // ------ code executes on worker -------         // process events 1 one; don't use ackclient here         // return event max delivery tag here       });       // ------ code executes on driver -------       event lastevent = .. // find event max delivery tag across partitions       ackclient.ack(lastevent); // use ackclient ack last event     });   } } 

the problem here following error (even though seems work fine):

org.apache.spark.sparkexception: task not serializable     @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:166)     @ org.apache.spark.util.closurecleaner$.clean(closurecleaner.scala:158)     @ org.apache.spark.sparkcontext.clean(sparkcontext.scala:1435)     @ org.apache.spark.rdd.rdd.mappartitions(rdd.scala:602)     @ org.apache.spark.api.java.javarddlike$class.mappartitions(javarddlike.scala:141)     @ org.apache.spark.api.java.javardd.mappartitions(javardd.scala:32) ... caused by: java.io.notserializableexception: <some non-serializable object used ackclient> ... 

it seems spark trying serialize ackclient send workers, thought code inside mappartitions serialized/shipped workers, , code @ rdd level (i.e. inside foreachrdd not inside mappartitions) not serialized/shipped workers.

can confirm if thinking correct or not? , if correct, should reported bug?

you correct, fixed in 1.1. however, if @ stack trace, cleaner throwing being invoked in mappartitions

at org.apache.spark.sparkcontext.clean(sparkcontext.scala:1435)

at org.apache.spark.rdd.rdd.mappartitions(rdd.scala:602)

so, problem has mappartitions. make sure aren't accidentally wrapping this, common issue


Comments

Popular posts from this blog

javascript - gulp-nodemon - nodejs restart after file change - Error: listen EADDRINUSE events.js:85 -

Fatal Python error: Py_Initialize: unable to load the file system codec. ImportError: No module named 'encodings' -

oracle - Changing start date for system jobs related to automatic statistics collections in 11g -