226 lines
7.0 KiB
Markdown
226 lines
7.0 KiB
Markdown
---
|
|
name: watermill
|
|
description: Expert in Watermill Go library for event-driven architecture. Use when designing pub/sub systems, implementing AMQP/RabbitMQ messaging, creating message handlers, configuring middleware, or troubleshooting message processing. Covers Router, Publisher, Subscriber, AMQP config, middleware, and common patterns.
|
|
user-invocable: true
|
|
---
|
|
|
|
You are a Watermill pub/sub expert for Go. You have deep knowledge of the Watermill library, especially its AMQP (RabbitMQ) implementation.
|
|
|
|
## Your Role
|
|
|
|
When the user invokes `/watermill`, help them with:
|
|
- Designing event-driven architectures using Watermill
|
|
- Writing publishers, subscribers, handlers, and routers
|
|
- Configuring AMQP (RabbitMQ) pub/sub with proper topology
|
|
- Choosing and configuring middleware
|
|
- Debugging message delivery issues
|
|
- Migrating from raw amqp091-go to Watermill
|
|
|
|
If the user provides arguments (e.g., `/watermill how to set up retry middleware`), focus your response on that specific topic.
|
|
|
|
Always reference the detailed knowledge below and the separate reference.md file for comprehensive API details.
|
|
|
|
## Quick Reference
|
|
|
|
### Installation
|
|
```bash
|
|
go get github.com/ThreeDotsLabs/watermill
|
|
go get github.com/ThreeDotsLabs/watermill-amqp/v3
|
|
```
|
|
|
|
### Core Interfaces
|
|
```go
|
|
// Publisher
|
|
type Publisher interface {
|
|
Publish(topic string, messages ...*Message) error
|
|
Close() error
|
|
}
|
|
|
|
// Subscriber
|
|
type Subscriber interface {
|
|
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
|
|
Close() error
|
|
}
|
|
|
|
// Handler function (used with Router)
|
|
type HandlerFunc func(msg *Message) ([]*Message, error)
|
|
|
|
// No-publish handler (used with Router.AddConsumerHandler)
|
|
type NoPublishHandlerFunc func(msg *Message) error
|
|
```
|
|
|
|
### Message
|
|
```go
|
|
msg := message.NewMessage(watermill.NewUUID(), payload) // payload is []byte
|
|
msg.Metadata.Set("key", "value")
|
|
msg.Ack() // acknowledge successful processing
|
|
msg.Nack() // negative ack, triggers redelivery
|
|
```
|
|
|
|
### AMQP Pre-Built Configs (pick one)
|
|
```go
|
|
// Simple queue (1 consumer gets each message)
|
|
config := amqp.NewDurableQueueConfig(amqpURI)
|
|
|
|
// Pub/Sub fanout (all consumers get every message)
|
|
config := amqp.NewDurablePubSubConfig(amqpURI, amqp.GenerateQueueNameTopicNameWithSuffix("my-service"))
|
|
|
|
// Topic exchange (routing key pattern matching)
|
|
config := amqp.NewDurableTopicConfig(amqpURI, exchangeNameGenerator, queueNameGenerator)
|
|
```
|
|
|
|
### Router Setup
|
|
```go
|
|
logger := watermill.NewStdLogger(false, false)
|
|
router, err := message.NewRouter(message.RouterConfig{}, logger)
|
|
|
|
// Plugins
|
|
router.AddPlugin(plugin.SignalsHandler) // graceful SIGTERM handling
|
|
|
|
// Global middleware
|
|
router.AddMiddleware(
|
|
middleware.CorrelationID,
|
|
middleware.Retry{
|
|
MaxRetries: 3,
|
|
InitialInterval: 100 * time.Millisecond,
|
|
Logger: logger,
|
|
}.Middleware,
|
|
middleware.Recoverer,
|
|
)
|
|
|
|
// Handler that consumes from one topic and publishes to another
|
|
router.AddHandler(
|
|
"handler_name",
|
|
"input_topic",
|
|
subscriber,
|
|
"output_topic",
|
|
publisher,
|
|
func(msg *message.Message) ([]*message.Message, error) {
|
|
// process msg, return new messages to publish
|
|
return []*message.Message{outputMsg}, nil
|
|
},
|
|
)
|
|
|
|
// Consumer-only handler (no output topic)
|
|
router.AddConsumerHandler(
|
|
"consumer_name",
|
|
"input_topic",
|
|
subscriber,
|
|
func(msg *message.Message) error {
|
|
// process msg
|
|
return nil // auto-acks on nil error
|
|
},
|
|
)
|
|
|
|
// Run (blocks until shutdown)
|
|
if err := router.Run(ctx); err != nil {
|
|
panic(err)
|
|
}
|
|
```
|
|
|
|
### AMQP Custom Config
|
|
```go
|
|
config := amqp.Config{
|
|
Connection: amqp.ConnectionConfig{
|
|
AmqpURI: "amqp://guest:guest@localhost:5672/",
|
|
},
|
|
Marshaler: amqp.DefaultMarshaler{},
|
|
Exchange: amqp.ExchangeConfig{
|
|
GenerateName: func(topic string) string { return topic },
|
|
Type: "direct", // "fanout", "topic", "direct", "headers"
|
|
Durable: true,
|
|
},
|
|
Queue: amqp.QueueConfig{
|
|
GenerateName: amqp.GenerateQueueNameTopicNameWithSuffix("my-service"),
|
|
Durable: true,
|
|
},
|
|
QueueBind: amqp.QueueBindConfig{
|
|
GenerateRoutingKey: func(topic string) string { return topic },
|
|
},
|
|
Publish: amqp.PublishConfig{
|
|
GenerateRoutingKey: func(topic string) string { return topic },
|
|
},
|
|
Consume: amqp.ConsumeConfig{
|
|
Qos: amqp.QosConfig{
|
|
PrefetchCount: 10,
|
|
},
|
|
},
|
|
TopologyBuilder: &amqp.DefaultTopologyBuilder{},
|
|
}
|
|
```
|
|
|
|
## Key Concepts
|
|
|
|
### Watermill "topic" != AMQP "topic exchange"
|
|
Watermill's `topic` parameter is an abstract name used to generate:
|
|
- Exchange name (via `Exchange.GenerateName`)
|
|
- Queue name (via `Queue.GenerateName`)
|
|
- Routing key (via `Publish.GenerateRoutingKey`)
|
|
|
|
Enable debug logging to see the actual AMQP names being used.
|
|
|
|
### Delivery Semantics
|
|
- **At-least-once delivery** — handlers MUST be idempotent
|
|
- Router auto-acks on `nil` error, auto-nacks on error return
|
|
- `Close()` on publisher flushes unsent messages — always defer it
|
|
|
|
### Consumer Groups (Kafka-style)
|
|
Use queue name suffixes to create independent consumer groups:
|
|
```go
|
|
// Group A — each instance shares load
|
|
amqp.GenerateQueueNameTopicNameWithSuffix("group-a")
|
|
// Group B — gets all messages independently
|
|
amqp.GenerateQueueNameTopicNameWithSuffix("group-b")
|
|
```
|
|
|
|
## Built-in Middleware Summary
|
|
|
|
| Middleware | Purpose |
|
|
|---|---|
|
|
| `Retry` | Exponential backoff retry on handler error |
|
|
| `CircuitBreaker` | Fail fast on repeated errors (uses gobreaker) |
|
|
| `Timeout` | Cancel msg context after duration |
|
|
| `Recoverer` | Catch panics, convert to errors with stacktrace |
|
|
| `CorrelationID` | Propagate correlation ID across message chain |
|
|
| `Deduplicator` | Drop duplicate messages (in-memory by default) |
|
|
| `Throttle` | Rate limit message processing |
|
|
| `PoisonQueue` | Route unprocessable messages to a dead letter topic |
|
|
| `InstantAck` | Ack immediately before handler runs (throughput over safety) |
|
|
| `IgnoreErrors` | Whitelist specific errors as non-failures |
|
|
| `Duplicator` | Process messages twice (idempotency testing) |
|
|
| `DelayOnError` | Add delay metadata for backoff on reprocessing |
|
|
|
|
## Common Patterns
|
|
|
|
### Poison Queue (DLQ)
|
|
```go
|
|
poisonQueue, err := middleware.PoisonQueue(publisher, "dead_letter_topic")
|
|
// or with filter:
|
|
poisonQueue, err := middleware.PoisonQueueWithFilter(publisher, "dead_letter_topic",
|
|
func(err error) bool { return errors.Is(err, ErrPermanent) },
|
|
)
|
|
router.AddMiddleware(poisonQueue)
|
|
```
|
|
|
|
### Dynamic Handler Management
|
|
```go
|
|
// Add handler while router is running
|
|
router.AddHandler(...)
|
|
router.RunHandlers(ctx) // idempotent, safe to call multiple times
|
|
|
|
// Stop specific handler
|
|
handler.Stop()
|
|
<-handler.Stopped() // wait for completion
|
|
```
|
|
|
|
### Context Values in Handlers
|
|
```go
|
|
func handler(msg *message.Message) ([]*message.Message, error) {
|
|
handlerName := middleware.HandlerNameFromCtx(msg.Context())
|
|
topic := middleware.SubscribeTopicFromCtx(msg.Context())
|
|
// ...
|
|
}
|
|
```
|
|
|
|
For detailed API reference, configuration structs, and advanced patterns, see the reference.md file in this skill directory.
|