RabbitMQ
EventFlow ships with a RabbitMQ integration that fans every persisted domain event out to an exchange. This is useful when downstream systems (read models, legacy services, analytics pipelines, and so on) must react to aggregate changes without being tightly coupled to the write model.
The integration focuses on publishing. It does not create queues or start consumers for you—topology remains an infrastructure concern so you can keep the messaging contract explicit.
Prerequisites
- RabbitMQ 3.8 or newer (older versions work, but automatic recovery and federation are more reliable on ≥3.8).
- The
EventFlow.RabbitMQ
package alongside the core EventFlow packages. - A pre-provisioned exchange (typically a durable topic exchange) plus the queues/bindings you want to consume from. EventFlow does not declare exchanges or queues automatically.
Install and register the publisher
1 |
|
Enable the publisher when you build your EventFlowOptions
.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
|
RabbitMqConfiguration.With
exposes the following knobs:
uri
– The AMQP URI, including credentials and vhost. Useamqps://
when TLS is required.persistent
– Whether RabbitMQ should persist messages to disk (true
by default). Set this tofalse
for transient data.modelsPrConnection
– How many channels (models) the integration pools per connection. Increase the value if you have a high write rate and observe channel contention.exchange
– Name of the exchange EventFlow publishes to. The exchange must already exist.
Once configured, EventFlow registers an ISubscribeSynchronousToAll
subscriber that ships each domain event to
RabbitMQ right after the event is committed to the event store. The command is considered complete only after the
publish succeeds (or ultimately fails), so RabbitMQ errors surface to the caller.
Exchange and routing conventions
By default messages are published with:
- Exchange – The value supplied via
RabbitMqConfiguration.With
(defaults toeventflow
). - Routing key –
eventflow.domainevent.{aggregate-name}.{event-name}.{event-version}
where each segment is slugified (lowercase, dashes for PascalCase).
For example, an event named UserRegistered
version 1
from CustomerAggregate
produces:
1 |
|
Creating queues and bindings
EventFlow does not create queues. Bind your own queues to the configured exchange using the routing keys that matter to a consumer. With the default topic exchange, you can subscribe to an entire aggregate or event family:
eventflow.domainevent.customer.*
– All events fromCustomerAggregate
.eventflow.domainevent.*.user-registered.*
– AnyUserRegistered
event regardless of aggregate.
1 2 3 4 5 6 |
|
Run similar provisioning code (or infrastructure as code) before your service starts or during deployment.
Message payload and headers
The integration serializes the aggregate event using EventFlow’s regular JSON serializer. Metadata is sent alongside the message in two places:
- Body – JSON payload with the actual event data. This is identical to what the event store persists.
- Headers – A
Dictionary<string,string>
containing EventFlow metadata such as: event_name
,event_version
aggregate_id
,aggregate_name
,aggregate_sequence_number
event_id
,batch_id
,timestamp
,timestamp_epoch
source_id
when available
Example body:
1 2 3 4 5 |
|
Example headers:
Header | Example value |
---|---|
event_name |
user-registered |
event_version |
1 |
aggregate_name |
customer |
aggregate_id |
customer-5b0d9af0 |
aggregate_sequence_number |
42 |
event_id |
01JF2ZNKX1QZS5CJ1V6AQ13RPT |
timestamp |
2025-09-20T17:53:41.2012129Z |
Leverage these headers to enrich logs, implement idempotency, or drive filtering logic in consumers.
Reliability characteristics
- Persistent messages – Enabled by default via
basicProperties.Persistent = true
when configured. - Connection pooling – A connection is opened per URI and keeps a pool of AMQP channels (models) to avoid throttling.
Tune
modelsPrConnection
for your throughput profile. - Automatic recovery – The RabbitMQ client enables topology and automatic connection recovery so brief network blips self-heal.
- Retry strategy – Transient
BrokerUnreachableException
,OperationInterruptedException
, andEndOfStreamException
are retried up to three times with a 25 ms backoff. ReplaceIRabbitMqRetryStrategy
in the container if you need custom retry logic.
Failures that propagate after retries cause the current command to fail; the publish will be retried the next time the command is executed or when the aggregate emits subsequent events.
Customizing the integration
- Alternate exchange or routing key – Replace the registered
IRabbitMqMessageFactory
with your own implementation to target different exchanges, enrich headers, or transform the payload. - Custom publish mechanics – Override
IRabbitMqPublisher
if you need publisher confirms, tracing, or batch semantics. - Asynchronous publishing – If you prefer to publish outside the command execution pipeline, register your own
ISubscribeAsynchronousToAll
implementation and publish from there instead of relying on the built-in synchronous publisher.
1 |
|
Troubleshooting
NOT_FOUND - no exchange
– The exchange name does not exist. Create it manually or update the configuration.NO_ROUTE
warnings – Nothing is bound to the routing key. Check your queue bindings.- Channel busy or blocked – Increase
modelsPrConnection
or scale out publishers. - Silent drops – Inspect consumer acknowledgements and dead-letter queues; EventFlow only publishes and cannot observe downstream failures.
For general guidance on subscribers and out-of-order delivery considerations, review the subscribers documentation.