Skip to content

Commit 3c2b8ab

Browse files
committed
[amqp] add spec for message timestamp, and pre fetch count.
1 parent 66fc535 commit 3c2b8ab

7 files changed

+154
-0
lines changed

src/Amqp/BasicConsumeBreakOnFalseSpec.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ public function tearDown()
3030
public function test()
3131
{
3232
$this->context = $context = $this->createContext();
33+
$context->setQos(0, 5, false);
34+
3335
$fooQueue = $this->createQueue($context, 'foo_basic_consume_break_on_false_spec');
3436
$barQueue = $this->createQueue($context, 'bar_basic_consume_break_on_false_spec');
3537

src/Amqp/BasicConsumeFromAllSubscribedQueuesSpec.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ public function tearDown()
3030
public function test()
3131
{
3232
$this->context = $context = $this->createContext();
33+
$context->setQos(0, 5, false);
34+
3335
$fooQueue = $this->createQueue($context, 'foo_basic_consume_from_all_subscribed_queues_spec');
3436
$barQueue = $this->createQueue($context, 'bar_basic_consume_from_all_subscribed_queues_spec');
3537

src/Amqp/BasicConsumeShouldAddConsumerTagOnSubscribeSpec.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ public function tearDown()
3030
public function test()
3131
{
3232
$this->context = $context = $this->createContext();
33+
$context->setQos(0, 5, false);
34+
3335
$queue = $this->createQueue($context, 'basic_consume_should_add_consumer_tag_on_subscribe_spec');
3436

3537
$consumer = $context->createConsumer($queue);

src/Amqp/BasicConsumeShouldRemoveConsumerTagOnUnsubscribeSpec.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ public function tearDown()
3030
public function test()
3131
{
3232
$this->context = $context = $this->createContext();
33+
$context->setQos(0, 5, false);
34+
3335
$queue = $this->createQueue($context, 'basic_consume_should_remove_consumer_tag_on_unsubscribe_spec');
3436

3537
$consumer = $context->createConsumer($queue);

src/Amqp/BasicConsumeUntilUnsubscribedSpec.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ public function tearDown()
3030
public function test()
3131
{
3232
$this->context = $context = $this->createContext();
33+
$context->setQos(0, 5, false);
34+
3335
$fooQueue = $this->createQueue($context, 'foo_basic_consume_until_unsubscribed_spec');
3436
$barQueue = $this->createQueue($context, 'bar_basic_consume_until_unsubscribed_spec');
3537

src/Amqp/PreFetchCountSpec.php

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
<?php
2+
3+
namespace Interop\Queue\Spec\Amqp;
4+
5+
use Interop\Amqp\AmqpConsumer;
6+
use Interop\Amqp\AmqpContext;
7+
use Interop\Amqp\AmqpMessage;
8+
use Interop\Amqp\AmqpQueue;
9+
use PHPUnit\Framework\TestCase;
10+
11+
/**
12+
* @group functional
13+
*/
14+
abstract class PreFetchCountSpec extends TestCase
15+
{
16+
/**
17+
* @var AmqpContext
18+
*/
19+
private $context;
20+
21+
public function tearDown()
22+
{
23+
if ($this->context) {
24+
$this->context->close();
25+
}
26+
27+
parent::tearDown();
28+
}
29+
30+
public function test()
31+
{
32+
$this->context = $context = $this->createContext();
33+
$queue = $this->createQueue($context, 'pre_fetch_count_spec');
34+
35+
$context->createProducer()->send($queue, $context->createMessage());
36+
$context->createProducer()->send($queue, $context->createMessage());
37+
$context->createProducer()->send($queue, $context->createMessage());
38+
$context->createProducer()->send($queue, $context->createMessage());
39+
$context->createProducer()->send($queue, $context->createMessage());
40+
41+
$this->context->setQos(0, 3, false);
42+
43+
$consumer = $context->createConsumer($queue);
44+
45+
$consumedMessages = 0;
46+
$context->subscribe($consumer, function() use (&$consumedMessages) {
47+
$consumedMessages++;
48+
});
49+
$context->consume(100);
50+
51+
$this->assertEquals(3, $consumedMessages);
52+
}
53+
54+
/**
55+
* @return AmqpContext
56+
*/
57+
abstract protected function createContext();
58+
59+
/**
60+
* @param AmqpContext $context
61+
* @param string $queueName
62+
*
63+
* @return AmqpQueue
64+
*/
65+
protected function createQueue(AmqpContext $context, $queueName)
66+
{
67+
$queue = $context->createQueue($queueName);
68+
$context->declareQueue($queue);
69+
$context->purgeQueue($queue);
70+
71+
return $queue;
72+
}
73+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
<?php
2+
3+
namespace Interop\Queue\Spec\Amqp;
4+
5+
use Interop\Amqp\AmqpContext;
6+
use Interop\Amqp\AmqpMessage;
7+
use Interop\Amqp\AmqpQueue;
8+
use PHPUnit\Framework\TestCase;
9+
10+
/**
11+
* @group functional
12+
*/
13+
abstract class SendAndReceiveTimestampAsIntegerSpec extends TestCase
14+
{
15+
/**
16+
* @var AmqpContext
17+
*/
18+
private $context;
19+
20+
public function tearDown()
21+
{
22+
if ($this->context) {
23+
$this->context->close();
24+
}
25+
26+
parent::tearDown();
27+
}
28+
29+
public function test()
30+
{
31+
$this->context = $context = $this->createContext();
32+
33+
$queue = $this->createQueue($context, 'send_and_receive_timestamp_as_integer_spec');
34+
35+
$expectedTime = time();
36+
$expectedBody = __METHOD__.time();
37+
38+
$message = $context->createMessage($expectedBody);
39+
$message->setTimestamp($expectedTime);
40+
41+
$context->createProducer()->send($queue, $message);
42+
43+
$consumer = $context->createConsumer($queue);
44+
45+
$receivedMessage = $consumer->receive(100);
46+
47+
$this->assertInstanceOf(AmqpMessage::class, $receivedMessage);
48+
$this->assertSame($expectedBody, $receivedMessage->getBody());
49+
$this->assertSame($expectedTime, $receivedMessage->getTimestamp());
50+
}
51+
52+
/**
53+
* @return AmqpContext
54+
*/
55+
abstract protected function createContext();
56+
57+
/**
58+
* @param AmqpContext $context
59+
* @param string $queueName
60+
*
61+
* @return AmqpQueue
62+
*/
63+
protected function createQueue(AmqpContext $context, $queueName)
64+
{
65+
$queue = $context->createQueue($queueName);
66+
$context->declareQueue($queue);
67+
$context->purgeQueue($queue);
68+
69+
return $queue;
70+
}
71+
}

0 commit comments

Comments
 (0)