Skip to content

Commit 74960ce

Browse files
committed
feat: add WithForcedPublicationSource
1 parent 3a58681 commit 74960ce

File tree

1 file changed

+12
-2
lines changed

1 file changed

+12
-2
lines changed

equeue.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ type HandlersChain []HandlerFunc
2121

2222
type OptionFunc func(*Engine)
2323

24+
func WithForcedPublicationSource(s string) OptionFunc {
25+
return func(e *Engine) {
26+
e.forcedPublicationSource = s
27+
}
28+
}
29+
2430
func WithForcedPublicationFormat(f format.Format) OptionFunc {
2531
return func(e *Engine) {
2632
e.forcedPublicationFormat = f
@@ -33,12 +39,12 @@ func WithForcedSubscriptionFormat(f format.Format) OptionFunc {
3339
}
3440
}
3541

36-
func New(d Driver, opts ...OptionFunc) *Engine {
42+
func New(driver Driver, opts ...OptionFunc) *Engine {
3743
engine := &Engine{
3844
RouterGroup: RouterGroup{
3945
root: true,
4046
},
41-
driver: d,
47+
driver: driver,
4248
tree: make(subsTree),
4349
consumers: make(map[*Consumer]struct{}),
4450
subscriptionActiveWorkersMap: make(map[*subscription]map[*worker]struct{}),
@@ -68,6 +74,7 @@ type Engine struct {
6874
RouterGroup
6975

7076
driver Driver
77+
forcedPublicationSource string
7178
forcedPublicationFormat format.Format
7279
forcedSubscriptionFormat format.Format
7380

@@ -126,6 +133,9 @@ func (e *Engine) newContext() *Context {
126133
}
127134

128135
func (e *Engine) Publish(ctx context.Context, topic string, event event.Event) error {
136+
if e.forcedPublicationSource != "" {
137+
event.SetSource(e.forcedPublicationSource)
138+
}
129139
if err := event.Validate(); err != nil {
130140
return err
131141
}

0 commit comments

Comments
 (0)