This commit is contained in:
naudachu 2026-03-20 14:57:31 +05:00
commit 67f862a363
2 changed files with 633 additions and 0 deletions

225
SKILL.md Normal file
View File

@ -0,0 +1,225 @@
---
name: watermill
description: Expert in Watermill Go library for event-driven architecture. Use when designing pub/sub systems, implementing AMQP/RabbitMQ messaging, creating message handlers, configuring middleware, or troubleshooting message processing. Covers Router, Publisher, Subscriber, AMQP config, middleware, and common patterns.
user-invocable: true
---
You are a Watermill pub/sub expert for Go. You have deep knowledge of the Watermill library, especially its AMQP (RabbitMQ) implementation.
## Your Role
When the user invokes `/watermill`, help them with:
- Designing event-driven architectures using Watermill
- Writing publishers, subscribers, handlers, and routers
- Configuring AMQP (RabbitMQ) pub/sub with proper topology
- Choosing and configuring middleware
- Debugging message delivery issues
- Migrating from raw amqp091-go to Watermill
If the user provides arguments (e.g., `/watermill how to set up retry middleware`), focus your response on that specific topic.
Always reference the detailed knowledge below and the separate reference.md file for comprehensive API details.
## Quick Reference
### Installation
```bash
go get github.com/ThreeDotsLabs/watermill
go get github.com/ThreeDotsLabs/watermill-amqp/v3
```
### Core Interfaces
```go
// Publisher
type Publisher interface {
Publish(topic string, messages ...*Message) error
Close() error
}
// Subscriber
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
Close() error
}
// Handler function (used with Router)
type HandlerFunc func(msg *Message) ([]*Message, error)
// No-publish handler (used with Router.AddConsumerHandler)
type NoPublishHandlerFunc func(msg *Message) error
```
### Message
```go
msg := message.NewMessage(watermill.NewUUID(), payload) // payload is []byte
msg.Metadata.Set("key", "value")
msg.Ack() // acknowledge successful processing
msg.Nack() // negative ack, triggers redelivery
```
### AMQP Pre-Built Configs (pick one)
```go
// Simple queue (1 consumer gets each message)
config := amqp.NewDurableQueueConfig(amqpURI)
// Pub/Sub fanout (all consumers get every message)
config := amqp.NewDurablePubSubConfig(amqpURI, amqp.GenerateQueueNameTopicNameWithSuffix("my-service"))
// Topic exchange (routing key pattern matching)
config := amqp.NewDurableTopicConfig(amqpURI, exchangeNameGenerator, queueNameGenerator)
```
### Router Setup
```go
logger := watermill.NewStdLogger(false, false)
router, err := message.NewRouter(message.RouterConfig{}, logger)
// Plugins
router.AddPlugin(plugin.SignalsHandler) // graceful SIGTERM handling
// Global middleware
router.AddMiddleware(
middleware.CorrelationID,
middleware.Retry{
MaxRetries: 3,
InitialInterval: 100 * time.Millisecond,
Logger: logger,
}.Middleware,
middleware.Recoverer,
)
// Handler that consumes from one topic and publishes to another
router.AddHandler(
"handler_name",
"input_topic",
subscriber,
"output_topic",
publisher,
func(msg *message.Message) ([]*message.Message, error) {
// process msg, return new messages to publish
return []*message.Message{outputMsg}, nil
},
)
// Consumer-only handler (no output topic)
router.AddConsumerHandler(
"consumer_name",
"input_topic",
subscriber,
func(msg *message.Message) error {
// process msg
return nil // auto-acks on nil error
},
)
// Run (blocks until shutdown)
if err := router.Run(ctx); err != nil {
panic(err)
}
```
### AMQP Custom Config
```go
config := amqp.Config{
Connection: amqp.ConnectionConfig{
AmqpURI: "amqp://guest:guest@localhost:5672/",
},
Marshaler: amqp.DefaultMarshaler{},
Exchange: amqp.ExchangeConfig{
GenerateName: func(topic string) string { return topic },
Type: "direct", // "fanout", "topic", "direct", "headers"
Durable: true,
},
Queue: amqp.QueueConfig{
GenerateName: amqp.GenerateQueueNameTopicNameWithSuffix("my-service"),
Durable: true,
},
QueueBind: amqp.QueueBindConfig{
GenerateRoutingKey: func(topic string) string { return topic },
},
Publish: amqp.PublishConfig{
GenerateRoutingKey: func(topic string) string { return topic },
},
Consume: amqp.ConsumeConfig{
Qos: amqp.QosConfig{
PrefetchCount: 10,
},
},
TopologyBuilder: &amqp.DefaultTopologyBuilder{},
}
```
## Key Concepts
### Watermill "topic" != AMQP "topic exchange"
Watermill's `topic` parameter is an abstract name used to generate:
- Exchange name (via `Exchange.GenerateName`)
- Queue name (via `Queue.GenerateName`)
- Routing key (via `Publish.GenerateRoutingKey`)
Enable debug logging to see the actual AMQP names being used.
### Delivery Semantics
- **At-least-once delivery** — handlers MUST be idempotent
- Router auto-acks on `nil` error, auto-nacks on error return
- `Close()` on publisher flushes unsent messages — always defer it
### Consumer Groups (Kafka-style)
Use queue name suffixes to create independent consumer groups:
```go
// Group A — each instance shares load
amqp.GenerateQueueNameTopicNameWithSuffix("group-a")
// Group B — gets all messages independently
amqp.GenerateQueueNameTopicNameWithSuffix("group-b")
```
## Built-in Middleware Summary
| Middleware | Purpose |
|---|---|
| `Retry` | Exponential backoff retry on handler error |
| `CircuitBreaker` | Fail fast on repeated errors (uses gobreaker) |
| `Timeout` | Cancel msg context after duration |
| `Recoverer` | Catch panics, convert to errors with stacktrace |
| `CorrelationID` | Propagate correlation ID across message chain |
| `Deduplicator` | Drop duplicate messages (in-memory by default) |
| `Throttle` | Rate limit message processing |
| `PoisonQueue` | Route unprocessable messages to a dead letter topic |
| `InstantAck` | Ack immediately before handler runs (throughput over safety) |
| `IgnoreErrors` | Whitelist specific errors as non-failures |
| `Duplicator` | Process messages twice (idempotency testing) |
| `DelayOnError` | Add delay metadata for backoff on reprocessing |
## Common Patterns
### Poison Queue (DLQ)
```go
poisonQueue, err := middleware.PoisonQueue(publisher, "dead_letter_topic")
// or with filter:
poisonQueue, err := middleware.PoisonQueueWithFilter(publisher, "dead_letter_topic",
func(err error) bool { return errors.Is(err, ErrPermanent) },
)
router.AddMiddleware(poisonQueue)
```
### Dynamic Handler Management
```go
// Add handler while router is running
router.AddHandler(...)
router.RunHandlers(ctx) // idempotent, safe to call multiple times
// Stop specific handler
handler.Stop()
<-handler.Stopped() // wait for completion
```
### Context Values in Handlers
```go
func handler(msg *message.Message) ([]*message.Message, error) {
handlerName := middleware.HandlerNameFromCtx(msg.Context())
topic := middleware.SubscribeTopicFromCtx(msg.Context())
// ...
}
```
For detailed API reference, configuration structs, and advanced patterns, see the reference.md file in this skill directory.

408
reference.md Normal file
View File

@ -0,0 +1,408 @@
# Watermill AMQP Detailed Reference
## Full AMQP Config Struct
```go
type Config struct {
Connection ConnectionConfig
Marshaler Marshaler
Exchange ExchangeConfig
Queue QueueConfig
QueueBind QueueBindConfig
Publish PublishConfig
Consume ConsumeConfig
TopologyBuilder TopologyBuilder
TLSConfig *tls.Config
}
```
### ConnectionConfig
```go
type ConnectionConfig struct {
AmqpURI string
TLSConfig *tls.Config
}
```
### ExchangeConfig
```go
type ExchangeConfig struct {
GenerateName func(topic string) string
Type string // "fanout", "topic", "direct", "headers"
Durable bool
AutoDeleted bool
Internal bool
NoWait bool
Arguments amqp.Table
}
```
### QueueConfig
```go
type QueueConfig struct {
GenerateName func(topic string) string
Durable bool
AutoDelete bool
Exclusive bool
NoWait bool
Arguments amqp.Table // x-message-ttl, x-dead-letter-exchange, etc.
}
```
### QueueBindConfig
```go
type QueueBindConfig struct {
GenerateRoutingKey func(topic string) string
NoWait bool
Arguments amqp.Table
}
```
### PublishConfig
```go
type PublishConfig struct {
GenerateRoutingKey func(topic string) string
Mandatory bool
Immediate bool
}
```
### ConsumeConfig
```go
type ConsumeConfig struct {
Qos QosConfig
Consumer string
Exclusive bool
NoLocal bool
NoWait bool
Arguments amqp.Table
}
type QosConfig struct {
PrefetchCount int
PrefetchSize int
Global bool
}
```
## Pre-Built Config Functions
### NewDurableQueueConfig
- No exchange declared
- Queue name = topic name
- Persistent delivery mode
- Best for: work queues, task distribution
### NewNonDurableQueueConfig
- Same as above but non-durable, non-persistent
- Best for: temporary/ephemeral queues
### NewDurablePubSubConfig
- Fanout exchange (all consumers get all messages)
- Requires `GenerateQueueNameFunc` for unique queue per consumer
- Persistent delivery mode
- Best for: event broadcasting, notifications
### NewNonDurablePubSubConfig
- Same as above but non-durable
- Best for: real-time updates where persistence isn't needed
### NewDurableTopicConfig / NewNonDurableTopicConfig
- Topic exchange with routing key pattern matching
- Best for: selective message routing by pattern
## Marshaler
```go
type Marshaler interface {
Marshal(msg *message.Message) (amqp091.Publishing, error)
Unmarshal(amqpMsg amqp091.Delivery) (*message.Message, error)
}
```
### DefaultMarshaler
```go
type DefaultMarshaler struct {
// Customize the amqp091.Publishing before sending
PostprocessPublishing func(amqp091.Publishing) amqp091.Publishing
// Set true for non-persistent delivery (higher throughput, no disk writes)
NotPersistentDeliveryMode bool
// Header key for storing Watermill message UUID (default: "Watermill_MessageUUID")
MessageUUIDHeaderKey string
}
```
Example: adding content type
```go
marshaler := amqp.DefaultMarshaler{
PostprocessPublishing: func(p amqp091.Publishing) amqp091.Publishing {
p.ContentType = "application/json"
p.CorrelationId = "my-correlation-id"
return p
},
}
```
## TopologyBuilder
Default: `DefaultTopologyBuilder` — declares exchanges, queues, and bindings automatically.
For custom topology (pre-existing infrastructure):
```go
type MyTopologyBuilder struct{}
func (t *MyTopologyBuilder) BuildTopology(channel *amqp091.Channel, queueName string, exchangeName string, config Config, logger watermill.LoggerAdapter) error {
// Custom declarations or no-ops for pre-declared topology
return nil
}
func (t *MyTopologyBuilder) ExchangeDeclare(channel *amqp091.Channel, exchangeName string, config Config) error {
return nil // skip exchange declaration
}
```
## Publisher Usage
```go
publisher, err := amqp.NewPublisher(config, logger)
if err != nil {
return err
}
defer publisher.Close() // IMPORTANT: flushes unsent messages
msg := message.NewMessage(watermill.NewUUID(), []byte(`{"event":"user_created"}`))
msg.Metadata.Set("correlation_id", "abc-123")
err = publisher.Publish("events.user", msg)
```
- Thread-safe
- Publish blocks until broker confirms receipt (with persistent delivery)
- Topic maps to exchange/queue/routing-key depending on config
## Subscriber Usage
```go
subscriber, err := amqp.NewSubscriber(config, logger)
if err != nil {
return err
}
defer subscriber.Close()
messages, err := subscriber.Subscribe(ctx, "events.user")
if err != nil {
return err
}
for msg := range messages {
var event UserEvent
if err := json.Unmarshal(msg.Payload, &event); err != nil {
msg.Nack() // redelivery
continue
}
// process event...
msg.Ack() // acknowledge
}
```
- Channel closes when `Close()` is called or context is canceled
- Must Ack/Nack every message
- Next message delivered only after Ack (with prefetch=1)
## Router Detailed Reference
### RouterConfig
```go
type RouterConfig struct {
CloseTimeout time.Duration // default: 30s
}
```
### Handler Registration
```go
// Full handler: consume + publish
handler := router.AddHandler(
"handler_name", // must be unique
"subscribe_topic", // consumed topic
subscriber, // Subscriber implementation
"publish_topic", // produced topic
publisher, // Publisher implementation
handlerFunc, // func(msg *Message) ([]*Message, error)
)
// Consumer-only handler
router.AddConsumerHandler(
"consumer_name",
"subscribe_topic",
subscriber,
noPublishHandlerFunc, // func(msg *Message) error
)
```
### Handler-Level Middleware
```go
handler := router.AddHandler(...)
handler.AddMiddleware(
middleware.Timeout(5 * time.Second),
)
```
### Context Helpers
```go
middleware.HandlerNameFromCtx(msg.Context()) // "handler_name"
middleware.SubscriberNameFromCtx(msg.Context()) // e.g. "amqp.Subscriber"
middleware.PublisherNameFromCtx(msg.Context())
middleware.SubscribeTopicFromCtx(msg.Context())
middleware.PublishTopicFromCtx(msg.Context())
```
## Middleware Detailed Reference
### Retry
```go
middleware.Retry{
MaxRetries: 3,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 10 * time.Second,
Multiplier: 2.0,
MaxElapsedTime: 30 * time.Second,
RandomizationFactor: 0.5,
OnRetryHook: func(retryNum int, delay time.Duration) {
log.Printf("retry %d after %v", retryNum, delay)
},
ShouldRetry: func(err error) bool {
return !errors.Is(err, ErrPermanent) // skip retry for permanent errors
},
Logger: logger,
}.Middleware
```
### Circuit Breaker
```go
import "github.com/sony/gobreaker"
cb := middleware.NewCircuitBreaker(gobreaker.Settings{
Name: "my-handler",
MaxRequests: 3, // half-open state max requests
Interval: 10 * time.Second,
Timeout: 30 * time.Second,
ReadyToTrip: func(counts gobreaker.Counts) bool {
return counts.ConsecutiveFailures > 5
},
})
router.AddMiddleware(cb.Middleware)
```
### Timeout
```go
router.AddMiddleware(middleware.Timeout(5 * time.Second))
// Handler must check: msg.Context().Done()
```
### Throttle
```go
// 10 messages per second
throttle := middleware.NewThrottle(10, time.Second)
router.AddMiddleware(throttle.Middleware)
```
### Poison Queue
```go
// All errors go to poison queue
pq, err := middleware.PoisonQueue(publisher, "failed_messages")
router.AddMiddleware(pq)
// Only specific errors
pq, err := middleware.PoisonQueueWithFilter(publisher, "failed_messages",
func(err error) bool {
return errors.Is(err, ErrUnprocessable)
},
)
```
### Deduplicator
```go
dedup, err := middleware.NewDeduplicator(
middleware.NewMessageHasherSHA256(), // or NewMessageHasherAdler32()
middleware.NewInMemoryExpiringKeyRepository(10 * time.Minute),
)
router.AddMiddleware(dedup.Middleware)
```
### Correlation ID
```go
// Set on first message entering system
middleware.SetCorrelationID(watermill.NewUUID(), msg)
// Read in any handler downstream
corrID := middleware.MessageCorrelationID(msg)
// Auto-propagate (add as router middleware)
router.AddMiddleware(middleware.CorrelationID)
```
## Common AMQP Patterns
### Work Queue (competing consumers)
```go
// Multiple workers share the same queue
config := amqp.NewDurableQueueConfig("amqp://localhost:5672/")
// All subscribers with the same config compete for messages
```
### Fanout (pub/sub broadcast)
```go
config := amqp.NewDurablePubSubConfig(
"amqp://localhost:5672/",
amqp.GenerateQueueNameTopicNameWithSuffix("service-name"),
)
// Each service gets its own queue bound to the fanout exchange
```
### Topic Routing
```go
config := amqp.NewDurableTopicConfig(
"amqp://localhost:5672/",
func(topic string) string { return "events" }, // exchange
func(topic string) string { return "service." + topic }, // queue
)
// Use routing key patterns like "user.*", "order.#"
```
### Dead Letter Queue
```go
config := amqp.Config{
// ...
Queue: amqp.QueueConfig{
GenerateName: func(topic string) string { return topic },
Durable: true,
Arguments: amqp.Table{
"x-dead-letter-exchange": "dlx",
"x-dead-letter-routing-key": "dead_letter",
"x-message-ttl": int32(300000), // 5 minutes
},
},
}
```
### Delayed/Scheduled Messages
Use the `DelayOnError` middleware or RabbitMQ delayed message exchange plugin.
## Logging
```go
// Development
logger := watermill.NewStdLogger(debug, trace)
// Production (slog)
logger := watermill.NewSlogLogger(slog.Default())
```
## Advanced: Outbox Pattern
Watermill supports the transactional outbox pattern via the Forwarder component — publish messages to a database first, then forward to the broker. This guarantees no message loss even if the broker is down.
## Advanced: FanIn / FanOut
- **FanIn**: Merge multiple topics into one handler
- **FanOut**: Duplicate messages to multiple subscribers from one topic