How to use the flowmachine.flowmachine.core.server.query_schemas.custom_fields.ISODateTime function in flowmachine

To help you get started, we’ve selected a few flowmachine examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github Flowminder / FlowKit / flowmachine / flowmachine / core / server / query_schemas / nocturnal_events.py View on Github external
Returns
        -------
        Query
        """
        return NocturnalEvents(
            start=self.start,
            stop=self.stop,
            hours=self.hours,
            tables=self.event_types,
            subscriber_subset=self.subscriber_subset,
        )


class NocturnalEventsSchema(BaseQueryWithSamplingSchema):
    query_kind = fields.String(validate=OneOf(["nocturnal_events"]))
    start = ISODateTime(required=True)
    stop = ISODateTime(required=True)
    night_start_hour = fields.Integer(
        validate=Range(0, 23)
    )  # Tuples aren't supported by apispec https://github.com/marshmallow-code/apispec/issues/399
    night_end_hour = fields.Integer(validate=Range(0, 23))
    event_types = EventTypes()
    subscriber_subset = SubscriberSubset()

    __model__ = NocturnalEventsExposed
github Flowminder / FlowKit / flowmachine / flowmachine / core / server / query_schemas / subscriber_degree.py View on Github external
Returns
        -------
        Query
        """
        return SubscriberDegree(
            start=self.start,
            stop=self.stop,
            direction=self.direction,
            tables=self.event_types,
            subscriber_subset=self.subscriber_subset,
        )


class SubscriberDegreeSchema(BaseQueryWithSamplingSchema):
    query_kind = fields.String(validate=OneOf(["subscriber_degree"]))
    start = ISODateTime(required=True)
    stop = ISODateTime(required=True)
    direction = fields.String(
        required=False, validate=OneOf(["in", "out", "both"]), default="both"
    )  # TODO: use a globally defined enum for this
    event_types = EventTypes()
    subscriber_subset = SubscriberSubset()

    __model__ = SubscriberDegreeExposed
github Flowminder / FlowKit / flowmachine / flowmachine / core / server / query_schemas / pareto_interactions.py View on Github external
Returns
        -------
        Query
        """
        return ParetoInteractions(
            start=self.start,
            stop=self.stop,
            proportion=self.proportion,
            tables=self.event_types,
            subscriber_subset=self.subscriber_subset,
        )


class ParetoInteractionsSchema(BaseQueryWithSamplingSchema):
    query_kind = fields.String(validate=OneOf(["pareto_interactions"]))
    start = ISODateTime(required=True)
    stop = ISODateTime(required=True)
    proportion = fields.Float(required=True, validate=Range(min=0.0, max=1.0))
    event_types = EventTypes()
    subscriber_subset = SubscriberSubset()

    __model__ = ParetoInteractionsExposed
github Flowminder / FlowKit / flowmachine / flowmachine / core / server / query_schemas / displacement.py View on Github external
-------
        Query
        """
        return Displacement(
            start=self.start,
            stop=self.stop,
            statistic=self.statistic,
            reference_location=self.reference_location._flowmachine_query_obj,
            table=self.event_types,
            subscriber_subset=self.subscriber_subset,
        )


class DisplacementSchema(BaseQueryWithSamplingSchema):
    query_kind = fields.String(validate=OneOf(["displacement"]))
    start = ISODateTime(required=True)
    stop = ISODateTime(required=True)
    statistic = Statistic()
    reference_location = fields.Nested(InputToDisplacementSchema, many=False)
    event_types = EventTypes()
    subscriber_subset = SubscriberSubset()

    __model__ = DisplacementExposed
github Flowminder / FlowKit / flowmachine / flowmachine / core / server / query_schemas / subscriber_degree.py View on Github external
-------
        Query
        """
        return SubscriberDegree(
            start=self.start,
            stop=self.stop,
            direction=self.direction,
            tables=self.event_types,
            subscriber_subset=self.subscriber_subset,
        )


class SubscriberDegreeSchema(BaseQueryWithSamplingSchema):
    query_kind = fields.String(validate=OneOf(["subscriber_degree"]))
    start = ISODateTime(required=True)
    stop = ISODateTime(required=True)
    direction = fields.String(
        required=False, validate=OneOf(["in", "out", "both"]), default="both"
    )  # TODO: use a globally defined enum for this
    event_types = EventTypes()
    subscriber_subset = SubscriberSubset()

    __model__ = SubscriberDegreeExposed
github Flowminder / FlowKit / flowmachine / flowmachine / core / server / query_schemas / pareto_interactions.py View on Github external
-------
        Query
        """
        return ParetoInteractions(
            start=self.start,
            stop=self.stop,
            proportion=self.proportion,
            tables=self.event_types,
            subscriber_subset=self.subscriber_subset,
        )


class ParetoInteractionsSchema(BaseQueryWithSamplingSchema):
    query_kind = fields.String(validate=OneOf(["pareto_interactions"]))
    start = ISODateTime(required=True)
    stop = ISODateTime(required=True)
    proportion = fields.Float(required=True, validate=Range(min=0.0, max=1.0))
    event_types = EventTypes()
    subscriber_subset = SubscriberSubset()

    __model__ = ParetoInteractionsExposed
github Flowminder / FlowKit / flowmachine / flowmachine / core / server / query_schemas / unique_subscriber_counts.py View on Github external
"""
        return RedactedUniqueSubscriberCounts(
            unique_subscriber_counts=UniqueSubscriberCounts(
                start=self.start_date,
                stop=self.end_date,
                spatial_unit=self.aggregation_unit,
                table=self.event_types,
            )
        )


class UniqueSubscriberCountsSchema(AggregationUnitMixin, BaseSchema):
    # query_kind parameter is required here for claims validation
    query_kind = fields.String(validate=OneOf(["unique_subscriber_counts"]))
    start_date = ISODateTime(required=True)
    end_date = ISODateTime(required=True)
    event_types = EventTypes()

    __model__ = UniqueSubscriberCountsExposed
github Flowminder / FlowKit / flowmachine / flowmachine / core / server / query_schemas / trips_od_matrix.py View on Github external
trips=TripsODMatrix(
                SubscriberLocations(
                    self.start_date,
                    self.end_date,
                    spatial_unit=self.aggregation_unit,
                    table=self.event_types,
                    subscriber_subset=self.subscriber_subset,
                )
            )
        )


class TripsODMatrixSchema(AggregationUnitMixin, BaseSchema):
    # query_kind parameter is required here for claims validation
    query_kind = fields.String(validate=OneOf(["trips_od_matrix"]))
    start_date = ISODateTime(required=True)
    end_date = ISODateTime(required=True)
    event_types = EventTypes()
    subscriber_subset = SubscriberSubset()

    __model__ = TripsODMatrixExposed
github Flowminder / FlowKit / flowmachine / flowmachine / core / server / query_schemas / topup_balance.py View on Github external
Returns
        -------
        Query
        """
        return TopUpBalance(
            start=self.start_date,
            stop=self.end_date,
            statistic=self.statistic,
            subscriber_subset=self.subscriber_subset,
        )


class TopUpBalanceSchema(BaseQueryWithSamplingSchema):
    query_kind = fields.String(validate=OneOf(["topup_balance"]))
    start_date = ISODateTime(required=True)
    end_date = ISODateTime(required=True)
    statistic = Statistic()
    subscriber_subset = SubscriberSubset()

    __model__ = TopUpBalanceExposed
github Flowminder / FlowKit / flowmachine / flowmachine / core / server / query_schemas / total_network_objects.py View on Github external
-------
        Query
        """
        return TotalNetworkObjects(
            start=self.start_date,
            stop=self.end_date,
            spatial_unit=self.aggregation_unit,
            total_by=self.total_by,
            table=self.event_types,
        )


class TotalNetworkObjectsSchema(AggregationUnitMixin, BaseSchema):
    # query_kind parameter is required here for claims validation
    query_kind = fields.String(validate=OneOf(["total_network_objects"]))
    start_date = ISODateTime(required=True)
    end_date = ISODateTime(required=True)
    total_by = TotalBy(required=False, missing="day")
    event_types = EventTypes()

    __model__ = TotalNetworkObjectsExposed