Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_daily_location_1_df(get_dataframe, diff_reporter):
"""
Simple daily location query returns the expected data.
"""
dl = daily_location("2016-01-01", "2016-01-02")
df = get_dataframe(dl)
verify(df.to_csv(), diff_reporter)
def test_daily_location_5_sql(diff_reporter):
"""
Daily location query with non-default parameters returns the expected data.
"""
subset_query = CustomQuery(
"SELECT DISTINCT msisdn AS subscriber FROM events.calls WHERE msisdn in ('GNLM7eW5J5wmlwRa', 'e6BxY8mAP38GyAQz', '1vGR8kp342yxEpwY')"
)
dl = daily_location(
"2016-01-05",
level="cell",
hours=(23, 5),
method="last",
# subscriber_identifier="imei",
# column_name="admin2pcod",
# ignore_nulls=False,
subscriber_subset=subset_query,
)
sql = pretty_sql(dl.get_query())
verify(sql, diff_reporter)
def test_daily_location_2_df(get_dataframe, diff_reporter):
"""
Daily location query with non-default parameters returns the expected data.
"""
dl = daily_location(
"2016-01-04",
level="admin2",
hours=(3, 9),
method="most-common",
# subscriber_identifier="imei",
# column_name="admin2pcod",
ignore_nulls=False,
subscriber_subset=[
"2GJxeNazvlgZbqj6",
"7qKmzkeMbmk5nOa0",
"8dpPLR15XwR7jQyN",
"1NqnrAB9bRd597x2",
],
)
df = get_dataframe(dl)
verify(df.to_csv(), diff_reporter)
def test_daily_location_1_sql(diff_reporter):
"""
Simple daily location query returns the expected SQL string.
"""
dl = daily_location("2016-01-01", "2016-01-02")
sql = pretty_sql(dl.get_query())
verify(sql, diff_reporter)
def test_daily_location_4_sql(diff_reporter):
"""
Regression test; this verifies the SQL statement for the test below (which checks the resulting dataframe)
"""
subset_query = CustomQuery(
"SELECT * FROM (VALUES ('dr9xNYK006wykgXj')) as tmp (subscriber)"
)
dl = daily_location(
"2016-01-05",
table="events.calls",
hours=(22, 6),
subscriber_subset=subset_query,
)
sql = pretty_sql(dl.get_query())
verify(sql, diff_reporter)
table="all",
subscriber_identifier="msisdn",
ignore_nulls=True,
subscriber_subset=None,
):
"""
"""
if location == "any" and spatial_unit != make_spatial_unit("cell"):
raise ValueError(
"Invalid parameter combination: location='any' can only be used with cell spatial unit."
)
self.start = standardise_date(start)
self.stop = standardise_date(stop)
self.location = location
self.ul = SubscriberLocations(
self.start,
self.stop,
spatial_unit=spatial_unit,
hours=hours,
table=table,
subscriber_identifier=subscriber_identifier,
ignore_nulls=ignore_nulls,
subscriber_subset=subscriber_subset,
)
self.table = self.ul.table
self.subscriber_identifier = self.ul.subscriber_identifier
def __init__(
self,
start,
stop,
spatial_unit: AnySpatialUnit = make_spatial_unit("cell"),
hours="all",
table="all",
):
self.start = standardise_date(start)
self.stop = standardise_date(stop)
self.spatial_unit = spatial_unit
self.hours = hours
self.table = table
self.ul = UniqueLocations(
SubscriberLocations(
start=self.start,
stop=self.stop,
spatial_unit=self.spatial_unit,
hours=self.hours,
table=self.table,
)
)
super().__init__()
stop,
spatial_unit: Optional[AnySpatialUnit] = None,
hours="all",
table="all",
subscriber_identifier="msisdn",
*,
ignore_nulls=True,
subscriber_subset=None,
):
"""
"""
self.start = standardise_date(start)
self.stop = standardise_date(stop)
if spatial_unit is None:
self.spatial_unit = make_spatial_unit("admin", level=3)
else:
self.spatial_unit = spatial_unit
self.hours = hours
self.table = table
self.subscriber_identifier = subscriber_identifier
self.subscriber_locs = SubscriberLocations(
start=self.start,
stop=self.stop,
spatial_unit=self.spatial_unit,
hours=self.hours,
table=self.table,
subscriber_identifier=self.subscriber_identifier,
ignore_nulls=ignore_nulls,
subscriber_subset=subscriber_subset,
str
SQL query string.
"""
try:
table_name = self.fully_qualified_table_name
schema, name = table_name.split(".")
state_machine = QueryStateMachine(
get_redis(), self.query_id, get_db().conn_id
)
state_machine.wait_until_complete()
if state_machine.is_completed and get_db().has_table(
schema=schema, name=name
):
try:
touch_cache(get_db(), self.query_id)
except ValueError:
pass # Cache record not written yet, which can happen for Models
# which will call through to this method from their `_make_query` method while writing metadata.
# In that scenario, the table _is_ written, but won't be visible from the connection touch_cache uses
# as the cache metadata transaction isn't complete!
return "SELECT * FROM {}".format(table_name)
except NotImplementedError:
pass
return self._make_query()
to the database cache of this query if it exists.
Returns
-------
str
SQL query string.
"""
try:
table_name = self.fully_qualified_table_name
schema, name = table_name.split(".")
state_machine = QueryStateMachine(
get_redis(), self.query_id, get_db().conn_id
)
state_machine.wait_until_complete()
if state_machine.is_completed and get_db().has_table(
schema=schema, name=name
):
try:
touch_cache(get_db(), self.query_id)
except ValueError:
pass # Cache record not written yet, which can happen for Models
# which will call through to this method from their `_make_query` method while writing metadata.
# In that scenario, the table _is_ written, but won't be visible from the connection touch_cache uses
# as the cache metadata transaction isn't complete!
return "SELECT * FROM {}".format(table_name)
except NotImplementedError:
pass
return self._make_query()