/** Monitor for <code>_channelMap</code> and <code>channelNumberAllocator</code> */ private final Object monitor = new Object(); /** Mapping from <code><b>1.._channelMax</b></code> to {@link ChannelN} instance */ private final Map<Integer, ChannelN> _channelMap = new HashMap<Integer, ChannelN>(); private final IntAllocator channelNumberAllocator;
private final ConsumerWorkService workService;
private final Set<CountDownLatch> shutdownSet = new HashSet<CountDownLatch>();
/** Maximum channel number available on this connection. */ private final int _channelMax; private final ThreadFactory threadFactory;
这上面的成员变量下面会有涉及。
对于ChannelManager的使用,是AMQConnection中的成员变量:
/** Object that manages a set of channels */ private volatile ChannelManager _channelManager;
public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory) { if (channelMax == 0) { // The framing encoding only allows for unsigned 16-bit integers // for the channel number channelMax = (1 << 16) - 1; } _channelMax = channelMax; channelNumberAllocator = new IntAllocator(1, channelMax);
public Channel createChannel(int channelNumber) throws IOException { ensureIsOpen(); ChannelManager cm = _channelManager; if (cm == null) return null; return cm.createChannel(this, channelNumber); } public Channel createChannel() throws IOException { ensureIsOpen(); ChannelManager cm = _channelManager; if (cm == null) return null; return cm.createChannel(this); }
这里就是调用了ChannelManager的createChannel方法。
下面是ChannelManager中关于创建Channel的代码:
public ChannelN createChannel(AMQConnection connection) throws IOException { ChannelN ch; synchronized (this.monitor) { int channelNumber = channelNumberAllocator.allocate(); if (channelNumber == -1) { return null; } else { ch = addNewChannel(connection, channelNumber); } } ch.open(); // now that it's been safely added return ch; }
public ChannelN createChannel(AMQConnection connection, int channelNumber) throws IOException { ChannelN ch; synchronized (this.monitor) { if (channelNumberAllocator.reserve(channelNumber)) { ch = addNewChannel(connection, channelNumber); } else { return null; } } ch.open(); // now that it's been safely added return ch; }
private ChannelN addNewChannel(AMQConnection connection, int channelNumber) throws IOException { if (_channelMap.containsKey(channelNumber)) { // That number's already allocated! Can't do it // This should never happen unless something has gone // badly wrong with our implementation. throw new IllegalStateException("We have attempted to " + "create a channel with a number that is already in " + "use. This should never happen. " + "Please report this as a bug."); } ChannelN ch = instantiateChannel(connection, channelNumber, this.workService); _channelMap.put(ch.getChannelNumber(), ch); return ch; }
protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) { return new ChannelN(connection, channelNumber, workService); }
/** * Package method: open the channel. * This is only called from {@link ChannelManager}. * @throws IOException if any problem is encountered */ public void open() throws IOException { // wait for the Channel.OpenOk response, and ignore it exnWrappingRpc(new Channel.Open(UNSPECIFIED_OUT_OF_BAND)); }