Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
str
SQL query string.
"""
try:
table_name = self.fully_qualified_table_name
schema, name = table_name.split(".")
state_machine = QueryStateMachine(
get_redis(), self.query_id, get_db().conn_id
)
state_machine.wait_until_complete()
if state_machine.is_completed and get_db().has_table(
schema=schema, name=name
):
try:
touch_cache(get_db(), self.query_id)
except ValueError:
pass # Cache record not written yet, which can happen for Models
# which will call through to this method from their `_make_query` method while writing metadata.
# In that scenario, the table _is_ written, but won't be visible from the connection touch_cache uses
# as the cache metadata transaction isn't complete!
return "SELECT * FROM {}".format(table_name)
except NotImplementedError:
pass
return self._make_query()
to the database cache of this query if it exists.
Returns
-------
str
SQL query string.
"""
try:
table_name = self.fully_qualified_table_name
schema, name = table_name.split(".")
state_machine = QueryStateMachine(
get_redis(), self.query_id, get_db().conn_id
)
state_machine.wait_until_complete()
if state_machine.is_completed and get_db().has_table(
schema=schema, name=name
):
try:
touch_cache(get_db(), self.query_id)
except ValueError:
pass # Cache record not written yet, which can happen for Models
# which will call through to this method from their `_make_query` method while writing metadata.
# In that scenario, the table _is_ written, but won't be visible from the connection touch_cache uses
# as the cache metadata transaction isn't complete!
return "SELECT * FROM {}".format(table_name)
except NotImplementedError:
pass
return self._make_query()
def get_stored(cls):
"""
Get a list of stored query objects of this type
Returns
-------
list
All cached instances of this Query type, or any if called with
Query.
"""
try:
get_db()
except:
raise NotConnectedError()
if cls is Query:
qry = "SELECT obj FROM cache.cached"
else:
qry = "SELECT obj FROM cache.cached WHERE class='{}'".format(cls.__name__)
logger.debug(qry)
objs = get_db().fetch(qry)
return (pickle.loads(obj[0]) for obj in objs)
ddl_ops_func = self._make_sql
current_state, changed_to_queue = QueryStateMachine(
get_redis(), self.query_id, get_db().conn_id
).enqueue()
logger.debug(
f"Attempted to enqueue query '{self.query_id}', query state is now {current_state} and change happened {'here and now' if changed_to_queue else 'elsewhere'}."
)
# name, redis, query, connection, ddl_ops_func, write_func, schema = None, sleep_duration = 1
store_future = submit_to_executor(
write_query_to_cache,
name=name,
schema=schema,
query=self,
connection=get_db(),
redis=get_redis(),
ddl_ops_func=ddl_ops_func,
write_func=write_query,
)
return store_future
list
All cached instances of this Query type, or any if called with
Query.
"""
try:
get_db()
except:
raise NotConnectedError()
if cls is Query:
qry = "SELECT obj FROM cache.cached"
else:
qry = "SELECT obj FROM cache.cached WHERE class='{}'".format(cls.__name__)
logger.debug(qry)
objs = get_db().fetch(qry)
return (pickle.loads(obj[0]) for obj in objs)
def __iter__(self):
con = get_db().engine
qur = self.get_query()
with con.begin():
self._query_object = con.execute(qur)
return self
# TODO: currently we can't use QueryStateMachine to determine whether
# the query_id belongs to a valid query object, so we need to check it
# manually. Would be good to add a QueryState.UNKNOWN so that we can
# avoid this separate treatment.
q_info_lookup = QueryInfoLookup(get_redis())
if not q_info_lookup.query_is_known(query_id):
msg = f"Unknown query id: '{query_id}'"
payload = {"query_id": query_id, "query_state": "awol"}
return ZMQReply(status="error", msg=msg, payload=payload)
query_state = QueryStateMachine(
get_redis(), query_id, get_db().conn_id
).current_query_state
if query_state == QueryState.COMPLETED:
q = get_query_object_by_id(get_db(), query_id)
try:
sql = q.geojson_query()
payload = {
"query_id": query_id,
"query_state": query_state,
"sql": sql,
"aggregation_unit": q.spatial_unit.canonical_name,
}
return ZMQReply(status="success", payload=payload)
except AttributeError:
msg = f"Query with id '{query_id}' has no geojson compatible representation." # TODO: This codepath is untested because all queries right now have geography
payload = {"query_id": query_id, "query_state": "errored"}
return ZMQReply(status="error", msg=msg, payload=payload)
else:
msg = f"Query with id '{query_id}' {query_state.description}."
payload = {"query_id": query_id, "query_state": query_state}
QueryStateMachine(get_redis(), self.query_id, get_db().conn_id).finish()
return self._runtime
current_state, changed_to_queue = QueryStateMachine(
get_redis(), self.query_id, get_db().conn_id
).enqueue()
logger.debug(
f"Attempted to enqueue query '{self.query_id}', query state is now {current_state} and change happened {'here and now' if changed_to_queue else 'elsewhere'}."
)
# name, redis, query, connection, ddl_ops_func, write_func, schema = None, sleep_duration = 1
store_future = submit_to_executor(
write_query_to_cache,
name=name,
schema=schema,
query=self,
connection=get_db(),
redis=get_redis(),
ddl_ops_func=lambda *x: [],
write_func=write_model_result,
)
return store_future