How to use the stix2.core.parse function in stix2

To help you get started, we’ve selected a few stix2 examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github oasis-open / cti-python-stix2 / stix2 / datastore / memory.py View on Github external
if isinstance(stix_data, list):
        # STIX objects are in a list- recurse on each object
        for stix_obj in stix_data:
            _add(store, stix_obj, allow_custom, version)

    elif stix_data["type"] == "bundle":
        # adding a json bundle - so just grab STIX objects
        for stix_obj in stix_data.get("objects", []):
            _add(store, stix_obj, allow_custom, version)

    else:
        # Adding a single non-bundle object
        if isinstance(stix_data, _STIXBase):
            stix_obj = stix_data
        else:
            stix_obj = parse(stix_data, allow_custom, version)

        # Map ID directly to the object, if it is a marking.  Otherwise,
        # map to a family, so we can track multiple versions.
        if is_marking(stix_obj):
            store._data[stix_obj.id] = stix_obj

        else:
            if stix_obj.id in store._data:
                obj_family = store._data[stix_obj.id]
            else:
                obj_family = _ObjectFamily()
                store._data[stix_obj.id] = obj_family

            obj_family.add(stix_obj)
github oasis-open / cti-python-stix2 / stix2 / datastore / memory.py View on Github external
if isinstance(stix_data, list):
        # STIX objects are in a list- recurse on each object
        for stix_obj in stix_data:
            _add(store, stix_obj, allow_custom, version)

    elif stix_data["type"] == "bundle":
        # adding a json bundle - so just grab STIX objects
        for stix_obj in stix_data.get("objects", []):
            _add(store, stix_obj, allow_custom, version)

    else:
        # Adding a single non-bundle object
        if isinstance(stix_data, _STIXBase):
            stix_obj = stix_data
        else:
            stix_obj = parse(stix_data, allow_custom, version)

        # Map ID directly to the object, if it is a marking.  Otherwise,
        # map to a family, so we can track multiple versions.
        if _is_marking(stix_obj):
            store._data[stix_obj["id"]] = stix_obj

        else:
            if stix_obj["id"] in store._data:
                obj_family = store._data[stix_obj["id"]]
            else:
                obj_family = _ObjectFamily()
                store._data[stix_obj["id"]] = obj_family

            obj_family.add(stix_obj)
github oasis-open / cti-python-stix2 / stix2 / datastore / taxii.py View on Github external
# don't extract TAXII filters from query (to send to TAXII endpoint)
        # as directly retrieving a STIX object by ID
        try:
            stix_objs = self.collection.get_object(stix_id)['objects']
            stix_obj = list(apply_common_filters(stix_objs, query))

        except HTTPError as e:
            if e.response.status_code == 404:
                # if resource not found or access is denied from TAXII server,
                # return None
                stix_obj = []
            else:
                raise DataSourceError("TAXII Collection resource returned error", e)

        if len(stix_obj):
            stix_obj = parse(stix_obj[0], allow_custom=self.allow_custom)
            if stix_obj.id != stix_id:
                # check - was added to handle erroneous TAXII servers
                stix_obj = None
        else:
            stix_obj = None

        return stix_obj
github oasis-open / cti-python-stix2 / stix2 / datastore / filesystem.py View on Github external
``stix_data`` can be a Bundle object, but each object in it will be
            saved separately; you will be able to retrieve any of the objects
            the Bundle contained, but not the Bundle itself.

        """
        if isinstance(stix_data, (v20.Bundle, v21.Bundle)):
            # recursively add individual STIX objects
            for stix_obj in stix_data.get("objects", []):
                self.add(stix_obj, version=version)

        elif isinstance(stix_data, _STIXBase):
            # adding python STIX object
            self._check_path_and_write(stix_data)

        elif isinstance(stix_data, (str, dict)):
            stix_data = parse(stix_data, allow_custom=self.allow_custom, version=version)
            self.add(stix_data, version=version)

        elif isinstance(stix_data, list):
            # recursively add individual STIX objects
            for stix_obj in stix_data:
                self.add(stix_obj)

        else:
            raise TypeError(
                "stix_data must be a STIX object (or list of), "
                "JSON formatted STIX (or list of), "
github oasis-open / cti-python-stix2 / stix2 / datastore / taxii.py View on Github external
bundle = parse(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)
            elif 'spec_version' in stix_data:
                # If the spec_version is present, use new Bundle object...
                bundle = v21.Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)
            else:
                bundle = v20.Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)

        elif isinstance(stix_data, list):
            # adding list of something - recurse on each
            for obj in stix_data:
                self.add(obj)
            return

        elif isinstance(stix_data, str):
            # adding json encoded string of STIX content
            stix_data = parse(stix_data, allow_custom=self.allow_custom)
            if stix_data['type'] == 'bundle':
                bundle = stix_data.serialize(encoding='utf-8', ensure_ascii=False)
            elif 'spec_version' in stix_data:
                # If the spec_version is present, use new Bundle object...
                bundle = v21.Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)
            else:
                bundle = v20.Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)

        else:
            raise TypeError("stix_data must be as STIX object(or list of),json formatted STIX (or list of), or a json formatted STIX bundle")

        self.collection.add_objects(bundle)
github oasis-open / cti-python-stix2 / stix2 / environment.py View on Github external
return self.factory.set_default_external_refs(*args, **kwargs)
    set_default_external_refs.__doc__ = ObjectFactory.set_default_external_refs.__doc__

    def set_default_object_marking_refs(self, *args, **kwargs):
        return self.factory.set_default_object_marking_refs(*args, **kwargs)
    set_default_object_marking_refs.__doc__ = ObjectFactory.set_default_object_marking_refs.__doc__

    def add_filters(self, *args, **kwargs):
        return self.source.filters.add(*args, **kwargs)

    def add_filter(self, *args, **kwargs):
        return self.source.filters.add(*args, **kwargs)

    def parse(self, *args, **kwargs):
        return _parse(*args, **kwargs)
    parse.__doc__ = _parse.__doc__

    def creator_of(self, obj):
        """Retrieve the Identity refered to by the object's `created_by_ref`.

        Args:
            obj: The STIX object whose `created_by_ref` property will be looked
                up.

        Returns:
            str: The STIX object's creator, or None, if the object contains no
                `created_by_ref` property or the object's creator cannot be
                found.

        """
        creator_id = obj.get('created_by_ref', '')
        if creator_id:
github oasis-open / cti-python-stix2 / stix2 / datastore / taxii.py View on Github external
# apply local (CompositeDataSource, TAXIICollectionSource and query) filters
            query.remove(taxii_filters)
            all_data = list(apply_common_filters(all_data, query))

        except HTTPError as e:
            # if resources not found or access is denied from TAXII server, return empty list
            if e.response.status_code == 404:
                raise DataSourceError(
                    "The requested STIX objects for the TAXII Collection resource defined in"
                    " the supplied TAXII Collection object are either not found or access is"
                    " denied. Received error: ", e,
                )

        # parse python STIX objects from the STIX object dicts
        stix_objs = [parse(stix_obj_dict, allow_custom=self.allow_custom) for stix_obj_dict in all_data]

        return stix_objs