Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_daily_location_1_df(get_dataframe, diff_reporter):
"""
Simple daily location query returns the expected data.
"""
dl = daily_location("2016-01-01", "2016-01-02")
df = get_dataframe(dl)
verify(df.to_csv(), diff_reporter)
def test_daily_location_5_sql(diff_reporter):
"""
Daily location query with non-default parameters returns the expected data.
"""
subset_query = CustomQuery(
"SELECT DISTINCT msisdn AS subscriber FROM events.calls WHERE msisdn in ('GNLM7eW5J5wmlwRa', 'e6BxY8mAP38GyAQz', '1vGR8kp342yxEpwY')"
)
dl = daily_location(
"2016-01-05",
level="cell",
hours=(23, 5),
method="last",
# subscriber_identifier="imei",
# column_name="admin2pcod",
# ignore_nulls=False,
subscriber_subset=subset_query,
)
sql = pretty_sql(dl.get_query())
verify(sql, diff_reporter)
def test_daily_location_2_df(get_dataframe, diff_reporter):
"""
Daily location query with non-default parameters returns the expected data.
"""
dl = daily_location(
"2016-01-04",
level="admin2",
hours=(3, 9),
method="most-common",
# subscriber_identifier="imei",
# column_name="admin2pcod",
ignore_nulls=False,
subscriber_subset=[
"2GJxeNazvlgZbqj6",
"7qKmzkeMbmk5nOa0",
"8dpPLR15XwR7jQyN",
"1NqnrAB9bRd597x2",
],
)
df = get_dataframe(dl)
verify(df.to_csv(), diff_reporter)
def test_daily_location_1_sql(diff_reporter):
"""
Simple daily location query returns the expected SQL string.
"""
dl = daily_location("2016-01-01", "2016-01-02")
sql = pretty_sql(dl.get_query())
verify(sql, diff_reporter)
def test_daily_location_4_sql(diff_reporter):
"""
Regression test; this verifies the SQL statement for the test below (which checks the resulting dataframe)
"""
subset_query = CustomQuery(
"SELECT * FROM (VALUES ('dr9xNYK006wykgXj')) as tmp (subscriber)"
)
dl = daily_location(
"2016-01-05",
table="events.calls",
hours=(22, 6),
subscriber_subset=subset_query,
)
sql = pretty_sql(dl.get_query())
verify(sql, diff_reporter)
import concurrent.futures
from flowmachine.core.errors import UnstorableQueryError
flowmachine.connect()
print("Constructing query objects")
admin1_spatial_unit = flowmachine.core.make_spatial_unit("admin", level=1)
admin3_spatial_unit = flowmachine.core.make_spatial_unit("admin", level=3)
vsite_spatial_unit = flowmachine.core.make_spatial_unit("versioned-site")
vcell_spatial_unit = flowmachine.core.make_spatial_unit("versioned-cell")
# FlowClient example usage
example_usage_queries = [
flowmachine.features.utilities.spatial_aggregates.SpatialAggregate(
locations=flowmachine.features.daily_location(
date="2016-01-01", spatial_unit=admin3_spatial_unit, method="last"
)
),
flowmachine.features.utilities.spatial_aggregates.SpatialAggregate(
locations=flowmachine.features.ModalLocation(
*[
flowmachine.features.daily_location(
date=dl_date, spatial_unit=admin3_spatial_unit, method="last"
)
for dl_date in pd.date_range("2016-01-01", "2016-01-03", freq="D")
]
)
),
flowmachine.features.Flows(
flowmachine.features.daily_location(
date="2016-01-01", spatial_unit=admin1_spatial_unit, method="last"
def _flowmachine_query_obj(self):
"""
Return the underlying flowmachine daily_location object.
Returns
-------
Query
"""
from flowmachine.features import daily_location
return daily_location(
date=self.date,
level=self.aggregation_unit,
method=self.method,
subscriber_subset=self.subscriber_subset,
)
def _unsampled_query_obj(self):
"""
Return the underlying flowmachine daily_location object.
Returns
-------
Query
"""
return daily_location(
date=self.date,
spatial_unit=self.aggregation_unit,
method=self.method,
table=self.event_types,
subscriber_subset=self.subscriber_subset,
)
locations=flowmachine.features.daily_location(
date="2016-01-01", spatial_unit=admin3_spatial_unit, method="last"
)
),
flowmachine.features.utilities.spatial_aggregates.SpatialAggregate(
locations=flowmachine.features.ModalLocation(
*[
flowmachine.features.daily_location(
date=dl_date, spatial_unit=admin3_spatial_unit, method="last"
)
for dl_date in pd.date_range("2016-01-01", "2016-01-03", freq="D")
]
)
),
flowmachine.features.Flows(
flowmachine.features.daily_location(
date="2016-01-01", spatial_unit=admin1_spatial_unit, method="last"
),
flowmachine.features.daily_location(
date="2016-01-07", spatial_unit=admin1_spatial_unit, method="last"
),
),
flowmachine.features.TotalLocationEvents(
start="2016-01-01",
stop="2016-01-08",
spatial_unit=admin3_spatial_unit,
interval="hour",
),
]
# Flows above normal
date_ranges = {
spatial_unit=admin3_spatial_unit,
interval="hour",
),
]
# Flows above normal
date_ranges = {
"benchmark": pd.date_range("2016-01-01", "2016-01-21", freq="D"),
"comparison": pd.date_range("2016-01-21", "2016-02-10", freq="D"),
"focal": pd.date_range("2016-02-10", "2016-02-28", freq="D"),
}
flows_above_normal_queries = [
flowmachine.features.utilities.spatial_aggregates.SpatialAggregate(
locations=flowmachine.features.ModalLocation(
*[
flowmachine.features.daily_location(
date=dl_date.strftime("%Y-%m-%d"),
spatial_unit=admin3_spatial_unit,
method="last",
)
for dl_date in dates
]
)
)
for dates in date_ranges.values()
] + [
flowmachine.features.Flows(
flowmachine.features.ModalLocation(
*[
flowmachine.features.daily_location(
date=dl_date.strftime("%Y-%m-%d"),
spatial_unit=admin3_spatial_unit,