Skip to content

plugins #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
super1t opened this issue May 26, 2024 · 4 comments
Open

plugins #25

super1t opened this issue May 26, 2024 · 4 comments

Comments

@super1t
Copy link

super1t commented May 26, 2024

when i use plugin: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange, publish return error :queue not bound

here is my code:

package mq

import (
	"context"
	"fmt"
	"github.com/furdarius/rabbitroutine"
	amqp "github.com/rabbitmq/amqp091-go"
	"testing"
	"time"
)

func TestProductDelayMessage(t *testing.T) {
	ctx := context.Background()

	url := "amqp://admin:admin@127.0.0.1:5672/test"
	routingKey := "test.delay.routingKey"
	exchange := "test.delay.exchange"
	queue := "test.delay.queue"

	conn := rabbitroutine.NewConnector(rabbitroutine.Config{
		// Max reconnect attempts
		//ReconnectAttempts: 20,
		// How long wait between reconnect
		Wait: 1 * time.Second,
	})

	pool := rabbitroutine.NewPool(conn)
	ensurePub := rabbitroutine.NewEnsurePublisher(pool)
	pub := rabbitroutine.NewRetryPublisher(
		ensurePub,
		rabbitroutine.PublishMaxAttemptsSetup(3),
		rabbitroutine.PublishDelaySetup(rabbitroutine.LinearDelay(10*time.Millisecond)),
	)

	go func() {
		err := conn.Dial(ctx, url)
		if err != nil {
			panic(fmt.Sprintf("failed to establish RabbitMQ connection: %v", err))
		}
	}()

	ch, err := conn.Channel(ctx)
	if err != nil {
		panic(fmt.Sprintf("failed to create channel: %v", err))
	}

	err = ch.ExchangeDeclare(exchange,
		"x-delayed-message",
		true,
		false,
		false,
		false,
		amqp.Table{
			"x-delayed-type": "direct",
		})
	if err != nil {
		panic(fmt.Sprintf("failed to declare exchange: %v", err))
	}

	_, err = ch.QueueDeclare(queue,
		true,
		false,
		false,
		false,
		nil)
	if err != nil {
		panic(fmt.Sprintf("failed to declare queue: %v", err))
	}

	err = ch.QueueBind(queue,
		routingKey,
		exchange,
		false, nil)
	if err != nil {
		panic(fmt.Sprintf("failed to bind queue: %v", err))
	}

	msg := amqp.Publishing{
		Body:         []byte("test"),
		DeliveryMode: 2,
		Headers: amqp.Table{
			"x-delay": 5000, // 设置消息的延时时间
		},
	}

	err = pub.Publish(ctx, exchange, routingKey, msg)
	fmt.Println("result:", err)

}

Reproduction steps

  • use docker rabbitmq:3.13.0-management
  • install rabbitmq_delayed_message_exchange
  • use the code above

Expected behavior
the delay message publish success, but Publish func return error

@furdarius
Copy link
Owner

furdarius commented May 27, 2024

Thanks for asking @super1t!

In your case you do conn.Dial is initiated in a goroutine, which means you don't wait for the connection to be established before publishing.

So, instead of

go func() {
	err := conn.Dial(ctx, url)
	if err != nil {
		panic(fmt.Sprintf("failed to establish RabbitMQ connection: %v", err))
	}
}()

You could just do

err := conn.Dial(ctx, url)
if err != nil {
	panic(fmt.Sprintf("failed to establish RabbitMQ connection: %v", err))
}

@super1t
Copy link
Author

super1t commented May 28, 2024

Thanks for asking @super1t!

In your case you do conn.Dial is initiated in a goroutine, which means you don't wait for the connection to be established before publishing.

So, instead of

go func() {
	err := conn.Dial(ctx, url)
	if err != nil {
		panic(fmt.Sprintf("failed to establish RabbitMQ connection: %v", err))
	}
}()

You could just do

err := conn.Dial(ctx, url)
if err != nil {
	panic(fmt.Sprintf("failed to establish RabbitMQ connection: %v", err))
}

It seems that this is not the reason. If I change it to not using coroutines, it will be blocked at this step and cannot be executed further. There should be a problem with the publish method.

@furdarius
Copy link
Owner

@super1t Right, my bad! Was wondering if you tried without rabbitroutine as a wrapper. Would that work in that case?

@super1t
Copy link
Author

super1t commented May 29, 2024

@super1t Right, my bad! Was wondering if you tried without rabbitroutine as a wrapper. Would that work in that case?

if i use https://github.com/streadway/amqp, it works

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants