RabbitMQ 学习笔记

2022-08-16
2分钟阅读时长

RabbitMQ tutorial in Spring AMQP

HelloWorld

@SpringBootApplication
public class LearnRabbitmqHelloWorldApplication {

    public static void main(String[] args) {
        SpringApplication.run(LearnRabbitmqHelloWorldApplication.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(AmqpTemplate template) {
        return args -> template.convertAndSend("myqueue", "foo");
    }

    @Bean
    public Queue myQueue() {
        return new Queue("myqueue");
    }

    @RabbitListener(queues = "myqueue")
    public void listen(String in) {
        System.out.println(in);
    }

}

代码为使用 Spring AMQP 简单实现一个生产者和消费者的例子

值得注意的是,如果 RabbitListener 或者 convertAndSend 指定的 queue 不存在的话,启动时会报错,可以通过声明一个 Queue 类型的 bean 让 AMQP 扫描到之后自动在 RabbitMQ 中创建

Work Queues

Work Queues 的目的是创建一个用于在多个消费者之间分配耗时任务的队列,

@SpringBootApplication
public class LearnrabbitmqApplication {

    public static void main(String[] args) {
        SpringApplication.run(LearnrabbitmqApplication.class, args);
    }


    AtomicInteger dots = new AtomicInteger(0);

    AtomicInteger count = new AtomicInteger(0);

    @Bean
    public Queue myQueue() {
        return new Queue("myqueue");
    }

    @Bean
    public ApplicationRunner runner(AmqpTemplate template, Queue queue) {
        return args -> {
            for (int i = 0; i < 5; i++) {
                StringBuilder builder = new StringBuilder("Hello");
                if (dots.incrementAndGet() == 4) {
                    dots.set(1);
                }
                builder.append(".".repeat(Math.max(0, dots.get())));
                builder.append(count.incrementAndGet());
                String message = builder.toString();
                template.convertAndSend(queue.getName(), message);
                System.out.println(" [x] Sent '" + message + "'");
            }
        };
    }

    @Bean
    public LongTaskReceiver rec1() {
        return new LongTaskReceiver(1);
    }

    @Bean
    public LongTaskReceiver rec2() {
        return new LongTaskReceiver(2);
    }

}

@RabbitListener(queues = "myqueue")
class LongTaskReceiver {

    private final Integer flag;

    LongTaskReceiver(Integer flag) {
        this.flag = flag;
    }

    @RabbitHandler
    public void receive(String in) throws InterruptedException {
        StopWatch watch = new StopWatch();
        watch.start();
        System.out.println("instance " + this.flag +
                " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        System.out.println("instance " + this.flag +
                " [x] Done in " + watch.getTotalTimeSeconds() + "s");
    }

    private void doWork(String in) throws InterruptedException {
        for (char ch : in.toCharArray()) {
            if (ch == '.') {
                Thread.sleep(1000);
            }
        }
    }
}

上述代码创建了一个生产者发送了五条消息,两个消费者从相同的队列中读取消息进行消费。 此处我们创建 RabbitListener 时使用的是在类上的注解,并且通过 RabbitHandler 注解指定处理消息的方法。

消息确认

对于长时间任务,可能会存在任务执行中出现错误的情况,对于 Spring AMQP 来说,Spring 会默认监听容器执行结果,对于出现异常的情况,会默认执行 channel.basicReject(deliveryTag, requeue) 来告知 RabbitMQ 消息未能成功被消费。(参见Consumer Acknowledgements and Publisher Confirms

如果 Listener 抛出 AmqpRejectAndDontRequeueException 或者配置文件中设置 defaultRequeueRejected=false 则不会对一场消息进行 reject。

在 Listener 处理完消息之后,会调用 channel.basicAck() 来告知 RabbitMQ 消息被确认。

消息的确认必须和消息接受在同一个 channel,否则的话会引起一个 channel-level protocol exception