How to use the atlasapi.events._AtlasBaseEvent function in atlasapi

To help you get started, we’ve selected a few atlasapi examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github mgmonteleone / python-atlasapi / atlasapi / events.py View on Github external
return return_dict


class _AtlasUserBaseEvent(_AtlasBaseEvent):
    def __init__(self, value_dict: dict) -> None:
        super().__init__(value_dict)
        self.user_id = value_dict.get('userId', None)  # type: str
        self.username = value_dict.get('username')  # type: str
        try:
            self.remote_address = ipaddress.ip_address(value_dict.get('remoteAddress', None))  # type: ipaddress
        except ValueError:
            logger.info('No IP address found')
            self.remote_address = None


class AtlasEvent(_AtlasBaseEvent):
    def __init__(self, value_dict: dict) -> None:
        super().__init__(value_dict)


class AtlasDataExplorerEvent(_AtlasUserBaseEvent):
    def __init__(self, value_dict: dict) -> None:
        super().__init__(value_dict)
        self.database = value_dict.get('database', None)  # type: str
        self.collection = value_dict.get('collection', None)  # type: str
        self.op_type = value_dict.get('opType', None)  # type: str


class AtlasClusterEvent(_AtlasBaseEvent):
    def __init__(self, value_dict: dict) -> None:
        super().__init__(value_dict)
        self.replica_set_name = value_dict.get('replicaSetName', None)  # type: str
github mgmonteleone / python-atlasapi / atlasapi / events.py View on Github external
self.event_dict = value_dict  # type: dict

    def as_dict(self):
        original_dict = self.__dict__
        return_dict = copy(original_dict)
        del return_dict['event_dict']
        return_dict['created_date'] = datetime.isoformat(self.created_date)
        return_dict['event_type'] = self.event_type.name
        return_dict['event_type_desc'] = self.event_type.value

        if return_dict.get('remote_address'):
            return_dict['remote_address'] = return_dict['remote_address'].__str__()
        return return_dict


class _AtlasUserBaseEvent(_AtlasBaseEvent):
    def __init__(self, value_dict: dict) -> None:
        super().__init__(value_dict)
        self.user_id = value_dict.get('userId', None)  # type: str
        self.username = value_dict.get('username')  # type: str
        try:
            self.remote_address = ipaddress.ip_address(value_dict.get('remoteAddress', None))  # type: ipaddress
        except ValueError:
            logger.info('No IP address found')
            self.remote_address = None


class AtlasEvent(_AtlasBaseEvent):
    def __init__(self, value_dict: dict) -> None:
        super().__init__(value_dict)
github mgmonteleone / python-atlasapi / atlasapi / events.py View on Github external
class AtlasEvent(_AtlasBaseEvent):
    def __init__(self, value_dict: dict) -> None:
        super().__init__(value_dict)


class AtlasDataExplorerEvent(_AtlasUserBaseEvent):
    def __init__(self, value_dict: dict) -> None:
        super().__init__(value_dict)
        self.database = value_dict.get('database', None)  # type: str
        self.collection = value_dict.get('collection', None)  # type: str
        self.op_type = value_dict.get('opType', None)  # type: str


class AtlasClusterEvent(_AtlasBaseEvent):
    def __init__(self, value_dict: dict) -> None:
        super().__init__(value_dict)
        self.replica_set_name = value_dict.get('replicaSetName', None)  # type: str
        self.cluster_name = value_dict.get('clusterName', None)  # type: str


class AtlasHostEvent(_AtlasBaseEvent):
    def __init__(self, value_dict: dict) -> None:
        super().__init__(value_dict)
        self.hostname = value_dict.get('hostname', None)  # type: str
        self.port = value_dict.get('port', None)  # type: int
        self.replica_set_name = value_dict.get('replicaSetName', None)  # type: str


class AtlasFeatureEvent(_AtlasUserBaseEvent):
    def __init__(self, value_dict: dict) -> None:
github mgmonteleone / python-atlasapi / atlasapi / events.py View on Github external
class AtlasDataExplorerEvent(_AtlasUserBaseEvent):
    def __init__(self, value_dict: dict) -> None:
        super().__init__(value_dict)
        self.database = value_dict.get('database', None)  # type: str
        self.collection = value_dict.get('collection', None)  # type: str
        self.op_type = value_dict.get('opType', None)  # type: str


class AtlasClusterEvent(_AtlasBaseEvent):
    def __init__(self, value_dict: dict) -> None:
        super().__init__(value_dict)
        self.replica_set_name = value_dict.get('replicaSetName', None)  # type: str
        self.cluster_name = value_dict.get('clusterName', None)  # type: str


class AtlasHostEvent(_AtlasBaseEvent):
    def __init__(self, value_dict: dict) -> None:
        super().__init__(value_dict)
        self.hostname = value_dict.get('hostname', None)  # type: str
        self.port = value_dict.get('port', None)  # type: int
        self.replica_set_name = value_dict.get('replicaSetName', None)  # type: str


class AtlasFeatureEvent(_AtlasUserBaseEvent):
    def __init__(self, value_dict: dict) -> None:
        super().__init__(value_dict)
        self.hostname = value_dict.get('hostname', None)  # type: str
        self.feature_name = value_dict.get('featureName', None)  # type: str


def atlas_event_factory(value_dict: dict) -> Union[
                            AtlasEvent, AtlasDataExplorerEvent, AtlasClusterEvent, AtlasHostEvent, AtlasFeatureEvent]:
github mgmonteleone / python-atlasapi / atlasapi / events.py View on Github external
if value_dict.get("featureName", None):
        return AtlasFeatureEvent(value_dict=value_dict)

    elif value_dict.get("hostname", None):
        return AtlasHostEvent(value_dict=value_dict)

    elif value_dict.get("clusterName", None):
        return AtlasClusterEvent(value_dict=value_dict)

    elif value_dict.get("database", None):
        return AtlasDataExplorerEvent(value_dict=value_dict)
    else:
        return AtlasEvent(value_dict=value_dict)


ListOfEvents = NewType('ListOfEvents', List[Union[dict, _AtlasBaseEvent]])