Skip to content

Commit c3f00f1

Browse files
authored
Merge pull request #73 from tyx/fix/unrecoverable-unpack
2 parents 99f5415 + 7753b08 commit c3f00f1

File tree

2 files changed

+95
-8
lines changed

2 files changed

+95
-8
lines changed

src/Service/SimpleBusDriver.php

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Psr\Log\LoggerInterface;
66
use Symfony\Component\Messenger\Envelope;
7+
use Symfony\Component\Messenger\Exception\HandlerFailedException;
78
use Symfony\Component\Messenger\MessageBusInterface;
89
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
910
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
@@ -20,14 +21,22 @@ public function __construct(LoggerInterface $logger)
2021

2122
public function putEnvelopeOnBus(MessageBusInterface $bus, Envelope $envelope, string $transportName): void
2223
{
23-
$envelope = $envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp);
24-
$bus->dispatch($envelope);
24+
try {
25+
$envelope = $envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp);
26+
$bus->dispatch($envelope);
2527

26-
$message = $envelope->getMessage();
27-
$this->logger->info('{class} was handled successfully.', [
28-
'class' => get_class($message),
29-
'message' => $message,
30-
'transport' => $transportName,
31-
]);
28+
$message = $envelope->getMessage();
29+
$this->logger->info('{class} was handled successfully.', [
30+
'class' => get_class($message),
31+
'message' => $message,
32+
'transport' => $transportName,
33+
]);
34+
} catch (HandlerFailedException $e) {
35+
while ($e instanceof HandlerFailedException) {
36+
$e = $e->getPrevious() ?? $e;
37+
}
38+
39+
throw $e;
40+
}
3241
}
3342
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
<?php
2+
namespace Bref\Symfony\Messenger\Test\Unit\Service;
3+
4+
use Bref\Symfony\Messenger\Service\SimpleBusDriver;
5+
use PHPUnit\Framework\TestCase;
6+
use Prophecy\PhpUnit\ProphecyTrait;
7+
use Psr\Log\NullLogger;
8+
use stdClass;
9+
use Symfony\Component\Messenger\Envelope;
10+
use Symfony\Component\Messenger\Exception\HandlerFailedException;
11+
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
12+
use Symfony\Component\Messenger\MessageBusInterface;
13+
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
14+
use Symfony\Component\Messenger\Stamp\HandledStamp;
15+
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
16+
use Throwable;
17+
18+
final class SimpleBusDriverTest extends TestCase
19+
{
20+
use ProphecyTrait;
21+
22+
private $messengerBus;
23+
24+
/** @before */
25+
public function prepare()
26+
{
27+
$this->messengerBus = $this->prophesize(MessageBusInterface::class);
28+
}
29+
30+
public function test_it_dispatch_message_on_specified_transport_ready_to_be_consumed()
31+
{
32+
$sut = new SimpleBusDriver(new NullLogger());
33+
$envelopeToBeConsumed = new Envelope(new stdClass());
34+
$this->messengerWillSucceedInConsumingOnTransport($envelopeToBeConsumed, 'my_transport');
35+
36+
$sut->putEnvelopeOnBus($this->messengerBus->reveal(), $envelopeToBeConsumed, 'my_transport');
37+
}
38+
39+
public function test_it_unpack_handler_failed_exception()
40+
{
41+
$sut = new SimpleBusDriver(new NullLogger());
42+
$envelopeToBeConsumed = new Envelope(new stdClass());
43+
$this->messengerWillFailToConsumeOnTransport(
44+
$envelopeToBeConsumed,
45+
'async',
46+
new UnrecoverableMessageHandlingException('boum')
47+
);
48+
$this->expectException(UnrecoverableMessageHandlingException::class);
49+
$sut->putEnvelopeOnBus($this->messengerBus->reveal(), $envelopeToBeConsumed, 'async');
50+
}
51+
52+
private function messengerWillSucceedInConsumingOnTransport(Envelope $envelope, string $transport)
53+
{
54+
$this->messengerBus
55+
->dispatch($envelope->with(new ReceivedStamp($transport), new ConsumedByWorkerStamp))
56+
->willReturn(
57+
$envelope->with(new HandledStamp('result', 'my.handler'))
58+
)
59+
->shouldBeCalled()
60+
;
61+
}
62+
63+
private function messengerWillFailToConsumeOnTransport(Envelope $envelope, string $transport, Throwable $failure)
64+
{
65+
$this->messengerBus
66+
->dispatch($envelope->with(new ReceivedStamp($transport), new ConsumedByWorkerStamp))
67+
->willThrow(
68+
new HandlerFailedException(
69+
$envelope->with(new HandledStamp('result', 'my.handler')),
70+
[
71+
$failure
72+
]
73+
)
74+
)
75+
->shouldBeCalled()
76+
;
77+
}
78+
}

0 commit comments

Comments
 (0)