本篇翻译的是RabbitMQ官方文档关于API的内容,原文链接:http://www.rabbitmq.com/api-guide.html
。博主对其内容进行大体上的翻译,有些许部分会保留英文,个人觉得这样更加有韵味,如果全部翻译成中文,会存在偏差,文不达意(主要是功力浅薄~~)。文章也对部分内容进行一定的解释,增强对相关知识点的理解。
Overview
RabbitMQ java client uses com.rabbitmq.client as its top-level package, 关键的classes和interface如下:
- Channel
- Connection
- ConnectionFactory
- Consumer
AMQP协议层面的操作通过Channel接口实现。Connection是用来open Channels的,可以注册event handlers,也可以在结束是close connections. Connection是通过ConnectionFactory来进行初始化操作的,当然也需要配置不同的connection设置,比如vhost或者username等。
Connections and Channels
关键的API如Connection和Channel,分别代表了AMQP-0-9-1的connection和channel。典型的包导入如下:
import com.rabbitmq.client.Connection; |
Connecting to a broker
下面的代码用来在给定的参数(hostname, port number等)下连接一个AMQP broker:
ConnectionFactory factory = new ConnectionFactory(); |
也可以选择使用URI来实现,示例如下:
ConnectionFactory factory = new ConnectionFactory(); |
Connection接口被用来open一个channel:
Channel channel = conn.createChannel(); |
这样在创建之后,Channel可以用来发送或者接受消息了。
在使用完之后,关闭连接:
channel.close(); |
显示的关闭channel是一个很好的习惯,但这不是必须的,在基本的connection关闭的时候channel也会自动的关闭。
Using Exchanges and Queues
AMQP的high-level构建模块exchanges和queues是Client端应用所必须的。在使用之前必须先“declared”(声明),确保在使用之前已经存在,如果不存在则创建它,这些操作都包含在declare里。
下面的代码是演示如何declare一个exchange和queue:
channel.exchangeDeclare(exchangeName, "direct", true); |
上面创建了一个durable, non-autodelete并且绑定类型为direct的exchange以及一个non-durable, exclusive,autodelete的queue(此queue的名称由broker端自动生成)。这里的exchange和queue也都没有设置特殊的arguments。
上面的代码也展示了如果使用routing key将queue和exchange绑定起来。上面声明的queue具备如下特性:排他的(只对当前client同一个Connection可用, 同一个Connection的不同的Channel可共用),并且也会在client连接断开时自动删除。
如果要在client共享一个queue,可以做如下声明:
channel.exchangeDeclare(exchangeName, "direct", true); |
这里的queue是durable的,非排他的,non-autodelete, 而且也有一个确定的已知的名称(又Client指定而非broker端自动生成)。
注意:Channel的API方法都是可以重载的,比如exchangeDeclare,queueDeclare根据参数的不同,可以有不同的重载形式,根据自身的需要去进行调用。
Publish messages
如果要发送一个消息可以采用Channel.basicPublish的方式:
byte[] messageBodyBytes = "Hello, world!".getBytes(); |
为了更好的控制,你也可以使用mandatory这个属性,或者可以发送一些特定属性的消息:
channel.basicPublish(exchangeName, routingKey, mandatory, |
这个方法发送了一条消息,这条消息的delivery mode为2,即消息需要被持久化在broker中,同时priority优先级为1,content-type为text/plain。你可以可以自己设定消息的属性:
channel.basicPublish(exchangeName, routingKey, |
你也可以发送一条带有header的消息:
Map<String, Object> headers = new HashMap<String, Object>(); |
你也可以发送一条带有超时时间expiration的消息:
channel.basicPublish(exchangeName, routingKey, |
以上只是举例,由于篇幅关系,这里就不一一列举所有的可能情形了。
Channel#basicPublish方法在以下两种情形下会被阻塞,具体可以参考http://www.rabbitmq.com/alarms.html
:
- When memory use goes above the configured limit.(内存不够)
- When disk space drops below the configured limit.(磁盘空间不足)
Channles and Concurrency Consideration(Thread Safaty)
Channel实例不能在线程建共享,应用程序应该为每一个线程开辟一个Channel, 而不是在多线程建共享Channel。某些情况下Channel的操作可以并发运行,但是某些情况下并发会导致在网络上错误的帧交叉,同时也会影响publisher confirm, 故多线程共享Channel是非线程安全的。
Receiving messages by subscription
import com.rabbitmq.client.Consumer; |
接受消息一般是通过实现Consumer接口或者继承DefaultConsumer来实现。当调用与Consumer相关的API方法时,不同的订阅采用consumer tags以作彼此的区分,在同一个Channel中的Consumer也需要通过唯一的consumer tags以作区分。。
消费消息demo如下:
boolean autoAck = false; |
注意到上面代码我们显示的设置autoAck=false, 对于Consumer来说这个设置是非常必要的。(译者注:具体可以参考RabbitMQ之消息确认机制(事务+Confirm)中Consumer确认那一章节。)
同时对于Consumer来说重写handleDelivery方法也是十分方便的。更复杂的Consumer会重写(override)更多的方法,比如handleShutdownSignal当channels和connections close的时候会调用,handleConsumeOk在其他callback方法之前调用,返回consumer tags.
Consumer同样可以override handleCancelOk和handleCancel方法,这样在显示的或者隐式的取消的时候调用。
你可以通过Channel.basicCancel方法显示的cancel一个指定的Consumer:
channel.basicCancel(consumerTag); |
(译者注:这句代码首先触发handleConsumerOk,之后触发handleDelivery方法,最后触发handleCancelOk方法。)
单个Consumer在Connection上都分配单个的线程来调用这些callback的方法,也就是说Consumer这里安全的调用阻塞式的方法,比如queueDeclare, txCommit, basicCancel或者basicPublish。
每个Channel都有自己的独立的线程。最常用的用法是一个Channel对应一个Consumer, 也就是意味着Consumers彼此间没有任何关联。当然你也可以在一个Channel中维持多个Consumers, 但是要注意一个问题,如果在Channel的一个Consumer一直在运行,那么对于其他Consumer的callbacks而言会被hold up(耽搁)。
Retrieving individual messages
通过Channel.basicGet可以一个一个的获取消息,其返回值是GetResponse(from which the header information(properties) and message body can be extracted)。
示例Demo如下:
boolean autoAck = false; |
如果设置autoAck为false,那么你同样需要显示的调用Channel.basicAck来确认消息已经被成功的接受了:
channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message |
(译者注:有关RabbitMQ的消费端的更多信息可以参考:RabbitMQ之Consumer消费模式(Push & Pull))
Handing unroutable messages
如果一个消息在publish的时候设置了mandatory标记,如果消息没有成功的路由到某个队列的时候,broker端会通过Basic.Return返回回来。
这时候客户端需要实现ReturnListener这个接口,并且调用Channel.setReturnListener。 如果client没有配置相关的return listener那么相应的需要被returned的消息就会被drop掉。
channel.setReturnListener(new ReturnListener() { |
(译者注:有关mandatory的更多内容可以参考:RabbitMQ之mandatory和immediate。)
Shutdown Protocol
Overview of the AMQP client shutdown
AMQP-0-9-1的connection和channel采用同样的方式来管理网络失败,内部错误以及显示的local shutdown。
AMQP-0-9-1的connection和channel具备如下的生命周期状态(lifecycle states):
- open: the object is ready to use.
- closing:当前对象被显示的通知调用shutdown,这样就产生了一个shutdown的请求至lower-layer的对象进行相应的操作,并等待这些shutdown操作的完成。
- closed:当前对象已经接受到所有的shutdown完成的通知,并且也shutdown了自身。
这些对象最终成closed的状态,而不管是由于什么原因引起的,或者是一个applicatin request,或者是内部client library的失败,或者是a remote network request, 亦或者是network failure。
AMQP的connecton和channel对象控制(possess)了shutdown-related的方法:addShutdownListener(ShutdownListener listener)和removeShutdownListener(ShutdownListener listener)。当connection和channel转向closed状态时会调用ShutdownListener, 而且如果将一个ShutdownListener注册到一个已经处于closed状态的object(特指connection或者channel的对象)时,会立刻调用ShutdownListener。
- getCloseReason():可以让你知道the object’s shutdown的原因。
- isOpen():检测the objects当前的是否处于open状态。
- close(int closeCode, String closeMessage):显示的通知the object执行shutdown。
示例代码:
import com.rabbitmq.client.ShutdownSignalException; |
Information about the ircumstances of a shutdown
当触发ShutdownListener的时候,就可以获取到ShutdownSignalException,这个ShutdownSignalException包含了close的原因,这个原因也可以通过getCloseReason()方法获取。
ShutdownSignalException提供了多个方法用来分析shutdown的原因。isHardError()方法可以知道是connection还是channel的error,getReason()方法可以获取cause相关的信息(以AMQP method的形式,com.rabbitmq.client.Method:AMQP.Channel.Close or AMQP.Connection.Close):
public void shutdownCompleted(ShutdownSignalException cause) |
Atomicity and use of the isOpen() method
我们并不推荐在生产环境的代码上使用channel或者connection的isOpen()方法,这个isOpen()方法的返回值依赖于shutdown cause的存在,有可能会产生竞争。
(译者添加:关于isOpen依赖于shutdown cause, isOpen的实现代码如下:)
public boolean isOpen() { |
错误的使用方式如下:
public void brokenMethod(Channel channel) |
正确的使用方式:
public void validMethod(Channel channel) |
Advanced Connection options
Consumer thread pool
默认情况下客户端会自动分配一个ExecutorService给Consumer线程,同样你也可以使用自定义的线程池,比如:
ExecutorService es = Executors.newFixedThreadPool(20); |
当connection关闭的时候,默认的ExecutorService会被shutdown,但是如果是自定义的ExecutorService将不会被自动的shutdown,所以Clients程序需要在最终关闭的时候手动的去执行shutdown(),否则将会阻止JVM的正常关闭。
同一个executor service可以被多个connections共用。除非有明显的证据证明默认的ExecutorService不能满足当前Consumer callbacks的需要,否则不建议使用自定义的ExecutorService.
Using Lists of Hosts
可以通过使用Address来执行newConnection(). com.rabbitmq.client.Address的使用是比较方便的,例如:
Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1) |
如果hostname1:portnumber1成功了连接,而hostname2:portnumber2连接失败了,connection照样会成功returned, 也不会跑出IOException。这个和你重复设置host,port然后调用factory.newConnection()直到有一组成功为止一个效果。
同样可以指定自定义的ExecutorService, 比如:factory.newConnection(es, addrArr)。
If you want moew control over the host to connect to, see the support for service discovery.
Service discovery with the AddressResolver interface
在版本3.6.6开始,可以通过AddressResolver接口的实现来创建connection:
Connection conn = factory.newConnection(addressResolver); |
AddressResolver接口如下:
public interface AddressResolver { |
使用AddressResolver可以更好的实现custom service discovery逻辑,和“automatic recovery”组合使用,客户端可以自动的和broker nodes连接.AddressResolver也可以有效的配合负载均衡策略。
AddressResolver有两个实现:DnsRecordIpAddressResolver和DnsSrvRecordAddressResolver.(博主没用过AddressResolver,这里就不多做解释了)
Heartbeat Timeout
有关Heartbeat的内容请参考Heatbeats guide。(原文就是这么说的。)
Custom Thread Factories
略。和Google App Engine有关。
Support for Java non-blocking IO
4.0版本开始客户端引入了java的NIO,这里引入NIO的目的不是为了比BIO的更快,而是是的更加容易的控制资源。
对于默认的BIO模式,每个connection都需要一个独立的线程来进行网络通讯。但在NIO模式下,你可以控制网络通讯读写线程的数量。
如果你的java程序需要许多的connections(几十个或者几百个),那么使用NIO模式是一个很好的选择。相比BIO而言,你所使用的线程数很少,通过设置合理的线程数,你可以不必担心性能的损耗,尤其是在connections不怎么busy的时候。
NIO必须被显示的设置:
ConnectionFactory connectionFactory = new ConnectionFactory(); |
你也可以设置NIO的参数:
connectionFactory.setNioParams(new NioParams().setNbIoThreads(4)); |
NIO模式下使用合理的默认值,同时你也可以根据自身的负载情况来进行合理的变换。
Automatic Recovery From Network Failures
Connection Recovery
客户端和broker之间的网络通讯可能会失败。RabbitMQ java client支持connections和拓扑topology(指queues, exchanges, bindings and consumers)的自动回复。自动恢复过程有如下几个步骤:
- Reconnect
- Restore connection listeners
- Re-open channels
- Restore channel listeners
- Restore channel basic.qos setting, publisher confirms and transaction settings
topology的恢复包括如下行为,performed for every channel:
- Re-declare exchange (exception for predefined ones)
- Re-declare queues
- Recover all bindings
- Recover all consumers
在版本4.0.0开始,自动回复默认是开启的。你也通过factory.setAutomaticRecoveryEnabled(boolean)可以手动的设置automatic onnection recovery.
ConnectionFactory factory = new ConnectionFactory(); |
如果由于某些异常(比如RabbitMQ节点始终连接不上)而导致的恢复失败。那么会在某个特定的时间间隔内重试,默认此间隔为5s,当然此值可配:
ConnectionFactory factory = new ConnectionFactory(); |
Recovery Listeners
It is possible to register one or more recovery listeners on recoverable connections and channels. 当connection recovery启用的时候,通过调用ConnectionFactory#newConnection和Connection#createChannel返回的connections实现com.rabbitmq.client.Recoverable. 这里提供了两个方法:addRecoveryListener和removerRecoveryListener.
/** |
当然你必须将connections和channels强制转换为Recoverable的才能使用这些方法。
Effects on Publishing
消息通过Channel,basicPublish发布,如果connection down了那么消息就会丢失。客户端不会在connection恢复之后重新delivery这些消息。为了确保消息的可靠性,可以参考Publisher Confims.(或者可以参考博主的博文:RabbitMQ之消息确认机制(事务+Confirm))。
Topology Recovery
Topology recovery涉及到exchanges, queues, bindings and consumer.当automatic recovery可用时topology recovery默认也可用。当然topology也可显示的设置为disabled:
ConnectionFactory factory = new ConnectionFactory(); |
Manual Acknowledgements and Automatic Recovery
当autoAck设置为false的时候,在消息delivery和ack的时候有可能会由于网络原因故障,在connection recovery之后,RabbitMQ会将所有的channels的delivery tags进行重置。这就意味着basic.ack, basic,nack以及basic.reject带有old delivery tags的将会引起channel exception。为了解决这个为题,RabbitMQ java client会记录和更新相应的delivery tags来确保在恢复期间保持单调递增。带有过时的delivery tags的ack将不会被发送。采用manual ack和automatic recovery的应用必须具备处理redeliveries的能力。
Unhandled Exceptions
在connection, channel, recovery, consumer生命周期内涉及的未被处理的异常可以委托给exception handler. Exception handler实现了ExceptionHandler这个接口,默认情况下使用的是DefaultExceptionHandler, 只是在标准输出流中打印一些exception的细节。
你可以使用ConnectionFactory#setExceptionHandler来override这个handler,这个handler可以被ConnectionFactory创建的所有的Connections所使用:
ConnectionFactory factory = new ConnectionFactory(); |