10 KiB
10 KiB
Watermill AMQP Detailed Reference
Full AMQP Config Struct
type Config struct {
Connection ConnectionConfig
Marshaler Marshaler
Exchange ExchangeConfig
Queue QueueConfig
QueueBind QueueBindConfig
Publish PublishConfig
Consume ConsumeConfig
TopologyBuilder TopologyBuilder
TLSConfig *tls.Config
}
ConnectionConfig
type ConnectionConfig struct {
AmqpURI string
TLSConfig *tls.Config
}
ExchangeConfig
type ExchangeConfig struct {
GenerateName func(topic string) string
Type string // "fanout", "topic", "direct", "headers"
Durable bool
AutoDeleted bool
Internal bool
NoWait bool
Arguments amqp.Table
}
QueueConfig
type QueueConfig struct {
GenerateName func(topic string) string
Durable bool
AutoDelete bool
Exclusive bool
NoWait bool
Arguments amqp.Table // x-message-ttl, x-dead-letter-exchange, etc.
}
QueueBindConfig
type QueueBindConfig struct {
GenerateRoutingKey func(topic string) string
NoWait bool
Arguments amqp.Table
}
PublishConfig
type PublishConfig struct {
GenerateRoutingKey func(topic string) string
Mandatory bool
Immediate bool
}
ConsumeConfig
type ConsumeConfig struct {
Qos QosConfig
Consumer string
Exclusive bool
NoLocal bool
NoWait bool
Arguments amqp.Table
}
type QosConfig struct {
PrefetchCount int
PrefetchSize int
Global bool
}
Pre-Built Config Functions
NewDurableQueueConfig
- No exchange declared
- Queue name = topic name
- Persistent delivery mode
- Best for: work queues, task distribution
NewNonDurableQueueConfig
- Same as above but non-durable, non-persistent
- Best for: temporary/ephemeral queues
NewDurablePubSubConfig
- Fanout exchange (all consumers get all messages)
- Requires
GenerateQueueNameFuncfor unique queue per consumer - Persistent delivery mode
- Best for: event broadcasting, notifications
NewNonDurablePubSubConfig
- Same as above but non-durable
- Best for: real-time updates where persistence isn't needed
NewDurableTopicConfig / NewNonDurableTopicConfig
- Topic exchange with routing key pattern matching
- Best for: selective message routing by pattern
Marshaler
type Marshaler interface {
Marshal(msg *message.Message) (amqp091.Publishing, error)
Unmarshal(amqpMsg amqp091.Delivery) (*message.Message, error)
}
DefaultMarshaler
type DefaultMarshaler struct {
// Customize the amqp091.Publishing before sending
PostprocessPublishing func(amqp091.Publishing) amqp091.Publishing
// Set true for non-persistent delivery (higher throughput, no disk writes)
NotPersistentDeliveryMode bool
// Header key for storing Watermill message UUID (default: "Watermill_MessageUUID")
MessageUUIDHeaderKey string
}
Example: adding content type
marshaler := amqp.DefaultMarshaler{
PostprocessPublishing: func(p amqp091.Publishing) amqp091.Publishing {
p.ContentType = "application/json"
p.CorrelationId = "my-correlation-id"
return p
},
}
TopologyBuilder
Default: DefaultTopologyBuilder — declares exchanges, queues, and bindings automatically.
For custom topology (pre-existing infrastructure):
type MyTopologyBuilder struct{}
func (t *MyTopologyBuilder) BuildTopology(channel *amqp091.Channel, queueName string, exchangeName string, config Config, logger watermill.LoggerAdapter) error {
// Custom declarations or no-ops for pre-declared topology
return nil
}
func (t *MyTopologyBuilder) ExchangeDeclare(channel *amqp091.Channel, exchangeName string, config Config) error {
return nil // skip exchange declaration
}
Publisher Usage
publisher, err := amqp.NewPublisher(config, logger)
if err != nil {
return err
}
defer publisher.Close() // IMPORTANT: flushes unsent messages
msg := message.NewMessage(watermill.NewUUID(), []byte(`{"event":"user_created"}`))
msg.Metadata.Set("correlation_id", "abc-123")
err = publisher.Publish("events.user", msg)
- Thread-safe
- Publish blocks until broker confirms receipt (with persistent delivery)
- Topic maps to exchange/queue/routing-key depending on config
Subscriber Usage
subscriber, err := amqp.NewSubscriber(config, logger)
if err != nil {
return err
}
defer subscriber.Close()
messages, err := subscriber.Subscribe(ctx, "events.user")
if err != nil {
return err
}
for msg := range messages {
var event UserEvent
if err := json.Unmarshal(msg.Payload, &event); err != nil {
msg.Nack() // redelivery
continue
}
// process event...
msg.Ack() // acknowledge
}
- Channel closes when
Close()is called or context is canceled - Must Ack/Nack every message
- Next message delivered only after Ack (with prefetch=1)
Router Detailed Reference
RouterConfig
type RouterConfig struct {
CloseTimeout time.Duration // default: 30s
}
Handler Registration
// Full handler: consume + publish
handler := router.AddHandler(
"handler_name", // must be unique
"subscribe_topic", // consumed topic
subscriber, // Subscriber implementation
"publish_topic", // produced topic
publisher, // Publisher implementation
handlerFunc, // func(msg *Message) ([]*Message, error)
)
// Consumer-only handler
router.AddConsumerHandler(
"consumer_name",
"subscribe_topic",
subscriber,
noPublishHandlerFunc, // func(msg *Message) error
)
Handler-Level Middleware
handler := router.AddHandler(...)
handler.AddMiddleware(
middleware.Timeout(5 * time.Second),
)
Context Helpers
middleware.HandlerNameFromCtx(msg.Context()) // "handler_name"
middleware.SubscriberNameFromCtx(msg.Context()) // e.g. "amqp.Subscriber"
middleware.PublisherNameFromCtx(msg.Context())
middleware.SubscribeTopicFromCtx(msg.Context())
middleware.PublishTopicFromCtx(msg.Context())
Middleware Detailed Reference
Retry
middleware.Retry{
MaxRetries: 3,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 10 * time.Second,
Multiplier: 2.0,
MaxElapsedTime: 30 * time.Second,
RandomizationFactor: 0.5,
OnRetryHook: func(retryNum int, delay time.Duration) {
log.Printf("retry %d after %v", retryNum, delay)
},
ShouldRetry: func(err error) bool {
return !errors.Is(err, ErrPermanent) // skip retry for permanent errors
},
Logger: logger,
}.Middleware
Circuit Breaker
import "github.com/sony/gobreaker"
cb := middleware.NewCircuitBreaker(gobreaker.Settings{
Name: "my-handler",
MaxRequests: 3, // half-open state max requests
Interval: 10 * time.Second,
Timeout: 30 * time.Second,
ReadyToTrip: func(counts gobreaker.Counts) bool {
return counts.ConsecutiveFailures > 5
},
})
router.AddMiddleware(cb.Middleware)
Timeout
router.AddMiddleware(middleware.Timeout(5 * time.Second))
// Handler must check: msg.Context().Done()
Throttle
// 10 messages per second
throttle := middleware.NewThrottle(10, time.Second)
router.AddMiddleware(throttle.Middleware)
Poison Queue
// All errors go to poison queue
pq, err := middleware.PoisonQueue(publisher, "failed_messages")
router.AddMiddleware(pq)
// Only specific errors
pq, err := middleware.PoisonQueueWithFilter(publisher, "failed_messages",
func(err error) bool {
return errors.Is(err, ErrUnprocessable)
},
)
Deduplicator
dedup, err := middleware.NewDeduplicator(
middleware.NewMessageHasherSHA256(), // or NewMessageHasherAdler32()
middleware.NewInMemoryExpiringKeyRepository(10 * time.Minute),
)
router.AddMiddleware(dedup.Middleware)
Correlation ID
// Set on first message entering system
middleware.SetCorrelationID(watermill.NewUUID(), msg)
// Read in any handler downstream
corrID := middleware.MessageCorrelationID(msg)
// Auto-propagate (add as router middleware)
router.AddMiddleware(middleware.CorrelationID)
Common AMQP Patterns
Work Queue (competing consumers)
// Multiple workers share the same queue
config := amqp.NewDurableQueueConfig("amqp://localhost:5672/")
// All subscribers with the same config compete for messages
Fanout (pub/sub broadcast)
config := amqp.NewDurablePubSubConfig(
"amqp://localhost:5672/",
amqp.GenerateQueueNameTopicNameWithSuffix("service-name"),
)
// Each service gets its own queue bound to the fanout exchange
Topic Routing
config := amqp.NewDurableTopicConfig(
"amqp://localhost:5672/",
func(topic string) string { return "events" }, // exchange
func(topic string) string { return "service." + topic }, // queue
)
// Use routing key patterns like "user.*", "order.#"
Dead Letter Queue
config := amqp.Config{
// ...
Queue: amqp.QueueConfig{
GenerateName: func(topic string) string { return topic },
Durable: true,
Arguments: amqp.Table{
"x-dead-letter-exchange": "dlx",
"x-dead-letter-routing-key": "dead_letter",
"x-message-ttl": int32(300000), // 5 minutes
},
},
}
Delayed/Scheduled Messages
Use the DelayOnError middleware or RabbitMQ delayed message exchange plugin.
Logging
// Development
logger := watermill.NewStdLogger(debug, trace)
// Production (slog)
logger := watermill.NewSlogLogger(slog.Default())
Advanced: Outbox Pattern
Watermill supports the transactional outbox pattern via the Forwarder component — publish messages to a database first, then forward to the broker. This guarantees no message loss even if the broker is down.
Advanced: FanIn / FanOut
- FanIn: Merge multiple topics into one handler
- FanOut: Duplicate messages to multiple subscribers from one topic