# Watermill AMQP Detailed Reference ## Full AMQP Config Struct ```go type Config struct { Connection ConnectionConfig Marshaler Marshaler Exchange ExchangeConfig Queue QueueConfig QueueBind QueueBindConfig Publish PublishConfig Consume ConsumeConfig TopologyBuilder TopologyBuilder TLSConfig *tls.Config } ``` ### ConnectionConfig ```go type ConnectionConfig struct { AmqpURI string TLSConfig *tls.Config } ``` ### ExchangeConfig ```go type ExchangeConfig struct { GenerateName func(topic string) string Type string // "fanout", "topic", "direct", "headers" Durable bool AutoDeleted bool Internal bool NoWait bool Arguments amqp.Table } ``` ### QueueConfig ```go type QueueConfig struct { GenerateName func(topic string) string Durable bool AutoDelete bool Exclusive bool NoWait bool Arguments amqp.Table // x-message-ttl, x-dead-letter-exchange, etc. } ``` ### QueueBindConfig ```go type QueueBindConfig struct { GenerateRoutingKey func(topic string) string NoWait bool Arguments amqp.Table } ``` ### PublishConfig ```go type PublishConfig struct { GenerateRoutingKey func(topic string) string Mandatory bool Immediate bool } ``` ### ConsumeConfig ```go type ConsumeConfig struct { Qos QosConfig Consumer string Exclusive bool NoLocal bool NoWait bool Arguments amqp.Table } type QosConfig struct { PrefetchCount int PrefetchSize int Global bool } ``` ## Pre-Built Config Functions ### NewDurableQueueConfig - No exchange declared - Queue name = topic name - Persistent delivery mode - Best for: work queues, task distribution ### NewNonDurableQueueConfig - Same as above but non-durable, non-persistent - Best for: temporary/ephemeral queues ### NewDurablePubSubConfig - Fanout exchange (all consumers get all messages) - Requires `GenerateQueueNameFunc` for unique queue per consumer - Persistent delivery mode - Best for: event broadcasting, notifications ### NewNonDurablePubSubConfig - Same as above but non-durable - Best for: real-time updates where persistence isn't needed ### NewDurableTopicConfig / NewNonDurableTopicConfig - Topic exchange with routing key pattern matching - Best for: selective message routing by pattern ## Marshaler ```go type Marshaler interface { Marshal(msg *message.Message) (amqp091.Publishing, error) Unmarshal(amqpMsg amqp091.Delivery) (*message.Message, error) } ``` ### DefaultMarshaler ```go type DefaultMarshaler struct { // Customize the amqp091.Publishing before sending PostprocessPublishing func(amqp091.Publishing) amqp091.Publishing // Set true for non-persistent delivery (higher throughput, no disk writes) NotPersistentDeliveryMode bool // Header key for storing Watermill message UUID (default: "Watermill_MessageUUID") MessageUUIDHeaderKey string } ``` Example: adding content type ```go marshaler := amqp.DefaultMarshaler{ PostprocessPublishing: func(p amqp091.Publishing) amqp091.Publishing { p.ContentType = "application/json" p.CorrelationId = "my-correlation-id" return p }, } ``` ## TopologyBuilder Default: `DefaultTopologyBuilder` — declares exchanges, queues, and bindings automatically. For custom topology (pre-existing infrastructure): ```go type MyTopologyBuilder struct{} func (t *MyTopologyBuilder) BuildTopology(channel *amqp091.Channel, queueName string, exchangeName string, config Config, logger watermill.LoggerAdapter) error { // Custom declarations or no-ops for pre-declared topology return nil } func (t *MyTopologyBuilder) ExchangeDeclare(channel *amqp091.Channel, exchangeName string, config Config) error { return nil // skip exchange declaration } ``` ## Publisher Usage ```go publisher, err := amqp.NewPublisher(config, logger) if err != nil { return err } defer publisher.Close() // IMPORTANT: flushes unsent messages msg := message.NewMessage(watermill.NewUUID(), []byte(`{"event":"user_created"}`)) msg.Metadata.Set("correlation_id", "abc-123") err = publisher.Publish("events.user", msg) ``` - Thread-safe - Publish blocks until broker confirms receipt (with persistent delivery) - Topic maps to exchange/queue/routing-key depending on config ## Subscriber Usage ```go subscriber, err := amqp.NewSubscriber(config, logger) if err != nil { return err } defer subscriber.Close() messages, err := subscriber.Subscribe(ctx, "events.user") if err != nil { return err } for msg := range messages { var event UserEvent if err := json.Unmarshal(msg.Payload, &event); err != nil { msg.Nack() // redelivery continue } // process event... msg.Ack() // acknowledge } ``` - Channel closes when `Close()` is called or context is canceled - Must Ack/Nack every message - Next message delivered only after Ack (with prefetch=1) ## Router Detailed Reference ### RouterConfig ```go type RouterConfig struct { CloseTimeout time.Duration // default: 30s } ``` ### Handler Registration ```go // Full handler: consume + publish handler := router.AddHandler( "handler_name", // must be unique "subscribe_topic", // consumed topic subscriber, // Subscriber implementation "publish_topic", // produced topic publisher, // Publisher implementation handlerFunc, // func(msg *Message) ([]*Message, error) ) // Consumer-only handler router.AddConsumerHandler( "consumer_name", "subscribe_topic", subscriber, noPublishHandlerFunc, // func(msg *Message) error ) ``` ### Handler-Level Middleware ```go handler := router.AddHandler(...) handler.AddMiddleware( middleware.Timeout(5 * time.Second), ) ``` ### Context Helpers ```go middleware.HandlerNameFromCtx(msg.Context()) // "handler_name" middleware.SubscriberNameFromCtx(msg.Context()) // e.g. "amqp.Subscriber" middleware.PublisherNameFromCtx(msg.Context()) middleware.SubscribeTopicFromCtx(msg.Context()) middleware.PublishTopicFromCtx(msg.Context()) ``` ## Middleware Detailed Reference ### Retry ```go middleware.Retry{ MaxRetries: 3, InitialInterval: 100 * time.Millisecond, MaxInterval: 10 * time.Second, Multiplier: 2.0, MaxElapsedTime: 30 * time.Second, RandomizationFactor: 0.5, OnRetryHook: func(retryNum int, delay time.Duration) { log.Printf("retry %d after %v", retryNum, delay) }, ShouldRetry: func(err error) bool { return !errors.Is(err, ErrPermanent) // skip retry for permanent errors }, Logger: logger, }.Middleware ``` ### Circuit Breaker ```go import "github.com/sony/gobreaker" cb := middleware.NewCircuitBreaker(gobreaker.Settings{ Name: "my-handler", MaxRequests: 3, // half-open state max requests Interval: 10 * time.Second, Timeout: 30 * time.Second, ReadyToTrip: func(counts gobreaker.Counts) bool { return counts.ConsecutiveFailures > 5 }, }) router.AddMiddleware(cb.Middleware) ``` ### Timeout ```go router.AddMiddleware(middleware.Timeout(5 * time.Second)) // Handler must check: msg.Context().Done() ``` ### Throttle ```go // 10 messages per second throttle := middleware.NewThrottle(10, time.Second) router.AddMiddleware(throttle.Middleware) ``` ### Poison Queue ```go // All errors go to poison queue pq, err := middleware.PoisonQueue(publisher, "failed_messages") router.AddMiddleware(pq) // Only specific errors pq, err := middleware.PoisonQueueWithFilter(publisher, "failed_messages", func(err error) bool { return errors.Is(err, ErrUnprocessable) }, ) ``` ### Deduplicator ```go dedup, err := middleware.NewDeduplicator( middleware.NewMessageHasherSHA256(), // or NewMessageHasherAdler32() middleware.NewInMemoryExpiringKeyRepository(10 * time.Minute), ) router.AddMiddleware(dedup.Middleware) ``` ### Correlation ID ```go // Set on first message entering system middleware.SetCorrelationID(watermill.NewUUID(), msg) // Read in any handler downstream corrID := middleware.MessageCorrelationID(msg) // Auto-propagate (add as router middleware) router.AddMiddleware(middleware.CorrelationID) ``` ## Common AMQP Patterns ### Work Queue (competing consumers) ```go // Multiple workers share the same queue config := amqp.NewDurableQueueConfig("amqp://localhost:5672/") // All subscribers with the same config compete for messages ``` ### Fanout (pub/sub broadcast) ```go config := amqp.NewDurablePubSubConfig( "amqp://localhost:5672/", amqp.GenerateQueueNameTopicNameWithSuffix("service-name"), ) // Each service gets its own queue bound to the fanout exchange ``` ### Topic Routing ```go config := amqp.NewDurableTopicConfig( "amqp://localhost:5672/", func(topic string) string { return "events" }, // exchange func(topic string) string { return "service." + topic }, // queue ) // Use routing key patterns like "user.*", "order.#" ``` ### Dead Letter Queue ```go config := amqp.Config{ // ... Queue: amqp.QueueConfig{ GenerateName: func(topic string) string { return topic }, Durable: true, Arguments: amqp.Table{ "x-dead-letter-exchange": "dlx", "x-dead-letter-routing-key": "dead_letter", "x-message-ttl": int32(300000), // 5 minutes }, }, } ``` ### Delayed/Scheduled Messages Use the `DelayOnError` middleware or RabbitMQ delayed message exchange plugin. ## Logging ```go // Development logger := watermill.NewStdLogger(debug, trace) // Production (slog) logger := watermill.NewSlogLogger(slog.Default()) ``` ## Advanced: Outbox Pattern Watermill supports the transactional outbox pattern via the Forwarder component — publish messages to a database first, then forward to the broker. This guarantees no message loss even if the broker is down. ## Advanced: FanIn / FanOut - **FanIn**: Merge multiple topics into one handler - **FanOut**: Duplicate messages to multiple subscribers from one topic