there fair amount of info online bulk loading hbase spark streaming using scala (these two particularly useful) , info java, there seems lack of info doing pyspark. questions are:
- how can data bulk loaded hbase using pyspark?
- most examples in language show single column per row being upserted. how can upsert multiple columns per row?
the code have follows:
if __name__ == "__main__": context = sparkcontext(appname="pythonhbasebulkloader") streamingcontext = streamingcontext(context, 5) stream = streamingcontext.textfilestream("file:///test/input"); stream.foreachrdd(bulk_load) streamingcontext.start() streamingcontext.awaittermination()
what need bulk load function
def bulk_load(rdd): #???
i've made progress previously, many , various errors (as documented here , here)
so after trial , error, present here best have come with. works well, , bulk loads data (using put
s or hfiles) willing believe not best method, comments/other answers welcome. assume you're using csv data.
bulk loading puts
by far easiest way bulk load, creates put
request each cell in csv , queues them hbase.
def bulk_load(rdd): #your configuration different. insert own quorum , parent node , table name conf = {"hbase.zookeeper.qourum": "localhost:2181",\ "zookeeper.znode.parent": "/hbase-unsecure",\ "hbase.mapred.outputtable": "test",\ "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.tableoutputformat",\ "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.immutablebyteswritable",\ "mapreduce.job.output.value.class": "org.apache.hadoop.io.writable"} keyconv = "org.apache.spark.examples.pythonconverters.stringtoimmutablebyteswritableconverter" valueconv = "org.apache.spark.examples.pythonconverters.stringlisttoputconverter" load_rdd = rdd.flatmap(lambda line: line.split("\n"))\#split input individual lines .flatmap(csv_to_key_value)#convert csv line key value pairs load_rdd.saveasnewapihadoopdataset(conf=conf,keyconverter=keyconv,valueconverter=valueconv)
the function csv_to_key_value
magic happens:
def csv_to_key_value(row): cols = row.split(",")#split on commas. #each cell tuple of (key, [key, column-family, column-descriptor, value]) #works n>=1 columns result = ((cols[0], [cols[0], "f1", "c1", cols[1]]), (cols[0], [cols[0], "f2", "c2", cols[2]]), (cols[0], [cols[0], "f3", "c3", cols[3]])) return result
the value converter defined earlier convert these tuples hbase put
s
bulk loading hfiles
bulk loading hfiles more efficient: rather put
request each cell, hfile written directly , regionserver told point new hfile. use py4j, before python code have write small java program:
import py4j.gatewayserver; import org.apache.hadoop.hbase.*; public class gatewayapplication { public static void main(string[] args) { gatewayapplication app = new gatewayapplication(); gatewayserver server = new gatewayserver(app); server.start(); } }
compile this, , run it. leave running long streaming happening. update bulk_load
follows:
def bulk_load(rdd): #the output class changes, else stays conf = {"hbase.zookeeper.qourum": "localhost:2181",\ "zookeeper.znode.parent": "/hbase-unsecure",\ "hbase.mapred.outputtable": "test",\ "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.hfileoutputformat2",\ "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.immutablebyteswritable",\ "mapreduce.job.output.value.class": "org.apache.hadoop.io.writable"}#"org.apache.hadoop.hbase.client.put"} keyconv = "org.apache.spark.examples.pythonconverters.stringtoimmutablebyteswritableconverter" valueconv = "org.apache.spark.examples.pythonconverters.stringlisttoputconverter" load_rdd = rdd.flatmap(lambda line: line.split("\n"))\ .flatmap(csv_to_key_value)\ .sortbykey(true) #don't process empty rdds if not load_rdd.isempty(): #saveasnewapihadoopdataset changes saveasnewapihadoopfile load_rdd.saveasnewapihadoopfile("file:///tmp/hfiles" + starttime, "org.apache.hadoop.hbase.mapreduce.hfileoutputformat2", conf=conf, keyconverter=keyconv, valueconverter=valueconv) #the file has been written, hbase doesn't know #get link py4j gateway = javagateway() #convert conf fledged configuration type config = dict_to_conf(conf) #set our htable htable = gateway.jvm.org.apache.hadoop.hbase.client.htable(config, "test") #set our path path = gateway.jvm.org.apache.hadoop.fs.path("/tmp/hfiles" + starttime) #get bulk loader loader = gateway.jvm.org.apache.hadoop.hbase.mapreduce.loadincrementalhfiles(config) #load hfile loader.dobulkload(path, htable) else: print("nothing process")
finally, straightforward dict_to_conf
:
def dict_to_conf(conf): gateway = javagateway() config = gateway.jvm.org.apache.hadoop.conf.configuration() keys = conf.keys() vals = conf.values() in range(len(keys)): config.set(keys[i], vals[i]) return config
as can see, bulk loading hfiles more complex using put
s, depending on data load worth since once working it's not difficult.
one last note on caught me off guard: hfiles expect data receive written in lexical order. not guaranteed true, since "10" < "9". if have designed key unique, can fixed easily:
load_rdd = rdd.flatmap(lambda line: line.split("\n"))\ .flatmap(csv_to_key_value)\ .sortbykey(true)#sort in ascending order
Comments
Post a Comment