RabbitMQ integration into NestJS β
We've had to implement a NestJS custom transport strategy to support all the features we want from RabbitMQ. We did this because there's some large limitations in the native NestJS implementation.
- No multi queue support
- No exchanges
- Limited config
- Support basically the same features as redis
Dependencies β
Our current implementation uses a few dependencies (without NestJS being counted)
"amqplib"Official rabbit client"amqp-connection-manager"Handles reconnection for the rabbit client"class-transformer"validate message we pass though rabbit"class-validator"format message from rabbit to our classes
Moving Parts β
It breaks down into a few parts
"RabbitStrategy"This glues all the parts together. Gets metadata from"DiscoveryModule"uses"RabbitModule"to create routing on RabbitMQ and calls consumers when a message exists."RabbitModule"Handles clients, creating channels and publishing messages"DiscoveryModule"Read metadata on compennants loaded inside an app"Decorator"Add metadata to be found by"DiscoveryModule"and used by"RabbitStrategy"to create routing on RabbitMQ"Messages"Type and contract to be used to send messages through RabbitMQ. This is used by"RabbitModule"for publishing and"RabbitStrategy"when consuming
graph TD
classDef our fill:#44AF69
classDef nest fill:#F8333C
classDef action fill:#FCAB10
classDef dep fill:#2B9EB3
subgraph Color code
Our[part build be us]:::our -->
Nest[part native to NestJs]:::nest -->
Action[Action a part does]:::action -->
dep[Dependencies]:::dep
end
Bootstrap[Bootstrap app creation]:::nest --similar to workers--> App
App:::nest --> Strategy[RabbitStrategy]:::our --> App
App --> DiscovertyModule:::our
Strategy -.Fetch metadata to create rabbitmq routing ..-> DiscovertyModule
Strategy -.Use metadata to create queue/exchange .-> RabbitService
DiscovertyModule --> MetaData[Find metadata on component]:::action --List of components from NestJS --> ModulesContainer:::nest
ModulesContainer ----> Contoller:::nest
RabbitCustom --> AddMetaData[Decorator add metadata]:::action --> Contoller
RabbitQueue["@RabbitQueue"]:::our --> RabbitCustom:::our
RabbitRpc["@RabbitRpc"]:::our --> RabbitCustom
RabbitEvent["@RabbitEvent"]:::our --> RabbitCustom
RabbitTask["@RabbitTask"]:::our --> RabbitCustom
App --> RabbitModule:::our --> RabbitService:::our --> amqpSub[amqp client to subscribe]:::dep
RabbitService --> amqpPub[amqp client to publish]:::dep2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Decorators β
There's 4 base decorators with 1 being used to create custom logic.
@RabbitCustom β
This is the base decorator that is used by all the other decorators by passing config to "RabbitCustom". In most situations we should be using one of the generic rabbit decorators.
@RabbitQueue β
This is the simplest. listen to a static queue for a message and consume it.
Example of it's usage can be found packages/service/ashitaka/src/modules/consumer/deadLetter/deadLetterConsumer.controller.ts
@RabbitRpc β
This can be used to replace api calls. It's best used for short actions; the default is 5 seconds (under 5 min max). The way this works is by having a response queue inside the message created by the producer. When the consumer has finished or failed the task it'll publish the response into the response queue. When everything is finished the temporary queue is removed/destroyed.
Example of it's usage can be found packages/service/ashitaka/src/modules/consumer/hash/hashConsumer.controller.ts
graph TD
ResponseQueue --Listen to created queue--> Client
Client --Publish message into queue--> Exchange
Exchange[Topic exchange] --> Queue1[exclusive queue #1]
Exchange --> Queue2[exclusive queue #2]
Queue1 --> Response[Handle response]
Queue2 --> Response
Response --Publish into response queue--> ResponseQueue[Response queue]2
3
4
5
6
7
8
@RabbitTask β
This is used when you want to run a task one at a time with using ack to control the flow of messages.
Example of it's usage can be found packages/service/ashitaka/src/modules/consumer/yuba-log/yuba-log-consumer.controller.ts
graph TD
Exchange[Topic exchange] --> Queue1[exclusive queue #1]
Exchange --> Queue2[exclusive queue #2]2
3
@RabbitEvent β
This pattern is used for pub/sub. It can handle the same message being sent to different consumers based on a "type" being used as routing key.
Example of it's usage can be found packages/service/ashitaka/src/modules/consumer/authentication/authentication-message-consumer.controller.ts
graph TD
Exchange[Topic exchange] --Routing key 'PL_EMPLOYEE_LOGIN_SUCCESSFUL_EVENT'--> Exchange1
Exchange --Routing key 'USER_LOGIN_SUCCESSFUL_EVENT'--> Exchange2
Exchange1[Fanout exchange] --message copy#1--> QueueA1[exclusive queue#1]
Exchange1 --message copy#2--> QueueB1[exclusive queue#2]
Exchange2[Fanout exchange] --message copy#1--> QueueA2[exclusive queue#1]
Exchange2 --message copy#2--> QueueB2[exclusive queue#2]2
3
4
5
6
7
Message lifecycle β
All messages being published are validated on both ends(producer, consumer). We use this pattern on all messages with the exception of RPC having a few extra steps.
If a message fails to find a route by RabbitMQ it becomes a dead letter and it gets put in our dead letter exchange to be consumed and custom logic to be applied. This logic could be requeueing, logging or generating metrics.
Message have a few ways to become a dead letter:
- Fail to be routed
- Failed to have a ack be called for 30 minutes
- Message TTL expires (Message have a lifespan of 1 day)
- Message exceeds the queue length limit
graph TD
subgraph "Client/Producer"
Client -->
messageCheck[Validate message] -->
publish[Publish message]
end
publish --> rabbitmq[RabbitMQ] -->
foundRoute[RabbitMQ found a valid route] --> serviceA
subgraph serviceA [Service]
ConsumeMessageA[Consume message] -->
messageCheckA[Validate message] --"Message is valid call consumer function" --> Consumer[Controller function]
messageCheckA --> FailedValidate[Validation failed push error to sentry] --"Run 'Ack' so message doesn't fall in dead letter" -->
FailAck[Ack]
end
rabbitmq -->
notFoundRoute[RabbiMQ did not find a route] --Publish to alternateExchange -->
deadLetter[DeadLetter exchange] -->
deadLetterConsumer[Consume DeadLetter] --> serviceB
TimeoutAck[Message hit timeout] --"'Ack' didn't get sent in time" --> deadLetter
subgraph serviceB [Service]
ConsumeMessageB[Consume message] -->
messageCheckC[Validate message] --The validation is an empty object is basically skip --> ConsumerA[Controller function]
ConsumerA --> log
ConsumerA --> metric
ConsumerA --> etc
end2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Rpc messages have a few extra steps. We depend on RpcExceptionFilter to handle the error states so we don't have any hanging rpc calls from an unknown error being thrown.
graph TD
consume[Consume message] -.Response queue found inside message .-> tempQueue
consume --> validate[Validate message] --> callConsumer
subgraph "RPC handle inside of NestJS service"
callConsumer[Call handle] --> useFilter[Wrap filter around the handler] --> Handle[Handler Function] -->
Error --> catch[Filter catch thrown error] --> response[Response with error] --> responsePublish
catch --> log[Log Error]
Handle --A success should handle its ack and response internally --> Success --> responsePublish
end
responsePublish --> tempQueue2
3
4
5
6
7
8
9
10
For the rpc client it's simple they have X retries with a timeout of X for each call before throwing an error. More info can be found in packages/lib/nest-scaffold/src/nest/rabbitmq/module/rabbit.service.ts
graph TD
publishRpc -->
validate[Validate Message] ---->
retryWrapper[Retry Wrapper] --Promise.race --> timeout[Timeout will throw an error]
retryWrapper --Promise.race --> publishLogic
subgraph publishLogic [Publish Logic]
createQueue[Create temp queue] --inject temp queue into message --> Publish -->
awaitConsume[Await response on temp queue]
end
publishLogic --Error in response --> FinalError --> Done
publishLogic --> Success --> Done
publishLogic --> UnknownError --Cleanup resources --> retry[Retry a new publish]
timeout --> retry --> retryWrapper
retry --Hit max retry throw error and done --> Done2
3
4
5
6
7
8
9
10
11
12
13
14
15
π