socket.py 2.22 KB
Newer Older
Stelios Karozis's avatar
Stelios Karozis committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
""" Defines a dummy socket implementing (part of) the zmq.Socket interface. """

# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.

import abc
import warnings
try:
    from queue import Queue  # Py 3
except ImportError:
    from Queue import Queue  # Py 2

import zmq

from traitlets import HasTraits, Instance, Int
from ipython_genutils.py3compat import with_metaclass

#-----------------------------------------------------------------------------
# Generic socket interface
#-----------------------------------------------------------------------------

class SocketABC(with_metaclass(abc.ABCMeta, object)):

    @abc.abstractmethod
    def recv_multipart(self, flags=0, copy=True, track=False):
        raise NotImplementedError

    @abc.abstractmethod
    def send_multipart(self, msg_parts, flags=0, copy=True, track=False):
        raise NotImplementedError

    @classmethod
    def register(cls, other_cls):
        if other_cls is not DummySocket:
            warnings.warn("SocketABC is deprecated since ipykernel version 4.5.0.",
                    DeprecationWarning, stacklevel=2)
        abc.ABCMeta.register(cls, other_cls)

#-----------------------------------------------------------------------------
# Dummy socket class
#-----------------------------------------------------------------------------

class DummySocket(HasTraits):
    """ A dummy socket implementing (part of) the zmq.Socket interface. """

    queue = Instance(Queue, ())
    message_sent = Int(0) # Should be an Event
    context = Instance(zmq.Context)
    def _context_default(self):
        return zmq.Context()

    #-------------------------------------------------------------------------
    # Socket interface
    #-------------------------------------------------------------------------

    def recv_multipart(self, flags=0, copy=True, track=False):
        return self.queue.get_nowait()

    def send_multipart(self, msg_parts, flags=0, copy=True, track=False):
        msg_parts = list(map(zmq.Message, msg_parts))
        self.queue.put_nowait(msg_parts)
        self.message_sent += 1

    def flush(self, timeout=1.0):
        """no-op to comply with stream API"""
        pass

SocketABC.register(DummySocket)