Commit c17e30a7 authored by Susanne Moog's avatar Susanne Moog Committed by Benni Mack
Browse files

[FEATURE] Adopt symfony messenger as message bus and queue

See the Feature.rst file for details.

The changes in workspaces are meant as a usage example.

The goal is to provide a basic implementation as a first step.
Further features (for example extending the capabilities of the
consume command or providing additional transportFactories or
multi-bus support) can be done in additional patches
(or even – in case of transports – in additional packages).

The default transport is synchronous to avoid forcing users to
configure the long running message consumer worker.

Command executed:
 composer req symfony/doctrine-messenger:^6.2 symfony/messenger:^6.2
 composer req symfony/doctrine-messenger:^6.2 symfony/messenger:^6.2 \
   -d typo3/sysext/core --no-update

Releases: main
Resolves: #97700
Change-Id: Ib034f70b8d6c0be974f807341df257c17dff42a1
Reviewed-on: https://review.typo3.org/c/Packages/TYPO3.CMS/+/77232


Tested-by: Benni Mack's avatarBenni Mack <benni@typo3.org>
Tested-by: Benjamin Franzke's avatarBenjamin Franzke <bfr@qbus.de>
Reviewed-by: ...
parent 883b98ce
......@@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "5cd2a99964df27eb44eb8b259ca6dd48",
"content-hash": "3d6e7320b040250f0419ac3004dd8352",
"packages": [
{
"name": "bacon/bacon-qr-code",
......@@ -2612,6 +2612,78 @@
],
"time": "2022-02-25T11:15:52+00:00"
},
{
"name": "symfony/doctrine-messenger",
"version": "v6.2.0",
"source": {
"type": "git",
"url": "https://github.com/symfony/doctrine-messenger.git",
"reference": "51f0c23a2b938d5ed4c9c859d502033c4cde3723"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/symfony/doctrine-messenger/zipball/51f0c23a2b938d5ed4c9c859d502033c4cde3723",
"reference": "51f0c23a2b938d5ed4c9c859d502033c4cde3723",
"shasum": ""
},
"require": {
"doctrine/dbal": "^2.13|^3.0",
"php": ">=8.1",
"symfony/messenger": "^5.4|^6.0",
"symfony/service-contracts": "^1.1|^2|^3"
},
"conflict": {
"doctrine/persistence": "<1.3"
},
"require-dev": {
"doctrine/persistence": "^1.3|^2|^3",
"symfony/property-access": "^5.4|^6.0",
"symfony/serializer": "^5.4|^6.0"
},
"type": "symfony-messenger-bridge",
"autoload": {
"psr-4": {
"Symfony\\Component\\Messenger\\Bridge\\Doctrine\\": ""
},
"exclude-from-classmap": [
"/Tests/"
]
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Fabien Potencier",
"email": "fabien@symfony.com"
},
{
"name": "Symfony Community",
"homepage": "https://symfony.com/contributors"
}
],
"description": "Symfony Doctrine Messenger Bridge",
"homepage": "https://symfony.com",
"support": {
"source": "https://github.com/symfony/doctrine-messenger/tree/v6.2.0"
},
"funding": [
{
"url": "https://symfony.com/sponsor",
"type": "custom"
},
{
"url": "https://github.com/fabpot",
"type": "github"
},
{
"url": "https://tidelift.com/funding/github/packagist/symfony/symfony",
"type": "tidelift"
}
],
"time": "2022-11-04T07:42:34+00:00"
},
{
"name": "symfony/event-dispatcher",
"version": "v6.2.0",
......@@ -3120,6 +3192,93 @@
],
"time": "2022-11-28T17:18:31+00:00"
},
{
"name": "symfony/messenger",
"version": "v6.2.2",
"source": {
"type": "git",
"url": "https://github.com/symfony/messenger.git",
"reference": "b8621076ecbef9ef7d53e1a92b1da3fdad3cc3f8"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/symfony/messenger/zipball/b8621076ecbef9ef7d53e1a92b1da3fdad3cc3f8",
"reference": "b8621076ecbef9ef7d53e1a92b1da3fdad3cc3f8",
"shasum": ""
},
"require": {
"php": ">=8.1",
"psr/log": "^1|^2|^3"
},
"conflict": {
"symfony/event-dispatcher": "<5.4",
"symfony/event-dispatcher-contracts": "<2",
"symfony/framework-bundle": "<5.4",
"symfony/http-kernel": "<5.4",
"symfony/serializer": "<5.4"
},
"require-dev": {
"psr/cache": "^1.0|^2.0|^3.0",
"symfony/console": "^5.4|^6.0",
"symfony/dependency-injection": "^5.4|^6.0",
"symfony/event-dispatcher": "^5.4|^6.0",
"symfony/http-kernel": "^5.4|^6.0",
"symfony/process": "^5.4|^6.0",
"symfony/property-access": "^5.4|^6.0",
"symfony/rate-limiter": "^5.4|^6.0",
"symfony/routing": "^5.4|^6.0",
"symfony/serializer": "^5.4|^6.0",
"symfony/service-contracts": "^1.1|^2|^3",
"symfony/stopwatch": "^5.4|^6.0",
"symfony/validator": "^5.4|^6.0"
},
"suggest": {
"enqueue/messenger-adapter": "For using the php-enqueue library as a transport."
},
"type": "library",
"autoload": {
"psr-4": {
"Symfony\\Component\\Messenger\\": ""
},
"exclude-from-classmap": [
"/Tests/"
]
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Samuel Roze",
"email": "samuel.roze@gmail.com"
},
{
"name": "Symfony Community",
"homepage": "https://symfony.com/contributors"
}
],
"description": "Helps applications send and receive messages to/from other applications or via message queues",
"homepage": "https://symfony.com",
"support": {
"source": "https://github.com/symfony/messenger/tree/v6.2.2"
},
"funding": [
{
"url": "https://symfony.com/sponsor",
"type": "custom"
},
{
"url": "https://github.com/fabpot",
"type": "github"
},
{
"url": "https://tidelift.com/funding/github/packagist/symfony/symfony",
"type": "tidelift"
}
],
"time": "2022-12-14T16:11:27+00:00"
},
{
"name": "symfony/mime",
"version": "v6.2.0",
......
<?php
declare(strict_types=1);
/*
* This file is part of the TYPO3 CMS project.
*
* It is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License, either version 2
* of the License, or any later version.
*
* For the full copyright and license information, please read the
* LICENSE.txt file that was distributed with this source code.
*
* The TYPO3 project - inspiring people to share!
*/
namespace TYPO3\CMS\Core\Command;
use Psr\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Completion\CompletionInput;
use Symfony\Component\Console\Completion\CompletionSuggestions;
use Symfony\Component\Console\Exception\RuntimeException;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Logger\ConsoleLogger;
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Question\ChoiceQuestion;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Worker;
use TYPO3\CMS\Core\Messenger\EventListener\StopWorkerOnTimeLimitListener;
/**
* Heavily stripped-down version of the symfony command with the same name.
*/
class ConsumeMessagesCommand extends Command
{
public function __construct(
private readonly MessageBusInterface $messageBus,
private readonly ServiceLocator $receiverLocator,
private readonly StopWorkerOnTimeLimitListener $stopWorkerOnTimeLimitListener,
private readonly EventDispatcherInterface $eventDispatcher,
private readonly array $receiverNames = [],
private readonly array $busIds = [],
) {
parent::__construct();
}
protected function configure(): void
{
$defaultReceiverName = count($this->receiverNames) === 1 ? current($this->receiverNames) : null;
$this
->setDefinition(
[
new InputArgument(
'receivers',
InputArgument::IS_ARRAY,
'Names of the receivers/transports to consume in order of priority',
$defaultReceiverName ? [$defaultReceiverName] : []
),
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
new InputOption('exit-code-on-limit', null, InputOption::VALUE_REQUIRED, 'Exit code when limits are reached', 0),
]
)
->setHelp(
<<<'EOF'
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
<info>php %command.full_name% <receiver-name></info>
To receive from multiple transports, pass each name:
<info>php %command.full_name% receiver1 receiver2</info>
Use the --queues option to limit a receiver to only certain queues (only supported by some receivers):
<info>php %command.full_name% <receiver-name> --queues=fasttrack</info>
EOF
);
}
protected function interact(InputInterface $input, OutputInterface $output)
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
if ($this->receiverNames && !$input->getArgument('receivers')) {
$io->block('Which transports/receivers do you want to consume?', null, 'fg=white;bg=blue', ' ', true);
$io->writeln('Choose which receivers you want to consume messages from in order of priority.');
if (count($this->receiverNames) > 1) {
$io->writeln(sprintf('Hint: to consume from multiple, use a list of their names, e.g. <comment>%s</comment>', implode(', ', $this->receiverNames)));
}
$question = new ChoiceQuestion('Select receivers to consume:', $this->receiverNames, 0);
$question->setMultiselect(true);
$input->setArgument('receivers', $io->askQuestion($question));
}
if (!$input->getArgument('receivers')) {
throw new RuntimeException('Please pass at least one receiver.', 1605305001);
}
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$exitCodeOnLimit = (int)($input->getOption('exit-code-on-limit'));
$receivers = [];
$rateLimiters = [];
$receiverNames = $input->getArgument('receivers');
foreach ($receiverNames as $receiverName) {
if (!$this->receiverLocator->has($receiverName)) {
$message = sprintf('The receiver "%s" does not exist.', $receiverName);
if ($this->receiverNames) {
$message .= sprintf(' Valid receivers are: %s.', implode(', ', $this->receiverNames));
}
throw new RuntimeException($message, 1605305002);
}
$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
}
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$io->success(sprintf('Consuming messages from transport%s "%s".', count($receivers) > 1 ? 's' : '', implode(', ', $receiverNames)));
$io->comment('Quit the worker with CONTROL-C.');
if ($output->getVerbosity() < OutputInterface::VERBOSITY_VERBOSE) {
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
}
$worker = new Worker($receivers, $this->messageBus, $this->eventDispatcher, new ConsoleLogger($output), $rateLimiters);
$options = [
'sleep' => $input->getOption('sleep') * 1000000,
];
$queues = $input->getOption('queues');
if ($queues) {
$options['queues'] = $queues;
}
$worker->run($options);
return $this->stopWorkerOnTimeLimitListener->hasStopped() ? $exitCodeOnLimit : 0;
}
public function complete(CompletionInput $input, CompletionSuggestions $suggestions): void
{
if ($input->mustSuggestArgumentValuesFor('receivers')) {
$suggestions->suggestValues(array_diff($this->receiverNames, array_diff($input->getArgument('receivers'), [$input->getCompletionValue()])));
return;
}
if ($input->mustSuggestOptionValuesFor('bus')) {
$suggestions->suggestValues($this->busIds);
}
}
}
<?php
declare(strict_types=1);
/*
* This file is part of the TYPO3 CMS project.
*
* It is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License, either version 2
* of the License, or any later version.
*
* For the full copyright and license information, please read the
* LICENSE.txt file that was distributed with this source code.
*
* The TYPO3 project - inspiring people to share!
*/
namespace TYPO3\CMS\Core\DependencyInjection;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Definition;
use Symfony\Component\DependencyInjection\Exception\RuntimeException;
use TYPO3\CMS\Core\Messenger\HandlersLocatorFactory;
use TYPO3\CMS\Core\Service\DependencyOrderingService;
use TYPO3\CMS\Core\Utility\GeneralUtility;
/**
* @internal
*/
final class MessageHandlerPass implements CompilerPassInterface
{
private readonly string $tagName;
private ContainerBuilder $container;
private DependencyOrderingService $orderingService;
public function __construct(string $tagName)
{
$this->tagName = $tagName;
$this->orderingService = new DependencyOrderingService();
}
public function process(ContainerBuilder $container): void
{
$this->container = $container;
$handlersLocatorFactory = $container->findDefinition(HandlersLocatorFactory::class);
foreach ($this->collectHandlers($container) as $message => $handlers) {
foreach ($this->orderingService->orderByDependencies($handlers) as $handler) {
$handlersLocatorFactory->addMethodCall('addHandler', [
$message,
$handler['service'],
$handler['method'] ?? '__invoke',
]);
}
}
}
/**
* Collects all handlers from the container.
*/
private function collectHandlers(ContainerBuilder $container): array
{
$unorderedHandlers = [];
foreach ($container->findTaggedServiceIds($this->tagName) as $serviceName => $tags) {
$service = $container->findDefinition($serviceName);
$service->setPublic(true);
foreach ($tags as $attributes) {
$messageHandler = $attributes['message'] ?? $this->getParameterType($serviceName, $service, $attributes['method'] ?? '__invoke');
if (!$messageHandler) {
throw new \InvalidArgumentException(
'Service tag "messenger.message_handler" requires a message attribute to be defined or the method must declare a parameter type. Missing in: ' . $serviceName,
1606732015
);
}
$messageIdentifier = $attributes['identifier'] ?? $serviceName;
$unorderedHandlers[$messageHandler][$messageIdentifier] = [
'service' => $serviceName,
'method' => $attributes['method'] ?? null,
'before' => GeneralUtility::trimExplode(',', $attributes['before'] ?? '', true),
'after' => GeneralUtility::trimExplode(',', $attributes['after'] ?? '', true),
];
}
}
return $unorderedHandlers;
}
/**
* Derives the class type of the first argument of a given method.
*/
private function getParameterType(string $serviceName, Definition $definition, string $method = '__invoke'): ?string
{
// A Reflection exception should never actually get thrown here, but linters want a try-catch just in case.
try {
if (!$definition->isAutowired()) {
throw new \InvalidArgumentException(
sprintf(
'Service "%s" has message handlers defined but does not declare a message to handle to and is not configured to autowire it from the handle method. Set autowire: true to enable auto-detection of the handled message.',
$serviceName
),
1606732016,
);
}
$params = $this->getReflectionMethod($serviceName, $definition, $method)->getParameters();
$rType = count($params) ? $params[0]->getType() : null;
if (!$rType instanceof \ReflectionNamedType) {
throw new \InvalidArgumentException(
sprintf(
'Service "%s" registers method "%s" as a message handler, but does not specify a message type and the method does not type a parameter. Declare a class type for the method parameter or specify a message class explicitly',
$serviceName,
$method
),
1606732017,
);
}
return $rType->getName();
} catch (\ReflectionException $e) {
// The collectHandlers() method will convert this to an exception.
return null;
}
}
/**
* @throws RuntimeException|\ReflectionException
* This method borrowed very closely from Symfony's AbstractRecursivePass (and the ListenerProviderPass).
* @see \TYPO3\CMS\Core\DependencyInjection\ListenerProviderPass::getReflectionMethod()
*/
private function getReflectionMethod(string $serviceName, Definition $definition, string $method): \ReflectionFunctionAbstract
{
if (!$class = $definition->getClass()) {
throw new RuntimeException(sprintf('Invalid service "%s": the class is not set.', $serviceName), 1606732018);
}
if (!$r = $this->container->getReflectionClass($class)) {
throw new RuntimeException(sprintf('Invalid service "%s": class "%s" does not exist.', $serviceName, $class), 1606732019);
}
if (!$r->hasMethod($method)) {
throw new RuntimeException(sprintf('Invalid service "%s": method "%s()" does not exist.', $serviceName, $class !== $serviceName ? $class . '::' . $method : $method), 1606732020);
}
$r = $r->getMethod($method);
if (!$r->isPublic()) {
throw new RuntimeException(sprintf('Invalid service "%s": method "%s()" must be public.', $serviceName, $class !== $serviceName ? $class . '::' . $method : $method), 1606732021);
}
return $r;
}
}
<?php
declare(strict_types=1);
/*
* This file is part of the TYPO3 CMS project.
*
* It is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License, either version 2
* of the License, or any later version.
*
* For the full copyright and license information, please read the
* LICENSE.txt file that was distributed with this source code.
*
* The TYPO3 project - inspiring people to share!
*/
namespace TYPO3\CMS\Core\DependencyInjection;
use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Reference;
use TYPO3\CMS\Core\Messenger\BusFactory;
use TYPO3\CMS\Core\Service\DependencyOrderingService;
use TYPO3\CMS\Core\Utility\GeneralUtility;
/**
* @internal
*/
final class MessengerMiddlewarePass implements CompilerPassInterface
{
private string $tagName;
private DependencyOrderingService $orderer;
public function __construct(string $tagName)
{
$this->tagName = $tagName;
$this->orderer = new DependencyOrderingService();
}
public function process(ContainerBuilder $container): void
{
$busFactory = $container->findDefinition(BusFactory::class);
$groupedMiddlewares = $this->collectMiddlewares($container);
$middlewares = [];
foreach ($groupedMiddlewares as $bus => $unorderedMiddlewares) {
$middlewares[$bus] = [];
foreach ($this->orderer->orderByDependencies($unorderedMiddlewares) as $middleware) {
$middlewares[$bus][] = new Reference($middleware['service']);
}
}
$busFactory->setArgument('$middlewares', array_map(
fn (array $busMiddlewares): IteratorArgument => new IteratorArgument($busMiddlewares),
$middlewares
));
}
/**
* Collects all messenger middlewares from the container and prepares them for ordering
*/
private function collectMiddlewares(ContainerBuilder $container): array
{
$unorderedMiddlewares = [];
foreach ($container->findTaggedServiceIds($this->tagName) as $serviceName => $tags) {
$service = $container->findDefinition($serviceName);
foreach ($tags as $attributes) {
$bus = $attributes['bus'] ?? 'default';
$unorderedMiddlewares[$bus][$serviceName] = [
'service' => $serviceName,
'before' => GeneralUtility::trimExplode(',', $attributes['before'] ?? '', true),
'after' => GeneralUtility::trimExplode(',', $attributes['after'] ?? '', true),
];
}
}
return $unorderedMiddlewares;
}
}