How to use the mocket.mocket.Mocket.register function in mocket

To help you get started, we’ve selected a few mocket examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github ampledata / kiss / tests / test_tcpkiss.py View on Github external
"""Tests writing-to and reading-from TCP Host."""
        frame = "%s>%s:%s" % (
            self.random(6),
            ','.join([self.random(6), self.random(6), self.random(6)]),
            ' '.join([
                self.random(), 'test_write_and_read', self.random()])
        )
        aprs_frame = aprs.Frame(frame)
        kiss_frame = aprs_frame.encode_kiss()

        ks = kiss.TCPKISS(host=self.random_host, port=self.random_port)
        a = (self.random_host, self.random_port)

        entry = MocketEntry(a, [kiss_frame])
        entry_1 = MocketEntry(('localhost', 80), True)
        Mocket.register(entry)

        ks.interface = create_connection(a)
        ks._write_handler = ks.interface.sendall

        def _pass(): pass
        ks.stop = _pass

        ks.write(kiss_frame)

        _read_data = ks.read(len(kiss_frame), readmode=False)

        self._logger.info(
            '_read_data(%s)="%s"', len(_read_data), _read_data)
github mindflayer / python-mocket / tests / test_mocket.py View on Github external
def test_getentry(self):
        entry = MocketEntry(('localhost', 80), True)
        Mocket.register(entry)
        self.assertEqual(Mocket.get_entry('localhost', 80, True), entry)
github ampledata / kiss / tests / test_tcpkiss.py View on Github external
def _test_write(self):
        frame = "%s>%s:%s" % (
            self.random(6),
            ','.join([self.random(6), self.random(6), self.random(6)]),
            ' '.join([
                self.random(), 'test_write', self.random()])
        )
        aprs_frame = aprs.Frame(frame)
        kiss_frame = aprs_frame.encode_kiss()

        ks = kiss.TCPKISS(host=self.random_host, port=self.random_port)
        a = (self.random_host, self.random_port)

        entry = MocketEntry(a, kiss_frame)
        Mocket.register(entry)
        self._logger.debug(a)
        self._logger.debug(entry.get_response())

        ks.interface = create_connection(a)
        ks._write_handler = ks.interface.sendall

        def _pass(): pass

        ks.stop = _pass

        ks.write(kiss_frame)
github mcuadros / pynats / tests / test_connection.py View on Github external
def __init__(self, expected, response):
        self.location = ('localhost', 4444)
        mocket.Mocket.register(self)
        self.expected = expected
        self.response = response
        self.calls = 0
github mindflayer / python-mocket / mocket / mockhttp.py View on Github external
def register(cls, method, uri, *responses, **config):

        default_config = dict(match_querystring=True, add_trailing_slash=True)
        default_config.update(config)
        config = default_config

        if config['add_trailing_slash'] and not urlsplit(uri).path:
            uri += '/'

        Mocket.register(cls(uri, method, responses, match_querystring=config['match_querystring']))
github mindflayer / python-mocket / mocket / mockredis.py View on Github external
def register(cls, addr, command, *responses):
        responses = [cls.response_cls(r) for r in responses]
        Mocket.register(cls(addr, command, responses))
github mindflayer / python-mocket / mocket / plugins / pook_mock_engine.py View on Github external
def single_register(cls, method, uri, body='', status=200, headers=None, match_querystring=True):
        entry = cls(
            uri, method, Response(
                body=body, status=status, headers=headers
            ), match_querystring=match_querystring
        )
        Mocket.register(entry)
        return entry