Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Query
"""
return RedactedConsecutiveTripsODMatrix(
consecutive_trips_od_matrix=ConsecutiveTripsODMatrix(
SubscriberLocations(
self.start_date,
self.end_date,
spatial_unit=self.aggregation_unit,
table=self.event_types,
subscriber_subset=self.subscriber_subset,
)
)
)
class ConsecutiveTripsODMatrixSchema(AggregationUnitMixin, BaseSchema):
# query_kind parameter is required here for claims validation
query_kind = fields.String(validate=OneOf(["consecutive_trips_od_matrix"]))
start_date = ISODateTime(required=True)
end_date = ISODateTime(required=True)
event_types = EventTypes()
subscriber_subset = SubscriberSubset()
__model__ = ConsecutiveTripsODMatrixExposed
def _flowmachine_query_obj(self):
"""
Return the underlying flowmachine object.
Returns
-------
Query
"""
return RedactedUnmovingCounts(
unmoving_counts=UnmovingCounts(
Unmoving(self.locations._flowmachine_query_obj)
)
)
class UnmovingCountsSchema(BaseSchema):
# query_kind parameter is required here for claims validation
query_kind = fields.String(validate=OneOf(["unmoving_counts"]))
locations = fields.Nested(UniqueLocationsSchema)
__model__ = UnmovingCountsExposed
Query
"""
return RedactedTotalEvents(
total_events=TotalLocationEvents(
start=self.start_date,
stop=self.end_date,
interval=self.interval,
direction=self.direction,
table=self.event_types,
spatial_unit=self.aggregation_unit,
subscriber_subset=self.subscriber_subset,
)
)
class LocationEventCountsSchema(AggregationUnitMixin, BaseSchema):
# query_kind parameter is required here for claims validation
query_kind = fields.String(validate=OneOf(["location_event_counts"]))
start_date = ISODateTime(required=True)
end_date = ISODateTime(required=True)
interval = fields.String(
required=True, validate=OneOf(TotalLocationEvents.allowed_intervals)
)
direction = fields.String(
required=True, validate=OneOf(["in", "out", "both"])
) # TODO: use a globally defined enum for this
event_types = EventTypes()
subscriber_subset = SubscriberSubset()
__model__ = LocationEventCountsExposed
class DummyQueryExposed(BaseExposedQuery):
def __init__(self, dummy_param, aggregation_unit, dummy_delay):
# Note: all input parameters need to be defined as attributes on `self`
# so that marshmallow can serialise the object correctly.
self.dummy_param = dummy_param
self.aggregation_unit = aggregation_unit
self.dummy_delay = dummy_delay
@property
def _flowmachine_query_obj(self):
sleep(self.dummy_delay)
return DummyQuery(dummy_param=self.dummy_param)
class DummyQuerySchema(AggregationUnitMixin, BaseSchema):
"""
Dummy query useful for testing.
"""
# query_kind parameter is required here for claims validation
query_kind = fields.String(validate=OneOf(["dummy_query"]))
dummy_param = fields.String(required=True)
dummy_delay = fields.Integer(missing=0, required=False)
__model__ = DummyQueryExposed
Return the underlying flowmachine daily_location object.
Returns
-------
Query
"""
return TotalNetworkObjects(
start=self.start_date,
stop=self.end_date,
spatial_unit=self.aggregation_unit,
total_by=self.total_by,
table=self.event_types,
)
class TotalNetworkObjectsSchema(AggregationUnitMixin, BaseSchema):
# query_kind parameter is required here for claims validation
query_kind = fields.String(validate=OneOf(["total_network_objects"]))
start_date = ISODateTime(required=True)
end_date = ISODateTime(required=True)
total_by = TotalBy(required=False, missing="day")
event_types = EventTypes()
__model__ = TotalNetworkObjectsExposed
Return the underlying flowmachine object.
Returns
-------
Query
"""
locations = self.locations._flowmachine_query_obj
metric = self.metric._flowmachine_query_obj
return RedactedJoinedSpatialAggregate(
joined_spatial_aggregate=JoinedSpatialAggregate(
locations=locations, metric=metric, method=self.method
)
)
class JoinedSpatialAggregateSchema(BaseSchema):
# query_kind parameter is required here for claims validation
query_kind = fields.String(validate=OneOf(["joined_spatial_aggregate"]))
locations = fields.Nested(InputToSpatialAggregate, required=True)
metric = fields.Nested(JoinableMetrics, required=True)
method = fields.String(validate=OneOf(JoinedSpatialAggregate.allowed_methods))
@pre_load
def validate_method(self, data, **kwargs):
continuous_metrics = [
"radius_of_gyration",
"unique_location_counts",
"topup_balance",
"subscriber_degree",
"topup_amount",
"event_count",
"nocturnal_events",
@property
def _flowmachine_query_obj(self):
"""
Return the underlying flowmachine object.
Returns
-------
Query
"""
locations = self.locations._flowmachine_query_obj
return RedactedSpatialAggregate(
spatial_aggregate=SpatialAggregate(locations=locations)
)
class SpatialAggregateSchema(BaseSchema):
# query_kind parameter is required here for claims validation
query_kind = fields.String(validate=OneOf(["spatial_aggregate"]))
locations = fields.Nested(InputToSpatialAggregate, required=True)
__model__ = SpatialAggregateExposed
self.range = range if range is None else tuple(range)
@property
def _flowmachine_query_obj(self):
"""
Return the underlying flowmachine object.
Returns
-------
Query
"""
metric = self.metric._flowmachine_query_obj
return HistogramAggregation(metric=metric, bins=self.bins, range=self.range)
class HistogramAggregateSchema(BaseSchema):
# query_kind parameter is required here for claims validation
query_kind = fields.String(validate=OneOf(["histogram_aggregate"]))
metric = fields.Nested(HistogrammableMetrics, required=True)
range = fields.Nested(Bounds)
bins = fields.Nested(HistogramBins)
__model__ = HistogramAggregateExposed
Returns
-------
Query
"""
return RedactedUniqueSubscriberCounts(
unique_subscriber_counts=UniqueSubscriberCounts(
start=self.start_date,
stop=self.end_date,
spatial_unit=self.aggregation_unit,
table=self.event_types,
)
)
class UniqueSubscriberCountsSchema(AggregationUnitMixin, BaseSchema):
# query_kind parameter is required here for claims validation
query_kind = fields.String(validate=OneOf(["unique_subscriber_counts"]))
start_date = ISODateTime(required=True)
end_date = ISODateTime(required=True)
event_types = EventTypes()
__model__ = UniqueSubscriberCountsExposed
Returns
-------
Query
"""
return RedactedLocationIntroversion(
location_introversion=LocationIntroversion(
start=self.start_date,
stop=self.end_date,
spatial_unit=self.aggregation_unit,
direction=self.direction,
table=self.event_types,
)
)
class LocationIntroversionSchema(AggregationUnitMixin, BaseSchema):
# query_kind parameter is required here for claims validation
query_kind = fields.String(validate=OneOf(["location_introversion"]))
start_date = ISODateTime(required=True)
end_date = ISODateTime(required=True)
direction = fields.String(
required=False, validate=OneOf(["in", "out", "both"]), default="both"
) # TODO: use a globally defined enum for this
event_types = EventTypes()
__model__ = LocationIntroversionExposed