/** * Protected API - called by nextCommand to check possibly handle an incoming Command before it is returned to the caller of nextCommand. If this method * returns true, the command is considered handled and is not passed back to nextCommand's caller; if it returns false, nextCommand returns the command as * usual. This is used in subclasses to implement handling of Basic.Return and Basic.Deliver messages, as well as Channel.Close and Connection.Close. * @param command the command to handle asynchronously * @return true if we handled the command; otherwise the caller should consider it "unhandled" */ public abstract boolean processAsync(Command command) throws IOException;
protected final Object _channelMutex = new Object(); /** The connection this channel is associated with. */ private final AMQConnection _connection; /** This channel's channel number. */ private final int _channelNumber; /** Command being assembled */ private AMQCommand _command = new AMQCommand(); /** The current outstanding RPC request, if any. (Could become a queue in future.) */ private RpcContinuation _activeRpc = null; /** Whether transmission of content-bearing methods should be blocked */ public volatile boolean _blockContent = false;
/** * Private API - When the Connection receives a Frame for this * channel, it passes it to this method. * @param frame the incoming frame * @throws IOException if an error is encountered */ public void handleFrame(Frame frame) throws IOException { AMQCommand command = _command; if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line _command = new AMQCommand(); // prepare for the next one handleCompleteInboundCommand(command); } }
/** * Private API - handle a command which has been assembled * @throws IOException if there's any problem * * @param command the incoming command * @throws IOException */ public void handleCompleteInboundCommand(AMQCommand command) throws IOException { // First, offer the command to the asynchronous-command // handling mechanism, which gets to act as a filter on the // incoming command stream. If processAsync() returns true, // the command has been dealt with by the filter and so should // not be processed further. It will return true for // asynchronous commands (deliveries/returns/other events), // and false for commands that should be passed on to some // waiting RPC continuation. if (!processAsync(command)) { // The filter decided not to handle/consume the command, // so it must be some reply to an earlier RPC. nextOutstandingRpc().handleCommand(command); markRpcFinished(); } }
public RpcContinuation nextOutstandingRpc() { synchronized (_channelMutex) { RpcContinuation result = _activeRpc; _activeRpc = null; _channelMutex.notifyAll(); return result; } }
方法将当前的_activeRpc返回,并置AQMChannel的_activeRpc为null。
接下来几个方法联系性很强:
/** * Protected API - sends a {@link Method} to the broker and waits for the * next in-bound Command from the broker: only for use from * non-connection-MainLoop threads! */ public AMQCommand rpc(Method m) throws IOException, ShutdownSignalException { return privateRpc(m); }
public AMQCommand rpc(Method m, int timeout) throws IOException, ShutdownSignalException, TimeoutException { return privateRpc(m, timeout); }
private AMQCommand privateRpc(Method m) throws IOException, ShutdownSignalException { SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(); rpc(m, k); // At this point, the request method has been sent, and we // should wait for the reply to arrive. // // Calling getReply() on the continuation puts us to sleep // until the connection's reader-thread throws the reply over // the fence. return k.getReply(); }
private AMQCommand privateRpc(Method m, int timeout) throws IOException, ShutdownSignalException, TimeoutException { SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(); rpc(m, k);
return k.getReply(timeout); }
public void rpc(Method m, RpcContinuation k) throws IOException { synchronized (_channelMutex) { ensureIsOpen(); quiescingRpc(m, k); } }
public void quiescingRpc(Method m, RpcContinuation k) throws IOException { synchronized (_channelMutex) { enqueueRpc(k); quiescingTransmit(m); } }
public void quiescingTransmit(Method m) throws IOException { synchronized (_channelMutex) { quiescingTransmit(new AMQCommand(m)); } } public void quiescingTransmit(AMQCommand c) throws IOException { synchronized (_channelMutex) { if (c.getMethod().hasContent()) { while (_blockContent) { try { _channelMutex.wait(); } catch (InterruptedException e) {}
// This is to catch a situation when the thread wakes up during // shutdown. Currently, no command that has content is allowed // to send anything in a closing state. ensureIsOpen(); } } c.transmit(this); } }
此时调用了AMQChannel的rpc(Method m, int timeout)方法,其间接调用了AMQChannel的privateRpc(Method, int timeout)方法。代码详情上面已经罗列出来。
注意privateRpc(Method, int timeout)方法的最有一句返回:return k.getReply(timeout);这句代码的意思是SimpleBlockingRpcContinuation对象在等待broker的返回,确切的来说是MainLoop线程处理之后返回,即AMQChannel类中handleCompleteInboundCommand方法的nextOutstandingRpc().handleCommand(command)这行代码。
public static abstract class BlockingRpcContinuation<T> implements RpcContinuation { public final BlockingValueOrException<T, ShutdownSignalException> _blocker = new BlockingValueOrException<T, ShutdownSignalException>();
public void handleCommand(AMQCommand command) { _blocker.setValue(transformReply(command)); }
public void handleShutdownSignal(ShutdownSignalException signal) { _blocker.setException(signal); }
public T getReply() throws ShutdownSignalException { return _blocker.uninterruptibleGetValue(); }
public T getReply(int timeout) throws ShutdownSignalException, TimeoutException { return _blocker.uninterruptibleGetValue(timeout); }
public abstract T transformReply(AMQCommand command); }
public static class SimpleBlockingRpcContinuation extends BlockingRpcContinuation<AMQCommand> { public AMQCommand transformReply(AMQCommand command) { return command; } }