Getting started with RabbitMQ in Spring Boot

Ruhshan Ahmed Abir
Javarevisited
Published in
11 min readApr 9, 2022

--

Just a picture captured by me for aesthetic purpose :D

RabbitMQ is the most widely deployed open source message broker. It gives you some nice features to help you make your application asynchronous and off-load some crux logic so that you can focus on your business code. In this article I’ve demonstrated the easiest way to get started with RabbitMQ in spring boot projects, automate queue and exchange creation, configure retry-ability on consumer, handle errors and distributed tracing.

Let me share a funny incident. When I first started working with rabbitMQ I went to the website and found that tutorial page. On that page there were 6 tutorials of some common scenarios in different languages. And hey! Java tutorials are just under python. It kinda still looks the same…

Then I skimmed through a few java tutorial pages and was amazed with how easy it is to use it. Just need to create a connection factory, declare channels, publish with channels basicPublish method, and receive with delivery callbacks.

Yeah, those steps helped me to stitch together my use case and even deploy my app into production. Then during another project I also started that way and somewhere in my mind I kept hearing a voice.. “There should be a cleaner way!, there should be a better way!!”. And trust me, that better way was always in front of my eye but I was blind to see it. In the same tutorial page there was…

Spring AMQP version for each tutorial which was exactly what I needed, but never knew it existed.

Spring AMQP makes development with RabbitMQ more springy. It provides listener containers, rabbitTemplate and rabbitAdmin. So let’s see some actions!

Set up RabbitMQ

So before everything we need to have a RabbitMQ instance running to interact with if you don’t have one at your disposal. You can use following docker compose to get one quickly.

version: '3.1'

services:
rabbitmq:
image: rabbitmq:management
restart: no
ports:
- "5672:5672"
- "15672:15672"

Here 5672 port is used by applications, and on 15672 we get a web interface for management.

Open your browser and hit into localhost:15672, you’ll see the following page:

Use guest/guest as username and password. If you are deploying rabbitmq into production, it’s always a good idea to change that default credential. But for now it should be fine.

Add dependency in Spring boot app

To use rabbitMQ we need to add the spring-boot-starter-amqp dependency. As I am using maven, I’ll add the following in my pom.xml:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

I also need add the following configuration in the profile:

spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest

Now spring amqp knows where our rabbitMQ is and which credential to use to connect with it.

To make the examples I’ve shown here relatable, here is what we’ll do. We’ll expose a user-registration controller, receive a payload containing name, email etc then publish in a queue. We assume that registering a user requires calling some other services and APIs, so we are making this asynchronous, and a consumer will do all this.

Publish messages to queue

Now I’ve created a controller class, and also added other relevant classes for this. This is how the controller looks like:

Notice that I’ve added RabbitTemplate in the controller and used that in the createUser method to publish the message. Now we need to create a queue. For that I’ve logged in into the rabbitMQ management console. Navigate to the Queue tab , there is an Add a new Queue option.

Just keep other fields as default and set the name as user-registration.

Now spin up the application and hit the api with appropriate payload. For me it looks like this:

curl --location --request POST 'http://localhost:8091/api/user' \--header 'Content-Type: application/json' \--data-raw '{"username":"Johnson","email":"johnsn@mail.com","mobileNumber": "017000000111"}'

If everything works correctly we’ll be able to see 1 ready message in the queue:

To see the message, click on the queue name and hit the Get Message(s) option, uh oh! The payload is not readable. It’s in base64 format. We’ll take care of this later I promise. Now head back to coding. Using this line I’ve published my message:

rabbitTemplate.convertAndSend(“”, “user-registration”, request);

Here first argument is exchange, second one is routing key and last one is the payload. But where is the queue?

In the rabbitMQ universe, an exchange is like a post office, a queue is like the physical location, and the routing key is the address of that location. When we created the queue and didn’t bound with any exchange, the default exchange automatically got bound with it and took the routing key as the queue’s name. This using empty string and queue name as routing key got the message published to the queue. A producer never directly sends messages to the queue.

Create queue automatically

Creating and configuring the queue through the management console works fine for now. But in the real world, this is not ideal. Imagine there are not one 10 different queues.

Everytime a new team mate gets onboard, or you change your device you need to create the queues. App goes from dev environment to test environment, you need to create the queues. App goes to production, someone needs to create them. This will get tedious and error-prone as well. Spring AMQP provides us rabbitMQ admin to automate these tasks.

Best part is, you just need to create a Bean that returns a Queue object containing the queue’s name. It is a good practice to keep all rabbitMQ related beans in the same configuration file. Right now my RabbitMQConfig looks like this:

@Configuration
public class RabbitMQConfig {
@Bean
public Queue createUserRegistrationQueue() {
return new Queue("q.user-registration");
}
}

It’s a convention in rabbitMQ to name the queue’s starting with q. .

Go ahead and delete any previous queues from the management portal, then restart your application and hit the api, the queue should get created and messages will be delivered.

Add consumer to process messages

It’s only half the fun to have only a publisher if there is no consumer for the published messages. Having a consumer is as simple as annotating a method with @RabbitListener annotation and mentioning the queue. My consumer for user registration request looks like this:

@Service
@Slf4j
public class UserRegistrationListener {
@RabbitListener(queues = {"q.user-registration"})
public void onUserRegistration(UserRegistrationRequest event) {
log.info("User Registration Event Received: {}", event);
}
}

If you hit the api endpoint for registration and then check the logs you’’ll be able to see this:

Looks nice! I’m receiving the expected payload in the consumer, and rabbitMQ internally handles serialization and deserialization so that I’m getting a pretty java object to work with. Though if we want to check the message directly from the management portal we’ll still see the message as base64. This is not ideal for development and troubleshooting. We want to see the messages in a readable format for us humans.

To achieve that, we need to declare a Jackson2JsonMessageConverter bean and set it as our desired message converted in rabbitTemplate. Need to add these beans in our RabitMQConfig file:

Note that, we used the default cachingConnectionFactory in constructing rabbitTemplate. To do that we need to inject the cachingConnectionFactory through the constructor. After that, comment out the listener to check the message in the portal. Otherwise, it will get delivered in the consumer method before we had a chance to have a peek.

If we hit the api now, and check the queue we’ll be able to see the text of the payload from the management portal.

Notify by email and sms to users after registration

Let’s say after completing registration of an user we want to notify the user by email and sms. Sure we can write the corresponding services and call them after registration. Then our code might look like this:

@RabbitListener(queues = {"q.user-registration"})
public void onUserRegistration(UserRegistrationRequest event) {

log.info("User Registration Event Received: {}", event);

/*
Relevant business logic for user registration
*/

emailService.sendEmail(event.getEmail());
smsService.sendSms(event.getMobileNumber());

}

But there is some issue with this approach. Your notification system got tightly coupled with the registration system. If not handled properly, exceptions in the senEmail method might disrupt sms delivery too. You might not want to spend compute resources of the same application that’s doing a heavy task like user registration to do a lightweight task like sending email. What can we do here? RabbitMQ to the rescue.

We will create two queues, annotate our methods with RabbitListener mentioning the designated queues. Then we will create a special type of exchange called FanOut exchange to bound the queues. Speciality of this exchange is: regardless of the routing key, every queue bound to this exchange receives the message.

We need to add the following Bean in our rabbitMQ config.

@Bean
public Declarables createPostRegistartionSchema(){

return new Declarables(
new FanoutExchange("x.post-registration"),
new Queue("q.send-email" ),
new Queue("q.send-sms"),
new Binding("q.send-email", Binding.DestinationType.QUEUE, "x.post-registration", "send-email", null),
new Binding("q.send-sms", Binding.DestinationType.QUEUE, "x.post-registration", "send-sms", null));

}

Here in the first line, we declare the exchange, by convention name of exchanges starts with x.

In the next two lines we declared the queues. Then in the last two lines we declared the binding of the queues with this x.post-registration exchange.

After restarting the application go to the Exchanges tab of the management portal, you’ll see there is an exchange created with our given name along with the other exchanges.

Click on that and you’ll see the binding:

Now I have added two consumers, one for sms and one for email:

@Service
@Slf4j
public class SendSmsService {
@RabbitListener(queues = "q.send-sms")
public void sendSms(UserRegistrationRequest request) {

log.info("Sending sms to {} ", request.getMobileNumber());
}
}
--------------------------------------------------------------------
@Service
@Slf4j
public class SendEmailService {

@RabbitListener(queues = "q.send-email")
public void sendEmail(UserRegistrationRequest request) {

log.info("Sending email to {}", request.getEmail());
}
}

Finally add the following line in the userRegistrationListener:

rabbitTemplate.convertAndSend(“x.post-registration”,””, event);

Then restart the application and hit the api, following log should be visible:

Cool right? Submit once and receive in multiple consumers. Now these look properly decoupled.

Enable graceful retry mechanism

As stated before, registering a user is an extremely complex process. There might raise some exceptions for some cases, and retrying might be a good option for these exceptions. You can give the headache of retrying to rabbitMQ just by throwing that exception. Let’s simulate the scenario:

@RabbitListener(queues = {"q.user-registration"})
public void onUserRegistration(UserRegistrationRequest event) {
log.info("User Registration Event Received: {}", event);

executeRegistration(event);


rabbitTemplate.convertAndSend("x.post-registration","", event);
}

private void executeRegistration(UserRegistrationRequest event) {
log.info("Executing User Registration Event: {}", event);

throw new RuntimeException("Registration Failed");

}

We are calling the executeRegistration method from our listener, and inside this method we are generating a RunTimeException. Now restart the application and hit the api. Wow! Its retrying… problem solved! But wait….. How long will it retry? Well, until the exception goes away. In this case we can change the code and restart, but in the real world this won’t be possible. If the exception keeps occurring we need to stop at some point and also wait between attempts. You did notice that right now it’s retrying too fast right?

To solve that we need to declare two beans. One SimpleRabbitListenerContainerFactory and another RetryOperationsInterceptor . Then add this interceptor to the listenerContainerFactory’s advice chain.

@Bean
public RetryOperationsInterceptor retryInterceptor(){
return RetryInterceptorBuilder.stateless().maxAttempts(3)
.backOffOptions(2000, 2.0, 100000)
.build();
}


@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, cachingConnectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
factory.setAdviceChain(retryInterceptor());
return factory;
}

Now restart the application and hit the api. It should retry 2 times only after the initial attempt. First after two seconds, second after four seconds.

Here is a catch, naming the bean rabbitListenerContainerFactory overrides the default listener container factory. Thus this retry mechanism will get activated for all the listeners. If you don’t want this and want this mechanism only for UserRegistrationListener, do the following. Rename the factory something else.. Like: registrationListenerContainerFactory then mention it in the RabbitListener annotation line this:

@RabbitListener(queues = {"q.user-registration"}, containerFactory = "registrationListenerContainerFactory")

Notice that, after the retryer getting exhausted messages are being dropped. Thus, messages will be lost if exceptions keep occurring during retry. This might not be the ideal scenario for many cases. We may want to keep track of the failures for further processing or investigations in the future.

Add dead letter queue for failed registrations

Dead letter queue is just another queue, what makes it special is when or how we use it. At first we’ll declare a schema for this.

@Bean
public Declarables createDeadLetterSchema(){
return new Declarables(
new DirectExchange("x.registration-failure"),
new Queue("q.fall-back-registration"),
new Binding("q.fall-back-registration", Binding.DestinationType.QUEUE,"x.registration-failure", "fall-back", null)
);
}

Just a normal exchange , queue and their binding. I’ve set the routing key as “fall-back” which will be required later.

Now we will configure our q.user-registration so that it knows it has a dead letter queue. It’s best to delete the previous queue. Otherwise updated configuration might not reflect.

@Bean
public Queue createUserRegistrationQueue() {

return QueueBuilder.durable("q.user-registration")
.withArgument("x-dead-letter-exchange","x.registration-failure")
.withArgument("x-dead-letter-routing-key","fall-back")
.build();
}

Here we declared the queue with dead letter configuration. Arguments are self explanatory here.

One last piece of the puzzle. To make sure rabbitMQ conveys the message to DLX we need to throw an AmqpRejectAndDontRequeueException from the listener. But we cannot just throw that now as we are using a retry interceptor. Instead we will tell the retry interceptor to throw this exception when it exhausts. Like this:

@Bean
public RetryOperationsInterceptor retryInterceptor(){
return RetryInterceptorBuilder.stateless().maxAttempts(3)
.backOffOptions(2000, 2.0, 100000)
.recoverer(new RejectAndDontRequeueRecoverer())
.build();
}

Now, we have to add a listener that will receive message to the queue:

@Service
@Slf4j
public class FallBackRegistrationService {

@RabbitListener(queues = {"q.fall-back-registration"})
public void onRegistrationFailure(UserRegistrationRequest failedRegistration){
log.info("Executing fallback for failed registration {}", failedRegistration);
}

}

Now restart the application and check the logs. After the entries of retry attempt there will be new line like this:

Now it’s up to you what you want to do here. Either you may save it in a db, call some other service, or requeue in another queue.

Add distributed tracing

That’s the easiest part. Just add sleuth in your dependency and you’ll see trace ids are propagating in all logs.

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>

This is particularly helpful if different application or micro services are involved.

That’s it for today.

All the code for this demo is available here.

Support me 👇

--

--

Ruhshan Ahmed Abir
Javarevisited

Started with poetry, ended up with codes. Have a university degree on Biotechnology. Works and talks about Java, Python, JS. www.buymeacoffee.com/ruhshan