Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# set the last command to be the end of the statement
self.commands[-1].end_of_statement = True
for command in self.commands:
# need to use deepcopy as the values can be appended to multiple
# parents and in lxml that removes it from the original parent,
# whereas this will create a copy of the statement for each parent
current_statement.append(deepcopy(command))
if command.end_of_statement:
statements.append(current_statement)
current_statement = []
return statements
class Command(ComplexObject):
def __init__(self, protocol_version="2.3", **kwargs):
"""
[MS-PSRP] 2.2.3.12 Command
https://msdn.microsoft.com/en-us/library/dd339976.aspx
:param protocol_version: The negotiated protocol version of the remote
host. This determines what merge_* objects are added to the
serialized xml.
:param cmd: The cmdlet or script to run
:param is_script: Whether cmd is a script or not
:param use_local_scope: Use local or global scope to invoke commands
:param merge_my_result: Controls the behaviour of what stream to merge
to 'merge_to_result'. Only supports NONE or ERROR (only used in
protocol 2.1)
:param merge_to_result: Controls the behaviour of where to merge the
)),
('parameters', DictionaryMeta(
name="Parameters",
dict_key_meta=ObjectMeta("S"),
dict_value_meta=ObjectMeta("Obj", object=ParameterMetadata))
),
)
self.name = kwargs.get('name')
self.namespace = kwargs.get('namespace')
self.help_uri = kwargs.get('help_uri')
self.command_type = kwargs.get('command_type')
self.output_type = kwargs.get('output_type')
self.parameters = kwargs.get('parameters')
class ParameterMetadata(ComplexObject):
def __init__(self, **kwargs):
"""
[MS-PSRP] 2.2.3.23 ParameterMetadata
https://msdn.microsoft.com/en-us/library/ee175918.aspx
:param name: The name of a parameter
:param parameter_type: The type of the parameter
:param alises: List of alternative names of the parameter
:param switch_parameter: True if param is a switch parameter
:param dynamic: True if param is included as a consequence of the data
specified in the ArgumentList property
"""
super(ParameterMetadata, self).__init__()
self.types = [
"System.Management.Automation.ParameterMetadata",
[MS-PSRP] 2.2.2.30 RUNSPACEPOOL_INIT_DATA Message
https://msdn.microsoft.com/en-us/library/hh537788.aspx
:param min_runspaces:
:param max_runspaces:
"""
super(RunspacePoolInitData, self).__init__()
self._extended_properties = (
('min_runspaces', ObjectMeta("I32", name="MinRunspaces")),
('max_runspaces', ObjectMeta("I32", name="MaxRunspaces")),
)
self.min_runspaces = min_runspaces
self.max_runspaces = max_runspaces
class ResetRunspaceState(ComplexObject):
MESSAGE_TYPE = MessageType.RESET_RUNSPACE_STATE
def __init__(self, ci=None):
"""
[MS-PSRP] 2.2.2.31 RESET_RUNSPACE_STATE Message
https://msdn.microsoft.com/en-us/library/mt224027.aspx
:param ci: The call identifier
"""
super(ResetRunspaceState, self).__init__()
self._extended_properties = (
('ci', ObjectMeta("I64", name="ci")),
)
self.ci = ci
class ErrorRecordMessage(ErrorRecord):
MESSAGE_TYPE = MessageType.ERROR_RECORD
def __init__(self, **kwargs):
"""
[MS-PSRP] 2.2.2.20 ERROR_RECORD Message
https://msdn.microsoft.com/en-us/library/dd342423.aspx
:param kwargs:
"""
super(ErrorRecordMessage, self).__init__(**kwargs)
class PipelineState(ComplexObject):
MESSAGE_TYPE = MessageType.PIPELINE_STATE
def __init__(self, state=None, error_record=None):
"""
[MS-PSRP] 2.2.2.21 PIPELINE_STATE Message
https://msdn.microsoft.com/en-us/library/dd304923.aspx
:param state: The state of the pipeline
:param error_record:
"""
super(PipelineState, self).__init__()
self._extended_properties = (
('state', ObjectMeta("I32", name="PipelineState")),
('error_record', ObjectMeta("Obj", name="ExceptionAsErrorRecord",
optional=True)),
)
super(SessionCapability, self).__init__()
self._extended_properties = (
('protocol_version', ObjectMeta("Version",
name="protocolversion")),
('ps_version', ObjectMeta("Version", name="PSVersion")),
('serialization_version', ObjectMeta("Version",
name="SerializationVersion")),
('time_zone', ObjectMeta("BA", name="TimeZone", optional=True)),
)
self.protocol_version = protocol_version
self.ps_version = ps_version
self.serialization_version = serialization_version
self.time_zone = time_zone
class InitRunspacePool(ComplexObject):
MESSAGE_TYPE = MessageType.INIT_RUNSPACEPOOL
def __init__(self, min_runspaces=None, max_runspaces=None,
thread_options=None, apartment_state=None, host_info=None,
application_arguments=None):
"""
[MS-PSRP] 2.2.2.2 INIT_RUNSPACEPOOL Message
https://msdn.microsoft.com/en-us/library/dd359645.aspx
:param min_runspaces: The minimum number of runspaces in the pool
:param max_runspaces: The maximum number of runspaces in the pool
:param thread_options: Thread options provided by the higher layer
:param apartment_state: Apartment state provided by the higher layer
:param host_info: The client's HostInfo details
:param application_arguments: Application arguments provided by a
higher layer, stored in the $PSSenderInfo variable in the pool
message = PipelineOutput()
message.data = message_data
elif message_type == MessageType.PIPELINE_INPUT:
message_data = serializer.deserialize(message_data)
message = PipelineInput()
message.data = message_data
elif message_type == MessageType.PUBLIC_KEY_REQUEST:
message = PublicKeyRequest()
else:
message_meta = ObjectMeta("Obj", object=message_obj)
message = serializer.deserialize(message_data, message_meta)
return Message(destination, rpid, pid, message, serializer)
class SessionCapability(ComplexObject):
MESSAGE_TYPE = MessageType.SESSION_CAPABILITY
def __init__(self, protocol_version=None, ps_version=None,
serialization_version=None, time_zone=None):
"""
[MS-PSRP] 2.2.2.1 SESSION_CAPABILITY Message
https://msdn.microsoft.com/en-us/library/dd340636.aspx
:param protocol_version: The PSRP version
:param ps_version: The PowerShell version
:param serialization_version: The serialization version
:param time_zone: Time Zone information of the host, should be a byte
string
"""
super(SessionCapability, self).__init__()
self._extended_properties = (
('args', ObjectMeta(name="PSEventArgs.SourceArgs")),
('data', ObjectMeta(name="PSEventArgs.MessageData")),
('computer', ObjectMeta("S", name="PSEventArgs.ComputerName")),
('runspace_id', ObjectMeta("G", name="PSEventArgs.RunspaceId")),
)
self.event_id = event_id
self.source_id = source_id
self.time = time
self.sender = sender
self.args = args
self.data = data
self.computer = computer
self.runspace_id = runspace_id
class ApplicationPrivateData(ComplexObject):
MESSAGE_TYPE = MessageType.APPLICATION_PRIVATE_DATA
def __init__(self, data=None):
"""
[MS-PSRP] 2.2.2.13 APPLICATION_PRIVATE_DATA Message
https://msdn.microsoft.com/en-us/library/dd644934.aspx
:param data: A dict that contains data to sent to the PowerShell layer
"""
super(ApplicationPrivateData, self).__init__()
self._extended_properties = (
('data', DictionaryMeta(name="ApplicationPrivateData")),
)
self.data = data
@property
def _to_string(self):
try:
return self._string_map[self.value]
except KeyError as err:
raise KeyError("%s is not a valid enum value for %s, valid values "
"are %s" % (err, self._types[0], self._string_map))
@_to_string.setter
def _to_string(self, value):
pass
# PSRP Complex Objects - https://msdn.microsoft.com/en-us/library/dd302883.aspx
class Coordinates(ComplexObject):
def __init__(self, **kwargs):
"""
[MS-PSRP] 2.2.3.1 Coordinates
https://msdn.microsoft.com/en-us/library/dd302883.aspx
:param x: The X coordinate (0 is the leftmost column)
:param y: The Y coordinate (0 is the topmost row)
"""
super(Coordinates, self).__init__()
self._adapted_properties = (
('x', ObjectMeta("I32", name="X")),
('y', ObjectMeta("I32", name="Y")),
)
self._types = [
"System.Management.Automation.Host.Coordinates",
elif value_type == str:
return "S"
elif value_type == bytes:
# This will only occur in Python 3 as a byte string in Python 2 is
# a str. If users on that platform want a BA then they need to
# explicitly set the metadata themselves
return "BA"
elif value_type == uuid:
return "G"
elif value_type == list:
return "LST"
elif value_type == dict:
return "DCT"
elif isinstance(value, GenericComplexObject):
return "ObjDynamic"
elif isinstance(value, ComplexObject):
return "Obj"
else:
# catch all, this probably isn't right but will not throw an
# error
return "S"
or Complex Object, Null for no value
"""
super(CommandParameter, self).__init__()
self._extended_properties = (
('name', ObjectMeta("S", name="N")),
('value', ObjectMeta(name="V")),
)
self.name = kwargs.get('name')
self.value = kwargs.get('value')
# The host default data is serialized quite differently from the normal rules
# this contains some sub classes that are specific to the serialized form
class _HostDefaultData(ComplexObject):
class _DictValue(ComplexObject):
def __init__(self, **kwargs):
super(_HostDefaultData._DictValue, self).__init__()
self._extended_properties = (
('value_type', ObjectMeta("S", name="T")),
('value', ObjectMeta(name="V")),
)
self.value_type = kwargs.get('value_type')
self.value = kwargs.get('value')
class _Color(ComplexObject):
def __init__(self, color):
super(_HostDefaultData._Color, self).__init__()
self._extended_properties = (
('type', ObjectMeta("S", name="T")),