Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_parse_message(self):
actual = PAExecFileInfo()
data = b"\x04\x00\x00\x00" \
b"\x66\x00\x69\x00\x6c\x00\x65\x00" \
b"\x00\x80\x3e\xd5\xde\xb1\x9d\x01" \
b"\x0a\x00\x00\x00" \
b"\x0a\x00\x00\x00" \
b"\x01"
data = actual.unpack(data)
assert len(actual) == 29
assert data == b""
assert actual['filename_len'].get_value() == 4
assert actual['filename'].get_value() == "file".encode('utf-16-le-')
assert actual['file_last_write'].get_value() == \
datetime.utcfromtimestamp(0)
assert actual['file_version_ls'].get_value() == 10
assert actual['file_version_ms'].get_value() == 10
assert actual['copy_file'].get_value()
message['arguments'] = "arg1".encode('utf-16-le')
message['src_dir'] = "C:\\source".encode('utf-16-le')
message['dest_dir'] = "C:\\target".encode('utf-16-le')
src_file_info1 = PAExecFileInfo()
src_file_info1['filename'] = "src1".encode('utf-16-le')
src_file_info1['file_last_write'] = datetime.utcfromtimestamp(0)
src_file_info2 = PAExecFileInfo()
src_file_info2['filename'] = "src2".encode('utf-16-le')
src_file_info2['file_last_write'] = datetime.utcfromtimestamp(0)
message['src_files'] = [src_file_info1, src_file_info2]
dest_file_info1 = PAExecFileInfo()
dest_file_info1['filename'] = "dest1".encode('utf-16-le')
dest_file_info1['file_last_write'] = datetime.utcfromtimestamp(0)
dest_file_info2 = PAExecFileInfo()
dest_file_info2['filename'] = "dest2".encode('utf-16-le')
dest_file_info2['file_last_write'] = datetime.utcfromtimestamp(0)
message['dest_files'] = [dest_file_info1, dest_file_info2]
expected = b"\x01\x00\x00\x00" \
b"\x02\x00\x00\x00" \
b"\x01\x00\x00\x00\x02\x00\x00\x00" \
b"\x00" \
b"\x00" \
b"\x00" \
b"\x00" \
b"\x00" \
b"\x00\x00\x00\x00" \
b"\x01" \
b"\x00" \
b"\x00" \
message['password'] = "pass".encode('utf-16-le')
message['username'] = "user".encode('utf-16-le')
message['executable'] = "a.exe".encode('utf-16-le')
message['arguments'] = "arg1".encode('utf-16-le')
message['src_dir'] = "C:\\source".encode('utf-16-le')
message['dest_dir'] = "C:\\target".encode('utf-16-le')
src_file_info1 = PAExecFileInfo()
src_file_info1['filename'] = "src1".encode('utf-16-le')
src_file_info1['file_last_write'] = datetime.utcfromtimestamp(0)
src_file_info2 = PAExecFileInfo()
src_file_info2['filename'] = "src2".encode('utf-16-le')
src_file_info2['file_last_write'] = datetime.utcfromtimestamp(0)
message['src_files'] = [src_file_info1, src_file_info2]
dest_file_info1 = PAExecFileInfo()
dest_file_info1['filename'] = "dest1".encode('utf-16-le')
dest_file_info1['file_last_write'] = datetime.utcfromtimestamp(0)
dest_file_info2 = PAExecFileInfo()
dest_file_info2['filename'] = "dest2".encode('utf-16-le')
dest_file_info2['file_last_write'] = datetime.utcfromtimestamp(0)
message['dest_files'] = [dest_file_info1, dest_file_info2]
expected = b"\x01\x00\x00\x00" \
b"\x02\x00\x00\x00" \
b"\x01\x00\x00\x00\x02\x00\x00\x00" \
b"\x00" \
b"\x00" \
b"\x00" \
b"\x00" \
b"\x00" \
b"\x00\x00\x00\x00" \
def test_create_message(self):
message = PAExecSettingsBuffer()
message['processors'] = [1, 2]
message['interactive'] = True
message['password'] = "pass".encode('utf-16-le')
message['username'] = "user".encode('utf-16-le')
message['executable'] = "a.exe".encode('utf-16-le')
message['arguments'] = "arg1".encode('utf-16-le')
message['src_dir'] = "C:\\source".encode('utf-16-le')
message['dest_dir'] = "C:\\target".encode('utf-16-le')
src_file_info1 = PAExecFileInfo()
src_file_info1['filename'] = "src1".encode('utf-16-le')
src_file_info1['file_last_write'] = datetime.utcfromtimestamp(0)
src_file_info2 = PAExecFileInfo()
src_file_info2['filename'] = "src2".encode('utf-16-le')
src_file_info2['file_last_write'] = datetime.utcfromtimestamp(0)
message['src_files'] = [src_file_info1, src_file_info2]
dest_file_info1 = PAExecFileInfo()
dest_file_info1['filename'] = "dest1".encode('utf-16-le')
dest_file_info1['file_last_write'] = datetime.utcfromtimestamp(0)
dest_file_info2 = PAExecFileInfo()
dest_file_info2['filename'] = "dest2".encode('utf-16-le')
dest_file_info2['file_last_write'] = datetime.utcfromtimestamp(0)
message['dest_files'] = [dest_file_info1, dest_file_info2]
expected = b"\x01\x00\x00\x00" \
message['unique_id'] = 1
buffer = PAExecSettingsBuffer()
buffer['processors'] = [1, 2]
buffer['interactive'] = True
buffer['password'] = "pass".encode('utf-16-le')
buffer['username'] = "user".encode('utf-16-le')
buffer['executable'] = "a.exe".encode('utf-16-le')
buffer['arguments'] = "arg1".encode('utf-16-le')
buffer['src_dir'] = "C:\\source".encode('utf-16-le')
buffer['dest_dir'] = "C:\\target".encode('utf-16-le')
src_file_info1 = PAExecFileInfo()
src_file_info1['filename'] = "src1".encode('utf-16-le')
src_file_info1['file_last_write'] = datetime.utcfromtimestamp(0)
src_file_info2 = PAExecFileInfo()
src_file_info2['filename'] = "src2".encode('utf-16-le')
src_file_info2['file_last_write'] = datetime.utcfromtimestamp(0)
buffer['src_files'] = [src_file_info1, src_file_info2]
dest_file_info1 = PAExecFileInfo()
dest_file_info1['filename'] = "dest1".encode('utf-16-le')
dest_file_info1['file_last_write'] = datetime.utcfromtimestamp(0)
dest_file_info2 = PAExecFileInfo()
dest_file_info2['filename'] = "dest2".encode('utf-16-le')
dest_file_info2['file_last_write'] = datetime.utcfromtimestamp(0)
buffer['dest_files'] = [dest_file_info1, dest_file_info2]
message['buffer'] = buffer
expected = b"\x01\x00" \
b"\x01\x02\x03\x04" \
b"\x00\x06\x06\x0f\x08\x15\x16\x13" \
b"\x1d\x19\x1a\x27\x22\x2d\x2e\x2b" \
def test_create_message(self):
message = PAExecFileInfo()
message['filename'] = "file".encode('utf-16-le')
message['file_last_write'] = datetime.utcfromtimestamp(0)
message['file_version_ls'] = 10
message['file_version_ms'] = 10
message['copy_file'] = True
expected = b"\x04\x00\x00\x00" \
b"\x66\x00\x69\x00\x6c\x00\x65\x00" \
b"\x00\x80\x3e\xd5\xde\xb1\x9d\x01" \
b"\x0a\x00\x00\x00" \
b"\x0a\x00\x00\x00" \
b"\x01"
actual = message.pack()
assert len(message) == 29
assert actual == expected
def __init__(self):
self.fields = OrderedDict([
('filename_len', IntField(
size=4,
default=lambda s: int(len(s['filename']) / 2)
)),
('filename', BytesField(
size=lambda s: s['filename_len'].get_value() * 2
)),
('file_last_write', DateTimeField(size=8)),
('file_version_ls', IntField(size=4)),
('file_version_ms', IntField(size=4)),
('copy_file', BoolField(size=1))
])
super(PAExecFileInfo, self).__init__()
def _get_file(self, data):
min_size = 21
filename_size = struct.unpack("
size=lambda s: s['src_dir_len'].get_value() * 2
)),
('dest_dir_len', IntField(
size=4,
default=lambda s: int(len(s['dest_dir']) / 2)
)),
('dest_dir', BytesField(
size=lambda s: s['dest_dir_len'].get_value() * 2
)),
('num_src_files', IntField(
size=4,
default=lambda s: len(s['src_files'].get_value())
)),
('src_files', ListField(
list_count=lambda s: s['num_src_files'].get_value(),
list_type=StructureField(structure_type=PAExecFileInfo),
unpack_func=lambda s, d:
self._unpack_file_list(s, d, 'num_src_files')
)),
('num_dest_files', IntField(
size=4,
default=lambda s: len(s['dest_files'].get_value())
)),
('dest_files', ListField(
list_count=lambda s: s['num_dest_files'].get_value(),
list_type=StructureField(structure_type=PAExecFileInfo),
unpack_func=lambda s, d:
self._unpack_file_list(s, d, 'num_dest_files')
)),
('timeout_seconds', IntField(size=4))
])
super(PAExecSettingsBuffer, self).__init__()