I’ve been away from writing blog posts — the last one was more than two years ago. I decided to write one on this topic, however; I found out that implementing a queue was harder than it should be (in my opinion). Although Magento’s shenanigans don’t really surprise me anymore, the complexity of getting a simple queue up and running caught me (and my estimate) by surprise. There’s a few other blog posts on this topic already, but I found they don’t quite get into the nitty gritty that you potentially can get stuck on.

That said, today I’m going to write about how to create a queue that does additional order processing using Magento 2 and MySQL only — without RabbitMQ.

Why asynchronous processing of orders?

One use case is when you want to synchronize orders to an external platform, do some post processing, or simply want to make some API requests related to a new order. There are plenty of ways to hook into the order placement flow. If you can, however, this should be done asynchronously.

Asynchronous processing of orders is preferred because it doesn’t delay the order flow from the customer’s viewpoint . Any latency or errors that happen during the order handling will not interfere with the customer’s experience — the task happens in a separate PHP process in the background, usually triggered by a cron-job.

RabbitMQ or MySQL?

Magento 2 supports RabbitMQ and it’s officially recommended to use RabbitMQ; it scales better. However, I think MySQL has plenty of “scale” for the vast majority of use-cases. Running a simple queue can be hardly considered a bottleneck. Especially if all it's doing is keeping track of new order IDs. If anything, a separate node can be used to consume the queue.

The reality is, it’s harder to set-up RabbitMQ, it’s another service to maintain, and it so happened that my Docker development environment didn’t come with RabbitMQ.

Edit: I now use Warden as my development environment which comes with RabbitMQ.

It’s probably above reasons that Magento added a MySQL driver for their queueing functionality.

As a side-note, Magento 1 didn’t have a queue/functionality to process events asynchronously. For M1, I’ve always used a ProxiBlue module.

Edit #2: Magento actually uses MySQL for 14 out of its 15 core queues. More on that below (see "Some futher comments on the two connection types").

Which packages are responsible for queueing functionality?

magento/framework-message-queue: Magento’s main message queue functionality.

magento/framework-amqp: Adds AMQP functionality to the above package; handles connections to external queueing systems.

magento/module-message-queue: Ties framework-message-queue into the Magento application, and adds some application functionality like CLI commands and crontabs.

magento/module-mysql-mq: Adds MySQL driver so you don’t need an external queueing system like RabbitMQ. We’ll talk more about this one.

In an ideal world framework packages can be potentially used as a standalone package, without the Magento application itself. Therefore, framework packages don’t contain XML configuration specific to Magento. There needs to be an entrypoint to the framework functionality through a module package (in this case module-message-queue).

Publishing events

First I created a standard event observer that listens to the event sales_order_place_after (triggered in \Magento\Sales\Model\Order). However, I had to change this to a plugin; at the time of dispatching sales_order_place_after the order has not been persisted to the database yet (and thus we cannot get its ID).

This right way to do this is to create an after plugin for the method OrderManagementInterface::place, as it fires after the order has been persisted to the database. On top of that, it’s marked with an API notation and our code therefore complies with the service contract.

My afterPlace method looks like this:

public function afterPlace(
    OrderManagementInterface $subject,
    OrderInterface $return
) {
    try {
        $this->publisher->publish(self::TOPIC_NAME, $return->getId());
    } catch (\Exception $e) {
        $this->logger->error($e);
    }

    return $return;
}

Publisher is an instance of Magento\Framework\MessageQueue\Publisher.

While not really relevant, logger is an instance of Psr\Log\LoggerInterface.

The topic name that I’ve picked is {vendor}.order.create. We’ll get to that soon.

The publish method lets you publish any data as long as it’s json encodable (any scalar, array, associative array or simple object that’s castable to any of those).

Add the correct configuration to di.xml. I use the global di.xml instead of limiting it to an area, that’s because I want to include orders placed through the API, frontend as well as the admin.

Now, if you run this, it’ll fail with error: "Topic vendor.order.create is not configured." That’s because we haven’t configured our message queue yet. We'll need about 4 XML files for that. Wait, What?

We need 4 XML files to create a simple queue?

You heard that right. The configuration is similar to an AMQP configuration, but not quite.

Magento’s configuration is written in a way that is compatible with AMQP, hence why it seems complicated to get a basic queue up. While AMQP has quite advanced capabilities, Magento’s XML configuration format (and Magento’s MySQL queue driver) don’t cover all of AMQP’s features.

An important note: In Magento Commerce 2.0 a configuration file with name queue.xml was used. That file is now deprecated. Instead the configuration has been split up into multiple files, also see the migration guide.

Since Magento 2.1, queueing functionality exists on both Commerce and Open Source editions.

So what’s AMQP?

This video is a great explanation; it helps wrapping your head around the following concepts:

  • Topic
  • Publisher
  • Exchange
  • Queue
  • Consumer

Configuring our topic

This will declare our topic, along with telling Magento about its datatype.

In your module’s etc directory, create an XML file named communication.xml with the following contents:

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="vendor.order.create" request="string"/>
</config>

In these examples, replace "vendor" with your vendor name.

More information about the specifics can be found in the official documentation.

If we try to run this, it will fail with error "Publisher vendor.order.create is not declared.". That's correct, so let’s create our publisher next.

Configuring our publisher

In your module's etc directory, create a file queue_publisher.xml with contents:

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
    <publisher topic="vendor.order.create">
        <connection name="db" exchange="vendor-order-exchange" />
    </publisher>
</config>

This file defines our publisher, along with the connection type and exchange for the topic.

Pick connection name db, and pick a unique name of your exchange.

When we run this, it should fail with “Message queue topic "vendor.order.create" is not configured.”. We’ve defined the publisher, but it doesn’t know where to route the event to — the exchange doesn’t exist and thus our event can’t be added to a queue.

It also might not fail, however the message won't be routed correctly so my guess is that it'll just stay there in limbo.

Configure the exchange

In your module's etc directory, create a queue_topology.xml file with contents:

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
    <exchange name="vendor-order-exchange" type="topic" connection="db">
        <binding id="VendorOrderCreate" topic="vendor.order.create" destinationType="queue" destination="vendor_order_create"/>
    </exchange>
</config>

The exchange name should match what was specified for the publisher. The type should be topic. This means that the exchange routes events to queues by matching the topic. Connection should be db.

At this point you’ll need to run bin/magento setup:upgrade for our queue to be installed. You can check the table queue to see if your queue was added correctly. The class responsible for this is a recurring setup script (vendor/magento/module-amqp/setup/recurring.php).

Keep in mind that any configuration can be overridden using global (app/etc/config.php) or environment config files (app/etc/env.php).

Patch for Magento 2.3

Before we go further, it’s critical to know that in Magento 2.3.1, 2.3.2 and 2.3.3 (the current latest version as of November 2019) there’s a bug with the MySQL queue driver when using the new XML format. The new XML format isn't recognized and doesn't actually work.

Luckily there's already an issue and pull request. You’ll have to create a composer patch from the PR (using cweagans/composer-patches):

"magento/module-mysql-mq": {
    "Fix: \\Magento\\MysqlMq\\Model\\Driver\\Exchange uses deprecated getQueuesByTopic, results in exception #21904": "patches/composer/21904-fix-message-queue-db-config.patch"
}

The root cause is that configuration was specified in queue.xml previously and now it’s split up into multiple files. Unfortunately the MySQL driver hasn’t been updated to reflect these changes.

After applying the patch, the functionality works correctly.

Interestingly enough, the official DevDocs mention that you need both the deprecated queue.xml file along with the new files if you want to use the MySQL driver. It seems that this bad workaround for this bug somehow made it in the official DevDocs, unintentionally. For more context, see my comment here.

Placing an order

Now that we've got our queue configured, go ahead and place an order. If no errors happened, you should see an entry appear in the queue_message and queue_message_status tables.

For convenience I’ve included the meaning of the status codes below:

  • MESSAGE_STATUS_NEW = 2;
  • MESSAGE_STATUS_IN_PROGRESS = 3;
  • MESSAGE_STATUS_COMPLETE= 4;
  • MESSAGE_STATUS_RETRY_REQUIRED = 5;
  • MESSAGE_STATUS_ERROR = 6;
  • MESSAGE_STATUS_TO_BE_DELETED = 7;

(Found in vendor/magento/module-mysql-mq/Model/QueueManagement.php.)

If all that worked correctly, let’s configure our queue consumer next.

Configuring the queue consumer

In your module's etc directory, create a file queue_consumer.xml with the following contents:

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <consumer name="VendorOrderCreate"
              queue="vendor_order_create"
              connection="db"
              handler="Vendor\Module\Model\Consumer::processOrder"/>
</config>

Our processOrder could look like this:

public function processOrder(string $orderId)
{
    try {
        $order = $this->orderRepository->get((int)$orderId);
        echo $order->getIncrementId() . PHP_EOL;
    } catch (\Exception $e) {
        $this->logger->error($e);
        return;
    }
 }

Check if your consumer is added correctly by flushing the cache, and running bin/magento queue:consumers:list. Your consumer should appear within that list.

How do we run our queue consumer?

It works as follows: a CLI process is started that polls the database to see if any new entries have been added to the queue. It’s basically an infinite loop that calls PHP’s sleep function.

This CLI process is started through the normal Magento cron. Whenever cron runs, it spawns a process (separate to the cron process) that, by default, runs almost forever — default is 10,000 consumed entries before it is terminated. This should be the main way that your queue consumers are triggered.

My cron doesn’t run in development mode. So I manually run bin/magento queue:consumers:start <task>.

If you don’t specify the option max-messages, it runs forever. So during development I typically run bin/magento queue:consumers:start --max-messages=1.

So in this case you would run bin/magento queue:consumers:start VendorOrderCreate --max-messages=1. If there is no message to consume, the process will just sit there until a message appears. It will automatically terminate once it has consumed n messages (n being the value of your max-messages argument.

You can’t have multiple consumer processes running at the same time; built-in locking exists to prevent this.

For Magento version 2.3.2 and below, the process will create a PID file inside the var directory named queueName.pid. Magento will check this file to see if there already is a consumer process running that’s polling. If there already is a running process, the new command will terminate immediately without consuming anything.

If you find that your command is terminated instantly without consuming your queue, you may find that you’ve got a process running in the background (triggered by a cronjob) that’s actually consuming the queue. So if you want to see output, just kill that off first (and delete the PID file).

Ensure that this PID file persists between deployments, because otherwise cron would be spawning multiple processes (thanks @cjnewbs for this tip).

Also check out Magento\MessageQueue\Model\Cron\ConsumersRunner for more information.

For Magento version 2.3.3 and above, this behavious has slightly changed. The use of the PID file based locking has been deprecated in favour of an abstraction called LockManagerInterface. It uses MySQL's locking functions by default, but can also be configured to use file, cache or Zookeeper based locking. The lock name is a md5 hash of the name of the queue. See this commit for more details: 1d9e07b.

Further configuration

Queues and consumers can be further configured in your main config (env.php).

There are two main keys. 'queue' lets you configure an AMQP host such as RabbitMQ (official documentation). 'cron_consumers_runner' lets you specify the behaviour of cron when it comes to consumers. For example, you can specify whether cron should initialise consumers or whether you want to run consumers manually instead ('cron_run' => false). You can also specify which consumers to run (if not all) and the maximum number of messages that a consumer should consume ('max_messages' => 20000).

The official documentation does a good job of explaining what each key does.

Some futher comments on the two connection types

We've learned that the connection type is configured on a queue-basis. This might be strange given that the specification of the queue shouldn't depend on the underlying service. The best case scenario would be to simply specify the connection type in your env.php file and be done with it. I assume this architectural decision was made because of the difference between AMQP and the simple MySQL queue implementation in Magento.

All but one queue use the db connection type. The only queue that uses AMQP out of the box is async.operations.all, which is used for the bulk asynchronous REST API. If you don't use that API, there's no need to configure RabbitMQ at all (except for getting rid of the log message Consumer "async.operations.all" skipped as required connection "amqp" is not configured. Unknown connection name amqp).

Although I haven't tested this myself, if you do want to use AMQP for other queues as well, you could overwrite the configuration of other queues. Also see my StackExchange answer here for more info.

Conclusion

I hope this provides a bit of understanding of creating queues using MySQL only. Of course this is applicable not just for processing orders, but for running any asynchronous task.

magento2 queue mysql amqp