Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _check_dates(self):
# Handle the logic for dealing with missing dates.
# If there are no dates present, then we raise an error
# if some are present, but some are missing we raise a
# warning.
# If the subscriber does not pass a start or stop date, then we take
# the min/max date in the events.calls table
if self.start is None:
d1 = (
get_db()
.min_date(self.table_ORIG.fully_qualified_table_name.split(".")[1])
.strftime("%Y-%m-%d")
)
else:
d1 = self.start.split()[0]
if self.stop is None:
d2 = (
get_db()
.max_date(self.table_ORIG.fully_qualified_table_name.split(".")[1])
.strftime("%Y-%m-%d")
)
else:
d2 = self.stop.split()[0]
all_dates = list_of_dates(d1, d2)
def __init__(
self,
start=None,
stop=None,
*,
table="all",
total_by="day",
network_object: AnySpatialUnit = make_spatial_unit("cell"),
spatial_unit: Optional[AnySpatialUnit] = None,
hours="all",
subscriber_subset=None,
subscriber_identifier="msisdn",
):
self.start = standardise_date(
get_db().min_date(table=table) if start is None else start
)
self.stop = standardise_date(
get_db().max_date(table=table) if stop is None else stop
)
self.table = table
if isinstance(self.table, str):
self.table = self.table.lower()
if self.table != "all" and not self.table.startswith("events"):
self.table = "events.{}".format(self.table)
network_object.verify_criterion("is_network_object")
self.network_object = network_object
if spatial_unit is None:
self.spatial_unit = make_spatial_unit("admin", level=0)
start=None,
stop=None,
*,
table="all",
total_by="day",
network_object: AnySpatialUnit = make_spatial_unit("cell"),
spatial_unit: Optional[AnySpatialUnit] = None,
hours="all",
subscriber_subset=None,
subscriber_identifier="msisdn",
):
self.start = standardise_date(
get_db().min_date(table=table) if start is None else start
)
self.stop = standardise_date(
get_db().max_date(table=table) if stop is None else stop
)
self.table = table
if isinstance(self.table, str):
self.table = self.table.lower()
if self.table != "all" and not self.table.startswith("events"):
self.table = "events.{}".format(self.table)
network_object.verify_criterion("is_network_object")
self.network_object = network_object
if spatial_unit is None:
self.spatial_unit = make_spatial_unit("admin", level=0)
else:
self.spatial_unit = spatial_unit
# No sense in aggregating network object to network object
# if some are present, but some are missing we raise a
# warning.
# If the subscriber does not pass a start or stop date, then we take
# the min/max date in the events.calls table
if self.start is None:
d1 = (
get_db()
.min_date(self.table_ORIG.fully_qualified_table_name.split(".")[1])
.strftime("%Y-%m-%d")
)
else:
d1 = self.start.split()[0]
if self.stop is None:
d2 = (
get_db()
.max_date(self.table_ORIG.fully_qualified_table_name.split(".")[1])
.strftime("%Y-%m-%d")
)
else:
d2 = self.stop.split()[0]
all_dates = list_of_dates(d1, d2)
# Slightly annoying feature, but if the subscriber passes a date such as '2016-01-02'
# this will be interpreted as midnight, so we don't want to include this in our
# calculations. Check for this here, an if this is the case pop the final element
# of the list
if (self.stop is not None) and (
len(self.stop) == 10 or self.stop.endswith("00:00:00")
):
all_dates.pop(-1)
# This will be a true false list for whether each of the dates
JOIN pg_class as p ON (inhparent=p.oid)
JOIN pg_namespace pn ON pn.oid = p.relnamespace
JOIN pg_namespace cn ON cn.oid = c.relnamespace
WHERE p.relname = '{tn}' and pn.nspname = '{sc}'
UNION"""
qur += """
SELECT oid
FROM pg_class
WHERE oid='{sc}.{tn}'::regclass
)
SELECT SUM(reltuples::bigint) FROM pg_class, counts
WHERE pg_class.oid=counts.oid
"""
ct = get_db().fetch(qur.format(sc=self.schema, tn=self.name))[0][0]
return int(ct)
all_dates = list_of_dates(d1, d2)
# Slightly annoying feature, but if the subscriber passes a date such as '2016-01-02'
# this will be interpreted as midnight, so we don't want to include this in our
# calculations. Check for this here, an if this is the case pop the final element
# of the list
if (self.stop is not None) and (
len(self.stop) == 10 or self.stop.endswith("00:00:00")
):
all_dates.pop(-1)
# This will be a true false list for whether each of the dates
# is present in the database
try:
db_dates = [
d.strftime("%Y-%m-%d")
for d in get_db().available_dates[self.table_ORIG.name]
]
except KeyError: # No dates at all for this table
raise MissingDateError
dates_present = [d in db_dates for d in all_dates]
logger.debug(
f"Data for {sum(dates_present)}/{len(dates_present)} calendar dates."
)
# All dates are missing
if not any(dates_present):
raise MissingDateError
# Some dates are missing, others are present
elif not all(dates_present):
present_dates = [d for p, d in zip(dates_present, all_dates) if p]
warnings.warn(
f"{len(dates_present) - sum(dates_present)} of {len(dates_present)} calendar dates missing. Earliest date is {present_dates[0]}, latest is {present_dates[-1]}.",
stacklevel=2,
def to_geojson(self, crs=None):
"""
Parameters
----------
crs : int or str
Optionally give an integer srid, or valid proj4 string to transform output to
Returns
-------
dict
This query as a GeoJson FeatureCollection in dict form.
"""
proj4_string = proj4string(get_db(), crs)
try:
js = self._geojson.get(proj4_string, self._get_geojson(proj4_string))
except AttributeError:
self._geojson = {}
js = self._get_geojson(proj4_string)
if self._cache:
self._geojson[proj4_string] = js
return js.copy()
self.columns = set(columns)
try:
self.columns.remove(subscriber_identifier)
self.columns.add(f"{subscriber_identifier} AS subscriber")
except KeyError:
if self.subscriber_subsetter.is_proper_subset:
warnings.warn(
f"No subscriber column requested, did you mean to include {subscriber_identifier} in columns? "
"Since you passed a subscriber_subset the data will still be subset by your subscriber subset, "
"but the subscriber column will not be present in the output.",
stacklevel=2,
)
self.columns = sorted(self.columns)
self.sqlalchemy_table = get_sqlalchemy_table_definition(
self.table_ORIG.fully_qualified_table_name, engine=get_db().engine,
)
if self.start == self.stop:
raise ValueError("Start and stop are the same.")
super().__init__()
# This needs to happen after the parent classes init method has been
# called as it relies upon the connection object existing
self._check_dates()
schema = extracted_schema
elif schema is None:
schema = "public"
self.name = name
self.schema = schema
self.fqn = "{}.{}".format(schema, name) if schema else name
if "." not in self.fqn:
raise ValueError("{} is not a valid table.".format(self.fqn))
if not self.is_stored:
raise ValueError("{} is not a known table.".format(self.fqn))
# Get actual columns of this table from the database
db_columns = list(
zip(
*get_db().fetch(
f"""SELECT column_name from INFORMATION_SCHEMA.COLUMNS
WHERE table_name = '{self.name}' AND table_schema='{self.schema}'"""
)
)
)[0]
if (
columns is None or columns == []
): # No columns specified, setting them from the database
columns = db_columns
else:
self.parent_table = Table(
schema=self.schema, name=self.name
) # Point to the full table
if isinstance(columns, str): # Wrap strings in a list
columns = [columns]
logger.debug(