i'm taking data spout.each bolt insert mapped fields different tables in database.but database tables have constraints.in test tables have 2 tables named user-details , my-details constraints allows users-table fill first(first on should inserted) after my-details table inserted.when run topology users-table getting inserted because when bolts perform insert query database allowing psqlbolt insert first (because of constraints) , psqlbolt1 throwing exception saying users id not found.so kept (1000)sleep in psqlbolt1 when 2 bolts working.but when apply same many bolts(12) waiting timing increasing , bolt execution failing saying bolt wait time exceeded.how can execute user fields first after psql1 should start inserting.
my topology class
public class topology { connectionprovider cp; protected static final string jdbc_conf = "jdbc.conf"; protected static final string table_name = "users"; protected static final string select_query = "select dept_name department, user_department department.dept_id = user_department.dept_id" + " , user_department.user_id = ?"; public static void main(string[] args) throws exception{ string argument = args[0]; jdbcmapper jdbcmapper; topologybuilder builder = new topologybuilder(); map map = maps.newhashmap(); map.put("datasourceclassname", "org.postgresql.ds.pgsimpledatasource"); map.put("datasource.url","jdbc:postgresql://localhost:5432/twitter_analysis?user=postgres"); connectionprovider cp = new myconnectionprovider(map); jdbcmapper = new simplejdbcmapper(table_name, cp); list<column> schemacolumns = lists.newarraylist(new column("user_id", types.integer), new column ("user_name",types.varchar),new column("create_date", types.timestamp)); jdbcmapper mapper = new simplejdbcmapper(schemacolumns); psqlbolt userpersistancebolt = new psqlbolt(cp, mapper) .withinsertquery("insert user_details (id, user_name, created_timestamp) values (?,?,?)"); builder.setspout("myspout", new userspout(), 1); builder.setbolt("psql_bolt", userpersistancebolt,1).shufflegrouping("myspout"); jdbcmapper = new simplejdbcmapper("my_details", cp); list<column> schemacolumns1 = lists.newarraylist(new column("my_id", types.integer), new column ("my_name",types.varchar)); jdbcmapper mapper1 = new simplejdbcmapper(schemacolumns1); psqlbolt1 userpersistancebolt1 = new psqlbolt1(cp, mapper1) .withinsertquery("insert my_details (my_id, my_name) values (?,?)"); //builder.setspout("myspout", new userspout(), 1); builder.setbolt("psql_bolt1", userpersistancebolt1,1).shufflegrouping("myspout"); config conf = new config(); conf.put(jdbc_conf, map); conf.setdebug(true); conf.setnumworkers(3); if (argument.equalsignorecase("runlocally")){ system.out.println("running topology locally..."); localcluster cluster = new localcluster(); cluster.submittopology("twitter test storm-postgresql", conf, builder.createtopology()); } else { system.out.println("running topology on cluster..."); stormsubmitter.submittopology("topology_psql", conf, builder.createtopology()); } }}
my bolts:psql1
public class psqlbolt1 extends abstractjdbcbolt { private static final logger log = logger.getlogger(psqlbolt1.class); private string tablename; private string insertquery; private jdbcmapper jdbcmapper; public psqlbolt1(connectionprovider connectionprovider, jdbcmapper jdbcmapper) { super(connectionprovider); this.jdbcmapper = jdbcmapper; } public psqlbolt1 withinsertquery(string insertquery) { this.insertquery = insertquery; system.out.println("query passsed....."); return this; } @override public void prepare(map map, topologycontext topologycontext, outputcollector collector) { super.prepare(map, topologycontext, collector); if(stringutils.isblank(tablename) && stringutils.isblank(insertquery)) { throw new illegalargumentexception("you must supply either tablename or insert query."); } } @override public void execute(tuple tuple) { try { thread.sleep(1000); list<column> columns = jdbcmapper.getcolumns(tuple); list<list<column>> columnlists = new arraylist<list<column>>(); columnlists.add(columns); if(!stringutils.isblank(tablename)) { this.jdbcclient.insert(this.tablename, columnlists); } else { this.jdbcclient.executeinsertquery(this.insertquery, columnlists); } this.collector.ack(tuple); } catch (exception e) { this.collector.reporterror(e); this.collector.fail(tuple); } } @override public void declareoutputfields(outputfieldsdeclarer outputfieldsdeclarer) { }}
psqlbolt:
public class psqlbolt extends abstractjdbcbolt { private static final logger log = logger.getlogger(psqlbolt.class); private string tablename; private string insertquery; private jdbcmapper jdbcmapper; public psqlbolt(connectionprovider connectionprovider, jdbcmapper jdbcmapper) { super(connectionprovider); this.jdbcmapper = jdbcmapper; } public psqlbolt withtablename(string tablename) { this.tablename = tablename; return this; } public psqlbolt withinsertquery(string insertquery) { this.insertquery = insertquery; system.out.println("query passsed....."); return this; } @override public void prepare(map map, topologycontext topologycontext, outputcollector collector) { super.prepare(map, topologycontext, collector); if(stringutils.isblank(tablename) && stringutils.isblank(insertquery)) { throw new illegalargumentexception("you must supply either tablename or insert query."); } } @override public void execute(tuple tuple) { try { list<column> columns = jdbcmapper.getcolumns(tuple); list<list<column>> columnlists = new arraylist<list<column>>(); columnlists.add(columns); if(!stringutils.isblank(tablename)) { this.jdbcclient.insert(this.tablename, columnlists); } else { this.jdbcclient.executeinsertquery(this.insertquery, columnlists); } this.collector.ack(tuple); } catch (exception e) { this.collector.reporterror(e); this.collector.fail(tuple); } } @override public void declareoutputfields(outputfieldsdeclarer outputfieldsdeclarer) { }}
when applied same many bolts topology colour changing red(wait state).
here bolts wait time.first bolt doesn't have sleep.i kept 1 sec sleep in second bolt , rest bolts having 2 secs sleep.
how replace sleep perform work or if increase number of supervisors problem solved?
you can restructure topology such spout sends message m bolt 1. bolt 1 can take action on message , forwards same message bolt 2 if action successful. way, there strict ordering between actions.
Comments
Post a Comment