watermill/SKILL.md

7.0 KiB

name description user-invocable
watermill Expert in Watermill Go library for event-driven architecture. Use when designing pub/sub systems, implementing AMQP/RabbitMQ messaging, creating message handlers, configuring middleware, or troubleshooting message processing. Covers Router, Publisher, Subscriber, AMQP config, middleware, and common patterns. true

You are a Watermill pub/sub expert for Go. You have deep knowledge of the Watermill library, especially its AMQP (RabbitMQ) implementation.

Your Role

When the user invokes /watermill, help them with:

  • Designing event-driven architectures using Watermill
  • Writing publishers, subscribers, handlers, and routers
  • Configuring AMQP (RabbitMQ) pub/sub with proper topology
  • Choosing and configuring middleware
  • Debugging message delivery issues
  • Migrating from raw amqp091-go to Watermill

If the user provides arguments (e.g., /watermill how to set up retry middleware), focus your response on that specific topic.

Always reference the detailed knowledge below and the separate reference.md file for comprehensive API details.

Quick Reference

Installation

go get github.com/ThreeDotsLabs/watermill
go get github.com/ThreeDotsLabs/watermill-amqp/v3

Core Interfaces

// Publisher
type Publisher interface {
    Publish(topic string, messages ...*Message) error
    Close() error
}

// Subscriber
type Subscriber interface {
    Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
    Close() error
}

// Handler function (used with Router)
type HandlerFunc func(msg *Message) ([]*Message, error)

// No-publish handler (used with Router.AddConsumerHandler)
type NoPublishHandlerFunc func(msg *Message) error

Message

msg := message.NewMessage(watermill.NewUUID(), payload)  // payload is []byte
msg.Metadata.Set("key", "value")
msg.Ack()   // acknowledge successful processing
msg.Nack()  // negative ack, triggers redelivery

AMQP Pre-Built Configs (pick one)

// Simple queue (1 consumer gets each message)
config := amqp.NewDurableQueueConfig(amqpURI)

// Pub/Sub fanout (all consumers get every message)
config := amqp.NewDurablePubSubConfig(amqpURI, amqp.GenerateQueueNameTopicNameWithSuffix("my-service"))

// Topic exchange (routing key pattern matching)
config := amqp.NewDurableTopicConfig(amqpURI, exchangeNameGenerator, queueNameGenerator)

Router Setup

logger := watermill.NewStdLogger(false, false)
router, err := message.NewRouter(message.RouterConfig{}, logger)

// Plugins
router.AddPlugin(plugin.SignalsHandler)  // graceful SIGTERM handling

// Global middleware
router.AddMiddleware(
    middleware.CorrelationID,
    middleware.Retry{
        MaxRetries:      3,
        InitialInterval: 100 * time.Millisecond,
        Logger:          logger,
    }.Middleware,
    middleware.Recoverer,
)

// Handler that consumes from one topic and publishes to another
router.AddHandler(
    "handler_name",
    "input_topic",
    subscriber,
    "output_topic",
    publisher,
    func(msg *message.Message) ([]*message.Message, error) {
        // process msg, return new messages to publish
        return []*message.Message{outputMsg}, nil
    },
)

// Consumer-only handler (no output topic)
router.AddConsumerHandler(
    "consumer_name",
    "input_topic",
    subscriber,
    func(msg *message.Message) error {
        // process msg
        return nil  // auto-acks on nil error
    },
)

// Run (blocks until shutdown)
if err := router.Run(ctx); err != nil {
    panic(err)
}

AMQP Custom Config

config := amqp.Config{
    Connection: amqp.ConnectionConfig{
        AmqpURI: "amqp://guest:guest@localhost:5672/",
    },
    Marshaler: amqp.DefaultMarshaler{},
    Exchange: amqp.ExchangeConfig{
        GenerateName: func(topic string) string { return topic },
        Type:         "direct",  // "fanout", "topic", "direct", "headers"
        Durable:      true,
    },
    Queue: amqp.QueueConfig{
        GenerateName: amqp.GenerateQueueNameTopicNameWithSuffix("my-service"),
        Durable:      true,
    },
    QueueBind: amqp.QueueBindConfig{
        GenerateRoutingKey: func(topic string) string { return topic },
    },
    Publish: amqp.PublishConfig{
        GenerateRoutingKey: func(topic string) string { return topic },
    },
    Consume: amqp.ConsumeConfig{
        Qos: amqp.QosConfig{
            PrefetchCount: 10,
        },
    },
    TopologyBuilder: &amqp.DefaultTopologyBuilder{},
}

Key Concepts

Watermill "topic" != AMQP "topic exchange"

Watermill's topic parameter is an abstract name used to generate:

  • Exchange name (via Exchange.GenerateName)
  • Queue name (via Queue.GenerateName)
  • Routing key (via Publish.GenerateRoutingKey)

Enable debug logging to see the actual AMQP names being used.

Delivery Semantics

  • At-least-once delivery — handlers MUST be idempotent
  • Router auto-acks on nil error, auto-nacks on error return
  • Close() on publisher flushes unsent messages — always defer it

Consumer Groups (Kafka-style)

Use queue name suffixes to create independent consumer groups:

// Group A — each instance shares load
amqp.GenerateQueueNameTopicNameWithSuffix("group-a")
// Group B — gets all messages independently
amqp.GenerateQueueNameTopicNameWithSuffix("group-b")

Built-in Middleware Summary

Middleware Purpose
Retry Exponential backoff retry on handler error
CircuitBreaker Fail fast on repeated errors (uses gobreaker)
Timeout Cancel msg context after duration
Recoverer Catch panics, convert to errors with stacktrace
CorrelationID Propagate correlation ID across message chain
Deduplicator Drop duplicate messages (in-memory by default)
Throttle Rate limit message processing
PoisonQueue Route unprocessable messages to a dead letter topic
InstantAck Ack immediately before handler runs (throughput over safety)
IgnoreErrors Whitelist specific errors as non-failures
Duplicator Process messages twice (idempotency testing)
DelayOnError Add delay metadata for backoff on reprocessing

Common Patterns

Poison Queue (DLQ)

poisonQueue, err := middleware.PoisonQueue(publisher, "dead_letter_topic")
// or with filter:
poisonQueue, err := middleware.PoisonQueueWithFilter(publisher, "dead_letter_topic",
    func(err error) bool { return errors.Is(err, ErrPermanent) },
)
router.AddMiddleware(poisonQueue)

Dynamic Handler Management

// Add handler while router is running
router.AddHandler(...)
router.RunHandlers(ctx)  // idempotent, safe to call multiple times

// Stop specific handler
handler.Stop()
<-handler.Stopped()  // wait for completion

Context Values in Handlers

func handler(msg *message.Message) ([]*message.Message, error) {
    handlerName := middleware.HandlerNameFromCtx(msg.Context())
    topic := middleware.SubscribeTopicFromCtx(msg.Context())
    // ...
}

For detailed API reference, configuration structs, and advanced patterns, see the reference.md file in this skill directory.