Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
return return_dict
class _AtlasUserBaseEvent(_AtlasBaseEvent):
def __init__(self, value_dict: dict) -> None:
super().__init__(value_dict)
self.user_id = value_dict.get('userId', None) # type: str
self.username = value_dict.get('username') # type: str
try:
self.remote_address = ipaddress.ip_address(value_dict.get('remoteAddress', None)) # type: ipaddress
except ValueError:
logger.info('No IP address found')
self.remote_address = None
class AtlasEvent(_AtlasBaseEvent):
def __init__(self, value_dict: dict) -> None:
super().__init__(value_dict)
class AtlasDataExplorerEvent(_AtlasUserBaseEvent):
def __init__(self, value_dict: dict) -> None:
super().__init__(value_dict)
self.database = value_dict.get('database', None) # type: str
self.collection = value_dict.get('collection', None) # type: str
self.op_type = value_dict.get('opType', None) # type: str
class AtlasClusterEvent(_AtlasBaseEvent):
def __init__(self, value_dict: dict) -> None:
super().__init__(value_dict)
self.replica_set_name = value_dict.get('replicaSetName', None) # type: str
self.event_dict = value_dict # type: dict
def as_dict(self):
original_dict = self.__dict__
return_dict = copy(original_dict)
del return_dict['event_dict']
return_dict['created_date'] = datetime.isoformat(self.created_date)
return_dict['event_type'] = self.event_type.name
return_dict['event_type_desc'] = self.event_type.value
if return_dict.get('remote_address'):
return_dict['remote_address'] = return_dict['remote_address'].__str__()
return return_dict
class _AtlasUserBaseEvent(_AtlasBaseEvent):
def __init__(self, value_dict: dict) -> None:
super().__init__(value_dict)
self.user_id = value_dict.get('userId', None) # type: str
self.username = value_dict.get('username') # type: str
try:
self.remote_address = ipaddress.ip_address(value_dict.get('remoteAddress', None)) # type: ipaddress
except ValueError:
logger.info('No IP address found')
self.remote_address = None
class AtlasEvent(_AtlasBaseEvent):
def __init__(self, value_dict: dict) -> None:
super().__init__(value_dict)
class AtlasEvent(_AtlasBaseEvent):
def __init__(self, value_dict: dict) -> None:
super().__init__(value_dict)
class AtlasDataExplorerEvent(_AtlasUserBaseEvent):
def __init__(self, value_dict: dict) -> None:
super().__init__(value_dict)
self.database = value_dict.get('database', None) # type: str
self.collection = value_dict.get('collection', None) # type: str
self.op_type = value_dict.get('opType', None) # type: str
class AtlasClusterEvent(_AtlasBaseEvent):
def __init__(self, value_dict: dict) -> None:
super().__init__(value_dict)
self.replica_set_name = value_dict.get('replicaSetName', None) # type: str
self.cluster_name = value_dict.get('clusterName', None) # type: str
class AtlasHostEvent(_AtlasBaseEvent):
def __init__(self, value_dict: dict) -> None:
super().__init__(value_dict)
self.hostname = value_dict.get('hostname', None) # type: str
self.port = value_dict.get('port', None) # type: int
self.replica_set_name = value_dict.get('replicaSetName', None) # type: str
class AtlasFeatureEvent(_AtlasUserBaseEvent):
def __init__(self, value_dict: dict) -> None:
class AtlasDataExplorerEvent(_AtlasUserBaseEvent):
def __init__(self, value_dict: dict) -> None:
super().__init__(value_dict)
self.database = value_dict.get('database', None) # type: str
self.collection = value_dict.get('collection', None) # type: str
self.op_type = value_dict.get('opType', None) # type: str
class AtlasClusterEvent(_AtlasBaseEvent):
def __init__(self, value_dict: dict) -> None:
super().__init__(value_dict)
self.replica_set_name = value_dict.get('replicaSetName', None) # type: str
self.cluster_name = value_dict.get('clusterName', None) # type: str
class AtlasHostEvent(_AtlasBaseEvent):
def __init__(self, value_dict: dict) -> None:
super().__init__(value_dict)
self.hostname = value_dict.get('hostname', None) # type: str
self.port = value_dict.get('port', None) # type: int
self.replica_set_name = value_dict.get('replicaSetName', None) # type: str
class AtlasFeatureEvent(_AtlasUserBaseEvent):
def __init__(self, value_dict: dict) -> None:
super().__init__(value_dict)
self.hostname = value_dict.get('hostname', None) # type: str
self.feature_name = value_dict.get('featureName', None) # type: str
def atlas_event_factory(value_dict: dict) -> Union[
AtlasEvent, AtlasDataExplorerEvent, AtlasClusterEvent, AtlasHostEvent, AtlasFeatureEvent]:
if value_dict.get("featureName", None):
return AtlasFeatureEvent(value_dict=value_dict)
elif value_dict.get("hostname", None):
return AtlasHostEvent(value_dict=value_dict)
elif value_dict.get("clusterName", None):
return AtlasClusterEvent(value_dict=value_dict)
elif value_dict.get("database", None):
return AtlasDataExplorerEvent(value_dict=value_dict)
else:
return AtlasEvent(value_dict=value_dict)
ListOfEvents = NewType('ListOfEvents', List[Union[dict, _AtlasBaseEvent]])