Little Rabbit (Part 2) - The Deadly Traps
Little Rabbit (Part 2) - The Deadly Traps
Everything Was Running Smoothly...
After migrating from HTTP to RabbitMQ RPC (Part 1), my team thought we had mastered the little rabbit.
Natural load balancing. Easy scaling. Messages never lost.
Everything ran smoothly in the dev environment. Ran smoothly in UAT. Even ran smoothly in production...
...until it didn't.
Trap #1: Channel Leak
Symptoms
One fine day, RabbitMQ started refusing new connections. The logs showed a terrifying message:
AMQP operation channel.open failed due to maximum channels limitLooking at the dashboard, the number of channels was at... 65,535 - exactly the default RabbitMQ limit.
Understanding Connections and Channels
To understand this bug, you need to understand how RabbitMQ works:
A Connection is an actual TCP connection between your application and the RabbitMQ server. Creating connections is expensive (TCP handshake, authentication...).
A Channel is a "virtual channel" inside a connection. One connection can have many channels. Creating channels is much lighter than connections.
Think of a connection as a highway, and channels as the lanes on that highway.
The Culprit
A developer wrote code like this:
// Each request creates a new channel
public void sendMessage(String message) {
Channel channel = connection.createChannel();
channel.basicPublish("exchange", "routing.key",
null, message.getBytes());
// Done processing... forgot to close the channel!
}Each request creates a new channel. But nobody closes the channel.
1 million requests = 1 million open channels = RabbitMQ dies.
Solution: Singleton Channel
Instead of creating a new channel for each request, we used the Singleton pattern:
public class RabbitMQProducer {
private static RabbitMQProducer instance;
private Connection connection;
private Channel channel; // Singleton channel
private RabbitMQProducer() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
this.connection = factory.newConnection();
this.channel = connection.createChannel();
}
public static synchronized RabbitMQProducer getInstance() {
if (instance == null) {
instance = new RabbitMQProducer();
}
return instance;
}
public void sendMessage(String message) {
// Reuse channel instead of creating new ones
channel.basicPublish("exchange", "routing.key",
null, message.getBytes());
}
}One connection, one channel, shared across all requests.
Note on Thread Safety
Channels in RabbitMQ are not thread-safe. If you have multiple threads using the same channel, you need to synchronize:
public synchronized void sendMessage(String message) {
channel.basicPublish("exchange", "routing.key",
null, message.getBytes());
}Or use a channel pool - a set of pre-created channels that threads borrow and return.
Trap #2: Reply Queue Explosion
This trap is far more terrifying.
RPC Pattern - How It Works
Remember how the RPC pattern works:
Request:
- Producer creates a dedicated reply queue to receive the response
- Sends message with
replyTo = - Waits for response on that queue
Response:
- Consumer receives message, processes it
- Sends response to the queue specified in
replyTo - Producer receives the response
The Problem
A developer wrote code like this:
public String callRPC(String request) {
// Create a NEW reply queue for EACH request
String replyQueueName = channel.queueDeclare().getQueue();
// Send request with replyTo
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.replyTo(replyQueueName)
.correlationId(UUID.randomUUID().toString())
.build();
channel.basicPublish("", "rpc_queue", props, request.getBytes());
// Wait for response...
// Done... FORGOT TO DELETE THE REPLY QUEUE!
}Each RPC request creates a new reply queue. Nobody deletes the queue after using it.
- 1,000 requests = 1,000 queues
- 100,000 requests = 100,000 queues
- 1,000,000 requests = 1,000,000 queues
Consequences
RabbitMQ starts:
- Memory skyrocketing - each queue consumes memory
- Erlang processes increasing - each queue is an Erlang process
- Management UI slowing down - listing 1 million queues is a nightmare
- Eventually: crash
Solution 1: Direct Reply-to
RabbitMQ has a special feature called Direct Reply-to.
Instead of creating a dedicated queue, you use a pseudo-queue named amq.rabbitmq.reply-to:
public String callRPC(String request) {
// Use Direct Reply-to instead of creating a new queue
String replyTo = "amq.rabbitmq.reply-to";
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.replyTo(replyTo)
.correlationId(UUID.randomUUID().toString())
.build();
channel.basicPublish("", "rpc_queue", props, request.getBytes());
// RabbitMQ will route response directly to the consumer
// NO actual queue is created!
}Advantages:
- No actual queue created
- Response routed directly to consumer
- No cleanup needed
Disadvantages:
- Only works on the same channel
- No message persistence (if consumer dies, response is lost)
Solution 2: Singleton Reply Queue
If you need persistence, use a single reply queue for the producer:
public class RPCClient {
private static RPCClient instance;
private Channel channel;
private String replyQueueName; // Singleton reply queue
private Map<String, CompletableFuture<String>> pendingRequests;
private RPCClient() {
// Create ONE reply queue only
this.replyQueueName = channel.queueDeclare().getQueue();
this.pendingRequests = new ConcurrentHashMap<>();
// Consumer for reply queue
channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
String correlationId = delivery.getProperties().getCorrelationId();
CompletableFuture<String> future = pendingRequests.remove(correlationId);
if (future != null) {
future.complete(new String(delivery.getBody()));
}
}, consumerTag -> {});
}
public String callRPC(String request) {
String correlationId = UUID.randomUUID().toString();
CompletableFuture<String> future = new CompletableFuture<>();
pendingRequests.put(correlationId, future);
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.replyTo(replyQueueName) // ALWAYS use this queue
.correlationId(correlationId)
.build();
channel.basicPublish("", "rpc_queue", props, request.getBytes());
return future.get(30, TimeUnit.SECONDS);
}
}The idea:
- One reply queue for all requests
- Use
correlationIdto match responses with requests - ConcurrentHashMap to track pending requests
Trap #3: Non-Singleton Producer
You might think: "OK, I've fixed the channel leak and reply queue explosion."
But there's one more trap.
The Problem
In a multi-threaded or distributed environment (multiple instances), if each thread/instance creates its own producer:
// Thread 1
RPCClient client1 = new RPCClient(); // Creates reply queue 1
// Thread 2
RPCClient client2 = new RPCClient(); // Creates reply queue 2
// Thread 3
RPCClient client3 = new RPCClient(); // Creates reply queue 3
// ... 1000 threads = 1000 reply queuesYou're back in the reply queue explosion trap!
Solution: Singleton Producer
The producer MUST be a Singleton:
public class RPCClient {
private static volatile RPCClient instance;
public static RPCClient getInstance() {
if (instance == null) {
synchronized (RPCClient.class) {
if (instance == null) {
instance = new RPCClient();
}
}
}
return instance;
}
// Private constructor
private RPCClient() {
// ... setup connection, channel, reply queue
}
}Double-checked locking to ensure thread safety.
Summary of Traps
| Trap | Symptoms | Solution |
|---|---|---|
| Channel Leak | "maximum channels limit" | Singleton Channel or Channel Pool |
| Reply Queue Explosion | Memory increase, Erlang processes increase | Direct Reply-to or Singleton Reply Queue |
| Non-singleton Producer | Abnormally many reply queues | Singleton Producer pattern |
Recommended Config with Vert.x
If using the Vert.x RabbitMQ client, here's the recommended config:
RabbitMQOptions options = new RabbitMQOptions()
.setUser(config.user)
.setPassword(config.pass)
.setHost(config.host)
.setPort(config.port)
.setAutomaticRecoveryEnabled(true) // Auto reconnect
.setReconnectInterval(10000L) // 10s between retries
.setReconnectAttempts(5) // Retry 5 times
.setConnectionTimeout(7000) // 7s connection timeout
.setHandshakeTimeout(7000) // 7s handshake timeout
.setRequestedChannelMax(5) // Limit channel count
.setNetworkRecoveryInterval(7000); // 7s network recoveryImportant notes:
setRequestedChannelMax(5)- limit channels to detect leaks earlysetReconnectAttempts(5)- don't retry infinitely, avoid connection storms
We Thought We Had Learned All the Lessons
After fixing all these traps, my team became much more confident.
Channels were managed strictly. Reply queues used Singleton. Producers were also Singleton.
The system ran stably for months.
Until the night of January 11th, 2023.
Teaser: The Night of 500,000 Connections
8:09 PM. SOC called.
“"Hey, RabbitMQ has a problem."
Looking at the dashboard:
- Message rate: 0 (dropped to zero)
- Connections: 500,000 (normally 50,000)
- Erlang processes: increased 2.5x
Impact:
- 1.8 million transactions lost
- 446,000 users affected
What happened? And how did we handle it?
This story, I'll tell in the final part of the series.
"Little Rabbit" Series
| Part | Title | Content |
|---|---|---|
| Part 1 | When HTTP Is No Longer Enough | RabbitMQ RPC, competing consumers |
| Part 2 | The Deadly Traps | Channel Leak, Reply Queue Explosion (you are here) |
| Part 3 | The Night of 500,000 Connections | A real incident and lessons learned |
| Part 4 | A War Without Winners | Little's Law, accept uncertainty |
Stay tuned!