Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""Tests writing-to and reading-from TCP Host."""
frame = "%s>%s:%s" % (
self.random(6),
','.join([self.random(6), self.random(6), self.random(6)]),
' '.join([
self.random(), 'test_write_and_read', self.random()])
)
aprs_frame = aprs.Frame(frame)
kiss_frame = aprs_frame.encode_kiss()
ks = kiss.TCPKISS(host=self.random_host, port=self.random_port)
a = (self.random_host, self.random_port)
entry = MocketEntry(a, [kiss_frame])
entry_1 = MocketEntry(('localhost', 80), True)
Mocket.register(entry)
ks.interface = create_connection(a)
ks._write_handler = ks.interface.sendall
def _pass(): pass
ks.stop = _pass
ks.write(kiss_frame)
_read_data = ks.read(len(kiss_frame), readmode=False)
self._logger.info(
'_read_data(%s)="%s"', len(_read_data), _read_data)
def test_getentry(self):
entry = MocketEntry(('localhost', 80), True)
Mocket.register(entry)
self.assertEqual(Mocket.get_entry('localhost', 80, True), entry)
def _test_write(self):
frame = "%s>%s:%s" % (
self.random(6),
','.join([self.random(6), self.random(6), self.random(6)]),
' '.join([
self.random(), 'test_write', self.random()])
)
aprs_frame = aprs.Frame(frame)
kiss_frame = aprs_frame.encode_kiss()
ks = kiss.TCPKISS(host=self.random_host, port=self.random_port)
a = (self.random_host, self.random_port)
entry = MocketEntry(a, kiss_frame)
Mocket.register(entry)
self._logger.debug(a)
self._logger.debug(entry.get_response())
ks.interface = create_connection(a)
ks._write_handler = ks.interface.sendall
def _pass(): pass
ks.stop = _pass
ks.write(kiss_frame)
def __init__(self, expected, response):
self.location = ('localhost', 4444)
mocket.Mocket.register(self)
self.expected = expected
self.response = response
self.calls = 0
def register(cls, method, uri, *responses, **config):
default_config = dict(match_querystring=True, add_trailing_slash=True)
default_config.update(config)
config = default_config
if config['add_trailing_slash'] and not urlsplit(uri).path:
uri += '/'
Mocket.register(cls(uri, method, responses, match_querystring=config['match_querystring']))
def register(cls, addr, command, *responses):
responses = [cls.response_cls(r) for r in responses]
Mocket.register(cls(addr, command, responses))
def single_register(cls, method, uri, body='', status=200, headers=None, match_querystring=True):
entry = cls(
uri, method, Response(
body=body, status=status, headers=headers
), match_querystring=match_querystring
)
Mocket.register(entry)
return entry