Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
value={
'type': 'URL',
'confidence': 100,
'share_level': 'green',
'sources': ['test.1']
}
)
for call in SR_mock.mock_calls:
name, args, kwargs = call
if name == '().pipeline().__enter__().hset':
break
else:
self.fail(msg='hset not found')
self.assertEqual(args[2].startswith('lz4'), True)
stixdict = json.loads(lz4.decompress(args[2][3:]))
indicator = stixdict['indicators'][0]
cyboxprops = indicator['observable']['object']['properties']
self.assertEqual(cyboxprops['type'], 'URL')
self.assertEqual(cyboxprops['value'], u'\u2603.net/p\xe5th')
b.stop()
value={
'type': 'URL',
'confidence': 100,
'share_level': 'green',
'sources': ['test.1']
}
)
for call in SR_mock.mock_calls:
name, args, kwargs = call
if name == '().pipeline().__enter__().hset':
break
else:
self.fail(msg='hset not found')
self.assertEqual(args[2].startswith('lz4'), True)
stixdict = json.loads(lz4.decompress(args[2][3:]))
indicator = stixdict['indicators'][0]
cyboxprops = indicator['observable']['object']['properties']
self.assertEqual(cyboxprops['type'], 'URL')
self.assertEqual(cyboxprops['value'], u'\u2603.net/p\xe5th')
SR_mock.reset_mock()
b.stop()
value={
'type': 'domain',
'confidence': 100,
'share_level': 'green',
'sources': ['test.1']
}
)
for call in SR_mock.mock_calls:
name, args, kwargs = call
if name == '().pipeline().__enter__().hset':
break
else:
self.fail(msg='hset not found')
self.assertEqual(args[2].startswith('lz4'), True)
stixdict = json.loads(lz4.decompress(args[2][3:]))
indicator = stixdict['indicators'][0]
cyboxprops = indicator['observable']['object']['properties']
self.assertEqual(cyboxprops['value'], 'example.com')
self.assertEqual(cyboxprops['type'], 'FQDN')
SR_mock.reset_mock()
b.stop()
(c.compress, lz4_decompress),
(c.compressHC, lz4_decompress),
(lz4_compress, c.decompress),
(lz4_compressHC, c.decompress)
], ids=('arctic/lz4',
'arcticHC/lz4',
'lz4/arctic',
'lz4HC/arctic'))
def test_roundtrip(compress, decompress):
_str = b"hello world"
cstr = compress(_str)
assert _str == decompress(cstr)
def roundtrip(size=None):
if size is None:
size = struct.unpack(">I", b"\0" + os.urandom(3))[0]
data = os.urandom(size)
assert rustlz4.decompress(pylz4.compress(data)) == data
assert pylz4.decompress(buffer(rustlz4.compress(data))) == data
assert rustlz4.decompress(pylz4.compressHC(data)) == data
assert pylz4.decompress(buffer(rustlz4.compresshc(data))) == data
def zip_decompress(compressed, magic):
# hexdump(data)
if magic == CISO_MAGIC:
return zlib.decompress(compressed, -15)
elif magic == ZISO_MAGIC:
return lz4.decompress(b'\x00\x08\x00\x00' + compressed)
klen, key = read_command(v011_aol)
if cmd == 'JAR':
ctlen, ctype = read_command(v011_aol)
originallen, original_size = read_command(v011_aol)
_ = v011_aol.read(1)
char = b""
length = b""
while char != None and char != b':':
char = v011_aol.read(1)
if not char:
raise FuckOffException()
if char != b':':
length = length + char
compressed = v011_aol.read(int(length))
pre_compressed = struct.pack("I", int(original_size)) + compressed
decompressed = lz4.decompress(pre_compressed)
database[key] = decompressed.encode("base64")
elif cmd == 'SPOIL':
# :5:SPOIL:30:10391merveill.espage%3D15_root:20:2014-04-28T12:44:12Z
datesize, date = read_command(v011_aol)
expired = datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ")
if expired <= datetime.now():
try:
del database[key]
except KeyError:
pass
elif cmd == 'SCOOP':
try:
del database[key]
except KeyError:
decompress = lambda s: pickle.loads( lz4.decompress( s ) )
compress = lambda o: lz4.compressHC( pickle.dumps( o ) )