watermill/reference.md

10 KiB

Watermill AMQP Detailed Reference

Full AMQP Config Struct

type Config struct {
    Connection      ConnectionConfig
    Marshaler       Marshaler
    Exchange        ExchangeConfig
    Queue           QueueConfig
    QueueBind       QueueBindConfig
    Publish         PublishConfig
    Consume         ConsumeConfig
    TopologyBuilder TopologyBuilder
    TLSConfig       *tls.Config
}

ConnectionConfig

type ConnectionConfig struct {
    AmqpURI   string
    TLSConfig *tls.Config
}

ExchangeConfig

type ExchangeConfig struct {
    GenerateName func(topic string) string
    Type         string  // "fanout", "topic", "direct", "headers"
    Durable      bool
    AutoDeleted  bool
    Internal     bool
    NoWait       bool
    Arguments    amqp.Table
}

QueueConfig

type QueueConfig struct {
    GenerateName func(topic string) string
    Durable      bool
    AutoDelete   bool
    Exclusive    bool
    NoWait       bool
    Arguments    amqp.Table  // x-message-ttl, x-dead-letter-exchange, etc.
}

QueueBindConfig

type QueueBindConfig struct {
    GenerateRoutingKey func(topic string) string
    NoWait             bool
    Arguments          amqp.Table
}

PublishConfig

type PublishConfig struct {
    GenerateRoutingKey func(topic string) string
    Mandatory          bool
    Immediate          bool
}

ConsumeConfig

type ConsumeConfig struct {
    Qos       QosConfig
    Consumer  string
    Exclusive bool
    NoLocal   bool
    NoWait    bool
    Arguments amqp.Table
}

type QosConfig struct {
    PrefetchCount int
    PrefetchSize  int
    Global        bool
}

Pre-Built Config Functions

NewDurableQueueConfig

  • No exchange declared
  • Queue name = topic name
  • Persistent delivery mode
  • Best for: work queues, task distribution

NewNonDurableQueueConfig

  • Same as above but non-durable, non-persistent
  • Best for: temporary/ephemeral queues

NewDurablePubSubConfig

  • Fanout exchange (all consumers get all messages)
  • Requires GenerateQueueNameFunc for unique queue per consumer
  • Persistent delivery mode
  • Best for: event broadcasting, notifications

NewNonDurablePubSubConfig

  • Same as above but non-durable
  • Best for: real-time updates where persistence isn't needed

NewDurableTopicConfig / NewNonDurableTopicConfig

  • Topic exchange with routing key pattern matching
  • Best for: selective message routing by pattern

Marshaler

type Marshaler interface {
    Marshal(msg *message.Message) (amqp091.Publishing, error)
    Unmarshal(amqpMsg amqp091.Delivery) (*message.Message, error)
}

DefaultMarshaler

type DefaultMarshaler struct {
    // Customize the amqp091.Publishing before sending
    PostprocessPublishing func(amqp091.Publishing) amqp091.Publishing

    // Set true for non-persistent delivery (higher throughput, no disk writes)
    NotPersistentDeliveryMode bool

    // Header key for storing Watermill message UUID (default: "Watermill_MessageUUID")
    MessageUUIDHeaderKey string
}

Example: adding content type

marshaler := amqp.DefaultMarshaler{
    PostprocessPublishing: func(p amqp091.Publishing) amqp091.Publishing {
        p.ContentType = "application/json"
        p.CorrelationId = "my-correlation-id"
        return p
    },
}

TopologyBuilder

Default: DefaultTopologyBuilder — declares exchanges, queues, and bindings automatically.

For custom topology (pre-existing infrastructure):

type MyTopologyBuilder struct{}

func (t *MyTopologyBuilder) BuildTopology(channel *amqp091.Channel, queueName string, exchangeName string, config Config, logger watermill.LoggerAdapter) error {
    // Custom declarations or no-ops for pre-declared topology
    return nil
}

func (t *MyTopologyBuilder) ExchangeDeclare(channel *amqp091.Channel, exchangeName string, config Config) error {
    return nil  // skip exchange declaration
}

Publisher Usage

publisher, err := amqp.NewPublisher(config, logger)
if err != nil {
    return err
}
defer publisher.Close()  // IMPORTANT: flushes unsent messages

msg := message.NewMessage(watermill.NewUUID(), []byte(`{"event":"user_created"}`))
msg.Metadata.Set("correlation_id", "abc-123")

err = publisher.Publish("events.user", msg)
  • Thread-safe
  • Publish blocks until broker confirms receipt (with persistent delivery)
  • Topic maps to exchange/queue/routing-key depending on config

Subscriber Usage

subscriber, err := amqp.NewSubscriber(config, logger)
if err != nil {
    return err
}
defer subscriber.Close()

messages, err := subscriber.Subscribe(ctx, "events.user")
if err != nil {
    return err
}

for msg := range messages {
    var event UserEvent
    if err := json.Unmarshal(msg.Payload, &event); err != nil {
        msg.Nack()  // redelivery
        continue
    }
    // process event...
    msg.Ack()  // acknowledge
}
  • Channel closes when Close() is called or context is canceled
  • Must Ack/Nack every message
  • Next message delivered only after Ack (with prefetch=1)

Router Detailed Reference

RouterConfig

type RouterConfig struct {
    CloseTimeout time.Duration  // default: 30s
}

Handler Registration

// Full handler: consume + publish
handler := router.AddHandler(
    "handler_name",     // must be unique
    "subscribe_topic",  // consumed topic
    subscriber,         // Subscriber implementation
    "publish_topic",    // produced topic
    publisher,          // Publisher implementation
    handlerFunc,        // func(msg *Message) ([]*Message, error)
)

// Consumer-only handler
router.AddConsumerHandler(
    "consumer_name",
    "subscribe_topic",
    subscriber,
    noPublishHandlerFunc,  // func(msg *Message) error
)

Handler-Level Middleware

handler := router.AddHandler(...)
handler.AddMiddleware(
    middleware.Timeout(5 * time.Second),
)

Context Helpers

middleware.HandlerNameFromCtx(msg.Context())     // "handler_name"
middleware.SubscriberNameFromCtx(msg.Context())  // e.g. "amqp.Subscriber"
middleware.PublisherNameFromCtx(msg.Context())
middleware.SubscribeTopicFromCtx(msg.Context())
middleware.PublishTopicFromCtx(msg.Context())

Middleware Detailed Reference

Retry

middleware.Retry{
    MaxRetries:          3,
    InitialInterval:     100 * time.Millisecond,
    MaxInterval:         10 * time.Second,
    Multiplier:          2.0,
    MaxElapsedTime:      30 * time.Second,
    RandomizationFactor: 0.5,
    OnRetryHook: func(retryNum int, delay time.Duration) {
        log.Printf("retry %d after %v", retryNum, delay)
    },
    ShouldRetry: func(err error) bool {
        return !errors.Is(err, ErrPermanent)  // skip retry for permanent errors
    },
    Logger: logger,
}.Middleware

Circuit Breaker

import "github.com/sony/gobreaker"

cb := middleware.NewCircuitBreaker(gobreaker.Settings{
    Name:        "my-handler",
    MaxRequests: 3,             // half-open state max requests
    Interval:    10 * time.Second,
    Timeout:     30 * time.Second,
    ReadyToTrip: func(counts gobreaker.Counts) bool {
        return counts.ConsecutiveFailures > 5
    },
})
router.AddMiddleware(cb.Middleware)

Timeout

router.AddMiddleware(middleware.Timeout(5 * time.Second))
// Handler must check: msg.Context().Done()

Throttle

// 10 messages per second
throttle := middleware.NewThrottle(10, time.Second)
router.AddMiddleware(throttle.Middleware)

Poison Queue

// All errors go to poison queue
pq, err := middleware.PoisonQueue(publisher, "failed_messages")
router.AddMiddleware(pq)

// Only specific errors
pq, err := middleware.PoisonQueueWithFilter(publisher, "failed_messages",
    func(err error) bool {
        return errors.Is(err, ErrUnprocessable)
    },
)

Deduplicator

dedup, err := middleware.NewDeduplicator(
    middleware.NewMessageHasherSHA256(),  // or NewMessageHasherAdler32()
    middleware.NewInMemoryExpiringKeyRepository(10 * time.Minute),
)
router.AddMiddleware(dedup.Middleware)

Correlation ID

// Set on first message entering system
middleware.SetCorrelationID(watermill.NewUUID(), msg)

// Read in any handler downstream
corrID := middleware.MessageCorrelationID(msg)

// Auto-propagate (add as router middleware)
router.AddMiddleware(middleware.CorrelationID)

Common AMQP Patterns

Work Queue (competing consumers)

// Multiple workers share the same queue
config := amqp.NewDurableQueueConfig("amqp://localhost:5672/")
// All subscribers with the same config compete for messages

Fanout (pub/sub broadcast)

config := amqp.NewDurablePubSubConfig(
    "amqp://localhost:5672/",
    amqp.GenerateQueueNameTopicNameWithSuffix("service-name"),
)
// Each service gets its own queue bound to the fanout exchange

Topic Routing

config := amqp.NewDurableTopicConfig(
    "amqp://localhost:5672/",
    func(topic string) string { return "events" },           // exchange
    func(topic string) string { return "service." + topic },  // queue
)
// Use routing key patterns like "user.*", "order.#"

Dead Letter Queue

config := amqp.Config{
    // ...
    Queue: amqp.QueueConfig{
        GenerateName: func(topic string) string { return topic },
        Durable:      true,
        Arguments: amqp.Table{
            "x-dead-letter-exchange":    "dlx",
            "x-dead-letter-routing-key": "dead_letter",
            "x-message-ttl":             int32(300000),  // 5 minutes
        },
    },
}

Delayed/Scheduled Messages

Use the DelayOnError middleware or RabbitMQ delayed message exchange plugin.

Logging

// Development
logger := watermill.NewStdLogger(debug, trace)

// Production (slog)
logger := watermill.NewSlogLogger(slog.Default())

Advanced: Outbox Pattern

Watermill supports the transactional outbox pattern via the Forwarder component — publish messages to a database first, then forward to the broker. This guarantees no message loss even if the broker is down.

Advanced: FanIn / FanOut

  • FanIn: Merge multiple topics into one handler
  • FanOut: Duplicate messages to multiple subscribers from one topic