How to use updateStateByKey() with using socketTextStream in spark streaming? -


1st testing:

the code testing below:

object streamtest {      def main(args: array[string]) {         val sc = new sparkcontext         val ssc = new streamingcontext(sc, seconds(1))         ssc.checkpoint("./checkpoint")          val lines = ssc.sockettextstream("192.168.11.5", 9999, storagelevel.memory_only_ser)         val accstream = lines.map((_ , "")).updatestatebykey(updatefunc)         accstream.print()         ssc.start()         ssc.awaittermination()     }     def updatefunc: (seq[string], option[int]) => option[int] = { case _ => some(1) } } 

when send 1 data(only one) netcat, see screenshot: enter image description here

the result is:

enter image description here

my question is: why result printed time? why not 1 time? (i send 1 data socket client.)

2nd testing:

i'm testing again(set spark streaming interval time 5 seconds):

send data:

enter image description here

the result is:

enter image description here

3rd tesing:

using constantinputdstream tesing, code below:

object sparkstreaming {     def main(args: array[string]) {         val sc = new sparkcontext         val ssc = new streamingcontext(sc, seconds(1))         ssc.checkpoint("./checkpoint")         val seq = seq("key")   //every 1 second send "key"         val rdd = ssc.sparkcontext.parallelize(seq)         //using constantinputdstream inputdstream         val inputdstream = new constantinputdstream(ssc, rdd)          val map = inputdstream.map((_, "")).updatestatebykey(updatefunc)         map.print         ssc.start         ssc.awaittermination     }      def updatefunc: (seq[string], option[int]) => option[int] = { case _ => some(1) } } 

the result is:

enter image description here

the result of 3rd testing the same to result of 1st testing.

in 1st test, send "key" in the 1st second.

in 3rd test, constantinputdstream send "key" every 1 second.

but why results same? result odd using sockettextstream.

could tell me why? thank much!

the whole point of updatestatebykey save , accumulate state when needed. after updatestatebykey stream set of tuples keys , returned values update function. keep state of key until return none update function instead of some.

you can refer example implementation in answer: how process subset of input records in batch, i.e. first second in 3-sec batch time?


Comments