/** Current state, used to decide how to handle each incoming frame. */ private enum CAState { EXPECTING_METHOD, EXPECTING_CONTENT_HEADER, EXPECTING_CONTENT_BODY, COMPLETE } private CAState state;
/** The method for this command */ private Method method;
/** The content header for this command */ private AMQContentHeader contentHeader;
/** The fragments of this command's content body - a list of byte[] */ private final List<byte[]> bodyN; /** sum of the lengths of all fragments */ private int bodyLength;
/** No bytes of content body not yet accumulated */ private long remainingBodyBytes;
while (_running) { Frame frame = _frameHandler.readFrame();
if (frame != null) { _missedHeartbeats = 0; if (frame.type == AMQP.FRAME_HEARTBEAT) { // Ignore it: we've already just reset the heartbeat counter. } else { if (frame.channel == 0) { // the special channel _channel0.handleFrame(frame); } else { if (isOpen()) { // If we're still _running, but not isOpen(), then we // must be quiescing, which means any inbound frames // for non-zero channels (and any inbound commands on // channel zero that aren't Connection.CloseOk) must // be discarded. ChannelManager cm = _channelManager; if (cm != null) { cm.getChannel(frame.channel).handleFrame(frame); } } } } } else { // Socket timeout waiting for a frame. // Maybe missed heartbeat. handleSocketTimeout(); } }
public void handleFrame(Frame frame) throws IOException { AMQCommand command = _command; if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line _command = new AMQCommand(); // prepare for the next one handleCompleteInboundCommand(command); } }
/** * Sends this command down the named channel on the channel's * connection, possibly in multiple frames. * @param channel the channel on which to transmit the command * @throws IOException if an error is encountered */ public void transmit(AMQChannel channel) throws IOException { int channelNumber = channel.getChannelNumber(); AMQConnection connection = channel.getConnection();
synchronized (assembler) { Method m = this.assembler.getMethod(); connection.writeFrame(m.toFrame(channelNumber)); if (m.hasContent()) { byte[] body = this.assembler.getContentBody();