i have create tcp/ip server rpc communications. i'm bound using provided java lib handles "rpc" stuff. lib received hdlc messages contain protobuf data. lib uses request , response handlers handle hdlc , protobuf part. lib can used serial connections networked connections.
we'd use netty tcp server this. when calling lib expects java.io.inputstream
, java.io.outputstream
in "rpc" method.
i have simple blocking setup, in create server socket, , pass socket.getinputstream()
, socket.getoutputstream()
rpc method. next need register (set) of rpc handlers rpc object, , clients can connect , data can send. seems pretty straightforward me.
i've setup netty "echo" server , want use rpc library netty. i'm struggling how convert received data required inputstream
, how convert outputstream
of rpc lib can send client. need decoder/encoder, or there simpler way this? , if so, how transform bytebuf
inputstream
, outputstream
format can send on network?
inputstream
the other api has readpacket() method
if library has readpacket method, can use bytebufinputstream
in combination replayingdecoder
, relatively easy implement:
public class rpcinputhandler extends replayingdecoder<object> { rpc upstream = ....; protected void decode(channelhandlercontext ctx, bytebuf buf) throws exception { upstream.readpacket(new bytebufinputstream(buf)); state(null); } }
the remote api uses threads read stream
if upstream library uses separate thread process incoming messages, going lose 1 of main advantages of netty: low thread count high number of connections.
public class rpcinputhandler extends simplechannelinboundhandler<bytebuf> { rpc upstream = ....; pipedinputstream in; pipedoutputstream out; @override public void channelactive(channelhandlercontext ctx) throws exception { in = new pipedinputstream(); out = new pipedoutputstream(in); upstream.startinput(in); } @override public void channelinactive(channelhandlercontext ctx) throws exception { out.close(); // sends eof other pipe } // method called messagereceived(channelhandlercontext, i) in 5.0. public void channelread0(channelhandlercontext ctx, bytebuf msg) throws exception { byte[] data = new byte[msg.readablebytes()]; msg.readbytes(data); out.write(data); } }
outputstream
making custom outputstream writes bytes our connection simple, methods map directly over
public class outputstreamnetty extends outputstream { final channel channel = ...; channelfuture lastfuture; private void checkfuture() throws ioexception { if(lastfuture.isdone()) { if(lastfuture.cause() != null) { throw new ioexception("downstream write problem", lastfuture.cause() ); } lastfuture = null; } } private void addfuture(channelfuture f) { if(lastfuture == null) { lastfuture = f; } } public void close() throws ioexception { checkfuture() addfuture(channel.close()); } public void flush() throws ioexception { checkfuture() addfuture(channel.flush()); } public void write(byte[] b, int off, int len) throws ioexception { checkfuture() bytebuf f = channel.alloc().buffer(len); f.writebytes(b, off, len); addfuture(channel.write(f)); } public abstract void write(int b) throws ioexception { checkfuture() bytebuf f = channel.alloc().buffer(1); f.writebyte(b); addfuture(channel.write(f)); } }
Comments
Post a Comment