Message Queue Architecture
RabbitMQ configuration and queue management for the Notification Service.
Overview
The Notification Service uses RabbitMQ as a message broker to decouple API requests from email sending operations. This ensures reliability, scalability, and fault tolerance.
Queue Architecture
┌─────────────────────────────────────────────────────────────┐
│ RabbitMQ Broker │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Email Queue (Primary) │ │
│ │ - Name: email-queue │ │
│ │ - Durable: true │ │
│ │ - Max length: 100,000 │ │
│ ��� - DLX: email-dlx │ │
│ └──────────────┬─────────────────────────────────────┘ │
│ │ │
│ │ On reject/NACK │
│ │ │
│ ┌──────────────▼─────────────────────────────────────┐ │
│ │ Retry Queue (Delayed) │ │
│ │ - Name: email-retry-queue │ │
│ │ - Durable: true │ │
│ │ - TTL: varies (exponential backoff) │ │
│ │ - Routes back to: email-queue │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Dead Letter Exchange (DLX) │ │
│ │ - Name: email-dlx │ │
│ │ - Type: direct │ │
│ │ - Durable: true │ ���
│ └──────────────┬──────────────────────────────────────┘ │
│ │ │
│ │ Routes to │
│ │ │
│ ┌──────────────▼──────────────────────────────────────┐ │
│ │ Dead Letter Queue (DLQ) │ │
│ │ - Name: email-dlq │ │
│ │ - Durable: true │ │
│ │ - Manual intervention required │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Queue Configuration
Email Queue (Primary)
Primary queue for all notification jobs.
Properties:
{
name: 'email-queue',
durable: true,
arguments: {
'x-max-length': 100000,
'x-dead-letter-exchange': 'email-dlx',
'x-dead-letter-routing-key': 'email-dlq'
}
}
Characteristics:
- Durable: Survives broker restart
- Max length: 100,000 messages (prevents memory overflow)
- Dead letter exchange: Routes failed messages to DLX
- Fair dispatch: Workers receive messages round-robin
- Prefetch count: 1 (workers process one message at a time)
Retry Queue
Temporary queue for delayed retries with exponential backoff.
Properties:
{
name: 'email-retry-queue',
durable: true,
arguments: {
'x-dead-letter-exchange': '',
'x-dead-letter-routing-key': 'email-queue'
}
}
Retry Logic:
- Message expires after TTL
- Routes back to primary queue
- TTL increases exponentially: 10s, 30s, 60s, 120s, 300s
Example TTL Calculation:
const delays = [10000, 30000, 60000, 120000, 300000]; // milliseconds
const ttl = delays[Math.min(retryCount, delays.length - 1)];
Dead Letter Exchange (DLX)
Exchange that receives messages that failed all retry attempts.
Properties:
{
name: 'email-dlx',
type: 'direct',
durable: true
}
Routing:
- Binds to Dead Letter Queue
- Routing key:
email-dlq
Dead Letter Queue (DLQ)
Final destination for messages that cannot be processed.
Properties:
{
name: 'email-dlq',
durable: true
}
Handling:
- Manual inspection required
- Common issues: Invalid email, SMTP errors, template errors
- Can republish to main queue after fixing issues
Message Format
Job Structure
interface EmailJob {
type: 'verification' | 'notification';
data: EmailJobData;
timestamp: number;
retries: number;
}
interface EmailJobData {
// Common fields
notificationId: string;
callbackUrl?: string | null;
userId?: string | null;
// For verification emails
email?: string;
username?: string;
verificationLink?: string;
subject?: string;
// For custom notifications
to?: string;
message?: string;
}
Example: Verification Email
{
"type": "verification",
"data": {
"notificationId": "123e4567-e89b-12d3-a456-426614174000",
"email": "user@example.com",
"username": "john_doe",
"verificationLink": "https://app.com/verify?token=abc123",
"subject": "Verify your email address",
"userId": "456e7890-e12b-34c5-d678-901234567890",
"callbackUrl": "https://app.com/api/webhooks/email-sent"
},
"timestamp": 1707217200000,
"retries": 0
}
Example: Custom Notification
{
"type": "notification",
"data": {
"notificationId": "789e0123-e45b-67c8-d901-234567890abc",
"to": "user@example.com",
"subject": "Order Confirmation",
"message": "Your order #12345 has been confirmed!",
"userId": "456e7890-e12b-34c5-d678-901234567890",
"callbackUrl": null
},
"timestamp": 1707217200000,
"retries": 0
}
Message Flow
Normal Flow (Success)
1. API publishes message to email-queue
└─ Message properties: persistent, content-type: application/json
2. Worker consumes message from email-queue
└─ Prefetch: 1 (one message at a time)
3. Worker processes message
├─ Update DB: status = 'sending'
├─ Render email template
├─ Send via SMTP
└─ Update DB: status = 'sent'
4. Worker ACKs message
└─ Message removed from queue
Retry Flow (Temporary Failure)
1. API publishes message to email-queue
2. Worker consumes message
3. Worker processes message (fails - e.g., SMTP timeout)
└─ Update DB: status = 'retrying'
4. Worker NACKs message with requeue=false
└─ Message sent to retry queue via DLX
5. Message expires in retry queue after TTL
└─ Routes back to email-queue
6. Worker consumes message again (retry count incremented)
7. Worker processes message (success)
└─ Update DB: status = 'sent'
8. Worker ACKs message
└─ Message removed from queue
Dead Letter Flow (Max Retries Exceeded)
1. Worker attempts: retry 1, 2, 3 (all fail)
2. Worker processes message (fails - retry count = 3)
└─ Update DB: status = 'failed', errorMessage = '...'
3. Worker NACKs message with requeue=false
└─ Message sent to DLX
4. DLX routes message to DLQ
└─ Message stored in email-dlq
5. Manual intervention required
├─ Inspect message in RabbitMQ management UI
├─ Fix underlying issue (e.g., invalid email)
└─ Optionally republish to email-queue
Connection Management
Publish Channel
Dedicated channel for publishing messages from API service.
class RabbitMQConnection {
private publishChannel: ConfirmChannel;
async getPublishChannel(): Promise<ConfirmChannel> {
if (!this.publishChannel) {
this.publishChannel = await this.connection.createConfirmChannel();
}
return this.publishChannel;
}
async publish(queue: string, message: Buffer): Promise<void> {
const channel = await this.getPublishChannel();
channel.sendToQueue(queue, message, { persistent: true });
await channel.waitForConfirms(); // Wait for broker confirmation
}
}
Consume Channel
Dedicated channel for consuming messages in workers.
class EmailWorker {
private consumeChannel: Channel;
async start(): Promise<void> {
this.consumeChannel = await rabbitMQ.getConsumeChannel();
// Prefetch: process one message at a time
this.consumeChannel.prefetch(1);
// Start consuming
this.consumeChannel.consume('email-queue', async (msg) => {
if (!msg) return;
try {
await this.processMessage(msg);
this.consumeChannel.ack(msg); // Success
} catch (error) {
this.consumeChannel.nack(msg, false, false); // Fail, send to retry
}
});
}
}
Connection Recovery
connection.on('error', (err) => {
logger.error('RabbitMQ connection error', { error: err.message });
});
connection.on('close', () => {
logger.warn('RabbitMQ connection closed, reconnecting...');
setTimeout(() => this.connect(), 5000);
});
Configuration
Environment Variables
RABBITMQ_URL=amqp://user:pass@rabbitmq:5672
EMAIL_QUEUE_NAME=email-queue
EMAIL_RETRY_QUEUE_NAME=email-retry-queue
EMAIL_DLQ_NAME=email-dlq
RABBITMQ_DLX_EXCHANGE=email-dlx
Queue Settings
export default {
rabbitmq: {
url: process.env.RABBITMQ_URL,
exchanges: {
dlx: process.env.RABBITMQ_DLX_EXCHANGE || 'email-dlx'
},
queues: {
email: process.env.EMAIL_QUEUE_NAME || 'email-queue',
emailRetry: process.env.EMAIL_RETRY_QUEUE_NAME || 'email-retry-queue',
emailDlq: process.env.EMAIL_DLQ_NAME || 'email-dlq'
},
routingKeys: {
emailDlq: 'email-dlq'
},
settings: {
maxLength: 100000,
prefetch: 1
}
}
};
Monitoring
RabbitMQ Management UI
Access at http://localhost:15672 (default credentials: guest/guest)
Key Metrics:
- Queue depth (messages ready)
- Consumer count (active workers)
- Message rates (publish/deliver/ack)
- Unacked messages (in-flight)
Queue Depth Monitoring
# Check queue depth
curl -u guest:guest http://localhost:15672/api/queues/%2F/email-queue
# Alert if queue depth > 10,000
Dead Letter Queue Inspection
# Get messages in DLQ
curl -u guest:guest http://localhost:15672/api/queues/%2F/email-dlq/get \
-X POST \
-d '{"count":10,"ackmode":"ack_requeue_false","encoding":"auto"}'
Retry Strategy
Exponential Backoff
const RETRY_DELAYS = [
10000, // 10 seconds
30000, // 30 seconds
60000, // 1 minute
120000, // 2 minutes
300000 // 5 minutes
];
const MAX_RETRIES = RETRY_DELAYS.length;
function getRetryDelay(retryCount: number): number {
return RETRY_DELAYS[Math.min(retryCount, RETRY_DELAYS.length - 1)];
}
Retry Logic
async function processMessage(msg: ConsumeMessage): Promise<void> {
const job: EmailJob = JSON.parse(msg.content.toString());
try {
await sendEmail(job.data);
// Success - ACK message
} catch (error) {
job.retries++;
if (job.retries >= MAX_RETRIES) {
// Max retries exceeded - mark as failed, send to DLQ
await updateStatus(job.data.notificationId, 'failed', error.message);
this.channel.nack(msg, false, false);
} else {
// Retry - send to retry queue with TTL
await updateStatus(job.data.notificationId, 'retrying');
const delay = getRetryDelay(job.retries);
await this.publishToRetryQueue(job, delay);
this.channel.ack(msg); // ACK original message
}
}
}
Best Practices
Publishing
- Always use persistent messages (
persistent: true) - Use confirm channel and wait for confirmations
- Set content-type header (
application/json) - Include timestamp in message payload
- Keep messages small (< 1 MB)
Consuming
- Set prefetch to 1 for fair distribution
- Always ACK or NACK messages
- Use separate channels for publishing and consuming
- Implement idempotency (check if notification already processed)
- Handle consumer cancellation gracefully
Error Handling
- Log all errors with context (notification ID, retry count)
- Distinguish between retryable and non-retryable errors
- Update database status before NACK
- Monitor DLQ regularly
- Set up alerts for high DLQ depth
Performance
- Use connection pooling for high-throughput
- Monitor queue depth and scale workers accordingly
- Use lazy queues for large message backlogs
- Enable publisher confirms for reliability
- Set message TTL to prevent infinite retention
Troubleshooting
Queue Depth Growing
Symptom: Messages accumulating in queue
Possible Causes:
- Workers stopped or crashed
- SMTP server slow or unresponsive
- Workers processing slower than publishing rate
Solutions:
- Scale up workers:
docker-compose up --scale workers=5 - Check worker logs for errors
- Verify SMTP connectivity
- Increase worker resources (CPU, memory)
Messages in DLQ
Symptom: High count in email-dlq
Possible Causes:
- Invalid email addresses
- SMTP authentication failures
- Template rendering errors
- Network issues
Solutions:
- Inspect messages in RabbitMQ UI
- Check worker error logs
- Fix root cause (update template, verify SMTP credentials)
- Republish messages to email-queue after fixing
Connection Errors
Symptom: Workers unable to connect to RabbitMQ
Solutions:
- Verify RabbitMQ is running:
docker ps | grep rabbitmq - Check connection URL:
echo $RABBITMQ_URL - Test connectivity:
telnet rabbitmq 5672 - Check firewall rules
- Review RabbitMQ logs:
docker logs rabbitmq
Next Steps
- API Design - REST API architecture
- Deployment - Production deployment
- Database Schema - PostgreSQL tables