1st testing:
the code testing below:
object streamtest { def main(args: array[string]) { val sc = new sparkcontext val ssc = new streamingcontext(sc, seconds(1)) ssc.checkpoint("./checkpoint") val lines = ssc.sockettextstream("192.168.11.5", 9999, storagelevel.memory_only_ser) val accstream = lines.map((_ , "")).updatestatebykey(updatefunc) accstream.print() ssc.start() ssc.awaittermination() } def updatefunc: (seq[string], option[int]) => option[int] = { case _ => some(1) } }
when send 1 data(only one) netcat, see screenshot:
the result is:
my question is: why result printed time? why not 1 time? (i send 1 data socket client.)
2nd testing:
i'm testing again(set spark streaming interval time 5 seconds):
send data:
the result is:
3rd tesing:
using constantinputdstream
tesing, code below:
object sparkstreaming { def main(args: array[string]) { val sc = new sparkcontext val ssc = new streamingcontext(sc, seconds(1)) ssc.checkpoint("./checkpoint") val seq = seq("key") //every 1 second send "key" val rdd = ssc.sparkcontext.parallelize(seq) //using constantinputdstream inputdstream val inputdstream = new constantinputdstream(ssc, rdd) val map = inputdstream.map((_, "")).updatestatebykey(updatefunc) map.print ssc.start ssc.awaittermination } def updatefunc: (seq[string], option[int]) => option[int] = { case _ => some(1) } }
the result is:
the result of 3rd testing the same to result of 1st testing.
in 1st test, send "key" in the 1st second.
in 3rd test, constantinputdstream send "key" every 1 second.
but why results same? result odd using sockettextstream
.
could tell me why? thank much!
the whole point of updatestatebykey
save , accumulate state when needed. after updatestatebykey
stream set of tuples keys , returned values update function. keep state of key until return none
update function instead of some
.
you can refer example implementation in answer: how process subset of input records in batch, i.e. first second in 3-sec batch time?
Comments
Post a Comment