Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def query_state(self) -> "QueryState":
"""
Return the current query state.
Returns
-------
flowmachine.core.query_state.QueryState
The current query state
"""
state_machine = QueryStateMachine(get_redis(), self.query_id, get_db().conn_id)
return state_machine.current_query_state
if not self.is_stored:
try:
self._df
except AttributeError:
raise ValueError("Not computed yet.")
def write_model_result(query_ddl_ops: List[str], connection: Engine) -> float:
if store_dependencies:
store_all_unstored_dependencies(self)
self._df.to_sql(name, connection, schema=schema, index=False)
QueryStateMachine(get_redis(), self.query_id, get_db().conn_id).finish()
return self._runtime
current_state, changed_to_queue = QueryStateMachine(
get_redis(), self.query_id, get_db().conn_id
).enqueue()
logger.debug(
f"Attempted to enqueue query '{self.query_id}', query state is now {current_state} and change happened {'here and now' if changed_to_queue else 'elsewhere'}."
)
# name, redis, query, connection, ddl_ops_func, write_func, schema = None, sleep_duration = 1
store_future = submit_to_executor(
write_query_to_cache,
name=name,
schema=schema,
query=self,
connection=get_db(),
redis=get_redis(),
ddl_ops_func=lambda *x: [],
write_func=write_model_result,
)
return store_future
def write_model_result(query_ddl_ops: List[str], connection: Engine) -> float:
if store_dependencies:
store_all_unstored_dependencies(self)
self._df.to_sql(name, connection, schema=schema, index=False)
QueryStateMachine(get_redis(), self.query_id, get_db().conn_id).finish()
return self._runtime
) -> ZMQReply:
"""
Handler for the 'poll_query' action.
Returns the status of the query with the given `query_id`.
"""
query_kind = _get_query_kind_for_query_id(query_id)
# TODO: we should probably be able to use the QueryStateMachine to determine
# whether the query already exists.
if query_kind is None:
payload = {"query_id": query_id, "query_state": "awol"}
return ZMQReply(
status="error", msg=f"Unknown query id: '{query_id}'", payload=payload
)
else:
q_state_machine = QueryStateMachine(get_redis(), query_id, get_db().conn_id)
payload = {
"query_id": query_id,
"query_kind": query_kind,
"query_state": q_state_machine.current_query_state,
"progress": query_progress(
FlowmachineQuerySchema()
.load(QueryInfoLookup(get_redis()).get_query_params(query_id))
._flowmachine_query_obj
),
}
return ZMQReply(status="success", payload=payload)
QueryResetFailedException
If the query wasn't succesfully removed
Parameters
----------
name : str
Name of the table
schema : str
Schema of the table
cascade : bool
Set to false to remove only this table from cache
drop : bool
Set to false to remove the cache record without dropping the table
"""
q_state_machine = QueryStateMachine(
get_redis(), self.query_id, get_db().conn_id
)
current_state, this_thread_is_owner = q_state_machine.reset()
if this_thread_is_owner:
con = get_db().engine
try:
table_reference_to_this_query = self.get_table()
if table_reference_to_this_query is not self:
table_reference_to_this_query.invalidate_db_cache(
cascade=cascade, drop=drop
) # Remove any Table pointing as this query
except (ValueError, NotImplementedError) as e:
pass # This cache record isn't actually stored
try:
deps = get_db().fetch(
"""SELECT obj FROM cache.cached LEFT JOIN cache.dependencies
ON cache.cached.query_id=cache.dependencies.query_id
plan = ddl_op_result[0][0][0] # Should be a query plan
plan_time += plan["Execution Time"]
except (IndexError, KeyError):
pass # Not an explain result
logger.debug("Executed queries.")
return plan_time
if store_dependencies:
store_queries_in_order(
unstored_dependencies_graph(self)
) # Need to ensure we're behind our deps in the queue
ddl_ops_func = self._make_sql
current_state, changed_to_queue = QueryStateMachine(
get_redis(), self.query_id, get_db().conn_id
).enqueue()
logger.debug(
f"Attempted to enqueue query '{self.query_id}', query state is now {current_state} and change happened {'here and now' if changed_to_queue else 'elsewhere'}."
)
# name, redis, query, connection, ddl_ops_func, write_func, schema = None, sleep_duration = 1
store_future = submit_to_executor(
write_query_to_cache,
name=name,
schema=schema,
query=self,
connection=get_db(),
redis=get_redis(),
ddl_ops_func=ddl_ops_func,
write_func=write_query,
)
return store_future
ddl_ops_func = self._make_sql
current_state, changed_to_queue = QueryStateMachine(
get_redis(), self.query_id, get_db().conn_id
).enqueue()
logger.debug(
f"Attempted to enqueue query '{self.query_id}', query state is now {current_state} and change happened {'here and now' if changed_to_queue else 'elsewhere'}."
)
# name, redis, query, connection, ddl_ops_func, write_func, schema = None, sleep_duration = 1
store_future = submit_to_executor(
write_query_to_cache,
name=name,
schema=schema,
query=self,
connection=get_db(),
redis=get_redis(),
ddl_ops_func=ddl_ops_func,
write_func=write_query,
)
return store_future
validation_errors_as_text = textwrap.indent(
json.dumps(validation_error_messages, indent=2), " "
)
error_msg = (
"Parameter validation failed.\n\n"
f"The action parameters were:\n{action_params_as_text}.\n\n"
f"Validation error messages:\n{validation_errors_as_text}.\n\n"
)
payload = {"validation_error_messages": validation_error_messages}
return ZMQReply(status="error", msg=error_msg, payload=payload)
q_info_lookup = QueryInfoLookup(get_redis())
try:
query_id = q_info_lookup.get_query_id(action_params)
qsm = QueryStateMachine(
query_id=query_id, redis_client=get_redis(), db_id=get_db().conn_id
)
if qsm.current_query_state in [
QueryState.CANCELLED,
QueryState.KNOWN,
]: # Start queries running even if they've been cancelled or reset
if qsm.is_cancelled:
reset = qsm.reset()
finish = qsm.finish_resetting()
raise QueryInfoLookupError
except QueryInfoLookupError:
try:
# Set the query running (it's safe to call this even if the query was set running before)
query_id = await asyncio.get_running_loop().run_in_executor(
executor=config.server_thread_pool,
func=partial(
copy_context().run,
return self._runtime
current_state, changed_to_queue = QueryStateMachine(
get_redis(), self.query_id, get_db().conn_id
).enqueue()
logger.debug(
f"Attempted to enqueue query '{self.query_id}', query state is now {current_state} and change happened {'here and now' if changed_to_queue else 'elsewhere'}."
)
# name, redis, query, connection, ddl_ops_func, write_func, schema = None, sleep_duration = 1
store_future = submit_to_executor(
write_query_to_cache,
name=name,
schema=schema,
query=self,
connection=get_db(),
redis=get_redis(),
ddl_ops_func=lambda *x: [],
write_func=write_model_result,
)
return store_future