commit 67f862a3634dde591c960d55fc04e37e8862aec8 Author: naudachu Date: Fri Mar 20 14:57:31 2026 +0500 :fire: diff --git a/SKILL.md b/SKILL.md new file mode 100644 index 0000000..0b9e9b0 --- /dev/null +++ b/SKILL.md @@ -0,0 +1,225 @@ +--- +name: watermill +description: Expert in Watermill Go library for event-driven architecture. Use when designing pub/sub systems, implementing AMQP/RabbitMQ messaging, creating message handlers, configuring middleware, or troubleshooting message processing. Covers Router, Publisher, Subscriber, AMQP config, middleware, and common patterns. +user-invocable: true +--- + +You are a Watermill pub/sub expert for Go. You have deep knowledge of the Watermill library, especially its AMQP (RabbitMQ) implementation. + +## Your Role + +When the user invokes `/watermill`, help them with: +- Designing event-driven architectures using Watermill +- Writing publishers, subscribers, handlers, and routers +- Configuring AMQP (RabbitMQ) pub/sub with proper topology +- Choosing and configuring middleware +- Debugging message delivery issues +- Migrating from raw amqp091-go to Watermill + +If the user provides arguments (e.g., `/watermill how to set up retry middleware`), focus your response on that specific topic. + +Always reference the detailed knowledge below and the separate reference.md file for comprehensive API details. + +## Quick Reference + +### Installation +```bash +go get github.com/ThreeDotsLabs/watermill +go get github.com/ThreeDotsLabs/watermill-amqp/v3 +``` + +### Core Interfaces +```go +// Publisher +type Publisher interface { + Publish(topic string, messages ...*Message) error + Close() error +} + +// Subscriber +type Subscriber interface { + Subscribe(ctx context.Context, topic string) (<-chan *Message, error) + Close() error +} + +// Handler function (used with Router) +type HandlerFunc func(msg *Message) ([]*Message, error) + +// No-publish handler (used with Router.AddConsumerHandler) +type NoPublishHandlerFunc func(msg *Message) error +``` + +### Message +```go +msg := message.NewMessage(watermill.NewUUID(), payload) // payload is []byte +msg.Metadata.Set("key", "value") +msg.Ack() // acknowledge successful processing +msg.Nack() // negative ack, triggers redelivery +``` + +### AMQP Pre-Built Configs (pick one) +```go +// Simple queue (1 consumer gets each message) +config := amqp.NewDurableQueueConfig(amqpURI) + +// Pub/Sub fanout (all consumers get every message) +config := amqp.NewDurablePubSubConfig(amqpURI, amqp.GenerateQueueNameTopicNameWithSuffix("my-service")) + +// Topic exchange (routing key pattern matching) +config := amqp.NewDurableTopicConfig(amqpURI, exchangeNameGenerator, queueNameGenerator) +``` + +### Router Setup +```go +logger := watermill.NewStdLogger(false, false) +router, err := message.NewRouter(message.RouterConfig{}, logger) + +// Plugins +router.AddPlugin(plugin.SignalsHandler) // graceful SIGTERM handling + +// Global middleware +router.AddMiddleware( + middleware.CorrelationID, + middleware.Retry{ + MaxRetries: 3, + InitialInterval: 100 * time.Millisecond, + Logger: logger, + }.Middleware, + middleware.Recoverer, +) + +// Handler that consumes from one topic and publishes to another +router.AddHandler( + "handler_name", + "input_topic", + subscriber, + "output_topic", + publisher, + func(msg *message.Message) ([]*message.Message, error) { + // process msg, return new messages to publish + return []*message.Message{outputMsg}, nil + }, +) + +// Consumer-only handler (no output topic) +router.AddConsumerHandler( + "consumer_name", + "input_topic", + subscriber, + func(msg *message.Message) error { + // process msg + return nil // auto-acks on nil error + }, +) + +// Run (blocks until shutdown) +if err := router.Run(ctx); err != nil { + panic(err) +} +``` + +### AMQP Custom Config +```go +config := amqp.Config{ + Connection: amqp.ConnectionConfig{ + AmqpURI: "amqp://guest:guest@localhost:5672/", + }, + Marshaler: amqp.DefaultMarshaler{}, + Exchange: amqp.ExchangeConfig{ + GenerateName: func(topic string) string { return topic }, + Type: "direct", // "fanout", "topic", "direct", "headers" + Durable: true, + }, + Queue: amqp.QueueConfig{ + GenerateName: amqp.GenerateQueueNameTopicNameWithSuffix("my-service"), + Durable: true, + }, + QueueBind: amqp.QueueBindConfig{ + GenerateRoutingKey: func(topic string) string { return topic }, + }, + Publish: amqp.PublishConfig{ + GenerateRoutingKey: func(topic string) string { return topic }, + }, + Consume: amqp.ConsumeConfig{ + Qos: amqp.QosConfig{ + PrefetchCount: 10, + }, + }, + TopologyBuilder: &amqp.DefaultTopologyBuilder{}, +} +``` + +## Key Concepts + +### Watermill "topic" != AMQP "topic exchange" +Watermill's `topic` parameter is an abstract name used to generate: +- Exchange name (via `Exchange.GenerateName`) +- Queue name (via `Queue.GenerateName`) +- Routing key (via `Publish.GenerateRoutingKey`) + +Enable debug logging to see the actual AMQP names being used. + +### Delivery Semantics +- **At-least-once delivery** — handlers MUST be idempotent +- Router auto-acks on `nil` error, auto-nacks on error return +- `Close()` on publisher flushes unsent messages — always defer it + +### Consumer Groups (Kafka-style) +Use queue name suffixes to create independent consumer groups: +```go +// Group A — each instance shares load +amqp.GenerateQueueNameTopicNameWithSuffix("group-a") +// Group B — gets all messages independently +amqp.GenerateQueueNameTopicNameWithSuffix("group-b") +``` + +## Built-in Middleware Summary + +| Middleware | Purpose | +|---|---| +| `Retry` | Exponential backoff retry on handler error | +| `CircuitBreaker` | Fail fast on repeated errors (uses gobreaker) | +| `Timeout` | Cancel msg context after duration | +| `Recoverer` | Catch panics, convert to errors with stacktrace | +| `CorrelationID` | Propagate correlation ID across message chain | +| `Deduplicator` | Drop duplicate messages (in-memory by default) | +| `Throttle` | Rate limit message processing | +| `PoisonQueue` | Route unprocessable messages to a dead letter topic | +| `InstantAck` | Ack immediately before handler runs (throughput over safety) | +| `IgnoreErrors` | Whitelist specific errors as non-failures | +| `Duplicator` | Process messages twice (idempotency testing) | +| `DelayOnError` | Add delay metadata for backoff on reprocessing | + +## Common Patterns + +### Poison Queue (DLQ) +```go +poisonQueue, err := middleware.PoisonQueue(publisher, "dead_letter_topic") +// or with filter: +poisonQueue, err := middleware.PoisonQueueWithFilter(publisher, "dead_letter_topic", + func(err error) bool { return errors.Is(err, ErrPermanent) }, +) +router.AddMiddleware(poisonQueue) +``` + +### Dynamic Handler Management +```go +// Add handler while router is running +router.AddHandler(...) +router.RunHandlers(ctx) // idempotent, safe to call multiple times + +// Stop specific handler +handler.Stop() +<-handler.Stopped() // wait for completion +``` + +### Context Values in Handlers +```go +func handler(msg *message.Message) ([]*message.Message, error) { + handlerName := middleware.HandlerNameFromCtx(msg.Context()) + topic := middleware.SubscribeTopicFromCtx(msg.Context()) + // ... +} +``` + +For detailed API reference, configuration structs, and advanced patterns, see the reference.md file in this skill directory. diff --git a/reference.md b/reference.md new file mode 100644 index 0000000..429dae8 --- /dev/null +++ b/reference.md @@ -0,0 +1,408 @@ +# Watermill AMQP Detailed Reference + +## Full AMQP Config Struct + +```go +type Config struct { + Connection ConnectionConfig + Marshaler Marshaler + Exchange ExchangeConfig + Queue QueueConfig + QueueBind QueueBindConfig + Publish PublishConfig + Consume ConsumeConfig + TopologyBuilder TopologyBuilder + TLSConfig *tls.Config +} +``` + +### ConnectionConfig +```go +type ConnectionConfig struct { + AmqpURI string + TLSConfig *tls.Config +} +``` + +### ExchangeConfig +```go +type ExchangeConfig struct { + GenerateName func(topic string) string + Type string // "fanout", "topic", "direct", "headers" + Durable bool + AutoDeleted bool + Internal bool + NoWait bool + Arguments amqp.Table +} +``` + +### QueueConfig +```go +type QueueConfig struct { + GenerateName func(topic string) string + Durable bool + AutoDelete bool + Exclusive bool + NoWait bool + Arguments amqp.Table // x-message-ttl, x-dead-letter-exchange, etc. +} +``` + +### QueueBindConfig +```go +type QueueBindConfig struct { + GenerateRoutingKey func(topic string) string + NoWait bool + Arguments amqp.Table +} +``` + +### PublishConfig +```go +type PublishConfig struct { + GenerateRoutingKey func(topic string) string + Mandatory bool + Immediate bool +} +``` + +### ConsumeConfig +```go +type ConsumeConfig struct { + Qos QosConfig + Consumer string + Exclusive bool + NoLocal bool + NoWait bool + Arguments amqp.Table +} + +type QosConfig struct { + PrefetchCount int + PrefetchSize int + Global bool +} +``` + +## Pre-Built Config Functions + +### NewDurableQueueConfig +- No exchange declared +- Queue name = topic name +- Persistent delivery mode +- Best for: work queues, task distribution + +### NewNonDurableQueueConfig +- Same as above but non-durable, non-persistent +- Best for: temporary/ephemeral queues + +### NewDurablePubSubConfig +- Fanout exchange (all consumers get all messages) +- Requires `GenerateQueueNameFunc` for unique queue per consumer +- Persistent delivery mode +- Best for: event broadcasting, notifications + +### NewNonDurablePubSubConfig +- Same as above but non-durable +- Best for: real-time updates where persistence isn't needed + +### NewDurableTopicConfig / NewNonDurableTopicConfig +- Topic exchange with routing key pattern matching +- Best for: selective message routing by pattern + +## Marshaler + +```go +type Marshaler interface { + Marshal(msg *message.Message) (amqp091.Publishing, error) + Unmarshal(amqpMsg amqp091.Delivery) (*message.Message, error) +} +``` + +### DefaultMarshaler +```go +type DefaultMarshaler struct { + // Customize the amqp091.Publishing before sending + PostprocessPublishing func(amqp091.Publishing) amqp091.Publishing + + // Set true for non-persistent delivery (higher throughput, no disk writes) + NotPersistentDeliveryMode bool + + // Header key for storing Watermill message UUID (default: "Watermill_MessageUUID") + MessageUUIDHeaderKey string +} +``` + +Example: adding content type +```go +marshaler := amqp.DefaultMarshaler{ + PostprocessPublishing: func(p amqp091.Publishing) amqp091.Publishing { + p.ContentType = "application/json" + p.CorrelationId = "my-correlation-id" + return p + }, +} +``` + +## TopologyBuilder + +Default: `DefaultTopologyBuilder` — declares exchanges, queues, and bindings automatically. + +For custom topology (pre-existing infrastructure): +```go +type MyTopologyBuilder struct{} + +func (t *MyTopologyBuilder) BuildTopology(channel *amqp091.Channel, queueName string, exchangeName string, config Config, logger watermill.LoggerAdapter) error { + // Custom declarations or no-ops for pre-declared topology + return nil +} + +func (t *MyTopologyBuilder) ExchangeDeclare(channel *amqp091.Channel, exchangeName string, config Config) error { + return nil // skip exchange declaration +} +``` + +## Publisher Usage + +```go +publisher, err := amqp.NewPublisher(config, logger) +if err != nil { + return err +} +defer publisher.Close() // IMPORTANT: flushes unsent messages + +msg := message.NewMessage(watermill.NewUUID(), []byte(`{"event":"user_created"}`)) +msg.Metadata.Set("correlation_id", "abc-123") + +err = publisher.Publish("events.user", msg) +``` + +- Thread-safe +- Publish blocks until broker confirms receipt (with persistent delivery) +- Topic maps to exchange/queue/routing-key depending on config + +## Subscriber Usage + +```go +subscriber, err := amqp.NewSubscriber(config, logger) +if err != nil { + return err +} +defer subscriber.Close() + +messages, err := subscriber.Subscribe(ctx, "events.user") +if err != nil { + return err +} + +for msg := range messages { + var event UserEvent + if err := json.Unmarshal(msg.Payload, &event); err != nil { + msg.Nack() // redelivery + continue + } + // process event... + msg.Ack() // acknowledge +} +``` + +- Channel closes when `Close()` is called or context is canceled +- Must Ack/Nack every message +- Next message delivered only after Ack (with prefetch=1) + +## Router Detailed Reference + +### RouterConfig +```go +type RouterConfig struct { + CloseTimeout time.Duration // default: 30s +} +``` + +### Handler Registration +```go +// Full handler: consume + publish +handler := router.AddHandler( + "handler_name", // must be unique + "subscribe_topic", // consumed topic + subscriber, // Subscriber implementation + "publish_topic", // produced topic + publisher, // Publisher implementation + handlerFunc, // func(msg *Message) ([]*Message, error) +) + +// Consumer-only handler +router.AddConsumerHandler( + "consumer_name", + "subscribe_topic", + subscriber, + noPublishHandlerFunc, // func(msg *Message) error +) +``` + +### Handler-Level Middleware +```go +handler := router.AddHandler(...) +handler.AddMiddleware( + middleware.Timeout(5 * time.Second), +) +``` + +### Context Helpers +```go +middleware.HandlerNameFromCtx(msg.Context()) // "handler_name" +middleware.SubscriberNameFromCtx(msg.Context()) // e.g. "amqp.Subscriber" +middleware.PublisherNameFromCtx(msg.Context()) +middleware.SubscribeTopicFromCtx(msg.Context()) +middleware.PublishTopicFromCtx(msg.Context()) +``` + +## Middleware Detailed Reference + +### Retry +```go +middleware.Retry{ + MaxRetries: 3, + InitialInterval: 100 * time.Millisecond, + MaxInterval: 10 * time.Second, + Multiplier: 2.0, + MaxElapsedTime: 30 * time.Second, + RandomizationFactor: 0.5, + OnRetryHook: func(retryNum int, delay time.Duration) { + log.Printf("retry %d after %v", retryNum, delay) + }, + ShouldRetry: func(err error) bool { + return !errors.Is(err, ErrPermanent) // skip retry for permanent errors + }, + Logger: logger, +}.Middleware +``` + +### Circuit Breaker +```go +import "github.com/sony/gobreaker" + +cb := middleware.NewCircuitBreaker(gobreaker.Settings{ + Name: "my-handler", + MaxRequests: 3, // half-open state max requests + Interval: 10 * time.Second, + Timeout: 30 * time.Second, + ReadyToTrip: func(counts gobreaker.Counts) bool { + return counts.ConsecutiveFailures > 5 + }, +}) +router.AddMiddleware(cb.Middleware) +``` + +### Timeout +```go +router.AddMiddleware(middleware.Timeout(5 * time.Second)) +// Handler must check: msg.Context().Done() +``` + +### Throttle +```go +// 10 messages per second +throttle := middleware.NewThrottle(10, time.Second) +router.AddMiddleware(throttle.Middleware) +``` + +### Poison Queue +```go +// All errors go to poison queue +pq, err := middleware.PoisonQueue(publisher, "failed_messages") +router.AddMiddleware(pq) + +// Only specific errors +pq, err := middleware.PoisonQueueWithFilter(publisher, "failed_messages", + func(err error) bool { + return errors.Is(err, ErrUnprocessable) + }, +) +``` + +### Deduplicator +```go +dedup, err := middleware.NewDeduplicator( + middleware.NewMessageHasherSHA256(), // or NewMessageHasherAdler32() + middleware.NewInMemoryExpiringKeyRepository(10 * time.Minute), +) +router.AddMiddleware(dedup.Middleware) +``` + +### Correlation ID +```go +// Set on first message entering system +middleware.SetCorrelationID(watermill.NewUUID(), msg) + +// Read in any handler downstream +corrID := middleware.MessageCorrelationID(msg) + +// Auto-propagate (add as router middleware) +router.AddMiddleware(middleware.CorrelationID) +``` + +## Common AMQP Patterns + +### Work Queue (competing consumers) +```go +// Multiple workers share the same queue +config := amqp.NewDurableQueueConfig("amqp://localhost:5672/") +// All subscribers with the same config compete for messages +``` + +### Fanout (pub/sub broadcast) +```go +config := amqp.NewDurablePubSubConfig( + "amqp://localhost:5672/", + amqp.GenerateQueueNameTopicNameWithSuffix("service-name"), +) +// Each service gets its own queue bound to the fanout exchange +``` + +### Topic Routing +```go +config := amqp.NewDurableTopicConfig( + "amqp://localhost:5672/", + func(topic string) string { return "events" }, // exchange + func(topic string) string { return "service." + topic }, // queue +) +// Use routing key patterns like "user.*", "order.#" +``` + +### Dead Letter Queue +```go +config := amqp.Config{ + // ... + Queue: amqp.QueueConfig{ + GenerateName: func(topic string) string { return topic }, + Durable: true, + Arguments: amqp.Table{ + "x-dead-letter-exchange": "dlx", + "x-dead-letter-routing-key": "dead_letter", + "x-message-ttl": int32(300000), // 5 minutes + }, + }, +} +``` + +### Delayed/Scheduled Messages +Use the `DelayOnError` middleware or RabbitMQ delayed message exchange plugin. + +## Logging + +```go +// Development +logger := watermill.NewStdLogger(debug, trace) + +// Production (slog) +logger := watermill.NewSlogLogger(slog.Default()) +``` + +## Advanced: Outbox Pattern +Watermill supports the transactional outbox pattern via the Forwarder component — publish messages to a database first, then forward to the broker. This guarantees no message loss even if the broker is down. + +## Advanced: FanIn / FanOut +- **FanIn**: Merge multiple topics into one handler +- **FanOut**: Duplicate messages to multiple subscribers from one topic