i trying read streaming data, using spark python, , change data format streaming data. seems cannot read stream...
here steps:
i opened 1 terminal, cd input data folder, type command line
ls part-* | xargs -i % sh -c '{ cat %; sleep 5;}' | nc -lk 9999
then open terminal, type
setenv spark_home /user/abc/downloads/spark-1.5.2-bin-hadoop2.6/
, run spark locally. type command${spark_home}/bin/spark-submit --master local /user/abc/test.py localhost 9999
run code.
below code, testing whether reading streaming data , change data format... shows error: 16/01/28 22:41:37 info receiversupervisorimpl: starting receiver 16/01/28 22:41:37 info receiversupervisorimpl: called receiver onstart 16/01/28 22:41:37 info receiversupervisorimpl: receiver started again 16/01/28 22:41:37 info socketreceiver: connecting localhost:9999 16/01/28 22:41:37 info socketreceiver: connected localhost:9999 16/01/28 22:41:37 info socketreceiver: closed socket localhost:9999 16/01/28 22:41:37 warn receiversupervisorimpl: restarting receiver delay 2000 ms: socket data stream had no more data
if re-run ls part-* | xargs -i % sh -c '{ cat %; sleep 5;}' | nc -lk 9999
, still shows same error.... know how solve problem?
import sys import re pyspark import sparkcontext pyspark.sql.context import sqlcontext pyspark.sql import row pyspark.streaming import streamingcontext sc = sparkcontext(appname="test") ssc = streamingcontext(sc, 5) sqlcontext = sqlcontext(sc) def get_tuple(r): m = re.search('\[(.*?)\]',r) s = m.group(1) fs = s.split(',') in range(len(fs)): if > 1: fs[i] = float(fs[i]) return fs def main(): indata = ssc.sockettextstream(sys.argv[1], int(sys.argv[2])) inrdd = indata.map(lambda r: get_tuple(r)) features = row('feature_vec') features_rdd = inrdd.map(lambda r: features(r)) features_rdd.pprint(num=10) ssc.start() ssc.awaittermination() if __name__ == "__main__": main()
problem solved. spark command line should add [*] spark streaming, this:
${spark_home}/bin/spark-submit --master local[*] /user/abc/test.py localhost 9999
then output appear
Comments
Post a Comment