----------------------------------
* Added ``BrokerConnection.drain_events()`` (only works for amqplib/pika)
`drain_events` waits for events on all active channels.
* amqplib: Added timeout support to ``drain_events``.
Example usage:
>>> c = Consumer()
>>> it = c.iterconsume()
>>> wait for event on any channel
>>> try:
... connection.drain_events(timeout=1)
... except socket.timeout:
pass
* Added Consumer.consume / ConsumerSet.consume
We're slowly moving away from ``iterconsume`` as this flow doesn't
make sense. You often want to consume from several channels at once,
so ``iterconsume`` will probably be deprecated at some point.
"new-style" consume example::
>>> connection = BrokerConnection(..)
>>> consumer = Consumer(connection, ...)
>>> consumer.register_callback(handle_message)
>>> consumer.consume() declares consumers
>>> while True:
... connection.drain_events()
>>> consumer.cancel() Cancel consumer.
More elaborate example consuming from two channels,
where the first channel consumes from multiple queues::
>>> connection = BrokerConnection(..)
The first channel receives jobs from several
queues.
>>> queues = {"video": {"exchange": "jobs",
... "queue": "video",
... "routing_key": "video"},
... "image": {"exchange": "jobs",
... "queue": "image",
... "routing_key": "image"}}
>>> job_consumer = ConsumerSet(connection, from_dict=queues)
>>> job_consumer.register_callback(handle_job)
>>> job_consumer.consume()
The second channel receives remote control commands.
>>> remote_consumer = Consumer(connection, queue="control",
... exchange="control")
>>> remote_consumer.register_callback(handle_remote_command)
>>> remote_consumer.consume()
The eventloop.
Receives a single message a pass and calls the appropriate
callback.
>>> try:
... while True:
... connection.drain_events()
... finally:
... job_consumer.close()
... remote_consumer.close()
... connection.close()
* amqplib: now raises ``KeyError`` if hostname isn't set.