Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from typing import List
from .query import Query
class Join(Query):
"""
Class that results when joining two queries. Don't usually
call this directly, instead use the join method of query.
Parameters
----------
left : Query
Query object to join on.
right : Query
Query object to join on.
on_left : str or list of str
Name of the column on the left table on which to join, if a list
will join on the fact that each field is equal
on_right : str, optional
Name of the column on the right table on which to join, if not
specified will default to the same as on_left, if a list must be
cols_str = ",".join(cols)
group_cols_str = ",".join(group_cols)
sql = f"""
SELECT {group_cols_str}, COUNT(*) as value,
datetime FROM
(SELECT DISTINCT {group_cols_str}, {cols_str}, datetime FROM
(SELECT {group_cols_str}, {cols_str}, date_trunc('{self.total_by}', x.datetime) AS datetime
FROM ({self.joined.get_query()}) x) y) _
GROUP BY {group_cols_str}, datetime
ORDER BY {group_cols_str}, datetime
"""
return sql
class AggregateNetworkObjects(GeoDataMixin, Query):
"""
Class for calculating statistics about unique cells/sites
and aggregate it by period.
Parameters
----------
total_network_objects : TotalNetworkObjects
statistic : {'avg', 'max', 'min', 'median', 'mode', 'stddev', 'variance'}
Statistic to calculate, defaults to 'avg'.
aggregate_by : {'second', 'minute', 'hour', 'day', 'month', 'year', 'century'}
A period definition to calculate statistics over, defaults to the one
greater than total_network_objects.total_by.
Examples
def __init__(self, flowmachine_query):
"""
Parameters
----------
flowmachine_query : flowmachine.Query
The flowmachine query to be used for subsetting. The only requirement
on it is that the result has a column called "subscriber" (it is fine
for other columns to be present, too).
"""
assert isinstance(flowmachine_query, Query)
self._verify_that_subscriber_column_is_present(flowmachine_query)
self.flowmachine_query = flowmachine_query
self._md5 = self.flowmachine_query.query_id
super().__init__()
if isinstance(x, Query):
dependencies.add(x)
lists = []
for x in self.__dict__.values():
if isinstance(x, list) or isinstance(x, tuple):
lists.append(x)
else:
parent_classes = [cls.__name__ for cls in x.__class__.__mro__]
if "SubscriberSubsetterBase" in parent_classes:
# special case for subscriber subsetters, because they may contain
# attributes which are Query object but do not derive from Query
# themselves
lists.append(x.__dict__.values())
for l in lists:
for x in l:
if isinstance(x, Query):
dependencies.add(x)
return dependencies
def __init__(self, query: Query, **params):
super().__init__(query=query, **params)
Query.__init__(self)
"""
Return an appropriate subsetter for the given input.
Parameters
----------
subset : "all" or None or list or tuple or flowmachine.Query or SubscriberSubsetterBase
This can be one of the following:
- "all" or None: represents the subset of "all subscribers (i.e., no subsetting at all)
- list or tuple: represents a subset of an explicit list of subscribers
- flowmachine.Query: represents a subset given by the result of a flowmachine query
(where the resulting table must have a "subscriber" column)
If `subset` is already an instance of SubscriberSubsetterBase then it is returned unchanged.
"""
if isinstance(subset, SubscriberSubsetterBase):
return subset
elif isinstance(subset, Query):
return SubscriberSubsetterForFlowmachineQuery(subset)
elif isinstance(subset, (list, tuple, np.ndarray, pd.Series)):
return SubscriberSubsetterForExplicitSubset(subset)
elif subset == "all" or subset is None:
return SubscriberSubsetterForAllSubscribers()
elif isinstance(subset, str):
return SubscriberSubsetterForExplicitSubset([subset])
else:
raise ValueError(f"Invalid subscriber subset: {subset!r}")
from abc import abstractmethod
from .query import Query
class SubscriberSubsetBase(Query):
"""
Base class for the different types of subscriber subsets.
"""
@property
@abstractmethod
def is_proper_subset(self):
raise NotImplementedError(f"Class {self.__class__.__name__} does not implement 'is_proper_subset'")
class AllSubscribers(SubscriberSubsetBase):
is_proper_subset = False
def _make_query(self):
return ""
return f"""
SELECT max({value_column})::numeric as upper,
min({value_column})::numeric as lower
FROM ({metric.get_query()}) AS to_agg
"""
@_get_bounds_clause.register
def _(bounds: tuple, value_column: str, metric: Query) -> str:
return f"""
SELECT {max(bounds)}::numeric as upper,
{min(bounds)}::numeric as lower
"""
class HistogramAggregation(Query):
"""
Compute the histogram of another query.
Parameters
----------
metric : Query
Query to build histogram over
bins : int, or list of float
Either an integer number of equally spaced bins, or a list of bin edges
range : tuple of float, default None
Optionally supply inclusive lower and upper bounds to build the histogram over. By default, the
histogram will cover the whole range of the data.
value_column : str, default "value"
Name of the column in `metric` to construct the histogram over
censor : bool, default True
Set to False to return results where there are bins with counts below 15
USING ({loc_cols_string})
"""
joined_query = f"""
SELECT
row_number() over() AS gid,
*
FROM ({agg_qry}) AS Q
LEFT JOIN ({self.spatial_unit.get_geom_query()}) AS G
USING ({loc_cols_string})
"""
return joined_query, loc_cols + ["outflows", "inflows", "geom", "gid"]
class Flows(FlowLike, Query):
"""
An object representing the difference in locations between two location
type objects.
Parameters
----------
loc1 : daily_location, or ModalLocation object
Object representing the locations of people within the
first time frame of interest
loc2 : daily_location, or ModalLocation object
As above for the second period
"""
def __init__(self, loc1, loc2):
if loc1.spatial_unit != loc2.spatial_unit:
raise InvalidSpatialUnitError(
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# -*- coding: utf-8 -*-
"""
Simple utility class that allows the user to define their
own custom query via a python string.
"""
from typing import List, Set, Union
from .utils import pretty_sql
from .query import Query
class CustomQuery(Query):
"""
Gives the use an interface to create any custom query by simply passing a
full sql query.
Parameters
----------
sql : str
An sql query string
column_names : list of str or set of str
The column names to return
Examples
--------
>>> CQ = CustomQuery('SELECT * FROM events.calls', ["msisdn"])
>>> CQ.head()