threedotslab/references/rules-watermill.md

14 KiB

Watermill Rules (WM-01..10)

WM-01: Watermill as a Named Component in Unified Server (CRITICAL)

Watermill router MUST be registered as a named component via server.WithWatermillRouter(name, configure) — same pattern as WithHTTPHandler and WithGRPCServer. The With* option owns AMQP connection, middleware, and router lifecycle. The caller provides only handler registration via callback.

This ensures:

  • Middleware stack (retry, correlation, recovery) is consistent across all services
  • Broker config is centralized — swapping AMQP for Kafka changes one file
  • Shutdown ordering is explicit via server.OnShutdown(server.Stop(name))

Check procedure:

  1. Scan main.go for direct Watermill router creation (message.NewRouter, amqp.NewSubscriber)
  2. Flag any middleware setup outside server/watermill.go
  3. Verify Watermill component appears in OnShutdown with correct ordering

Correct:

// internal/common/server/watermill.go
func WithWatermillRouter(
    name string,
    configure func(*message.Router, message.Subscriber),
) Option {
    return func(s *Server) {
        wmLogger := watermill.NewStdLoggerWithOut(os.Stdout, true, false)
        amqpURI := os.Getenv("AMQP_URI")
        amqpConfig := amqp.NewDurableQueueConfig(amqpURI)

        sub, err := amqp.NewSubscriber(amqpConfig, wmLogger)
        if err != nil { panic(err) }

        r, err := message.NewRouter(message.RouterConfig{}, wmLogger)
        if err != nil { panic(err) }

        r.AddMiddleware(
            wmMiddleware.CorrelationID,
            wmMiddleware.Recoverer,
            wmMiddleware.Retry{MaxRetries: 3}.Middleware,
        )
        configure(r, sub)

        s.addComponent(name, component{
            name: name,
            start: func(ctx context.Context) error {
                return r.Run(ctx)
            },
            stop: func(ctx context.Context) error {
                return r.Close()
            },
        })
    }
}

// main.go — registered as named component
server.New(
    server.WithWatermillRouter("events", func(r *message.Router, sub message.Subscriber) {
        ports.RegisterEventHandlers(r, sub, application)
    }),
    server.WithHTTPHandler("api", createHandler),
    server.OnShutdown(
        server.Stop("events"),      // 1. stop consuming
        server.Stop("api"),         // 2. drain HTTP
        server.StopFunc(cleanup),   // 3. close clients
    ),
).Run(ctx)

Wrong:

// main.go — VIOLATION: infrastructure in main
func main() {
    sub, _ := amqp.NewSubscriber(amqpConfig, logger)           // VIOLATION
    r, _ := message.NewRouter(message.RouterConfig{}, logger)   // VIOLATION
    r.AddMiddleware(wmMiddleware.Recoverer)                      // VIOLATION
    r.Run(context.Background())
}

// main.go — VIOLATION: standalone RunWatermillRouter without unified server
server.RunWatermillRouter(func(r *message.Router, sub message.Subscriber) { ... })
// Cannot coordinate shutdown with other transports

WM-02: Publisher Factory Returns (Publisher, Close, Error) Triple (CRITICAL)

Publisher creation MUST follow the same (client, closeFunc, error) triple-return pattern as client.NewTrainerClient() and client.NewUsersClient(). Config comes from environment variables.

Check procedure:

  1. Verify publisher factory in internal/common/client/watermill.go
  2. Must return (message.Publisher, func() error, error)
  3. Must read AMQP_URI from env
  4. Error case must return a no-op close function, never nil

Correct:

// internal/common/client/watermill.go
func NewWatermillPublisher() (pub message.Publisher, close func() error, err error) {
    amqpURI := os.Getenv("AMQP_URI")
    if amqpURI == "" {
        return nil, func() error { return nil }, errors.New("empty env AMQP_URI")
    }

    logger := watermill.NewStdLoggerWithOut(os.Stdout, true, false)
    config := amqp.NewDurableQueueConfig(amqpURI)

    publisher, err := amqp.NewPublisher(config, logger)
    if err != nil {
        return nil, func() error { return nil }, errors.Wrap(err, "cannot create watermill publisher")
    }

    return publisher, publisher.Close, nil
}

Wrong:

// VIOLATION: returns raw connection, no close function
func NewPublisher() *amqp.Publisher {
    pub, _ := amqp.NewPublisher(config, logger)
    return pub
}

// VIOLATION: nil close function on error path
func NewPublisher() (message.Publisher, func() error, error) {
    // ...
    return nil, nil, err  // nil close panics on defer
}

WM-03: Event Handlers Live in Ports (CRITICAL)

Watermill event handlers are inbound adapters — they are ports, just like HTTP and gRPC handlers. They MUST:

  • Live in ports/
  • Hold app.Application
  • Delegate to command/query handlers
  • Contain NO business logic

Check procedure:

  1. Scan for message.HandlerFunc or func(*message.Message) error signatures
  2. These MUST be in ports/ package
  3. Must import app/, app/command/, or app/query/ — not domain/ directly
  4. Must follow the same delegation pattern as HTTP/gRPC handlers

Correct:

// ports/event.go
type EventHandlers struct {
    app app.Application
}

func RegisterEventHandlers(r *message.Router, sub message.Subscriber, application app.Application) {
    handlers := EventHandlers{app: application}

    r.AddNoPublisherHandler(
        "OnTrainingScheduled",
        "training.scheduled",
        sub,
        handlers.OnTrainingScheduled,
    )
}

func (h EventHandlers) OnTrainingScheduled(msg *message.Message) error {
    var event TrainingScheduledEvent
    if err := json.Unmarshal(msg.Payload, &event); err != nil {
        return err
    }
    return h.app.Commands.ScheduleTraining.Handle(
        msg.Context(),
        command.ScheduleTraining{Hour: event.Hour},
    )
}

Wrong:

// adapters/event_handler.go — VIOLATION: handler in adapters/
func HandleTrainingScheduled(msg *message.Message) error {
    repo.Save(ctx, training)  // VIOLATION: direct repo access
}

// app/command/schedule_training.go — VIOLATION: message parsing in app layer
func (h handler) Handle(ctx context.Context, msg *message.Message) error { ... }

WM-04: Event Publisher Adapter Implements Domain Interface (WARNING)

Publishing events MUST go through an adapter that implements an interface defined in the app or domain layer. The app layer defines what events to publish; the adapter knows how.

This keeps Watermill as a swappable infrastructure detail.

Check procedure:

  1. Look for message.Publisher usage — it MUST NOT appear in app/ or domain/
  2. An interface like EventPublisher should be in app/command/services.go or similar
  3. The concrete adapter in adapters/ implements it using Watermill

Correct:

// app/command/services.go
type TrainingEventPublisher interface {
    TrainingScheduled(ctx context.Context, t training.Training) error
    TrainingCancelled(ctx context.Context, trainingUUID string) error
}

// adapters/training_event_publisher.go
type WatermillTrainingEventPublisher struct {
    pub message.Publisher
}

func NewWatermillTrainingEventPublisher(pub message.Publisher) WatermillTrainingEventPublisher {
    return WatermillTrainingEventPublisher{pub: pub}
}

func (p WatermillTrainingEventPublisher) TrainingScheduled(ctx context.Context, t training.Training) error {
    payload, err := json.Marshal(TrainingScheduledEvent{UUID: t.UUID(), Hour: t.Time()})
    if err != nil { return err }
    msg := message.NewMessage(watermill.NewUUID(), payload)
    middleware.SetCorrelationID(middleware.MessageCorrelationID(msg), msg)
    return p.pub.Publish("training.scheduled", msg)
}

Wrong:

// app/command/schedule_training.go — VIOLATION: Watermill in app layer
import "github.com/ThreeDotsLabs/watermill/message"

func (h handler) Handle(ctx context.Context, cmd ScheduleTraining) error {
    msg := message.NewMessage(watermill.NewUUID(), payload)  // VIOLATION
    h.publisher.Publish("topic", msg)                         // VIOLATION: infra detail
}

WM-05: Topic Naming Uses Domain Language (WARNING)

Topic/queue names MUST use domain language with dot notation: {aggregate}.{past-tense-event}. No CRUD names, no technical prefixes.

Correct:

training.scheduled
training.cancelled
training.reschedule_requested
hour.made_available

Wrong:

create-training          // VIOLATION: CRUD name
events.training.created  // VIOLATION: redundant "events" prefix, CRUD
TRAINING_QUEUE           // VIOLATION: technical name, not domain event

WM-06: Event Structs Live in the Publishing Port or Adapter (INFO)

Event DTOs (the JSON payloads) are protocol-specific — they belong in ports/ or adapters/, NOT in domain/. Domain entities are the canonical model; events are a serialization concern.

Check procedure:

  1. Look for event structs (e.g., TrainingScheduledEvent)
  2. They MUST be in ports/ (if consumed by event handlers) or adapters/ (if produced by publisher adapters)
  3. They MUST NOT be in domain/

Correct:

// ports/event.go or adapters/training_event_publisher.go
type TrainingScheduledEvent struct {
    UUID string    `json:"uuid"`
    Hour time.Time `json:"hour"`
}

WM-07: Watermill Middleware in With* Option Only (WARNING)

Watermill middleware (retry, correlation ID, recoverer, throttle, etc.) MUST be configured exclusively inside the WithWatermillRouter option in internal/common/server/watermill.go — same principle as ARCH-06 for HTTP/gRPC middleware.

Check procedure:

  1. Scan for r.AddMiddleware or router.AddMiddleware calls
  2. All MUST be in internal/common/server/watermill.go (inside WithWatermillRouter)
  3. Flag any middleware setup in main.go, ports/, or service/

WM-08: Publisher Cleanup via OnShutdown or Composition Root (WARNING)

When a service publishes events, the publisher's close function MUST be closed as part of the shutdown sequence. Two valid patterns:

Pattern A — cleanup in OnShutdown (preferred when using unified server):

server.New(
    server.WithHTTPHandler("api", createHandler),
    server.OnShutdown(
        server.Stop("api"),         // 1. drain HTTP (in-flight may publish)
        server.StopFunc(cleanup),   // 2. close publisher + clients
    ),
).Run(ctx)

Pattern B — cleanup via defer (simpler services):

app, cleanup := service.NewApplication(ctx)
defer cleanup()  // runs after Run() returns

server.New(
    server.WithHTTPHandler("api", createHandler),
    server.OnShutdown(
        server.Stop("api"),
    ),
).Run(ctx)
// cleanup() runs here via defer — publisher closes after server drained

Check procedure:

  1. If service/application.go creates a publisher, verify close is either in OnShutdown or in the cleanup function
  2. Publisher close MUST happen after all transports that might publish are stopped
  3. Closing publisher before draining HTTP/gRPC = lost messages

Wrong:

// main.go — VIOLATION: publisher lifecycle in main, not ordered
func main() {
    pub, closePub, _ := client.NewWatermillPublisher()
    defer closePub()                                    // VIOLATION: may close before HTTP drains
    app := service.NewApplication(ctx, pub)             // VIOLATION: infra detail leaked
}

WM-09: Named Components Replace SERVER_TO_RUN Switch (INFO)

With the unified server pattern (ARCH-08), the SERVER_TO_RUN environment variable switch is replaced by composing With* options. A service that needs HTTP + Watermill simply registers both.

Correct — unified server:

// All transports in one process, explicit shutdown order
server.New(
    server.WithWatermillRouter("events", func(r *message.Router, sub message.Subscriber) {
        ports.RegisterEventHandlers(r, sub, app)
    }),
    server.WithHTTPHandler("api", func(router chi.Router) http.Handler {
        return ports.HandlerFromMux(ports.NewHttpServer(app), router)
    }),
    server.OnShutdown(
        server.Stop("events"),
        server.Stop("api"),
        server.StopFunc(cleanup),
    ),
).Run(ctx)

Also acceptable — SERVER_TO_RUN for single-transport deployments:

// When deploying each transport as a separate container
switch serverType {
case "http":
    server.New(
        server.WithHTTPHandler("api", createHandler),
        server.OnShutdown(server.Stop("api")),
    ).Run(ctx)
case "watermill":
    server.New(
        server.WithWatermillRouter("events", configureRouter),
        server.OnShutdown(server.Stop("events")),
    ).Run(ctx)
}

WM-10: No Synchronous Side Effects Replaced by Fire-and-Forget (CRITICAL)

When replacing synchronous gRPC calls with async events, you MUST ensure the operation tolerates eventual consistency. If the caller needs confirmation that the action succeeded, keep it synchronous (gRPC) or use a saga/process manager — do NOT simply drop the response.

Check procedure:

  1. For each gRPC adapter being replaced by events, check if the calling command inspects the return value or error
  2. If the command makes decisions based on the result, it MUST remain synchronous or use a compensation pattern
  3. Fire-and-forget is only valid for notifications, projections, and truly independent side effects

Correct use of async:

// Notification — caller doesn't need the result
func (h handler) Handle(ctx context.Context, cmd ScheduleTraining) error {
    // ... create training ...
    // Fire event — consumer will send email, update dashboard, etc.
    return h.eventPublisher.TrainingScheduled(ctx, training)
}

Wrong use of async:

// VIOLATION: caller needs confirmation that hours were reserved
func (h handler) Handle(ctx context.Context, cmd ScheduleTraining) error {
    training, _ := training.NewTraining(...)
    h.eventPublisher.TrainingScheduled(ctx, training)  // VIOLATION: no guarantee hours are available
    return h.repo.Save(ctx, training)                   // saved training without confirmed availability
}
// Previously this was a synchronous gRPC call that could fail and roll back