Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_query(self):
"""
Returns a string representing an SQL query. The string will point
to the database cache of this query if it exists.
Returns
-------
str
SQL query string.
"""
try:
table_name = self.fully_qualified_table_name
schema, name = table_name.split(".")
state_machine = QueryStateMachine(
get_redis(), self.query_id, get_db().conn_id
)
state_machine.wait_until_complete()
if state_machine.is_completed and get_db().has_table(
schema=schema, name=name
):
try:
touch_cache(get_db(), self.query_id)
except ValueError:
pass # Cache record not written yet, which can happen for Models
# which will call through to this method from their `_make_query` method while writing metadata.
# In that scenario, the table _is_ written, but won't be visible from the connection touch_cache uses
# as the cache metadata transaction isn't complete!
return "SELECT * FROM {}".format(table_name)
except NotImplementedError:
pass
)
validation_errors_as_text = textwrap.indent(
json.dumps(validation_error_messages, indent=2), " "
)
error_msg = (
"Parameter validation failed.\n\n"
f"The action parameters were:\n{action_params_as_text}.\n\n"
f"Validation error messages:\n{validation_errors_as_text}.\n\n"
)
payload = {"validation_error_messages": validation_error_messages}
return ZMQReply(status="error", msg=error_msg, payload=payload)
q_info_lookup = QueryInfoLookup(get_redis())
try:
query_id = q_info_lookup.get_query_id(action_params)
qsm = QueryStateMachine(
query_id=query_id, redis_client=get_redis(), db_id=get_db().conn_id
)
if qsm.current_query_state in [
QueryState.CANCELLED,
QueryState.KNOWN,
]: # Start queries running even if they've been cancelled or reset
if qsm.is_cancelled:
reset = qsm.reset()
finish = qsm.finish_resetting()
raise QueryInfoLookupError
except QueryInfoLookupError:
try:
# Set the query running (it's safe to call this even if the query was set running before)
query_id = await asyncio.get_running_loop().run_in_executor(
executor=config.server_thread_pool,
func=partial(
Handler for the 'get_sql' action.
Returns a SQL string which can be run against flowdb to obtain
the result of the query with given `query_id`.
"""
# TODO: currently we can't use QueryStateMachine to determine whether
# the query_id belongs to a valid query object, so we need to check it
# manually. Would be good to add a QueryState.UNKNOWN so that we can
# avoid this separate treatment.
q_info_lookup = QueryInfoLookup(get_redis())
if not q_info_lookup.query_is_known(query_id):
msg = f"Unknown query id: '{query_id}'"
payload = {"query_id": query_id, "query_state": "awol"}
return ZMQReply(status="error", msg=msg, payload=payload)
query_state = QueryStateMachine(
get_redis(), query_id, get_db().conn_id
).current_query_state
if query_state == QueryState.COMPLETED:
q = get_query_object_by_id(get_db(), query_id)
sql = q.get_query()
payload = {"query_id": query_id, "query_state": query_state, "sql": sql}
return ZMQReply(status="success", payload=payload)
else:
msg = f"Query with id '{query_id}' {query_state.description}."
payload = {"query_id": query_id, "query_state": query_state}
return ZMQReply(status="error", msg=msg, payload=payload)
used in the actual running of a query, only to be referenced by it.
"""
deps = []
if not query_obj.is_stored:
openlist = list(
zip([query_obj] * len(query_obj.dependencies), query_obj.dependencies)
)
while openlist:
y, x = openlist.pop()
if y is query_obj:
# We don't want to include this query in the graph, only its dependencies.
y = None
# Wait for query to complete before checking whether it's stored.
q_state_machine = QueryStateMachine(
get_redis(), x.query_id, get_db().conn_id
)
q_state_machine.wait_until_complete()
if not x.is_stored:
deps.append((y, x))
openlist += list(zip([x] * len(x.dependencies), x.dependencies))
def get_node_attrs(q):
attrs = {}
attrs["query_object"] = q
attrs["name"] = q.__class__.__name__
attrs["stored"] = False
attrs["shape"] = "rect"
attrs["label"] = f"{attrs['name']}."
return attrs
"""
if not self.is_stored:
try:
self._df
except AttributeError:
raise ValueError("Not computed yet.")
def write_model_result(query_ddl_ops: List[str], connection: Engine) -> float:
if store_dependencies:
store_all_unstored_dependencies(self)
self._df.to_sql(name, connection, schema=schema, index=False)
QueryStateMachine(get_redis(), self.query_id, get_db().conn_id).finish()
return self._runtime
current_state, changed_to_queue = QueryStateMachine(
get_redis(), self.query_id, get_db().conn_id
).enqueue()
logger.debug(
f"Attempted to enqueue query '{self.query_id}', query state is now {current_state} and change happened {'here and now' if changed_to_queue else 'elsewhere'}."
)
# name, redis, query, connection, ddl_ops_func, write_func, schema = None, sleep_duration = 1
store_future = submit_to_executor(
write_query_to_cache,
name=name,
schema=schema,
query=self,
connection=get_db(),
redis=get_redis(),
ddl_ops_func=lambda *x: [],
write_func=write_model_result,
)
def query_status(self, query_id: str) -> QueryState:
return QueryStateMachine(self._redis, query_id).current_query_state
try:
plan = ddl_op_result[0][0][0] # Should be a query plan
plan_time += plan["Execution Time"]
except (IndexError, KeyError):
pass # Not an explain result
logger.debug("Executed queries.")
return plan_time
if store_dependencies:
store_queries_in_order(
unstored_dependencies_graph(self)
) # Need to ensure we're behind our deps in the queue
ddl_ops_func = self._make_sql
current_state, changed_to_queue = QueryStateMachine(
get_redis(), self.query_id, get_db().conn_id
).enqueue()
logger.debug(
f"Attempted to enqueue query '{self.query_id}', query state is now {current_state} and change happened {'here and now' if changed_to_queue else 'elsewhere'}."
)
# name, redis, query, connection, ddl_ops_func, write_func, schema = None, sleep_duration = 1
store_future = submit_to_executor(
write_query_to_cache,
name=name,
schema=schema,
query=self,
connection=get_db(),
redis=get_redis(),
ddl_ops_func=ddl_ops_func,
write_func=write_query,
)