database - How to excute one bolt after the other when each bolt takes data from same spout? -


i'm taking data spout.each bolt insert mapped fields different tables in database.but database tables have constraints.in test tables have 2 tables named user-details , my-details constraints allows users-table fill first(first on should inserted) after my-details table inserted.when run topology users-table getting inserted because when bolts perform insert query database allowing psqlbolt insert first (because of constraints) , psqlbolt1 throwing exception saying users id not found.so kept (1000)sleep in psqlbolt1 when 2 bolts working.but when apply same many bolts(12) waiting timing increasing , bolt execution failing saying bolt wait time exceeded.how can execute user fields first after psql1 should start inserting.

my topology class

public class topology {  connectionprovider cp; protected static final string jdbc_conf = "jdbc.conf"; protected static final string table_name = "users"; protected static final string select_query = "select dept_name department, user_department department.dept_id = user_department.dept_id" +         " , user_department.user_id = ?"; public static void main(string[] args) throws exception{     string argument = args[0];     jdbcmapper jdbcmapper;    topologybuilder builder = new topologybuilder(); map map = maps.newhashmap();         map.put("datasourceclassname", "org.postgresql.ds.pgsimpledatasource");     map.put("datasource.url","jdbc:postgresql://localhost:5432/twitter_analysis?user=postgres"); connectionprovider cp = new myconnectionprovider(map);      jdbcmapper = new simplejdbcmapper(table_name, cp);      list<column> schemacolumns = lists.newarraylist(new column("user_id", types.integer), new column             ("user_name",types.varchar),new column("create_date", types.timestamp));        jdbcmapper mapper = new simplejdbcmapper(schemacolumns);          psqlbolt userpersistancebolt = new psqlbolt(cp, mapper)     .withinsertquery("insert user_details (id, user_name, created_timestamp) values (?,?,?)");      builder.setspout("myspout", new userspout(), 1);      builder.setbolt("psql_bolt", userpersistancebolt,1).shufflegrouping("myspout");        jdbcmapper = new simplejdbcmapper("my_details", cp);      list<column> schemacolumns1 = lists.newarraylist(new column("my_id", types.integer), new column             ("my_name",types.varchar));        jdbcmapper mapper1 = new simplejdbcmapper(schemacolumns1);          psqlbolt1 userpersistancebolt1 = new psqlbolt1(cp, mapper1)     .withinsertquery("insert my_details (my_id, my_name) values (?,?)");      //builder.setspout("myspout", new userspout(), 1);      builder.setbolt("psql_bolt1", userpersistancebolt1,1).shufflegrouping("myspout");  config conf = new config();      conf.put(jdbc_conf, map);      conf.setdebug(true);          conf.setnumworkers(3);       if (argument.equalsignorecase("runlocally")){         system.out.println("running topology locally...");         localcluster cluster = new localcluster();         cluster.submittopology("twitter test storm-postgresql", conf, builder.createtopology());     }     else {         system.out.println("running topology on cluster...");         stormsubmitter.submittopology("topology_psql", conf, builder.createtopology());      } }} 

my bolts:psql1

public class psqlbolt1 extends abstractjdbcbolt { private static final logger log = logger.getlogger(psqlbolt1.class);   private string tablename;  private string insertquery;  private jdbcmapper jdbcmapper;  public psqlbolt1(connectionprovider connectionprovider,  jdbcmapper jdbcmapper) {     super(connectionprovider);     this.jdbcmapper = jdbcmapper; } public psqlbolt1 withinsertquery(string insertquery) {     this.insertquery = insertquery; system.out.println("query passsed.....");     return this; } @override public void prepare(map map, topologycontext topologycontext, outputcollector collector) {     super.prepare(map, topologycontext, collector);     if(stringutils.isblank(tablename) && stringutils.isblank(insertquery)) {         throw new illegalargumentexception("you must supply either tablename or insert query.");     } }  @override public void execute(tuple tuple) {     try {   thread.sleep(1000);         list<column> columns = jdbcmapper.getcolumns(tuple);         list<list<column>> columnlists = new arraylist<list<column>>();         columnlists.add(columns);         if(!stringutils.isblank(tablename)) {             this.jdbcclient.insert(this.tablename, columnlists);         } else {             this.jdbcclient.executeinsertquery(this.insertquery, columnlists);         }         this.collector.ack(tuple);     } catch (exception e) {         this.collector.reporterror(e);         this.collector.fail(tuple);     } }  @override public void declareoutputfields(outputfieldsdeclarer outputfieldsdeclarer) {  }} 

psqlbolt:

public class psqlbolt extends abstractjdbcbolt { private static final logger log = logger.getlogger(psqlbolt.class);  private string tablename;  private string insertquery;  private jdbcmapper jdbcmapper;  public psqlbolt(connectionprovider connectionprovider,  jdbcmapper jdbcmapper) {     super(connectionprovider);     this.jdbcmapper = jdbcmapper; } public psqlbolt withtablename(string tablename) {     this.tablename = tablename;     return this; }    public psqlbolt withinsertquery(string insertquery) {     this.insertquery = insertquery; system.out.println("query passsed.....");     return this; } @override public void prepare(map map, topologycontext topologycontext, outputcollector collector) {     super.prepare(map, topologycontext, collector);     if(stringutils.isblank(tablename) && stringutils.isblank(insertquery)) {         throw new illegalargumentexception("you must supply either tablename or insert query.");     } }  @override public void execute(tuple tuple) {     try {          list<column> columns = jdbcmapper.getcolumns(tuple);         list<list<column>> columnlists = new arraylist<list<column>>();         columnlists.add(columns);         if(!stringutils.isblank(tablename)) {             this.jdbcclient.insert(this.tablename, columnlists);         } else {             this.jdbcclient.executeinsertquery(this.insertquery, columnlists);         }         this.collector.ack(tuple);     } catch (exception e) {         this.collector.reporterror(e);         this.collector.fail(tuple);     } }  @override public void declareoutputfields(outputfieldsdeclarer outputfieldsdeclarer) {  }} 

when applied same many bolts topology colour changing red(wait state).enter image description here

here bolts wait time.first bolt doesn't have sleep.i kept 1 sec sleep in second bolt , rest bolts having 2 secs sleep. enter image description here

how replace sleep perform work or if increase number of supervisors problem solved?

you can restructure topology such spout sends message m bolt 1. bolt 1 can take action on message , forwards same message bolt 2 if action successful. way, there strict ordering between actions.


Comments