hadoop - Streaming to HBase with pyspark -


there fair amount of info online bulk loading hbase spark streaming using scala (these two particularly useful) , info java, there seems lack of info doing pyspark. questions are:

  • how can data bulk loaded hbase using pyspark?
  • most examples in language show single column per row being upserted. how can upsert multiple columns per row?

the code have follows:

if __name__ == "__main__":      context = sparkcontext(appname="pythonhbasebulkloader")     streamingcontext = streamingcontext(context, 5)      stream = streamingcontext.textfilestream("file:///test/input");      stream.foreachrdd(bulk_load)      streamingcontext.start()     streamingcontext.awaittermination() 

what need bulk load function

def bulk_load(rdd):     #??? 

i've made progress previously, many , various errors (as documented here , here)

so after trial , error, present here best have come with. works well, , bulk loads data (using puts or hfiles) willing believe not best method, comments/other answers welcome. assume you're using csv data.

bulk loading puts

by far easiest way bulk load, creates put request each cell in csv , queues them hbase.

def bulk_load(rdd):     #your configuration different. insert own quorum , parent node , table name     conf = {"hbase.zookeeper.qourum": "localhost:2181",\             "zookeeper.znode.parent": "/hbase-unsecure",\             "hbase.mapred.outputtable": "test",\             "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.tableoutputformat",\             "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.immutablebyteswritable",\             "mapreduce.job.output.value.class": "org.apache.hadoop.io.writable"}      keyconv = "org.apache.spark.examples.pythonconverters.stringtoimmutablebyteswritableconverter"     valueconv = "org.apache.spark.examples.pythonconverters.stringlisttoputconverter"      load_rdd = rdd.flatmap(lambda line: line.split("\n"))\#split input individual lines                   .flatmap(csv_to_key_value)#convert csv line key value pairs     load_rdd.saveasnewapihadoopdataset(conf=conf,keyconverter=keyconv,valueconverter=valueconv) 

the function csv_to_key_value magic happens:

def csv_to_key_value(row):     cols = row.split(",")#split on commas.     #each cell tuple of (key, [key, column-family, column-descriptor, value])     #works n>=1 columns     result = ((cols[0], [cols[0], "f1", "c1", cols[1]]),               (cols[0], [cols[0], "f2", "c2", cols[2]]),               (cols[0], [cols[0], "f3", "c3", cols[3]]))     return result 

the value converter defined earlier convert these tuples hbase puts

bulk loading hfiles

bulk loading hfiles more efficient: rather put request each cell, hfile written directly , regionserver told point new hfile. use py4j, before python code have write small java program:

import py4j.gatewayserver; import org.apache.hadoop.hbase.*;  public class gatewayapplication {      public static void main(string[] args)     {         gatewayapplication app = new gatewayapplication();         gatewayserver server = new gatewayserver(app);         server.start();     } } 

compile this, , run it. leave running long streaming happening. update bulk_load follows:

def bulk_load(rdd):     #the output class changes, else stays     conf = {"hbase.zookeeper.qourum": "localhost:2181",\             "zookeeper.znode.parent": "/hbase-unsecure",\             "hbase.mapred.outputtable": "test",\             "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.hfileoutputformat2",\             "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.immutablebyteswritable",\             "mapreduce.job.output.value.class": "org.apache.hadoop.io.writable"}#"org.apache.hadoop.hbase.client.put"}      keyconv = "org.apache.spark.examples.pythonconverters.stringtoimmutablebyteswritableconverter"     valueconv = "org.apache.spark.examples.pythonconverters.stringlisttoputconverter"      load_rdd = rdd.flatmap(lambda line: line.split("\n"))\                   .flatmap(csv_to_key_value)\                   .sortbykey(true)     #don't process empty rdds     if not load_rdd.isempty():         #saveasnewapihadoopdataset changes saveasnewapihadoopfile         load_rdd.saveasnewapihadoopfile("file:///tmp/hfiles" + starttime,                                         "org.apache.hadoop.hbase.mapreduce.hfileoutputformat2",                                         conf=conf,                                         keyconverter=keyconv,                                         valueconverter=valueconv)         #the file has been written, hbase doesn't know          #get link py4j         gateway = javagateway()         #convert conf fledged configuration type         config = dict_to_conf(conf)         #set our htable         htable = gateway.jvm.org.apache.hadoop.hbase.client.htable(config, "test")         #set our path         path = gateway.jvm.org.apache.hadoop.fs.path("/tmp/hfiles" + starttime)         #get bulk loader         loader = gateway.jvm.org.apache.hadoop.hbase.mapreduce.loadincrementalhfiles(config)         #load hfile         loader.dobulkload(path, htable)     else:         print("nothing process") 

finally, straightforward dict_to_conf:

def dict_to_conf(conf):     gateway = javagateway()     config = gateway.jvm.org.apache.hadoop.conf.configuration()     keys = conf.keys()     vals = conf.values()     in range(len(keys)):         config.set(keys[i], vals[i])     return config 

as can see, bulk loading hfiles more complex using puts, depending on data load worth since once working it's not difficult.

one last note on caught me off guard: hfiles expect data receive written in lexical order. not guaranteed true, since "10" < "9". if have designed key unique, can fixed easily:

load_rdd = rdd.flatmap(lambda line: line.split("\n"))\               .flatmap(csv_to_key_value)\               .sortbykey(true)#sort in ascending order 

Comments