Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_pack(compress):
message = {'version': '1.1', 'short_message': 'test pack', 'foo': _ObjWithStr()}
expected = json.loads(json.dumps(message, default=str))
packed_message = gelf.pack(message, compress, default=str)
unpacked_message = zlib.decompress(packed_message) if compress else packed_message
unpacked_message = json.loads(unpacked_message.decode('utf-8'))
assert expected == unpacked_message
def makePickle(self, record):
message = gelf.make(record, self.debug, self.additional_fields)
packed = gelf.pack(message)
""" if you send the message over tcp, it should always be null terminated or the input will reject it """
return packed + b'\x00'
def convert_record_to_gelf(self, record):
return gelf.pack(
gelf.make(record, self.domain, self.debug, self.version, self.additional_fields, self.include_extra_fields),
self.compress, self.json_default
)