AMQP
在线手册:中文  英文

范例

Example #1 AMQP Example

<?php

// Create a connection
$cnn = new AMQPConnection();
$cnn->connect();

// Create a channel
$ch = new AMQPChannel($cnn);

// Declare a new exchange
$ex = new AMQPExchange($ch);
$ex->declare('exchange1'AMQP_EX_TYPE_FANOUT);

// Create a new queue
$q = new AMQPQueue($ch);
$q->declare('queue1');

// Bind it on the exchange to routing.key
$ex->bind('queue1''routing.key');

// Publish a message to the exchange with a routing key
$ex->publish('message''routing.key');

// Read from the queue
$msg $q->consume();

?>

AMQP
在线手册:中文  英文

用户评论:

bradyjvitrano at gmail dot com (2012-08-18 17:41:04)

This example assumes no previous queue or exchange has been declared.

Send File
<?php
/** 
 * Filename: send.php
 * Purpose: 
 * Send messages to RabbitMQ server using AMQP extension
 * Exchange Name: exchange1
 * Exchange Type: fanout
 * Queue Name: queue1
 */
$connection = new AMQPConnection();
$connection->connect();
if (!
$connection->isConnected()) {
    die(
'Not connected :(' PHP_EOL);
}
// Open Channel
$channel    = new AMQPChannel($connection);
// Declare exchange
$exchange   = new AMQPExchange($channel);
$exchange->setName('exchange1');
$exchange->setType('fanout');
$exchange->declare();
// Create Queue
$queue      = new AMQPQueue($channel);
$queue->setName('queue1');
$queue->declare();

$message    $exchange->publish('Custom Message (ts): '.time(), 'key1');
if (!
$message) {
    echo 
'Message not sent'PHP_EOL;
} else {
    echo 
'Message sent!'PHP_EOL;
}
?>

Receive File
<?php
/** 
 * Filename: receive.php
 * Purpose: 
 * Receive messages from RabbitMQ server using AMQP extension
 * Exchange Name: exchange1
 * Exchange Type: fanout
 * Queue Name: queue1
 */
$connection = new AMQPConnection();
$connection->connect();
if (!
$connection->isConnected()) {
    die(
'Not connected :('PHP_EOL);
}
// Open channel
$channel    = new AMQPChannel($connection);
// Open Queue and bind to exchange
$queue      = new AMQPQueue($channel);
$queue->setName('queue1');
$queue->bind('exchange1''key1');
$queue->declare();
// Prevent message redelivery with AMQP_AUTOACK param
while ($envelope $queue->get(AMQP_AUTOACK)) {
    echo (
$envelope->isRedelivery()) ? 'Redelivery' 'New Message';
    echo 
PHP_EOL;
    echo 
$envelope->getBody(), PHP_EOL;
}

?>

Test It
$ php -f send.php
$ php -f receive.php

jean dot weisbuch at phpnet dot org (2012-06-14 01:17:31)

The official documentation is not up to date as of June 2012 for the latest stable version (1.0.3) so i had to find semi-blindly how i could use the extension (thanks to some snippets on User Contributed Notes).

Here is a simple way to publish and retrieve messages (tested using RabbitMQ as the broker) that works on the stable version of the extension :

<?php
function amqp_connection() {
    
$amqpConnection = new AMQPConnection();
    
$amqpConnection->setLogin("username");
    
$amqpConnection->setPassword("123456");
    
$amqpConnection->setVhost("virthost");
    
$amqpConnection->connect();

    if(!
$amqpConnection->isConnected()) {
        die(
"Cannot connect to the broker, exiting !\n");
    }
    return 
$amqpConnection;
}

function 
amqp_receive($exchangeName$routingKey$queueName) {
    
$amqpConnection amqp_connection();

    
$channel = new AMQPChannel($amqpConnection);
    
$queue = new AMQPQueue($channel);
    
$queue->setName($queueName);
    
$queue->bind($exchangeName$routingKey);

    while(
$message $queue->get()) {
        echo(
"Message #".$message->getDeliveryTag()." '".$message->getBody()."'");

        if(
$message->isRedelivery()) {
            echo(
"\t(this message has already been delivered)");
        }
        
// just for testing purpose, shows how to manually remove a message from queue
        
if(rand(0,6) > 4) {
            
$queue->ack($message->getDeliveryTag());
            echo(
"\t(this message has been removed from the queue)");
        }
        
print_r($message->getMessageId());
        echo 
"\n";
    }

    if(!
$amqpConnection->disconnect()) {
        throw new 
Exception("Could not disconnect !");
    }
}

function 
amqp_send($text$routingKey$exchangeName){
    
$amqpConnection amqp_connection();

    
$channel = new AMQPChannel($amqpConnection);
    
$exchange = new AMQPExchange($channel);
    
$exchange->setName($exchangeName);
    
$exchange->setType("fanout");
        
$message $exchange->publish($text$routingKey);
        if(!
$message) {
        echo 
"Error: Message '".$message."' was not sent.\n";
    } else {
        echo 
"Message '".$message."' sent.\n";
    }

    if (!
$amqpConnection->disconnect()) {
        throw new 
Exception("Could not disconnect !");
    }
}

// lets send a message with a "random" content (the date)
amqp_send("Message added at this date: ".date(DATE_RFC822), "action""amq.fanout");

// you need to sleep for 1 sec if you want to be able to receive the message you just sent ("limitation" must be on the broker side)
sleep(1);

// now we receive messages from the queue we just sent a message on
amqp_receive("amq.fanout","action","action");
?>

The rand() part of the script is used to make it more "lively" for testing when the script is run many times by randomly acknowledging messages (and show how to acknowledge non-automatically a message after it has been read).

info at eeasoftware dot com (2012-02-25 20:54:51)

I spent several hours looking for some examples of sending messages to worker queues vs. fan queues.  The documentation isn't too clear and I had to consult many sources, so here are simple methods for both types of 'producers'.  For brevity I've left out 'safety' checks in the code dealing with whether or not a connection was established, error checking, cleanup, etc.

As further clarification, I'm using rabbitmq as my service, with java daemons running on the server as 'consumers'.  These examples are current with the new 1.0.0 stable AMQP release in February of 2012.

<?php

//set your connection arguments and connect to the server
$conn_args = array('host' => 'your_host''port' => 'your_host_port''login' => 'your_username''password' => 'your_password');
$conn = new AMQPConnection($conn_args);
$conn->connect();

//create your message
$message 'Hello World!';

//create a channel and exchange
$channel = new AMQPChannel($conn);
$exchange = new AMQPExchange($channel);

//Here is where the code splits for the different types of queues:

//-------------------------------------------------
//For Fan Type Queues (One message will be consumed by many consumers - each on a queue of their own)

//set the exchange name and publish the message
$exchange->setName('exchange_name');
$exchange->publish($message'your_routing_key');

//for Fan Queue calls not bound to a particular key, the 'routing_key' will simply be ignored

//-------------------------------------------------
//For Worker Type Queues (Many consumers are listening to a single queue, and a single message will be passed only to the next available consumer waiting in line)

//start a transaction
$channel->startTransaction();

//publish your message with 'your_queue_name' as the 'routing_key'
$exchange->publish($message'your_queue_name');

//commit your transaction
$channel->commitTransaction();

//-------------------------------------------------

//psuedo code: 
//clean up $channel; - probably just unset($channel) will do.
//clean up $exchange; - probably just unset($exchange) will do.

//disconnect from server
$conn->disconnect();

?>

kurt at surfmerchants dot com (2011-05-24 09:05:08)

The docs for this extension are a little vague, so I'm adding a few things I found helpful in getting this to work.

Helpful reading:
* http://en.wikipedia.org/wiki/AMQP
* http://www.rabbitmq.com/getstarted.html

Helpful tool:
/usr/sbin/rabbitmqctl list_queues

Here I've split the usage example into a sender and receiver, because that is much more useful (at least to me):

send.php :
#!/usr/bin/php -q
<?php
// config
$exchangeName 'myexchange';
$routingKey 'routing.key';
$message $argv[1];

// connect
$connection = new AMQPConnection();
$connection->connect();

// setup exchange
$ex = new AMQPExchange($connection);
$ex->declare($exchangeNameAMQP_EX_TYPE_FANOUT);

// Publish a message to the exchange with our routing key
$ex->publish($message$routingKey);

// disconnect
$connection->disconnect();

?>

receive.php:
#!/usr/bin/php -q
<?php
// config
$exchangeName 'myexchange';
$routingKey 'routing.key';
$queueName 'myqueue';

// connect
$connection = new AMQPConnection();
$connection->connect();

// setup our queue
$q = new AMQPQueue($connection);
$q->declare($queueName);

// Bind it on the exchange to routing.key
$q->bind($exchangeName$routingKey);

// show the message
print_r($q->get());

// disconnect
$connection->disconnect();

?>

The first time you run "./send.php 'hello world'", then "./receive.php", you will not get the message, because the receiver creates the queue. That's OK. While you *can* create the queue in the sender, this is not so useful, because it's really the receivers that read the queue, and I've found that various clients are very picky about the details of how the queue was created (for example, if I create the queue in my php sender, then try to listen with a python receiver, it throws errors, but if I let the python receiver create the queue, all is well).
So, the second time you send a message (after running receive.php once), the queue will exist, and you will then get the message the next time you run receive.

Another thing I noticed is that I'm using $q->get() instead of $q->consume(), because the latter seems to segfault (at least on my CentOS 5.5 system).

易百教程