Skip to content

Commit 023ed34

Browse files
authored
Merge pull request #84 from robinlehrmann/transport-name-resolver
Added TransportNameResolver for sns and sqs for automatic transport r…
2 parents d9168e4 + 7598b6a commit 023ed34

12 files changed

+507
-35
lines changed

README.md

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,6 @@ services:
175175
public: true
176176
autowire: true
177177
arguments:
178-
# Pass the transport name used in config/packages/messenger.yaml
179-
$transportName: 'async'
180178
# true enables partial SQS batch failure
181179
# Enabling this without proper SQS config will consider all your messages successful
182180
# See https://bref.sh/docs/function/handlers.html#partial-batch-response for more details.
@@ -296,9 +294,6 @@ services:
296294
Bref\Symfony\Messenger\Service\Sns\SnsConsumer:
297295
public: true
298296
autowire: true
299-
arguments:
300-
# Pass the transport name used in config/packages/messenger.yaml
301-
$transportName: 'async'
302297
```
303298

304299
Now, anytime a message is dispatched to SNS, the Lambda function will be called. The Bref consumer class will put back the message into Symfony Messenger to be processed.
@@ -390,7 +385,6 @@ services:
390385
public: true
391386
autowire: true
392387
arguments:
393-
# Pass the transport name used in config/packages/messenger.yaml
394388
$transportName: 'async'
395389
# Optionnally, if you have different buses in config/packages/messenger.yaml, set $bus like below:
396390
# $bus: '@event.bus'
@@ -456,6 +450,34 @@ services:
456450
region: us-east-1
457451
```
458452

453+
### Automatic transport recognition
454+
455+
Automatic transport recognition is primarily handled by default through TransportNameResolvers for SNS and SQS,
456+
ensuring that the transport name is automatically passed to your message handlers.
457+
However, in scenarios where you need to manually specify the transport name or adjust the default behavior,
458+
you can do so by setting the `$transportName` parameter in your service definitions within the config/services.yaml file.
459+
This parameter should match the transport name defined in your config/packages/messenger.yaml.
460+
For instance, for a SNSConsumer, you would configure it as follows:
461+
462+
```yaml
463+
# config/packages/messenger.yaml
464+
framework:
465+
messenger:
466+
transports:
467+
async: '%env(MESSENGER_TRANSPORT_DSN)%'
468+
```
469+
470+
```yaml
471+
# config/services.yaml
472+
services:
473+
Bref\Symfony\Messenger\Service\Sns\SnsConsumer:
474+
public: true
475+
autowire: true
476+
arguments:
477+
# Pass the transport name used in config/packages/messenger.yaml
478+
$transportName: 'async'
479+
```
480+
459481
### Disabling transports
460482

461483
By default, this package registers Symfony Messenger transports for SQS, SNS and EventBridge.
@@ -481,6 +503,5 @@ services:
481503
public: true
482504
autowire: true
483505
arguments:
484-
$transportName: 'async'
485506
$serializer: '@Happyr\MessageSerializer\Serializer'
486507
```

src/DependencyInjection/BrefMessengerExtension.php

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,26 @@
44

55
use Symfony\Component\Config\FileLocator;
66
use Symfony\Component\DependencyInjection\ContainerBuilder;
7+
use Symfony\Component\DependencyInjection\Extension\PrependExtensionInterface;
78
use Symfony\Component\DependencyInjection\Loader\YamlFileLoader;
89
use Symfony\Component\HttpKernel\DependencyInjection\Extension;
910

10-
class BrefMessengerExtension extends Extension
11+
class BrefMessengerExtension extends Extension implements PrependExtensionInterface
1112
{
1213
public function load(array $configs, ContainerBuilder $container): void
1314
{
1415
$loader = new YamlFileLoader($container, new FileLocator(__DIR__ . '/../Resources/config'));
1516
$loader->load('services.yaml');
1617
}
18+
19+
public function prepend(ContainerBuilder $container): void
20+
{
21+
$configs = $container->getExtensionConfig('framework');
22+
23+
foreach (array_reverse($configs) as $config) {
24+
if (array_key_exists('messenger', $config)) {
25+
$container->setParameter('messenger.transports', $config['messenger']['transports']);
26+
}
27+
}
28+
}
1729
}

src/Resources/config/services.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,26 @@ services:
44
arguments:
55
- '@logger'
66

7+
Bref\Symfony\Messenger\Service\MessengerTransportConfiguration:
8+
arguments:
9+
$messengerTransportsConfiguration: '%messenger.transports%'
10+
711
# SNS
12+
Bref\Symfony\Messenger\Service\Sns\SnsTransportNameResolver:
13+
arguments:
14+
- '@Bref\Symfony\Messenger\Service\MessengerTransportConfiguration'
815
Bref\Symfony\Messenger\Service\Sns\SnsTransportFactory:
916
tags: ['messenger.transport_factory']
1017
arguments:
1118
- '@bref.messenger.sns_client'
1219
bref.messenger.sns_client:
1320
class: AsyncAws\Sns\SnsClient
1421

22+
# SQS
23+
Bref\Symfony\Messenger\Service\Sqs\SqsTransportNameResolver:
24+
arguments:
25+
- '@Bref\Symfony\Messenger\Service\MessengerTransportConfiguration'
26+
1527
# EventBridge
1628
Bref\Symfony\Messenger\Service\EventBridge\EventBridgeTransportFactory:
1729
tags: ['messenger.transport_factory']
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
<?php
2+
3+
namespace Bref\Symfony\Messenger\Service;
4+
5+
use InvalidArgumentException;
6+
7+
/** @final */
8+
class MessengerTransportConfiguration
9+
{
10+
public function __construct(
11+
private array $messengerTransportsConfiguration
12+
) {
13+
}
14+
15+
/** @throws InvalidArgumentException */
16+
public function provideTransportFromEventSource(string $eventSourceWithProtocol): string
17+
{
18+
foreach ($this->messengerTransportsConfiguration as $messengerTransport => $messengerOptions) {
19+
$dsn = $this->extractDsnFromTransport($messengerOptions);
20+
21+
if ($dsn === $eventSourceWithProtocol) {
22+
return $messengerTransport;
23+
}
24+
}
25+
26+
throw new InvalidArgumentException(sprintf('No transport found for eventSource "%s".', $eventSourceWithProtocol));
27+
}
28+
29+
private function extractDsnFromTransport(string|array $messengerTransport): string
30+
{
31+
if (is_array($messengerTransport) && array_key_exists('dsn', $messengerTransport)) {
32+
return $messengerTransport['dsn'];
33+
}
34+
35+
return $messengerTransport;
36+
}
37+
}

src/Service/Sns/SnsConsumer.php

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
use Bref\Context\Context;
66
use Bref\Event\Sns\SnsEvent;
77
use Bref\Event\Sns\SnsHandler;
8+
use Bref\Event\Sns\SnsRecord;
89
use Bref\Symfony\Messenger\Service\BusDriver;
10+
use LogicException;
911
use Symfony\Component\Messenger\MessageBusInterface;
1012
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
1113

@@ -15,21 +17,25 @@ final class SnsConsumer extends SnsHandler
1517
private $bus;
1618
/** @var SerializerInterface */
1719
protected $serializer;
18-
/** @var string */
20+
/** @var string|null */
1921
private $transportName;
2022
/** @var BusDriver */
2123
private $busDriver;
24+
/** @var SnsTransportNameResolver|null */
25+
private $transportNameResolver;
2226

2327
public function __construct(
2428
BusDriver $busDriver,
2529
MessageBusInterface $bus,
2630
SerializerInterface $serializer,
27-
string $transportName
31+
string $transportName = null,
32+
SnsTransportNameResolver $transportNameResolver = null,
2833
) {
2934
$this->busDriver = $busDriver;
3035
$this->bus = $bus;
3136
$this->serializer = $serializer;
3237
$this->transportName = $transportName;
38+
$this->transportNameResolver = $transportNameResolver;
3339
}
3440

3541
public function handleSns(SnsEvent $event, Context $context): void
@@ -39,7 +45,16 @@ public function handleSns(SnsEvent $event, Context $context): void
3945
$headers = isset($attributes['Headers']) ? $attributes['Headers']->getValue() : '[]';
4046
$envelope = $this->serializer->decode(['body' => $record->getMessage(), 'headers' => json_decode($headers, true)]);
4147

42-
$this->busDriver->putEnvelopeOnBus($this->bus, $envelope, $this->transportName);
48+
$this->busDriver->putEnvelopeOnBus($this->bus, $envelope, $this->resolveTransportName($record));
4349
}
4450
}
51+
52+
private function resolveTransportName(SnsRecord $record): string
53+
{
54+
if (null === $this->transportName && null === $this->transportNameResolver) {
55+
throw new LogicException('You need to set $transportNameResolver or $transportName.');
56+
}
57+
58+
return $this->transportName ?? ($this->transportNameResolver)($record);
59+
}
4560
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
namespace Bref\Symfony\Messenger\Service\Sns;
4+
5+
use Bref\Event\Sns\SnsRecord;
6+
use Bref\Symfony\Messenger\Service\MessengerTransportConfiguration;
7+
use InvalidArgumentException;
8+
9+
/** @final */
10+
class SnsTransportNameResolver
11+
{
12+
private const TRANSPORT_PROTOCOL = 'sns://';
13+
14+
public function __construct(
15+
private MessengerTransportConfiguration $configurationProvider
16+
) {
17+
}
18+
19+
/** @throws InvalidArgumentException */
20+
public function __invoke(SnsRecord $snsRecord): string
21+
{
22+
if (!array_key_exists('EventSubscriptionArn', $snsRecord->toArray())) {
23+
throw new InvalidArgumentException('EventSubscriptionArn is missing in sns record.');
24+
}
25+
26+
$eventSourceArn = $snsRecord->getEventSubscriptionArn();
27+
28+
return $this->configurationProvider->provideTransportFromEventSource(self::TRANSPORT_PROTOCOL . $eventSourceArn);
29+
}
30+
}

src/Service/Sqs/SqsConsumer.php

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Bref\Event\Sqs\SqsHandler;
88
use Bref\Event\Sqs\SqsRecord;
99
use Bref\Symfony\Messenger\Service\BusDriver;
10+
use LogicException;
1011
use Psr\Log\LoggerInterface;
1112
use Psr\Log\NullLogger;
1213
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp;
@@ -23,7 +24,9 @@ final class SqsConsumer extends SqsHandler
2324
private $bus;
2425
/** @var SerializerInterface */
2526
protected $serializer;
26-
/** @var string */
27+
/** @var SqsTransportNameResolver */
28+
private $transportNameResolver;
29+
/** @var string|null */
2730
private $transportName;
2831
/** @var BusDriver */
2932
private $busDriver;
@@ -36,16 +39,18 @@ public function __construct(
3639
BusDriver $busDriver,
3740
MessageBusInterface $bus,
3841
SerializerInterface $serializer,
39-
string $transportName,
42+
string $transportName = null,
4043
LoggerInterface $logger = null,
41-
bool $partialBatchFailure = false
44+
bool $partialBatchFailure = false,
45+
SqsTransportNameResolver $transportNameResolver = null
4246
) {
4347
$this->busDriver = $busDriver;
4448
$this->bus = $bus;
4549
$this->serializer = $serializer;
4650
$this->transportName = $transportName;
4751
$this->logger = $logger ?? new NullLogger();
4852
$this->partialBatchFailure = $partialBatchFailure;
53+
$this->transportNameResolver = $transportNameResolver;
4954
}
5055

5156
public function handleSqs(SqsEvent $event, Context $context): void
@@ -93,7 +98,7 @@ public function handleSqs(SqsEvent $event, Context $context): void
9398
if ('' !== $context->getTraceId()) {
9499
$stamps[] = new AmazonSqsXrayTraceHeaderStamp($context->getTraceId());
95100
}
96-
$this->busDriver->putEnvelopeOnBus($this->bus, $envelope->with(...$stamps), $this->transportName);
101+
$this->busDriver->putEnvelopeOnBus($this->bus, $envelope->with(...$stamps), $this->resolveTransportName($record));
97102
} catch (UnrecoverableExceptionInterface $exception) {
98103
$this->logger->error(sprintf('SQS record with id "%s" failed to be processed. But failure was marked as unrecoverable. Message will be acknowledged.', $record->getMessageId()));
99104
$this->logger->error($exception);
@@ -116,4 +121,13 @@ private function readMessageGroupIdOfRecord(SqsRecord $record): ?string
116121
$recordAsArray = $record->toArray();
117122
return $recordAsArray['attributes']['MessageGroupId'] ?? null;
118123
}
124+
125+
private function resolveTransportName(SqsRecord $record): string
126+
{
127+
if (null === $this->transportName && null === $this->transportNameResolver) {
128+
throw new LogicException('You need to set $transportNameResolver or $transportName.');
129+
}
130+
131+
return $this->transportName ?? ($this->transportNameResolver)($record);
132+
}
119133
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?php
2+
3+
namespace Bref\Symfony\Messenger\Service\Sqs;
4+
5+
use Bref\Event\Sqs\SqsRecord;
6+
use Bref\Symfony\Messenger\Service\MessengerTransportConfiguration;
7+
use InvalidArgumentException;
8+
9+
/** @final */
10+
class SqsTransportNameResolver
11+
{
12+
private const TRANSPORT_PROTOCOL = 'sqs://';
13+
14+
public function __construct(
15+
private MessengerTransportConfiguration $configurationProvider
16+
) {
17+
}
18+
19+
public function __invoke(SqsRecord $sqsRecord): string
20+
{
21+
if (!array_key_exists('eventSourceARN', $sqsRecord->toArray())) {
22+
throw new InvalidArgumentException('EventSourceArn is missing in sqs record.');
23+
}
24+
25+
$eventSourceArn = $sqsRecord->toArray()['eventSourceARN'];
26+
27+
return $this->configurationProvider->provideTransportFromEventSource(self::TRANSPORT_PROTOCOL . $eventSourceArn);
28+
}
29+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<?php
2+
3+
namespace Bref\Symfony\Messenger\Test\Unit\Service;
4+
5+
use Bref\Symfony\Messenger\Service\MessengerTransportConfiguration;
6+
use InvalidArgumentException;
7+
use PHPUnit\Framework\TestCase;
8+
9+
final class MessengerTransportConfigurationTest extends TestCase
10+
{
11+
public function test_existing_transport_will_be_found_with_existing_event_source_arn(): void
12+
{
13+
$messengerTransportConfiguration = new MessengerTransportConfiguration([
14+
'async_example_one' => 'sqs://arn:aws:sqs:us-east-1:123456789012:example_one',
15+
'async_example_two' => [
16+
'dsn' => 'sqs://arn:aws:sqs:us-east-1:123456789012:example_two',
17+
],
18+
]);
19+
20+
self::assertSame(
21+
'async_example_one',
22+
$messengerTransportConfiguration->provideTransportFromEventSource(
23+
'sqs://arn:aws:sqs:us-east-1:123456789012:example_one'
24+
)
25+
);
26+
27+
self::assertSame(
28+
'async_example_two',
29+
$messengerTransportConfiguration->provideTransportFromEventSource(
30+
'sqs://arn:aws:sqs:us-east-1:123456789012:example_two'
31+
)
32+
);
33+
}
34+
35+
public function test_non_existing_transport_for_event_source_arn_will_be_not_found(): void
36+
{
37+
$messengerTransportConfiguration = new MessengerTransportConfiguration([]);
38+
39+
$this->expectException(InvalidArgumentException::class);
40+
$this->expectExceptionMessage(
41+
'No transport found for eventSource "sqs://arn:aws:sqs:us-east-1:123456789012:missing".'
42+
);
43+
44+
$messengerTransportConfiguration->provideTransportFromEventSource(
45+
'sqs://arn:aws:sqs:us-east-1:123456789012:missing'
46+
);
47+
}
48+
}

0 commit comments

Comments
 (0)