amqp.py

Tornado Coroutine interface to AMQP (pika)

0

Votes

#!/usr/bin/python
import datetime
import logging
import threading
import weakref

import tornado
import tornado.gen
import pika

'''
Pika-Tornado: a Tornado coroutine-based abstraction layer for Pika.
'''

# (C) 2015 Stuart Longland <stuartl@longlandclan.yi.org>
# Released under the terms of the Mozilla Public License v2.0

class AMQPObject(object):
    @classmethod
    def _get_log(cls, *name):
        return logging.getLogger('.'.join(
            (cls.__module__, cls.__name__) + name))

class AMQPConnection(AMQPObject):
    '''
    Connection to AMQP, this object handles unexpected disconnections (and
    re-connects accordingly) and wraps the channel opening process in a
    coroutine-friendly manner.
    Support for multiple IOLoops is provided and thread-safety is attempted.
    Callbacks may be specified in a number of ways:
    - as a reference to the function directly
    - as a tuple:
        - first element gives the IOLoop (or None for the client's IOLoop)
        - second element gives the reference to the function
        - third element if present gives the initial args
        - forth element if present gives the initial kwargs
    '''

    CONNECTION_CLOSED   = pika.connection.Connection.CONNECTION_CLOSED
    CONNECTION_INIT     = pika.connection.Connection.CONNECTION_INIT
    CONNECTION_PROTOCOL = pika.connection.Connection.CONNECTION_PROTOCOL
    CONNECTION_START    = pika.connection.Connection.CONNECTION_START
    CONNECTION_TUNE     = pika.connection.Connection.CONNECTION_TUNE
    CONNECTION_OPEN     = pika.connection.Connection.CONNECTION_OPEN
    CONNECTION_CLOSING  = pika.connection.Connection.CONNECTION_CLOSING

    def __init__(self, parameters, on_open_callback=None,
            on_open_error_callback=None, on_close_callback=None,
            on_giveup_callback=None, reconnect_delay=5.0, reconnect_max=-1,
            io_loop=None, io_thread=None):

        '''
        Initialise a connection to AMQP.
        :param: pika.connection.Parameters parameters:
            Connection parameters, see Pika documentation.
        :param method on_open_callback:
            Called when the connection is opened.
        :param method on_open_error_callback:
            Called if the connection can't be opened.
        :param method on_close_callback:
            Called when the connection is closed.
        :param method on_giveup_callback:
            Called when the connection attempts have been exhausted
            and this library "gives up".  This might be used to log
            an alarm and shut down the daemon.
        :param float reconnect_delay:
            The amount of time to wait before attempting to reconnect.
            Set to 0 to disable reconnect.
        :param int reconnect_max:
            The maximum number of reconnection attempts.  Set to 0 to
            permit unlimited reconnections.
        :param tornado.ioloop.IOLoop io_loop:
            IOLoop instance to use for the AMQP client itself.  By default,
            the current IOLoop is used.
        :param threading.Thread io_thread:
            Thread used for AMQP communications.  Since Pika itself is not
            thread-safe, it is imperative that all AMQP operations take place
            in the same thread as the AMQP IOLoop instance.
            If this is different to the thread the constructor is running in
            for whatever reason, the reference to that thread can be given
            here.
        '''

        if io_loop is None:
            io_loop     = tornado.ioloop.IOLoop.current()
        if io_thread is None:
            io_thread   = threading.current_thread()

        self._io_loop           = io_loop
        self._io_thread         = io_thread
        self._reconnect_delay   = datetime.timedelta(seconds=reconnect_delay)
        self._reconnect_max     = reconnect_max
        self._reconnect_rem     = reconnect_max
        self._reconnect         = reconnect_delay > 0
        self._giveup            = False

        self._on_open_cb        = []
        self._on_open_error_cb  = []
        self._on_close_cb       = []
        self._on_giveup_cb      = []
        if on_open_callback is not None:
            self._on_open_cb.append(
                    self._get_callback(on_open_callback))
        if on_open_error_callback is not None:
            self._on_open_error_cb.append(
                    self._get_callback(on_open_error_callback))
        if on_close_callback is not None:
            self._on_close_cb.append(
                    self._get_callback(on_close_callback))
        if on_giveup_callback is not None:
            self._on_giveup_cb.append(
                    self._get_callback(on_giveup_callback))

        self._amqp_parameters   = parameters
        self._amqp_connection   = None
        self._channels          = []
        self._replace_connection()

    # ------------------------------------------------------------------------
    # Connection state
    # ------------------------------------------------------------------------

    connection_state    = property(lambda s : \
            s._amqp_connection.connection_state)
    is_closed           = property(lambda s : s._amqp_connection.is_closed)
    is_closing          = property(lambda s : s._amqp_connection.is_closing)
    is_open             = property(lambda s : s._amqp_connection.is_open)

    # ------------------------------------------------------------------------
    # Connection callbacks
    # ------------------------------------------------------------------------

    def add_on_open_callback(self, callback, callback_args=None,
            callback_kwargs=None):
        self._on_open_cb.append(
                self._get_callback(callback, callback_args, callback_kwargs))

    def add_on_open_error_callback(self, callback, callback_args=None,
            callback_kwargs=None):
        self._on_open_error_cb.append(
                self._get_callback(callback, callback_args, callback_kwargs))

    def add_on_close_callback(self, callback, callback_args=None,
            callback_kwargs=None):
        self._on_close_cb.append(
                self._get_callback(callback, callback_args, callback_kwargs))

    def add_on_giveup_callback(self, callback, callback_args=None,
            callback_kwargs=None):
        self._on_giveup.append(
                self._get_callback(callback, callback_args, callback_kwargs))

    # ------------------------------------------------------------------------
    # Connection management
    # ------------------------------------------------------------------------

    def _replace_connection(self):
        log      = self._get_log('_replace_connection')
        try:
            log.debug('Gathering a list of existing channels')
            # Grab a list of channels
            channels = self._get_channels()
            log.debug(  'We have %s channels to reopen.  Opening '\
                        'connection to AMQP with parameters %s.',
                        len(channels), self._amqp_parameters)

            # Replace the connection
            self._amqp_connection   = pika.adapters.TornadoConnection(
                parameters              = self._amqp_parameters,
                on_open_callback        = self._handle_on_open_callback,
                on_open_error_callback  = self._handle_on_open_error_callback,
                on_close_callback       = self._handle_on_close_callback,
                custom_ioloop           = self._io_loop)
            log.debug('AMQP connection started')

            # Mark these channels as stale
            for ch in channels:
                try:
                    log.debug('Re-opening %s', ch)
                    ch._mark_stale()
                except:
                    pass
        except:
            log.exception('Failed to replace connection')

    def connect(self, timeout=None):
        '''
        Connect to AMQP.
        '''
        return self._run_async(
                self._amqp_connection.connect,
                (),
                {},
                timeout,
                None,
                None)

    def close(self, *args, **kwargs):
        '''
        Disconnect from AMQP.  This disables reconnections.
        See pika.BaseConnection.close for details on arguments.
        '''
        timeout         = kwargs.pop('timeout',None)
        self._giveup    = False
        self._reconnect = False
        return self._run_async(
                async_func  = self._amqp_connection.close,
                args        = args,
                kwargs      = kwargs,
                timeout     = timeout,
                cb_func     = None,
                future      = None)

    def set_backpressure_multiplier(self, *args, **kwargs):
        return self._run_async(
                async_func  = self._amqp_connection.set_backpressure_multiplier,
                args        = args,
                kwargs      = kwargs,
                timeout     = timeout,
                cb_func     = None,
                future      = None)

    # ------------------------------------------------------------------------
    # Connection callbacks
    # ------------------------------------------------------------------------

    def add_backpressure_callback(self, callback):
        return self._run_async(
                self._amqp_connection.add_backpressure_callback,
                    args=(self._get_callback(callback),))

    def add_on_close_callback(self, callback):
        return self._run_async(
                self._amqp_connection.add_on_close_callback,
                    args=(self._get_callback(callback),))

    def add_on_open_callback(self, callback):
        return self._run_async(
                self._amqp_connection.add_on_open_callback,
                    args=(self._get_callback(callback),))

    def add_on_open_error_callback(self, callback):
        return self._run_async(
                self._amqp_connection.add_on_open_error_callback,
                    args=(self._get_callback(callback),))

    # ------------------------------------------------------------------------
    # Channel management
    # ------------------------------------------------------------------------

    def _cleanup_channels(self):
        self._channels  = filter(lambda cr : cr() is not None, self._channels)

    def _get_channels(self):
        self._cleanup_channels()
        return [ch() for ch in self._channels]

    @tornado.gen.coroutine
    def channel(self, channel_number=None, timeout=None):
        '''
        Open a new channel on the AMQP connection.
        '''
        ch = AMQPChannel(self, channel_number)
        yield ch._init_channel(timeout)
        self._cleanup_channels()
        self._channels.append(weakref.ref(ch))
        raise tornado.gen.Return(ch)

    # ------------------------------------------------------------------------
    # Connection handling callbacks
    # ------------------------------------------------------------------------

    def _handle_on_open_callback(self, *args, **kwargs):
        '''
        What to do when the connection is opened?
        '''
        log = self._get_log('_handle_on_open_callback')
        try:
            log.info('Connection open')
            self._reconnect     = self._reconnect_delay.total_seconds() > 0
            self._reconnect_rem = self._reconnect_max
            self._giveup        = False

            # Grab a list of channels
            channels = self._get_channels()
            # Re-open those channels
            for ch in channels:
                log.debug('Re-opening %s', ch)
                yield ch._init_channel()

            log.debug('Calling open callbacks')
            for cb in self._on_open_cb:
                self._io_loop.add_callback(cb, *args, **kwargs)
        except:
            log.exception('Failed to handle connection open')

    def _handle_on_close_callback(self, *args, **kwargs):
        '''
        What to do when the connection is closed?
        '''
        log = self._get_log('_handle_on_close_callback')
        try:
            log.info('Connection closed')
            for cb in self._on_close_cb:
                try:
                    cb(*args, **kwargs)
                except:
                    log.exception('Exception in callback %s(*%s, **%s)',
                            cb, args, kwargs)
        except:
            log.exception('Exception in close callback')
        self._reconnect()

    def _handle_on_open_error_callback(self, connection, error,
                *args, **kwargs):
        '''
        What to do if we fail to open the connection?
        '''
        log = self._get_log('_handle_on_open_error_callback')
        try:
            log.error('Received %s whilst attempting connection', error)

            # Pass it through to the callback handler we were given
            if self._on_open_error_cb is not None:
                for cb in self._on_open_error_cb:
                    try:
                        cb(connection, error, *args, **kwargs)
                    except:
                        log.exception('Error in callback %s(*%s, **%s)',
                                cb, args, kwargs)
        except:
            log.exception('Failed in open-error callback')
        self._reconnect()

    def _reconnect(self):
        '''
        Perform a re-connection, if enabled.
        '''
        log = self._get_log('_reconnect')
        try:
            # Sanity check, ensure we're running in the correct thread for this.
            # add_timeout is *NOT* thread-safe.
            if not self._is_own_thread:
                # This should do the trick!
                self._io_loop.add_callback(_reconnect)
                return

            # Reconnect?
            if self._reconnect:
                if self._reconnect_max > 0:
                    self._reconnect_rem -= 1
                    self._reconnect = self._reconnect_rem > 0
                    self._giveup = not self._reconnect
                    if self._giveup:
                        log.error('This is our last connection attempt!')

                # Schedule a reconnect.  We do this by completely replacing the
                # AMQP connection object.
                self._io_loop.add_timeout(
                        self._reconnect_delay, self._replace_connection)
                log.error('Re-connecting in %s.',
                        self._reconnect_delay)
            elif self._giveup:
                log.fatal('Giving up!')
                for cb in self._on_giveup_cb:
                    try:
                        cb()
                    except:
                        log.exception('Exception in callback %s', cb)
        except:
            log.exception('Failure in recnnection')

    # ------------------------------------------------------------------------
    # Callback and thread-safety utilities
    # ------------------------------------------------------------------------

    @property
    def _is_own_thread(self):
        '''
        Return True if we're running in our own thread.
        '''
        return self._io_thread is threading.current_thread()

    def _schedule_timeout(self, timeout, future, cb_func=None,
            cb_args=None, cb_kwargs=None):
        '''
        Place a TimeoutException into the future if the future is not
        completed in time.
        '''
        assert self._is_own_thread, 'This isn\'t my thread!'
        if cb_args is None:
            cb_args = ()
        if cb_kwargs is None:
            cb_kwargs = {}

        def _timeout():
            if not future.done():
                future.set_exception(TimeoutError())
            if cb_func is not None:
                cb_func(*cb_args, **cb_kwargs)
        return self._io_loop.add_timeout(timeout, _timeout)

    @tornado.gen.coroutine
    def _run_async(self, async_func, args=None, kwargs=None, timeout=None,
            cb_func=None, callback_name='callback', future=None):
        '''
        Run a method in our own thread and return its result.
        args and kwargs are passed to async_func, which will be run in the
        AMQP IOLoop.
        timeout specifies a time limit on the operation, and will result in a
        TimeoutError if the operation does not complete.
        callback, if given, causes a callback keyword argument to be passed to
        async_func which it should run when the result comes back.
        return_in_future specifies
        '''
        log = self._get_log('_run_async')
        try:
            #log.debug('Run %s\n\twith args %s\n\tkwargs %s'\
            #                '\n\ttimeout %s\n\tstore result in future %s'\
            #                '\n\tand using callback %s (named %s)',
            #                async_func, args, kwargs, timeout, future,
            #                cb_func, callback_name)
            if args is None:
                args = ()
            if kwargs is None:
                kwargs = {}

            timeout_ref         = None
            return_result       = future is None

            if self._is_own_thread:
                #log.debug('We are running in our own thread')

                # We're in our own thread.  Is there a callback involved?
                if cb_func is None:
                    # There isn't.  This is easy then, just call the function and
                    # return the result.  Since the function is synchronous, we
                    # ignore the timeout.
                    if return_result:
                        #log.debug('Running directly')
                        result = async_func(*args, **kwargs)
                        raise tornado.gen.Return(result)
                    else:
                        try:
                            result = async_func(*args, **kwargs)
                            future.set_result(result)
                        except Exception,e:
                            log.exception('Exception running %s',
                                    async_func)
                            future.set_exception(e)
                else:
                    #log.debug('Running via callback')
                    # The result (or an exception) will be placed in this future.
                    future = future or tornado.concurrent.Future()
                    # Create a timeout object if we're given a time limit.
                    if timeout is not None:
                        timeout_ref = self._schedule_timeout(timeout, future)

                    # The function takes a callback argument which provides the
                    # result we're after.  We've been given the function that will
                    # rat through what was given to us and will provide the result.

                    # The first argument given to the callback is the future.
                    cb_func = self._get_callback(cb_func)
                    def _callback(*args, **kwargs):
                        log = self._get_log('_run_async._callback')
                        #log.debug('Got response args=%s, kwargs=%s',
                        #        args, kwargs)
                        try:
                            cb_func(future, *args, **kwargs)
                        except TimeoutError:
                            timeout_ref = None
                        except Exception, e:
                            if not future.done():
                                future.set_exception(e)
                            else:
                                log.exception('Exception in callback')
                    kwargs = kwargs.copy()
                    kwargs[callback_name] = _callback

                    # Perform the action given...
                    #log.debug('Performing asynchronous operation')
                    try:
                        #log.debug('Executing %s(*%s, **%s)',
                        #        async_func, args, kwargs)
                        async_func(*args, **kwargs)
                    except Exception, e:
                        if not return_result:
                            future.set_exception(e)
                        else:
                            raise
                    if timeout_ref is not None:
                        self._io_loop.remove_timeout(timeout_ref)
            else:
                assert self._io_thread.is_alive(), 'IO Thread is dead'
                #log.debug('Scheduling to run in our own IOLoop (we are '\
                #                'in a foreign thread; current=%s vs ours=%s)',
                #                threading.current_thread(), self._io_thread)
                # We're in the wrong thread, so schedule this to take place in
                # our own IOLoop.  First, create a future to store this in.
                future = future or tornado.concurrent.Future()
                # Then call ourselves with the new future.
                self._io_loop.add_callback(
                        self._run_async,
                        async_func=async_func,
                        args=args,
                        kwargs=kwargs,
                        timeout=timeout,
                        cb_func=cb_func,
                        callback_name=callback_name,
                        future=future)

            #log.debug('Return result? %s', return_result)
            if return_result:
                # Wait for the result
                #log.debug('Waiting for result')
                result = yield future
                if timeout_ref is not None:
                    self._io_loop.remove_timeout(timeout_ref)
                #log.debug('Result = %s', result)
                # Return the result
                raise tornado.gen.Return(result)
            else:
                # Return nothing to prevent the IOLoop from waiting.
                raise tornado.gen.Return(None)
        except tornado.gen.Return:
            raise
        except:
            log.exception('Failed to execute')
            raise

    def _get_callback(self, callback_ref, callback_args=None,
            callback_kwargs=None):
        '''
        Return a callback function from the given reference.
        '''
        #log = self._get_log('_get_callback')
        if callback_ref is None:
            return None

        if callback_args is None:
            callback_args = ()
        if callback_kwargs is None:
            callback_kwargs = {}

        if isinstance(callback_ref, tuple):
            (callback_ioloop, callback_func) = callback_ref[0:2]
            if len(callback_ref) > 2:
                callback_args = callback_args + (callback_ref[2] or ())
            if len(callback_ref) > 3:
                callback_kwargs.update(callback_ref[3] or {})
        else:
            callback_ioloop = None
            callback_func   = callback_ref

        if callback_ioloop is None:
            callback_ioloop = self._io_loop

        def _callback(*args, **kwargs):
            log = self._get_log('_get_callback','_callback')
            #log.debug('Called with\n\targs=%s,\n\tkwargs=%s', args, kwargs)

            # TODO: why is this happening?
            if (len(args) == 1) and isinstance(args[0], tuple):
                args = args[0]

            try:
                cb_args     = callback_args + args
                cb_kwargs   = callback_kwargs.copy()
                cb_kwargs.update(kwargs)
                if self._io_loop is callback_ioloop:
                    # We're running this ourselves
                    #log.debug('Calling function in own IOLoop')
                    cb_res  = callback_func(*args, **kwargs)
                    if isinstance(cb_res, tornado.concurrent.Future):
                        #log.debug(  'Function is asynchronous, '\
                        #            'waiting for result')
                        @tornado.gen.coroutine
                        def _get_result(future):
                            #log.debug('Getting result')
                            try:
                                result = yield future
                                #log.debug('Callback result: %s', result)
                            except:
                                log.exception('Callback failed')
                        self._io_loop.add_future(cb_res, _get_result)
                    #else:
                        #log.debug('Callback result: %s', cb_res)
                else:
                    #log.debug('Scheduling function in external IOLoop')
                    callback_ioloop.add_callback(
                            callback_func, *cb_args, **cb_kwargs)
                    #log.debug('Callback scheduled')
            except:
                log.exception('Callback failed: %s', callback_func)
        #log.debug('Wrapping function %s in callback %s',
        #        callback_func, _callback)
        return _callback

class AMQPChannel(AMQPObject):
    '''
    An abstraction for the AMQP Channel.
    '''

    def __init__(self, connection, channel_number):
        '''
        Initialise a new channel object.
        '''
        log = self._get_log('__init__')
        log.debug('New channel')
        self._connection    = connection
        self._channel_number= channel_number
        self._channel       = None

        # Queues and exchanges to re-establish
        self._exchanges     = []
        self._queues        = []

    _amqp_connection        = property(lambda s : \
            s._connection._amqp_connection)

    def _mark_stale(self):
        self._get_log('_mark_stale').debug(
                'Channel is now stale')
        self._channel       = None

    @tornado.gen.coroutine
    def _init_channel(self, timeout=None):
        log = self._get_log('channel')
        def _callback(future, channel):
            log.debug('Opened new channel %s', channel)
            if not future.done():
                future.set_result(channel)
            else:
                log.error('Future already done, cannot return channel')
                channel.close()
        log.debug('Opening new channel (number=%s)', self._channel_number)
        try:
            channel     = yield self._connection._run_async(
                    async_func      = self._amqp_connection.channel,
                    args            = None,
                    kwargs          = {'channel_number': self._channel_number},
                    timeout         = timeout,
                    cb_func         = _callback,
                    callback_name   = 'on_open_callback',
                    future          = None)
        except:
            log.exception('Failed to open channel')
            raise

        self._channel = channel

        # If we had any exchanges, declare those now
        for ex in self._get_exchanges():
            try:
                log.debug('Declaring exchange %s', ex.exchange)
                yield ex.declare(timeout)
            except:
                log.exception('Failed to declare exchange %s', ex.exchange)

        # If we had any queues, declare those now
        for q in self._get_queues():
            try:
                log.debug('Declaring queue %s', q.routing_key)
                yield q.declare(timeout)
            except:
                log.exception('Failed to declare exchange %s', q.routing_key)

    # ------------------------------------------------------------------------
    # Channel status
    # ------------------------------------------------------------------------

    is_closed   = property(lambda s : s._channel.is_closed)
    is_closing  = property(lambda s : s._channel.is_closing)
    is_open     = property(lambda s : s._channel.is_open)

    # ------------------------------------------------------------------------
    # Channel callbacks
    # ------------------------------------------------------------------------

    def add_callback(self, callback, replies, one_shot=True):
        return self._connection._run_async(
                async_func=self._channel.add_callback,
                args=(self._connection._get_callback(callback),
                    replies, one_shot),
                kwargs={}, timeout=None, callback=None, future=None)

    def add_on_cancel_callback(self, callback):
        return self._connection._run_async(
                self._channel.add_on_cancel_callback,
                    args=(self._connection._get_callback(callback),))

    def add_on_close_callback(self, callback):
        return self._connection._run_async(
                self._channel.add_on_close_callback,
                    args=(self._connection._get_callback(callback),))

    def add_on_flow_callback(self, callback):
        return self._connection._run_async(
                self._channel.add_on_flow_callback,
                    args=(self._connection._get_callback(callback),))

    def add_on_return_callback(self, callback):
        return self._connection._run_async(
                self._channel.add_on_return_callback,
                    args=(self._connection._get_callback(callback),))

    # ------------------------------------------------------------------------
    # Channel operations
    # ------------------------------------------------------------------------

    def basic_ack(self, delivery_tag=0, multiple=False):
        return self._connection._run_async(
                self._channel.basic_ack, args=(delivery_tag, multiple))

    def basic_cancel(self, callback=None, consumer_tag='', nowait=False):
        return self._connection._run_async(
                self._channel.basic_cancel,
                    args=(self._connection._get_callback(callback),
                    consumer_tag, nowait))

    def basic_consume(self, consumer_callback, queue='', no_ack=False,
            exclusive=False, consumer_tag=None, arguments=None):
        return self._connection._run_async(
            self._channel.basic_consume,
                args=(self._connection._get_callback(consumer_callback),
                queue, no_ack, exclusive, consumer_tag, arguments))

    def basic_get(self, callback, queue='', no_ack=False):
        return self._connection._run_async(
                self._channel.basic_get,
                    args=(self._connection._get_callback(callback),
                    queue, no_ack))

    def basic_nack(self, delivery_tag=None, multiple=False, requeue=True):
        return self._connection._run_async(
            self._channel.basic_nack,
                args=(delivery_tag, multiple, requeue))

    def basic_publish(self, exchange, routing_key, body, properties=None,
            mandatory=False, immediate=False):
        return self._connection._run_async(
            self._channel.basic_publish,
                args=(exchange, routing_key, body, properties,
                    mandatory, immediate))

    def basic_qos(self, callback=None, prefetch_size=0, prefetch_count=0,
            all_channels=False):
        return self._connection._run_async(
            self._channel.basic_qos,
                args=(self._connection._get_callback(callback),
                prefetch_size, prefetch_count, all_channels))

    def basic_reject(self, delivery_tag=None, requeue=True):
        return self._connection._run_async(
            self._channel.basic_reject,
                args=(delivery_tag, requeue))

    def basic_recover(self, callback=None, prefetch_size=0, prefetch_count=0,
            all_channels=False):
        return self._connection._run_async(
            self._channel.basic_qos,
                args=(self._connection._get_callback(callback),
                prefetch_size, prefetch_count, all_channels))

    def close(self, *args, **kwargs):
        return self._connection._run_async(
            self._channel.close,
                args=args, kwargs=kwargs)

    def confirm_delivery(self, callback=None, nowait=False):
        return self._connection._run_async(
            self._channel.confirm_delivery,
                args=(self._connection._get_callback(callback), nowait))

    def open(self):
        return self._connection._run_async(self._channel.open)

    # ------------------------------------------------------------------------
    # Exchange operations
    # ------------------------------------------------------------------------

    def _cleanup_exchanges(self):
        self._exchanges = filter(lambda er : er() is not None, self._exchanges)
    def _get_exchanges(self):
        return [er() for er in self._exchanges]

    @tornado.gen.coroutine
    def exchange(self, exchange=None, exchange_type='direct',
            passive=False, durable=False, auto_delete=False, internal=False,
            nowait=False, arguments=None, timeout=None):

        e   = AMQPExchange(self, exchange, exchange_type, passive, durable,
                auto_delete, internal, nowait, arguments)

        yield e.declare(timeout)
        self._cleanup_exchanges()
        self._exchanges.append(weakref.ref(e))
        raise tornado.gen.Return(e)

    def exchange_bind(self, destination=None, source=None,
                      routing_key='', nowait=False, arguments=None,
                      timeout=None):
        def _get_response(future, response):
            future.set_result(response)
        return self._connection._run_async(
                self._channel.exchange_bind,
                args=(destination, source, routing_key, nowait, arguments),
                cb_func=_get_response,
                timeout=timeout)

    def exchange_declare(self, exchange=None, exchange_type='direct',
            passive=False, durable=False, auto_delete=False, internal=False,
            nowait=False, arguments=None, timeout=None):

        def _get_response(future, response):
            future.set_result(response)
        return self._connection._run_async(
                self._channel.exchange_declare,
                kwargs={
                    'exchange':         exchange,
                    'exchange_type':    exchange_type,
                    'passive':          passive,
                    'durable':          durable,
                    'auto_delete':      auto_delete,
                    'internal':         internal,
                    'nowait':           nowait,
                    'arguments':        arguments,
                },
                cb_func=_get_response,
                timeout=timeout)

    def exchange_delete(self, exchange=None, if_unused=False, nowait=False,
            timeout=None):

        def _get_response(future, response):
            future.set_result(response)
        return self._connection._run_async(
                self._channel.exchange_delete,
                kwargs={
                    'exchange':     exchange,
                    'if_unused':    if_unused,
                    'nowait':       nowait,
                },
                cb_func=_get_response,
                timeout=timeout)

    def exchange_unbind(self, destination=None, source=None,
                        routing_key='', nowait=False, arguments=None,
                        timeout=None):

        def _get_response(future, response):
            future.set_result(response)
        return self._connection._run_async(
                self._channel.exchange_unbind,
                kwargs={
                    'destination':  destination,
                    'source':       source,
                    'routing_key':  routing_key,
                    'nowait':       nowait,
                    'arguments':    arguments
                },
                cb_func=_get_response,
                timeout=timeout)

    # ------------------------------------------------------------------------
    # Flow operations
    # ------------------------------------------------------------------------

    def flow(self, callback, active):
        return self._connection._run_async(
            self._channel.flow,
                args=(self._connection._get_callback(callback), active))

    # ------------------------------------------------------------------------
    # Queue operations
    # ------------------------------------------------------------------------

    def _cleanup_queues(self):
        self._queues    = filter(lambda qr : qr() is not None, self._queues)
    def _get_queues(self):
        return [qr() for qr in self._queues]

    @tornado.gen.coroutine
    def queue(self, queue=None, passive=False, durable=False,
                exclusive=False, auto_delete=False, nowait=False,
                arguments=None, timeout=None):
        q   = AMQPQueue(self, queue, passive, durable, exclusive, auto_delete,
                nowait, arguments)
        yield q.declare(timeout)
        self._cleanup_queues()
        self._queues.append(weakref.ref(q))
        raise tornado.gen.Return(q)

    def queue_bind(self, queue, exchange, routing_key=None,
                   nowait=False, arguments=None, timeout=None):
        def _get_response(future, response):
            future.set_result(response)
        return self._connection._run_async(
                self._channel.queue_bind,
                kwargs={
                    'queue':        queue,
                    'exchange':     exchange,
                    'routing_key':  routing_key,
                    'nowait':       nowait,
                    'arguments':    arguments,
                },
                cb_func=_get_response,
                timeout=timeout)

    def queue_declare(self, queue='', passive=False, durable=False,
                      exclusive=False, auto_delete=False, nowait=False,
                      arguments=None, timeout=None):
        def _get_response(future, response):
            future.set_result(response)
        return self._connection._run_async(
                self._channel.queue_declare,
                kwargs={
                    'queue':        queue,
                    'passive':      passive,
                    'durable':      durable,
                    'exclusive':    exclusive,
                    'auto_delete':  auto_delete,
                    'nowait':       nowait,
                    'arguments':    arguments,
                },
                cb_func=_get_response,
                timeout=timeout)

    def queue_delete(self, queue='', if_unused=False,
                     if_empty=False, nowait=False, timeout=None):
        def _get_response(future, response):
            future.set_result(response)
        return self._connection._run_async(
                self._channel.queue_delete,
                kwargs={
                    'queue':        queue,
                    'if_unused':    if_unused,
                    'if_empty':     if_empty,
                    'nowait':       nowait
                },
                cb_func=_get_response,
                timeout=timeout)

    def queue_purge(self, queue, nowait=False, timeout=None):
        def _get_response(future, response):
            future.set_result(response)
        return self._connection._run_async(
                self._channel.queue_purge,
                kwargs={
                    'queue':        queue,
                    'nowait':       nowait
                },
                cb_func=_get_response,
                timeout=timeout)

    def queue_unbind(self, queue='', exchange=None,
                     routing_key=None, arguments=None, timeout=None):
        def _get_response(future, response):
            future.set_result(response)
        return self._connection._run_async(
                self._channel.queue_unbind,
                kwargs={
                    'queue':        queue,
                    'exchange':     exchange,
                    'routing_key':  routing_key,
                    'arguments':    arguments
                },
                cb_func=_get_response,
                timeout=timeout)

    # ------------------------------------------------------------------------
    # Transaction operations
    # ------------------------------------------------------------------------

    def tx_commit(self, timeout=None):
        def _get_response(future, response):
            future.set_result(response)
        return self._connection._run_async(
                self._channel.tx_commit,
                cb_func=_get_response,
                timeout=timeout)

    def tx_rollback(self, timeout=None):
        def _get_response(future, response):
            future.set_result(response)
        return self._connection._run_async(
                self._channel.tx_rollback,
                cb_func=_get_response,
                timeout=timeout)

    def tx_select(self, timeout=None):
        def _get_response(future, response):
            future.set_result(response)
        return self._connection._run_async(
                self._channel.tx_select,
                cb_func=_get_response,
                timeout=timeout)

class AMQPMessageDestination(AMQPObject):
    '''
    A AMQPMessageDestination object is a base class for Queues and Exchanges:
    places you can send messages to.  This base class handles the transmission
    of a message.
    '''

    def __init__(self, channel, exchange, routing_key):
        self._channel       = channel
        self._exchange      = exchange
        self._routing_key   = routing_key
        self._declared      = False

    _connection             = property(lambda s : s._channel._connection)
    _amqp_connection        = property(lambda s : s._channel._amqp_connection)

    exchange    = property(lambda s : s._exchange)
    routing_key = property(lambda s : s._routing_key)

    @tornado.gen.coroutine
    def basic_publish(self, body, properties=None, mandatory=False,
            immediate=False, exchange=None, routing_key=None):

        if exchange is None:
            exchange    = self.exchange
        if routing_key is None:
            routing_key = self.routing_key

        raise tornado.gen.Return((
            yield self._channel.basic_publish(
                exchange, routing_key, body, properties, mandatory,
                immediate)
        ))

class AMQPQueue(AMQPMessageDestination):
    '''
    A representation of a message queue.
    '''
    def __init__(self, channel, queue=None, passive=False, durable=False,
            exclusive=False, auto_delete=False, nowait=False, arguments=None):

        super(AMQPQueue, self).__init__(channel, '', None)
        self._queue         = queue
        self._passive       = passive
        self._durable       = durable
        self._exclusive     = exclusive
        self._auto_delete   = auto_delete
        self._nowait        = nowait
        self._arguments     = arguments

        # Our bindings to instate
        # Format:
        # {
        #   exchange_name: {
        #       routing_key:    {
        #           args:           kwargs for queue_bind,
        #           bound:          boolean indicating bind state,
        #       }
        #   },
        # }
        self._bindings      = {}

        # A list of consumers
        self._consumers     = []

    # ------------------------------------------------------------------------
    # Queue operations
    # ------------------------------------------------------------------------

    @tornado.gen.coroutine
    def declare(self, timeout=None):
        log = self._get_log('declare')
        # Are we declaring passively?
        if self._passive:
            # We are, declare using a temporary channel
            log.debug('Using temporary channel')
            ch  = yield self._channel._connection.channel()
        else:
            # We're not, use our own channel
            log.debug('Using own channel')
            ch  = self._channel

        # Declare the queue in the channel
        try:
            log.debug('Declaring queue')
            queue_res = yield ch.queue_declare(
                    queue       = self._queue or '',
                    passive     = self._passive,
                    durable     = self._durable,
                    exclusive   = self._exclusive,
                    auto_delete = self._auto_delete,
                    nowait      = self._nowait,
                    arguments   = self._arguments,
                    timeout     = timeout)
            self._routing_key   = queue_res.method.queue
            log.debug('Queue is %s', self._routing_key)
        except:
            log.exception('Failed to declare queue (queue=%s)', self._queue)

        # If we opened a temporary channel, close it
        if self._passive and ch.is_open:
            yield ch.close()

        # If we were bound to things, re-bind
        for exchange, bindings in self._bindings.copy().iteritems():
            for routing_key, state in bindings.copy().iteritems():
                yield self.bind(
                        exchange    = exchange,
                        routing_key = routing_key,
                        nowait      = state['args']['nowait'],
                        arguments   = state['args']['arguments'],
                        timeout     = timeout)

        # If we have any consumers, re-start those
        for c in self._get_consumers():
            yield c.consume(timeout)

    @tornado.gen.coroutine
    def bind(self, exchange, routing_key=None,
            nowait=False, arguments=None, timeout=None):
        log = self._get_log('bind')

        if isinstance(exchange, AMQPExchange):
            exchange    = exchange.exchange

        state = {
                'args': {
                    'nowait':       nowait,
                    'arguments':    arguments,
                },
                'bound':            False,
        }
        try:
            log.debug('Binding to %s:%s', exchange, routing_key)

            result = yield self._channel.queue_bind(
                    queue       = self.routing_key,
                    exchange    = exchange or '',
                    routing_key = routing_key,
                    nowait      = nowait,
                    arguments   = arguments,
                    timeout     = timeout)
            # We should be bound now.
            state['bound'] = True
        except:
            log.debug('Failed to bind to %s:%s', exchange, routing_key)

        if exchange not in self._bindings:
            self._bindings[exchange] = {}
        self._bindings[exchange][routing_key] = state
        raise tornado.gen.Return(state['bound'])

    @tornado.gen.coroutine
    def delete(self, if_unused=False, if_empty=False, nowait=False,
            timeout=None):
        log = self._get_log('delete')
        try:
            log.debug('Deleting queue %s', self.routing_key)
            yield self._channel.queue_delete(
                    queue       = self.routing_key,
                    if_unused   = if_unused,
                    if_empty    = if_empty,
                    nowait      = nowait,
                    timeout     = timeout
            )
            # We're deleted
            self._routing_key   = None
        except:
            log.exception('Failed to delete queue %s', self.routing_key)
            raise

    @tornado.gen.coroutine
    def purge(self, queue, nowait=False, timeout=None):
        yield self._channel.queue_purge(
                queue           = self.routing_key,
                nowait          = nowait,
                timeout         = timeout)

    @tornado.gen.coroutine
    def unbind(self, exchange=None, routing_key=None,
            arguments=None, timeout=None):

        if isinstance(exchange, AMQPExchange):
            exchange    = exchange.exchange

        yield self._channel.queue_unbind(
                queue       = self.routing_key,
                exchange    = exchange,
                routing_key = routing_key,
                arguments   = arguments,
                timeout     = timeout)
        # If we got here, then we're unbound
        try:
            del(self._bindings[exchange][routing_key])
            if len(self._bindings[exchange]) == 0:
                del(self._bindings[exchange])
        except KeyError:
            pass

    def _cleanup_consumers(self):
        self._consumers = filter(lambda cr : cr() is not None, self._consumers)
    def _get_consumers(self):
        return [cr() for cr in self._consumers]

    @tornado.gen.coroutine
    def consume(self, consumer_callback, no_ack=False, exclusive=False,
            consumer_tag=None, arguments=None, channel=None, timeout=None):

        c   = AMQPConsumer(channel or self._channel, consumer_callback, self,
                no_ack, exclusive, consumer_tag, arguments)

        yield c.consume(timeout)
        self._cleanup_consumers()
        self._consumers.append(weakref.ref(c))
        raise tornado.gen.Return(c)

class AMQPExchange(AMQPMessageDestination):
    '''
    A representation of a message exchange.
    '''
    def __init__(self, channel, exchange=None, exchange_type='direct',
            passive=False, durable=False, auto_delete=False, internal=False,
            nowait=False, arguments=None):

        super(AMQPExchange, self).__init__(channel, exchange, None)
        self._exchange_type = exchange_type
        self._passive       = passive
        self._durable       = durable
        self._auto_delete   = auto_delete
        self._internal      = internal
        self._nowait        = nowait
        self._arguments     = arguments
        self._declared      = False

        # Our bindings to instate
        # Format:
        # {
        #   exchange_name: {
        #       routing_key:    {
        #           args:           kwargs for queue_bind,
        #           bound:          boolean indicating bind state,
        #       }
        #   },
        # }
        self._bindings      = {}

    # ------------------------------------------------------------------------
    # Exchange operations
    # ------------------------------------------------------------------------

    @tornado.gen.coroutine
    def declare(self, timeout=None):
        log = self._get_log('declare')
        # Are we declaring passively?
        if self._passive:
            # We are, declare using a temporary channel
            log.debug('Using temporary channel')
            ch  = yield self._channel._connection.channel()
        else:
            # We're not, use our own channel
            log.debug('Using own channel')
            ch  = self._channel

        # Declare the queue in the channel
        try:
            log.debug('Declaring exchange')
            queue_res = yield ch.exchange_declare(
                    exchange        = self._exchange,
                    exchange_type   = self._exchange_type,
                    passive         = self._passive,
                    durable         = self._durable,
                    auto_delete     = self._auto_delete,
                    internal        = self._internal,
                    nowait          = self._nowait,
                    arguments       = self._arguments,
                    timeout         = timeout)
            self._declared          = True
            log.debug('Exchange declared')
        except:
            log.exception('Failed to declare exchange %s', self._exchange)

        # If we opened a temporary channel, close it
        if self._passive and ch.is_open:
            yield ch.close()

        # If we were bound to things, re-bind
        for exchange, bindings in self._bindings.copy().iteritems():
            for routing_key, state in bindings.copy().iteritems():
                yield self.bind(
                        exchange    = exchange,
                        routing_key = routing_key,
                        nowait      = state['args']['nowait'],
                        arguments   = state['args']['arguments'],
                        timeout     = timeout)


    @tornado.gen.coroutine
    def bind(self, exchange, routing_key=None,
            nowait=False, arguments=None, timeout=None):
        log = self._get_log('bind')

        if isinstance(exchange, AMQPExchange):
            exchange    = exchange.exchange

        state = {
                'args': {
                    'nowait':       nowait,
                    'arguments':    arguments,
                },
                'bound':            False,
        }
        try:
            log.debug('Binding to %s:%s', exchange, routing_key)

            result = yield self._channel.queue_bind(
                    destination = self.exchange,
                    source      = exchange,
                    routing_key = routing_key or '',
                    nowait      = nowait,
                    arguments   = arguments,
                    timeout     = timeout)
            # We should be bound now.
            state['bound'] = True
        except:
            log.debug('Failed to bind to %s:%s', exchange, routing_key)

        if exchange not in self._bindings:
            self._bindings[exchange] = {}
        self._bindings[exchange][routing_key] = state
        raise tornado.gen.Return(state['bound'])

    @tornado.gen.coroutine
    def delete(self, if_unused=False, nowait=False, timeout=None):
        log = self._get_log('delete')
        try:
            log.debug('Deleting exchange %s', self.exchange)
            yield self._channel.exchange_delete(
                    exchange    = self.exchange,
                    if_unused   = if_unused,
                    nowait      = nowait,
                    timeout     = timeout
            )
            self._declared      = False
        except:
            log.exception('Failed to delete exchange %s', self.exchange)
            raise

    @tornado.gen.coroutine
    def unbind(self, exchange=None, routing_key=None,
            arguments=None, timeout=None):

        if isinstance(exchange, AMQPExchange):
            exchange    = exchange.exchange

        yield self._channel.exchange_unbind(
                destination = self.exchange,
                source      = exchange,
                routing_key = routing_key,
                arguments   = arguments,
                timeout     = timeout)
        # If we got here, then we're unbound
        try:
            del(self._bindings[exchange][routing_key])
            if len(self._bindings[exchange]) == 0:
                del(self._bindings[exchange])
        except KeyError:
            pass

class AMQPConsumer(AMQPObject):
    '''
    An abstraction for a consumer.
    '''

    def __init__(self, channel, consumer_callback, queue=None, no_ack=False,
            exclusive=False, consumer_tag=None, arguments=None):

        self._channel           = channel
        self._consumer_callback = consumer_callback
        self._queue             = queue
        self._no_ack            = no_ack
        self._exclusive         = exclusive
        self._consumer_tag_given= consumer_tag
        self._arguments         = arguments
        self._consumer_tag      = None

    consumer_tag                = property(lambda s : s._consumer_tag)

    @tornado.gen.coroutine
    def consume(self, timeout=None):
        log = self._get_log('_consume')
        try:
            log.debug('Consuming queue %s', self._queue.routing_key)
            result = yield self._channel.basic_consume(
                    consumer_callback   = self._consumer_callback,
                    queue               = self._queue.routing_key,
                    no_ack              = self._no_ack,
                    exclusive           = self._exclusive,
                    consumer_tag        = self._consumer_tag_given,
                    arguments           = self._arguments)
            self._consumer_tag          = result
            log.debug('Consumer tag is %s', self._consumer_tag)
        except:
            log.exception('Failed to consume %s', self._queue.routing_key)
            self._consumer_tag          = None
            raise

    @tornado.gen.coroutine
    def cancel(self, nowait=False, arguments=None, timeout=None):
        log = self._get_log('cancel')
        try:
            log.debug('Cancelling consumer of queue %s', self._queue.routing_key)
            yield self._channel.basic_cancel(
                    consumer_tag        = self._consumer_tag_given,
                    nowait              = nowait,
                    arguments           = arguments)
            self._consumer_tag          = None
            log.debug('Consumer is cancelled')
        except:
            log.exception('Failed to cancel consuming %s',
                    self._queue.routing_key)
            raise

class TimeoutError(Exception):
    '''
    Exception raised when an operation times out.
    '''
    pass

if __name__ == '__main__':
    # The following is a simple example that tries to
    # create two channels (transmit and receive), declare an exchange,
    # bind a new queue to it, then send a message to the exchange.
    logging.basicConfig(
            level=logging.DEBUG,
            format= '%(asctime)s %(name)s[%(filename)s:%(lineno)4d] '\
                    '%(process)d/%(threadName)s %(levelname)s %(message)s')
    io_loop = tornado.ioloop.IOLoop.instance()
    msg_received = tornado.concurrent.Future()

    @tornado.gen.coroutine
    def receive(channel, method, properties, body):
        logging.info(
                ('\n' + ('-'*80) +
                '\nReceived\n\tchannel=%s,\n\tmethod=%s,'\
                '\n\tproperties=%s, body=%s\n' +
                ('-'*80)),
                channel, method, properties, body)
        msg_received.set_result(True)
        raise tornado.gen.Return(None)

    @tornado.gen.coroutine
    def on_connect(*args, **kwargs):
        try:
            logging.info('Opening a transmit channel')
            tx_ch = yield amqp_conn.channel()
            logging.debug('tx_ch = %s', tx_ch)
            logging.info('Opening a receive channel')
            rx_ch = yield amqp_conn.channel()
            logging.info('Declaring a test exchange on the receive channel')
            ex = yield rx_ch.exchange_declare(exchange='test',
                    exchange_type='fanout')
            logging.info('Declaring a receive queue on the receive channel')
            rx_q  = yield rx_ch.queue_declare(exclusive=True)
            logging.info('Binding the queue to the exchange')
            bind  = yield rx_ch.queue_bind( queue=rx_q.method.queue,
                                            exchange='test', routing_key='#')
            logging.info('Beginning consumation of queue')
            consume = yield rx_ch.basic_consume(
                    consumer_callback = receive,
                    queue = rx_q.method.queue,
                    no_ack=True)
            logging.info('Transmitting a test message')
            tx = yield tx_ch.basic_publish(
                    exchange = 'test',
                    routing_key = 'test',
                    body = 'this is a test')
            logging.info('Waiting to receive something')
            yield msg_received
            logging.info('Disconnecting')
            yield amqp_conn.close()
            logging.info('Stopping')
        except:
            logging.exception('on_connect failed')
        io_loop.stop()
        raise tornado.gen.Return(None)

    amqp_conn = AMQPConnection(
            pika.URLParameters('amqp://guest:guest@localhost:5672/%2f'),
            io_loop=io_loop,
            on_open_callback=on_connect)
    io_loop.start()

Vote Here

You must earn at least 1 vote on your snippets to be allowed to vote

Terms Of Use

Privacy Policy

Featured snippets are MIT license

Gears & Masters

Advertise

DevOpsnipp.com © 2020

medium.png