3.2 KiB
3.2 KiB
Watermill Router Option + Publisher Client Scaffold Template
Generate the WithWatermillRouter server option in internal/common/server/ and the publisher client factory in internal/common/client/. Requires the unified server scaffold (/threedots scaffold unified_server) to be in place first.
Placeholders
{{module_common}}— Go module path tointernal/common(e.g.,github.com/example/myproject/internal/common)
File 1: internal/common/server/watermill.go
package server
import (
"context"
"os"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v3/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
wmMiddleware "github.com/ThreeDotsLabs/watermill/message/router/middleware"
)
func WithWatermillRouter(
name string,
configure func(*message.Router, message.Subscriber),
) Option {
return func(s *Server) {
wmLogger := watermill.NewStdLoggerWithOut(os.Stdout, true, false)
amqpURI := os.Getenv("AMQP_URI")
if amqpURI == "" {
amqpURI = "amqp://guest:guest@rabbitmq:5672/"
}
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
sub, err := amqp.NewSubscriber(amqpConfig, wmLogger)
if err != nil {
panic("cannot create watermill subscriber: " + err.Error())
}
r, err := message.NewRouter(message.RouterConfig{}, wmLogger)
if err != nil {
panic("cannot create watermill router: " + err.Error())
}
r.AddMiddleware(
wmMiddleware.CorrelationID,
wmMiddleware.Recoverer,
wmMiddleware.Retry{MaxRetries: 3}.Middleware,
)
configure(r, sub)
s.addComponent(name, component{
name: name,
start: func(ctx context.Context) error {
return r.Run(ctx)
},
stop: func(ctx context.Context) error {
return r.Close()
},
})
}
}
File 2: internal/common/client/watermill.go
package client
import (
"os"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v3/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/pkg/errors"
)
func NewWatermillPublisher() (pub message.Publisher, close func() error, err error) {
amqpURI := os.Getenv("AMQP_URI")
if amqpURI == "" {
return nil, func() error { return nil }, errors.New("empty env AMQP_URI")
}
logger := watermill.NewStdLoggerWithOut(os.Stdout, true, false)
config := amqp.NewDurableQueueConfig(amqpURI)
publisher, err := amqp.NewPublisher(config, logger)
if err != nil {
return nil, func() error { return nil }, errors.Wrap(err, "cannot create watermill publisher")
}
return publisher, publisher.Close, nil
}
Post-Creation Instructions
After creating the Watermill option and publisher:
- Add
github.com/ThreeDotsLabs/watermillandgithub.com/ThreeDotsLabs/watermill-amqp/v3togo.mod - Add
AMQP_URIto.env,.test.env, anddocker-compose.yml - Add a RabbitMQ service to
docker-compose.yml:rabbitmq: image: rabbitmq:3-management ports: - "5672:5672" - "15672:15672" - Use
/threedots scaffold event_handler <Name>to create event handlers in a service - Use
/threedots scaffold event_publisher <Name>to create a publisher adapter - Add
server.WithWatermillRouter("events", ...)and include"events"inOnShutdown