Skip to content

Symfony/Messenger async (queued messages)🔗

References🔗

Quick start with Doctrine transport🔗

Install & configure symfony/messenger🔗

Install symfony/messenger:

1
composer require messenger doctrine-messenger

Note

doctrine-messenger is required as of Symfony 5+, since the transports were moved into their own packages; symfony/doctrine-messenger will not be a hard dependency of symfony/messenger as of Symfony 6+.

Uncomment the Doctrine transport line in the .env file:

1
2
3
4
5
6
###> symfony/messenger ###
# Choose one of the transports below
# MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
MESSENGER_TRANSPORT_DSN=doctrine://default
# MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
###< symfony/messenger ###

Create the event bus🔗

Tip

See the Command bus cookbook for command & query buses

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# config/packages/messenger.yaml
framework:
    messenger:
        buses:
            messenger.bus.events: 
                default_middleware: true
                middleware:
                    # https://symfony.com/doc/current/messenger.html#middleware-for-doctrine
                    - doctrine_ping_connection
                    - doctrine_close_connection
                    - doctrine_transaction
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<?php

namespace App\Infra\Bus;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;

class EventBus
{
    /** @var MessageBusInterface */
    private $eventBus;

    public function __construct(MessageBusInterface $eventBus)
    {
        $this->eventBus = $eventBus;
    }

    /**
     * Dispatches an event in a new transaction (after any other message being dispatched beforehand was handled successfully).
     * It's not mandatory but most events are handled asynchronously.
     *
     * @see https://symfony.com/doc/current/messenger/message-recorder.html about transactional dispatch
     *
     * @param bool $transactional Set it to false if you're not invoking the dispatch method from a handler
     */
    public function dispatch($event, bool $transactional = true): void
    {
        if ($transactional) {
            $envelope = Envelope::wrap($event, [
                // See https://symfony.com/doc/current/messenger/message-recorder.html
                new DispatchAfterCurrentBusStamp(),
            ]);
        }

        $this->eventBus->dispatch($envelope ?? $event);
    }
}

Note

As of Symfony 5.2+, the DispatchAfterCurrentBusStamp can be always added independently of the context (whether dispatching in a handler or not), as no more logic exception is thrown. See symfony/symfony/pull/37976

1
2
3
4
5
6
7
# config/services.yaml
services:
    event_bus:
        class: App\Infra\Bus\EventBus
        arguments: ['@messenger.bus.events']
    # Alias for event bus:
    App\Infra\Bus\EventBus: '@event_bus'

Doctrine as Transport🔗

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# config/packages/messenger.yaml
framework:
    messenger:
        # Uncomment this (and the failed transport below) to send failed messages to this transport for later handling.
        failure_transport: failed_events

        # https://symfony.com/doc/current/messenger.html#transport-configuration
        transports:
            events:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    auto_setup: false # Do not automatically create messenger DB tables
                    queue_name: event
                    table_name: messenger_messages
                retry_strategy:
                    max_retries: 3
                    # milliseconds delay - 10 seconds
                    delay: 10_000
                    # causes the delay to be higher before each retry
                    # e.g. 10 second delay, 20 seconds, 40 seconds, etc.
                    multiplier: 2
                    max_delay: 0
            failed_events:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    auto_setup: false
                    queue_name: failed_event
                    table_name: messenger_messages

Events and handlers🔗

An event🔗

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?php

namespace App\Application\MyNamespace\Event;

class MyEvent
{
    /**
     * @see https://symfony.com/doc/current/messenger.html#doctrine-entities-in-messages Rather pass identifiers in the event, not the full object
     * @var string
     */
    private $objectUuid;

    public function __construct(string $objectUuid)
    {
        $this->objectUuid = $objectUuid;
    }

    public function getObjectUuid(): string
    {
        return $this->objectUuid;
    }
}

A handler🔗

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
<?php

namespace App\Application\MyNamespace\Handler;

use App\Application\MyNamespace\Event\MyEvent;

class MyEventHandler
{
    public function __invoke(MyEvent $event): void
    {
        // do something with $event->getObjectUuid();
    }
}

Tag handlers in DIC🔗

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# config/services.yaml
services:
    _defaults:
        autowire: true
        autoconfigure: true
    event_handlers:
        namespace: App\Application\
        resource: '../src/Application/**/Handler/*EventHandler.php'
        public: true
        tags:
            - { name: 'messenger.message_handler', bus: 'messenger.bus.events' }

Route your event to the asynchronous transport🔗

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            events:
                # ...
            failed_events: 
                # ... 
        routing:
            # Route your messages to the 'events' transport:
            'App\Application\MyNamespace\Event\MyEvent': events

Dispatch event & run workers🔗

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
<?php

use App\Infra\Bus\EventBus;

class MyService
{
    /** EventBus */
    private $eventBus;

    public function __construct(EventBus $eventBus): void
    {
        $objectUuid = 'my-uuid';
        // This will trigger asynchronous handling by MyEventHandler:
        // Nota: always set the second argument to false if you do not dispatch the event in a handler
        $this->eventBus->dispatch(new MyEvent($objectUuid), false);
    }
}
1
bin/console messenger:consume events -vv

Finishing the job🔗

Setup transport in production🔗

Method #1: Doctrine migration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
    public function up(Schema $schema) : void
    {
            $this->abortIf($this->connection->getDatabasePlatform()->getName() !== 'mysql', 'Migration can only be executed safely on \'mysql\'.');

            $sql = <<<SQL
    CREATE TABLE `messenger_messages` (
      `id` bigint(20) NOT NULL,
      `body` longtext COLLATE utf8_unicode_ci NOT NULL,
      `headers` longtext COLLATE utf8_unicode_ci NOT NULL,
      `queue_name` varchar(255) COLLATE utf8_unicode_ci NOT NULL,
      `created_at` datetime NOT NULL,
      `available_at` datetime NOT NULL,
      `delivered_at` datetime DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
    SQL;

            $this->addSql($sql);

            $sql = <<<SQL
    ALTER TABLE `messenger_messages`
      ADD PRIMARY KEY (`id`),
      ADD KEY `IDX_75EA56E0FB7336F0` (`queue_name`),
      ADD KEY `IDX_75EA56E0E3BD61CE` (`available_at`),
      ADD KEY `IDX_75EA56E016BA31DB` (`delivered_at`);
    SQL;

            $this->addSql($sql);

            $this->addSql('ALTER TABLE `messenger_messages` MODIFY id BIGINT(20) NOT NULL AUTO_INCREMENT;');
    }

    public function down(Schema $schema) : void
    {
        $this->abortIf($this->connection->getDatabasePlatform()->getName() !== 'mysql', 'Migration can only be executed safely on \'mysql\'.');

        $this->addSql('DROP TABLE `messenger_messages`');
    }

Method #2: On deploy

1
2
3
4
5
6
7
8
9
# Makefile

warmup@staging:
    # ...
    bin/console messenger:setup-transports --ansi --no-debug --no-interaction

warmup@production:
    # ...
    bin/console messenger:setup-transports --ansi --no-debug --no-interaction

Make messenger tables ignored by Doctrine's schema tools🔗

1
2
3
4
# config/packages/doctrine.yaml
doctrine:
    dbal:
        schema_filter: '~^(?!messenger_messages)~'

Configure in-memory transport in test env🔗

1
2
3
4
5
6
7
8
# config/packages/test/messenger.yaml
framework:
  messenger:
    transports:
      events:
        dsn: 'in-memory://'
        retry_strategy:
          max_retries: 0 # No retry in tests

Default environment variable for MESSENGER_TRANSPORT_DSN (Optional)🔗

So that it is not required to provision env var to staging & production if your transport is Doctrine everywhere.

1
2
3
# config/services.yaml
parameters:
    'env(MESSENGER_TRANSPORT_DSN)': 'doctrine://default'

Bind event $eventBus in DIC (Optional)🔗

1
2
3
4
5
# config/services.yaml
services:
    _defaults:
        bind:
            App\Infra\Bus\EventBus $eventBus: '@event_bus'

Create an interface for your async events (Optional)🔗

Routing all your events to your events transport can be tedious. You can then declare an interface that all of your events implement and make routing easier.

The async event interface:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
<?php

namespace App\Application\Common\Event;

/**
 * Marker for messages to dispatch as events (async handling).
 */
interface AsyncEventInterface
{
}

Update your event accordingly (to implement this new interface):

1
2
3
4
5
6
7
8
<?php

use App\Application\Common\Event\AsyncEventInterface

class MyEvent implements AsyncEventInterface
{
    // ...
}

And finally route the interface (not every event type) to your transport:

1
2
3
4
5
6
7
# config/packages/messenger.yaml
framework:
    messenger:
        routing:
            # Remove the following line and replace it with the following:
            # 'App\Application\MyNamespace\Event\MyEvent': events
            'App\Application\Common\Event\AsyncEventInterface': events

From now on, every event which implements AsyncEventInterface will systematically be routed to the events transport.

Supervisor to manage workers🔗

Ref: https://symfony.com/doc/current/messenger.html#supervisor-configuration

Supervisor must run in production in order to manage your workers.

You can also use it during development.

Provision Supervisor🔗

Update your VM provisioning configuration file:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# .manala.yaml

supervisor:
    configs:
        - file: app.conf
          programs:
              messenger-consume:
                  command: php bin/console messenger:consume events --time-limit=3600 --limit=100 --no-interaction -vv
                  directory: /srv/app
                  stdout_logfile: /srv/log/supervisor.messenger-consume.log
                  startsecs: 1
                  startretries: 10
                  # Dev
                  autostart: false

Then manala up and provision:

1
make provision.supervisor

Tip

Is supervisor running ? In your VM: sudo service supervisor status

Then visit the URL of your VM on port 9001: http://<my-app>.vm:9001 to access the Supervisor administration board, stop your workers, restart them, view their logs, etc.

Supervisor admin

Restart workers on deploy🔗

You should also add a line to your .manala.yaml to restart Supervisor after deployments:

1
2
3
4
5
6
7
8
# .manala.yaml

releases:
  - &release_production:
    # …
    deploy_post_tasks:
      # …
      - shell: sudo /usr/bin/supervisorctl restart all

Common mistakes

If you use the startsecs option in your supervisor configuration, you will need to make sure that the limit option of the symfony command is high enough to not stop before the time limit

Elao's projects implementing async event bus🔗

Timestamps & keywords🔗

  • Last update: June 2020
  • Messenger version: ^4.4
  • Keywords: asynchronous, symfony/messenger, queued messages, event bus

Last update: December 20, 2024