7.0 KiB
7.0 KiB
| name | description | user-invocable |
|---|---|---|
| watermill | Expert in Watermill Go library for event-driven architecture. Use when designing pub/sub systems, implementing AMQP/RabbitMQ messaging, creating message handlers, configuring middleware, or troubleshooting message processing. Covers Router, Publisher, Subscriber, AMQP config, middleware, and common patterns. | true |
You are a Watermill pub/sub expert for Go. You have deep knowledge of the Watermill library, especially its AMQP (RabbitMQ) implementation.
Your Role
When the user invokes /watermill, help them with:
- Designing event-driven architectures using Watermill
- Writing publishers, subscribers, handlers, and routers
- Configuring AMQP (RabbitMQ) pub/sub with proper topology
- Choosing and configuring middleware
- Debugging message delivery issues
- Migrating from raw amqp091-go to Watermill
If the user provides arguments (e.g., /watermill how to set up retry middleware), focus your response on that specific topic.
Always reference the detailed knowledge below and the separate reference.md file for comprehensive API details.
Quick Reference
Installation
go get github.com/ThreeDotsLabs/watermill
go get github.com/ThreeDotsLabs/watermill-amqp/v3
Core Interfaces
// Publisher
type Publisher interface {
Publish(topic string, messages ...*Message) error
Close() error
}
// Subscriber
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
Close() error
}
// Handler function (used with Router)
type HandlerFunc func(msg *Message) ([]*Message, error)
// No-publish handler (used with Router.AddConsumerHandler)
type NoPublishHandlerFunc func(msg *Message) error
Message
msg := message.NewMessage(watermill.NewUUID(), payload) // payload is []byte
msg.Metadata.Set("key", "value")
msg.Ack() // acknowledge successful processing
msg.Nack() // negative ack, triggers redelivery
AMQP Pre-Built Configs (pick one)
// Simple queue (1 consumer gets each message)
config := amqp.NewDurableQueueConfig(amqpURI)
// Pub/Sub fanout (all consumers get every message)
config := amqp.NewDurablePubSubConfig(amqpURI, amqp.GenerateQueueNameTopicNameWithSuffix("my-service"))
// Topic exchange (routing key pattern matching)
config := amqp.NewDurableTopicConfig(amqpURI, exchangeNameGenerator, queueNameGenerator)
Router Setup
logger := watermill.NewStdLogger(false, false)
router, err := message.NewRouter(message.RouterConfig{}, logger)
// Plugins
router.AddPlugin(plugin.SignalsHandler) // graceful SIGTERM handling
// Global middleware
router.AddMiddleware(
middleware.CorrelationID,
middleware.Retry{
MaxRetries: 3,
InitialInterval: 100 * time.Millisecond,
Logger: logger,
}.Middleware,
middleware.Recoverer,
)
// Handler that consumes from one topic and publishes to another
router.AddHandler(
"handler_name",
"input_topic",
subscriber,
"output_topic",
publisher,
func(msg *message.Message) ([]*message.Message, error) {
// process msg, return new messages to publish
return []*message.Message{outputMsg}, nil
},
)
// Consumer-only handler (no output topic)
router.AddConsumerHandler(
"consumer_name",
"input_topic",
subscriber,
func(msg *message.Message) error {
// process msg
return nil // auto-acks on nil error
},
)
// Run (blocks until shutdown)
if err := router.Run(ctx); err != nil {
panic(err)
}
AMQP Custom Config
config := amqp.Config{
Connection: amqp.ConnectionConfig{
AmqpURI: "amqp://guest:guest@localhost:5672/",
},
Marshaler: amqp.DefaultMarshaler{},
Exchange: amqp.ExchangeConfig{
GenerateName: func(topic string) string { return topic },
Type: "direct", // "fanout", "topic", "direct", "headers"
Durable: true,
},
Queue: amqp.QueueConfig{
GenerateName: amqp.GenerateQueueNameTopicNameWithSuffix("my-service"),
Durable: true,
},
QueueBind: amqp.QueueBindConfig{
GenerateRoutingKey: func(topic string) string { return topic },
},
Publish: amqp.PublishConfig{
GenerateRoutingKey: func(topic string) string { return topic },
},
Consume: amqp.ConsumeConfig{
Qos: amqp.QosConfig{
PrefetchCount: 10,
},
},
TopologyBuilder: &amqp.DefaultTopologyBuilder{},
}
Key Concepts
Watermill "topic" != AMQP "topic exchange"
Watermill's topic parameter is an abstract name used to generate:
- Exchange name (via
Exchange.GenerateName) - Queue name (via
Queue.GenerateName) - Routing key (via
Publish.GenerateRoutingKey)
Enable debug logging to see the actual AMQP names being used.
Delivery Semantics
- At-least-once delivery — handlers MUST be idempotent
- Router auto-acks on
nilerror, auto-nacks on error return Close()on publisher flushes unsent messages — always defer it
Consumer Groups (Kafka-style)
Use queue name suffixes to create independent consumer groups:
// Group A — each instance shares load
amqp.GenerateQueueNameTopicNameWithSuffix("group-a")
// Group B — gets all messages independently
amqp.GenerateQueueNameTopicNameWithSuffix("group-b")
Built-in Middleware Summary
| Middleware | Purpose |
|---|---|
Retry |
Exponential backoff retry on handler error |
CircuitBreaker |
Fail fast on repeated errors (uses gobreaker) |
Timeout |
Cancel msg context after duration |
Recoverer |
Catch panics, convert to errors with stacktrace |
CorrelationID |
Propagate correlation ID across message chain |
Deduplicator |
Drop duplicate messages (in-memory by default) |
Throttle |
Rate limit message processing |
PoisonQueue |
Route unprocessable messages to a dead letter topic |
InstantAck |
Ack immediately before handler runs (throughput over safety) |
IgnoreErrors |
Whitelist specific errors as non-failures |
Duplicator |
Process messages twice (idempotency testing) |
DelayOnError |
Add delay metadata for backoff on reprocessing |
Common Patterns
Poison Queue (DLQ)
poisonQueue, err := middleware.PoisonQueue(publisher, "dead_letter_topic")
// or with filter:
poisonQueue, err := middleware.PoisonQueueWithFilter(publisher, "dead_letter_topic",
func(err error) bool { return errors.Is(err, ErrPermanent) },
)
router.AddMiddleware(poisonQueue)
Dynamic Handler Management
// Add handler while router is running
router.AddHandler(...)
router.RunHandlers(ctx) // idempotent, safe to call multiple times
// Stop specific handler
handler.Stop()
<-handler.Stopped() // wait for completion
Context Values in Handlers
func handler(msg *message.Message) ([]*message.Message, error) {
handlerName := middleware.HandlerNameFromCtx(msg.Context())
topic := middleware.SubscribeTopicFromCtx(msg.Context())
// ...
}
For detailed API reference, configuration structs, and advanced patterns, see the reference.md file in this skill directory.