Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Computes daily trajectories. Trajectories are defined as
dated lists of locations. For timestamped lists
of events see feature subscriber_locations.
"""
from functools import reduce
from typing import List
from flowmachine.core import Query
from flowmachine.features.utilities.subscriber_locations import BaseLocation
from ..utilities.multilocation import MultiLocation
class DayTrajectories(MultiLocation, BaseLocation, Query):
"""
Class that defines day-dated trajectories (list of time-sorted DailyLocations per subscriber).
Examples
--------
>>> dt = DayTrajectories(
'2016-01-01',
'2016-01-04',
spatial_unit = AdminSpatialUnit(level=3),
method = 'last',
hours = (5,17),
)
>>> dt.head(4)
subscriber name date
0 038OVABN11Ak4W5P Dolpa 2016-01-01
1 038OVABN11Ak4W5P Baglung 2016-01-02
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from typing import List
from flowmachine.core import Query
from flowmachine.features.utilities.validators import (
valid_median_window,
validate_column_present,
)
class IterativeMedianFilter(Query):
"""
Applies an iterative median filter
Parameters
----------
query_to_filter : Query
Query to apply iterated median filter to.
filter_window_size : int
Size of filter window - must be odd, positive and more than 1.
column_to_filter : str, default 'value'
Column to apply the filter to.
partition_column : str, default 'subscriber'
Column to use for partitioning. May be none, in which case no partitioning is applied.
order_column : str, default datetime
Column to use for ordering within partitions. May be none, in which case no ordering is applied.
"""
def from_query_id(cls, query_id, *, redis=None):
redis = redis or Query.redis
query_descr = redis.get(query_id)
if query_descr is None:
raise MissingQueryError(query_id, msg=f"Unknown query id: {query_id}")
else:
try:
query_kind = loads(query_descr)["query_kind"]
params = loads(query_descr)["params"]
except JSONDecodeError:
raise QueryProxyError(
"Query description does not contain valid JSON: '{query_descr}'. This should never happen."
)
return QueryProxy(query_kind, params, redis=redis)
# to perform the spatial aggregation. Once we have this, the following
# code block should be removed and simply `q.md5` be returned as the
# query_id.
try:
# In addition to the actual query, also set an aggregated version running.
# This is the one which we return via the API so that we don't expose any
# individual-level data.
q_agg = q.aggregate()
q_agg.store()
query_id = q_agg.md5
# FIXME: this is a big hack to register the query info lookup also
# for the aggregated query. This should not be necessary any more once
# this whole code block is removed (see fixme comment above). To make
# it obvious that it is the lookup of an aggregated query we artificially
# insert two additional keys 'is_aggregate' and 'non_aggregated_query_id'.
q_agg_info_lookup = QueryInfoLookup(Query.redis)
query_params_agg = deepcopy(self.query_params)
query_params_agg["is_aggregate"] = True
query_params_agg["non_aggregated_query_id"] = self.query_id
q_agg_info_lookup.register_query(q_agg.md5, self.query_params)
except AttributeError:
# This can happen for flows, which doesn't support aggregation
query_id = q.md5
return query_id
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import warnings
from typing import List
from flowmachine.core import Query
from flowmachine.core.mixins import GeoDataMixin
from flowmachine.utils import parse_datestring
class JoinedSpatialAggregate(GeoDataMixin, Query):
"""
Creates spatially aggregated data from two objects, one of which is
a metric of subscribers, and the other of which represents the subscribers'
locations.
A general class that join metric information about a subscriber with location
information about a subscriber and aggregates to the geometric level.
Parameters
----------
metric : Query
A query object that represents a subscriber level metric such
as radius of gyration. The underlying data must have a 'subscriber'
column. All other columns must be numeric and will be aggregated.
locations : Query
A query object that represents the locations of subscribers.
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from typing import List
from flowmachine.core import Query
from flowmachine.core.mixins import GeoDataMixin
from flowmachine.features.location.redacted_active_at_reference_location_counts import (
RedactedActiveAtReferenceLocationCounts,
)
from flowmachine.features.location.redacted_unique_subscriber_counts import (
RedactedUniqueSubscriberCounts,
)
from flowmachine.features.location.unique_visitor_counts import UniqueVisitorCounts
class RedactedUniqueVisitorCounts(GeoDataMixin, Query):
def __init__(
self, unique_visitor_counts: UniqueVisitorCounts,
):
"""
A count by location of how many unique visitors each location had.
This is the redacted variation - locations with 15 or fewer subscribers for
either the active or unique subqueries are redacted.
Parameters
----------
unique_visitor_counts : UniqueVisitorCounts
Visitor counts to redact
"""
self.active_at_reference_location_counts = RedactedActiveAtReferenceLocationCounts(
active_at_reference_location_counts=unique_visitor_counts.active_at_reference_location_counts
)
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from typing import Optional, List
from flowmachine.core import Query, make_spatial_unit
from flowmachine.core.spatial_unit import AnySpatialUnit
from flowmachine.features.subscriber import MeaningfulLocations
from flowmachine.features.location.flows import FlowLike
class MeaningfulLocationsOD(FlowLike, Query):
"""
Calculates an OD matrix aggregated to a spatial unit between two individual
level meaningful locations. For subscribers with more than one cluster of either
label, counts are weight to `1/(n_clusters_label_a*n_clusters_label_b)`.
Parameters
----------
meaningful_locations_a, meaningful_locations_a : MeaningfulLocations
Per-subscriber meaningful locations objects calculate an OD between
spatial_unit : flowmachine.core.spatial_unit.*SpatialUnit, default admin3
Spatial unit to aggregate to
"""
def __init__(
self,
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from typing import List
from flowmachine.core import Query
from flowmachine.core.mixins import GeoDataMixin
from flowmachine.features.location.redacted_location_metric import (
RedactedLocationMetric,
)
from flowmachine.features.location.spatial_aggregate import SpatialAggregate
class RedactedSpatialAggregate(RedactedLocationMetric, GeoDataMixin, Query):
"""
Class representing the result of spatially aggregating
a locations object, redacted so that results are not returned if counts are 15 or less..
A locations object represents the
location of multiple subscribers. This class represents the output
of aggregating that data spatially.
Parameters
----------
locations : subscriber location query
"""
def __init__(self, *, spatial_aggregate: SpatialAggregate):
self.redaction_target = spatial_aggregate
# self.spatial_unit is used in self._geo_augmented_query
import warnings
from typing import List
from flowmachine.core import Query
from flowmachine.core.mixins import GeoDataMixin
from flowmachine.features.location.joined_spatial_aggregate import (
JoinedSpatialAggregate,
)
from flowmachine.features.location.redacted_spatial_aggregate import (
RedactedSpatialAggregate,
)
from flowmachine.features.location.spatial_aggregate import SpatialAggregate
from flowmachine.utils import parse_datestring
class RedactedJoinedSpatialAggregate(GeoDataMixin, Query):
"""
Creates spatially aggregated data from two objects, one of which is
a metric of subscribers, and the other of which represents the subscribers'
locations.
A general class that join metric information about a subscriber with location
information about a subscriber and aggregates to the geometric level.
Parameters
----------
metric : Query
A query object that represents a subscriber level metric such
as radius of gyration. The underlying data must have a 'subscriber'
column. All other columns must be numeric and will be aggregated.
locations : Query
A query object that represents the locations of subscribers.
The modal daily location of a subscriber.
"""
from typing import List
from functools import reduce
from flowmachine.core import Query
from flowmachine.features.utilities.subscriber_locations import BaseLocation
from ..utilities.multilocation import MultiLocation
class ModalLocation(MultiLocation, BaseLocation, Query):
"""
ModalLocation is the mode of multiple DailyLocations (or other similar
location like objects.) It can be instantiated with either a date range
or a list of DailyLocations (the former is more common). It gives each
subscriber only one location.
"""
@property
def column_names(self) -> List[str]:
return ["subscriber"] + self.spatial_unit.location_id_columns
def _make_query(self):
"""
Default query method implemented in the
metaclass Query().
"""