14 KiB
Watermill Rules (WM-01..10)
WM-01: Watermill as a Named Component in Unified Server (CRITICAL)
Watermill router MUST be registered as a named component via server.WithWatermillRouter(name, configure) — same pattern as WithHTTPHandler and WithGRPCServer. The With* option owns AMQP connection, middleware, and router lifecycle. The caller provides only handler registration via callback.
This ensures:
- Middleware stack (retry, correlation, recovery) is consistent across all services
- Broker config is centralized — swapping AMQP for Kafka changes one file
- Shutdown ordering is explicit via
server.OnShutdown(server.Stop(name))
Check procedure:
- Scan
main.gofor direct Watermill router creation (message.NewRouter,amqp.NewSubscriber) - Flag any middleware setup outside
server/watermill.go - Verify Watermill component appears in
OnShutdownwith correct ordering
Correct:
// internal/common/server/watermill.go
func WithWatermillRouter(
name string,
configure func(*message.Router, message.Subscriber),
) Option {
return func(s *Server) {
wmLogger := watermill.NewStdLoggerWithOut(os.Stdout, true, false)
amqpURI := os.Getenv("AMQP_URI")
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
sub, err := amqp.NewSubscriber(amqpConfig, wmLogger)
if err != nil { panic(err) }
r, err := message.NewRouter(message.RouterConfig{}, wmLogger)
if err != nil { panic(err) }
r.AddMiddleware(
wmMiddleware.CorrelationID,
wmMiddleware.Recoverer,
wmMiddleware.Retry{MaxRetries: 3}.Middleware,
)
configure(r, sub)
s.addComponent(name, component{
name: name,
start: func(ctx context.Context) error {
return r.Run(ctx)
},
stop: func(ctx context.Context) error {
return r.Close()
},
})
}
}
// main.go — registered as named component
server.New(
server.WithWatermillRouter("events", func(r *message.Router, sub message.Subscriber) {
ports.RegisterEventHandlers(r, sub, application)
}),
server.WithHTTPHandler("api", createHandler),
server.OnShutdown(
server.Stop("events"), // 1. stop consuming
server.Stop("api"), // 2. drain HTTP
server.StopFunc(cleanup), // 3. close clients
),
).Run(ctx)
Wrong:
// main.go — VIOLATION: infrastructure in main
func main() {
sub, _ := amqp.NewSubscriber(amqpConfig, logger) // VIOLATION
r, _ := message.NewRouter(message.RouterConfig{}, logger) // VIOLATION
r.AddMiddleware(wmMiddleware.Recoverer) // VIOLATION
r.Run(context.Background())
}
// main.go — VIOLATION: standalone RunWatermillRouter without unified server
server.RunWatermillRouter(func(r *message.Router, sub message.Subscriber) { ... })
// Cannot coordinate shutdown with other transports
WM-02: Publisher Factory Returns (Publisher, Close, Error) Triple (CRITICAL)
Publisher creation MUST follow the same (client, closeFunc, error) triple-return pattern as client.NewTrainerClient() and client.NewUsersClient(). Config comes from environment variables.
Check procedure:
- Verify publisher factory in
internal/common/client/watermill.go - Must return
(message.Publisher, func() error, error) - Must read
AMQP_URIfrom env - Error case must return a no-op close function, never nil
Correct:
// internal/common/client/watermill.go
func NewWatermillPublisher() (pub message.Publisher, close func() error, err error) {
amqpURI := os.Getenv("AMQP_URI")
if amqpURI == "" {
return nil, func() error { return nil }, errors.New("empty env AMQP_URI")
}
logger := watermill.NewStdLoggerWithOut(os.Stdout, true, false)
config := amqp.NewDurableQueueConfig(amqpURI)
publisher, err := amqp.NewPublisher(config, logger)
if err != nil {
return nil, func() error { return nil }, errors.Wrap(err, "cannot create watermill publisher")
}
return publisher, publisher.Close, nil
}
Wrong:
// VIOLATION: returns raw connection, no close function
func NewPublisher() *amqp.Publisher {
pub, _ := amqp.NewPublisher(config, logger)
return pub
}
// VIOLATION: nil close function on error path
func NewPublisher() (message.Publisher, func() error, error) {
// ...
return nil, nil, err // nil close panics on defer
}
WM-03: Event Handlers Live in Ports (CRITICAL)
Watermill event handlers are inbound adapters — they are ports, just like HTTP and gRPC handlers. They MUST:
- Live in
ports/ - Hold
app.Application - Delegate to command/query handlers
- Contain NO business logic
Check procedure:
- Scan for
message.HandlerFuncorfunc(*message.Message) errorsignatures - These MUST be in
ports/package - Must import
app/,app/command/, orapp/query/— notdomain/directly - Must follow the same delegation pattern as HTTP/gRPC handlers
Correct:
// ports/event.go
type EventHandlers struct {
app app.Application
}
func RegisterEventHandlers(r *message.Router, sub message.Subscriber, application app.Application) {
handlers := EventHandlers{app: application}
r.AddNoPublisherHandler(
"OnTrainingScheduled",
"training.scheduled",
sub,
handlers.OnTrainingScheduled,
)
}
func (h EventHandlers) OnTrainingScheduled(msg *message.Message) error {
var event TrainingScheduledEvent
if err := json.Unmarshal(msg.Payload, &event); err != nil {
return err
}
return h.app.Commands.ScheduleTraining.Handle(
msg.Context(),
command.ScheduleTraining{Hour: event.Hour},
)
}
Wrong:
// adapters/event_handler.go — VIOLATION: handler in adapters/
func HandleTrainingScheduled(msg *message.Message) error {
repo.Save(ctx, training) // VIOLATION: direct repo access
}
// app/command/schedule_training.go — VIOLATION: message parsing in app layer
func (h handler) Handle(ctx context.Context, msg *message.Message) error { ... }
WM-04: Event Publisher Adapter Implements Domain Interface (WARNING)
Publishing events MUST go through an adapter that implements an interface defined in the app or domain layer. The app layer defines what events to publish; the adapter knows how.
This keeps Watermill as a swappable infrastructure detail.
Check procedure:
- Look for
message.Publisherusage — it MUST NOT appear inapp/ordomain/ - An interface like
EventPublishershould be inapp/command/services.goor similar - The concrete adapter in
adapters/implements it using Watermill
Correct:
// app/command/services.go
type TrainingEventPublisher interface {
TrainingScheduled(ctx context.Context, t training.Training) error
TrainingCancelled(ctx context.Context, trainingUUID string) error
}
// adapters/training_event_publisher.go
type WatermillTrainingEventPublisher struct {
pub message.Publisher
}
func NewWatermillTrainingEventPublisher(pub message.Publisher) WatermillTrainingEventPublisher {
return WatermillTrainingEventPublisher{pub: pub}
}
func (p WatermillTrainingEventPublisher) TrainingScheduled(ctx context.Context, t training.Training) error {
payload, err := json.Marshal(TrainingScheduledEvent{UUID: t.UUID(), Hour: t.Time()})
if err != nil { return err }
msg := message.NewMessage(watermill.NewUUID(), payload)
middleware.SetCorrelationID(middleware.MessageCorrelationID(msg), msg)
return p.pub.Publish("training.scheduled", msg)
}
Wrong:
// app/command/schedule_training.go — VIOLATION: Watermill in app layer
import "github.com/ThreeDotsLabs/watermill/message"
func (h handler) Handle(ctx context.Context, cmd ScheduleTraining) error {
msg := message.NewMessage(watermill.NewUUID(), payload) // VIOLATION
h.publisher.Publish("topic", msg) // VIOLATION: infra detail
}
WM-05: Topic Naming Uses Domain Language (WARNING)
Topic/queue names MUST use domain language with dot notation: {aggregate}.{past-tense-event}. No CRUD names, no technical prefixes.
Correct:
training.scheduled
training.cancelled
training.reschedule_requested
hour.made_available
Wrong:
create-training // VIOLATION: CRUD name
events.training.created // VIOLATION: redundant "events" prefix, CRUD
TRAINING_QUEUE // VIOLATION: technical name, not domain event
WM-06: Event Structs Live in the Publishing Port or Adapter (INFO)
Event DTOs (the JSON payloads) are protocol-specific — they belong in ports/ or adapters/, NOT in domain/. Domain entities are the canonical model; events are a serialization concern.
Check procedure:
- Look for event structs (e.g.,
TrainingScheduledEvent) - They MUST be in
ports/(if consumed by event handlers) oradapters/(if produced by publisher adapters) - They MUST NOT be in
domain/
Correct:
// ports/event.go or adapters/training_event_publisher.go
type TrainingScheduledEvent struct {
UUID string `json:"uuid"`
Hour time.Time `json:"hour"`
}
WM-07: Watermill Middleware in With* Option Only (WARNING)
Watermill middleware (retry, correlation ID, recoverer, throttle, etc.) MUST be configured exclusively inside the WithWatermillRouter option in internal/common/server/watermill.go — same principle as ARCH-06 for HTTP/gRPC middleware.
Check procedure:
- Scan for
r.AddMiddlewareorrouter.AddMiddlewarecalls - All MUST be in
internal/common/server/watermill.go(insideWithWatermillRouter) - Flag any middleware setup in
main.go,ports/, orservice/
WM-08: Publisher Cleanup via OnShutdown or Composition Root (WARNING)
When a service publishes events, the publisher's close function MUST be closed as part of the shutdown sequence. Two valid patterns:
Pattern A — cleanup in OnShutdown (preferred when using unified server):
server.New(
server.WithHTTPHandler("api", createHandler),
server.OnShutdown(
server.Stop("api"), // 1. drain HTTP (in-flight may publish)
server.StopFunc(cleanup), // 2. close publisher + clients
),
).Run(ctx)
Pattern B — cleanup via defer (simpler services):
app, cleanup := service.NewApplication(ctx)
defer cleanup() // runs after Run() returns
server.New(
server.WithHTTPHandler("api", createHandler),
server.OnShutdown(
server.Stop("api"),
),
).Run(ctx)
// cleanup() runs here via defer — publisher closes after server drained
Check procedure:
- If
service/application.gocreates a publisher, verify close is either inOnShutdownor in the cleanup function - Publisher close MUST happen after all transports that might publish are stopped
- Closing publisher before draining HTTP/gRPC = lost messages
Wrong:
// main.go — VIOLATION: publisher lifecycle in main, not ordered
func main() {
pub, closePub, _ := client.NewWatermillPublisher()
defer closePub() // VIOLATION: may close before HTTP drains
app := service.NewApplication(ctx, pub) // VIOLATION: infra detail leaked
}
WM-09: Named Components Replace SERVER_TO_RUN Switch (INFO)
With the unified server pattern (ARCH-08), the SERVER_TO_RUN environment variable switch is replaced by composing With* options. A service that needs HTTP + Watermill simply registers both.
Correct — unified server:
// All transports in one process, explicit shutdown order
server.New(
server.WithWatermillRouter("events", func(r *message.Router, sub message.Subscriber) {
ports.RegisterEventHandlers(r, sub, app)
}),
server.WithHTTPHandler("api", func(router chi.Router) http.Handler {
return ports.HandlerFromMux(ports.NewHttpServer(app), router)
}),
server.OnShutdown(
server.Stop("events"),
server.Stop("api"),
server.StopFunc(cleanup),
),
).Run(ctx)
Also acceptable — SERVER_TO_RUN for single-transport deployments:
// When deploying each transport as a separate container
switch serverType {
case "http":
server.New(
server.WithHTTPHandler("api", createHandler),
server.OnShutdown(server.Stop("api")),
).Run(ctx)
case "watermill":
server.New(
server.WithWatermillRouter("events", configureRouter),
server.OnShutdown(server.Stop("events")),
).Run(ctx)
}
WM-10: No Synchronous Side Effects Replaced by Fire-and-Forget (CRITICAL)
When replacing synchronous gRPC calls with async events, you MUST ensure the operation tolerates eventual consistency. If the caller needs confirmation that the action succeeded, keep it synchronous (gRPC) or use a saga/process manager — do NOT simply drop the response.
Check procedure:
- For each gRPC adapter being replaced by events, check if the calling command inspects the return value or error
- If the command makes decisions based on the result, it MUST remain synchronous or use a compensation pattern
- Fire-and-forget is only valid for notifications, projections, and truly independent side effects
Correct use of async:
// Notification — caller doesn't need the result
func (h handler) Handle(ctx context.Context, cmd ScheduleTraining) error {
// ... create training ...
// Fire event — consumer will send email, update dashboard, etc.
return h.eventPublisher.TrainingScheduled(ctx, training)
}
Wrong use of async:
// VIOLATION: caller needs confirmation that hours were reserved
func (h handler) Handle(ctx context.Context, cmd ScheduleTraining) error {
training, _ := training.NewTraining(...)
h.eventPublisher.TrainingScheduled(ctx, training) // VIOLATION: no guarantee hours are available
return h.repo.Save(ctx, training) // saved training without confirmed availability
}
// Previously this was a synchronous gRPC call that could fail and roll back