RabbitMQ 学习笔记
RabbitMQ tutorial in Spring AMQP
HelloWorld
@SpringBootApplication
public class LearnRabbitmqHelloWorldApplication {
public static void main(String[] args) {
SpringApplication.run(LearnRabbitmqHelloWorldApplication.class, args).close();
}
@Bean
public ApplicationRunner runner(AmqpTemplate template) {
return args -> template.convertAndSend("myqueue", "foo");
}
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
@RabbitListener(queues = "myqueue")
public void listen(String in) {
System.out.println(in);
}
}
代码为使用 Spring AMQP 简单实现一个生产者和消费者的例子
值得注意的是,如果 RabbitListener 或者 convertAndSend 指定的 queue 不存在的话,启动时会报错,可以通过声明一个 Queue
类型的 bean 让 AMQP 扫描到之后自动在 RabbitMQ 中创建
Work Queues
Work Queues 的目的是创建一个用于在多个消费者之间分配耗时任务的队列,
@SpringBootApplication
public class LearnrabbitmqApplication {
public static void main(String[] args) {
SpringApplication.run(LearnrabbitmqApplication.class, args);
}
AtomicInteger dots = new AtomicInteger(0);
AtomicInteger count = new AtomicInteger(0);
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
@Bean
public ApplicationRunner runner(AmqpTemplate template, Queue queue) {
return args -> {
for (int i = 0; i < 5; i++) {
StringBuilder builder = new StringBuilder("Hello");
if (dots.incrementAndGet() == 4) {
dots.set(1);
}
builder.append(".".repeat(Math.max(0, dots.get())));
builder.append(count.incrementAndGet());
String message = builder.toString();
template.convertAndSend(queue.getName(), message);
System.out.println(" [x] Sent '" + message + "'");
}
};
}
@Bean
public LongTaskReceiver rec1() {
return new LongTaskReceiver(1);
}
@Bean
public LongTaskReceiver rec2() {
return new LongTaskReceiver(2);
}
}
@RabbitListener(queues = "myqueue")
class LongTaskReceiver {
private final Integer flag;
LongTaskReceiver(Integer flag) {
this.flag = flag;
}
@RabbitHandler
public void receive(String in) throws InterruptedException {
StopWatch watch = new StopWatch();
watch.start();
System.out.println("instance " + this.flag +
" [x] Received '" + in + "'");
doWork(in);
watch.stop();
System.out.println("instance " + this.flag +
" [x] Done in " + watch.getTotalTimeSeconds() + "s");
}
private void doWork(String in) throws InterruptedException {
for (char ch : in.toCharArray()) {
if (ch == '.') {
Thread.sleep(1000);
}
}
}
}
上述代码创建了一个生产者发送了五条消息,两个消费者从相同的队列中读取消息进行消费。
此处我们创建 RabbitListener
时使用的是在类上的注解,并且通过 RabbitHandler
注解指定处理消息的方法。
消息确认
对于长时间任务,可能会存在任务执行中出现错误的情况,对于 Spring AMQP 来说,Spring 会默认监听容器执行结果,对于出现异常的情况,会默认执行 channel.basicReject(deliveryTag, requeue)
来告知 RabbitMQ 消息未能成功被消费。(参见Consumer Acknowledgements and Publisher Confirms)
如果 Listener 抛出 AmqpRejectAndDontRequeueException
或者配置文件中设置 defaultRequeueRejected=false
则不会对一场消息进行 reject。
在 Listener 处理完消息之后,会调用 channel.basicAck()
来告知 RabbitMQ 消息被确认。
消息的确认必须和消息接受在同一个 channel,否则的话会引起一个 channel-level protocol exception