(PECL amqp >= Unknown)
AMQPQueue::consume — Consume messages from a queue
Blocking function that will retrieve the next message from the queue as it becomes available and will pass it off to the callback.
callback
A callback function to which the consumed message will be passed. The function must accept at a minimum one parameter, an AMQPEnvelope object, and an optional second parameter the AMQPQueue from which the message was consumed.
The AMQPQueue::consume() will not return the processing thread back to the PHP script until the callback function returns FALSE
.
flags
A bitmask of any of the flags: AMQP_NOACK
.
Throws AMQPChannelException if the channel is not open.
Throws AMQPConnectionException if the connection to the broker was lost.
Example #1 AMQPQueue::consume() example
<?php
/* Create a connection using all default credentials: */
$connection = new AMQPConnection();
$connection->connect();
$channel = new AMQPChannel($connection);
/* create a queue object */
$queue = new AMQPQueue($channel);
//declare the queue
$queue->declare('myqueue');
$i = 0;
function processMessage($envelope, $queue) {
global $i;
echo "Message $i: " . $envelope->getBody() . "\n";
$i++;
if ($i > 10) {
// Bail after 10 messages
return false;
}
}
// Consume messages on queue
$queue->consume("processMessage");
?>
liuxiangchao at ometworks dot com (2013-01-28 09:15:57)
To consume ALL messages stored DURABLE exchanges, you will need to set channel's prefetch size parameter to 0:
<?php $channel->setPrefetchCount(0); ?>
pinepain at gmail dot com (2012-12-27 18:00:20)
Be careful using this function with non-zero amqp.timeout (you may check at AMQPConnection::getTimeout), because it looks like timeout value says how long to wait for a new message from broker before die in way like
Fatal error: Uncaught exception 'AMQPConnectionException' with message 'Resource temporarily unavailable' in /path/to/your/file.php:12
Stack trace:
#0 /path/to/your/file.php(12): AMQPQueue->consume(Object(Closure), 128)
#1 {main}
thrown in /path/to/your/file.php on line 12
As for notes about blocking, system resources greediness and so and so, you can investigate how it works by looking in amqp_queue.c for read_message_from_channel C function declaration and PHP_METHOD(amqp_queue_class, consume) method declaration. For me it works perfectly without any uncommon resources usage or I/O performance degradation under the load of 10k 64b message per second with delivery time for less than 0.001 sec.
OS: FreeBSD *** 8.2-RELEASE FreeBSD 8.2-RELEASE #0: Sat Mar ****** 2011 root@*****:**** amd64
PHP: PHP Version => 5.3.10, Suhosin Patch 0.9.10, Zend Engine v2.3.0
php AMQP extnsion:
amqp
Version => 1.0.9
Revision => $Revision: 327551 $
Compiled => Dec 2* 2012 @ *****
AMQP protocol version => 0-9-1
librabbitmq version => 0.2.0
Directive => Local Value => Master Value
amqp.auto_ack => 0 => 0
amqp.host => localhost => localhost
amqp.login => guest => guest
amqp.password => guest => guest
amqp.port => 5672 => 5672
amqp.prefetch_count => 3 => 3
amqp.timeout => 0 => 0
amqp.vhost => / => /
AMQP broker: RabbitMQ 3.0.1, Erlang R14B04
Definitely, such loop will block main thread, but due to single-thread PHP nature it's completely normal behavior. To exit this consumption loop your callback function or method (i prefer to use closures, btw) should return FALSE.
The benefit of this function is that you don't have manually iteration for all messages, and what is more important, if there is no unprocessed messages in queue it will wait for such for you.
So you have just to run you consumer (one or many) and optionally time to time check whether they still alive just for reason if you are not sure about callbacks or memory-limit-critical stuff
Laurent (2012-08-27 12:34:57)
Be careful using consume() function on AMQP. It will catch all Exception and fall down in infinite loop (message will not be marked as readed and reput in queue)
peter dot colclough at toolstation dot com (2011-08-09 03:10:06)
Using AMQP_consume, against a RabbitMQ server, actually stuffs memory. It will work in a loop, or on a constant recall, so long as your exchange/queue and messages are set to durable. However, it will alo make the system unusable within a couple of minutes.
Using get(), all is fine. I think this may be a bugette in teh PHP access code... ff to take a look.
hlopetz at gmail dot com (2011-05-10 23:32:01)
you shouldn't use AMQPQueue::consume() if you have to get _all_ incoming messages. you'll get only "max" number of messages and the queue will be destroyed then.
for my amqp.so v0.2.2 this weird behavior is true.
use AMQPQueue::get() and use "count" param instead.