Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
with Timer() as timer:
if len(sql.strip()) > 0:
status, _ = adapter.execute(sql, auto_begin=False,
fetch=False)
self.ran_hooks.append(hook)
with finishctx, DbtModelState({'node_status': 'passed'}):
print_hook_end_line(
hook_text, status, idx, num_hooks, timer.elapsed
)
self._total_executed += len(ordered_hooks)
with TextOnly():
print_timestamped_line("")
def run_hooks(self, adapter, hook_type: RunHookType, extra_context):
ordered_hooks = self.get_hooks_by_type(hook_type)
# on-run-* hooks should run outside of a transaction. This happens
# b/c psycopg2 automatically begins a transaction when a connection
# is created.
adapter.clear_transaction()
if not ordered_hooks:
return
num_hooks = len(ordered_hooks)
plural = 'hook' if num_hooks == 1 else 'hooks'
with TextOnly():
print_timestamped_line("")
print_timestamped_line(
'Running {} {} {}'.format(num_hooks, hook_type, plural)
)
startctx = TimestampNamed('node_started_at')
finishctx = TimestampNamed('node_finished_at')
for idx, hook in enumerate(ordered_hooks, start=1):
sql = self.get_hook_sql(adapter, hook, idx, num_hooks,
extra_context)
hook_text = '{}.{}.{}'.format(hook.package_name, hook_type,
hook.index)
hook_meta_ctx = HookMetadata(hook, self.index_offset(idx))
running_ctx = DbtModelState({'node_status': 'running'})
with UniqueID(hook.unique_id):
with hook_meta_ctx, startctx, running_ctx:
def print_results_line(self, results, execution_time):
nodes = [r.node for r in results] + self.ran_hooks
stat_line = get_counts(nodes)
execution = ""
if execution_time is not None:
execution = " in {execution_time:0.2f}s".format(
execution_time=execution_time)
with TextOnly():
print_timestamped_line("")
print_timestamped_line(
"Finished running {stat_line}{execution}."
.format(stat_line=stat_line, execution=execution))
def execute_nodes(self):
num_threads = self.config.threads
target_name = self.config.target_name
text = "Concurrency: {} threads (target='{}')"
concurrency_line = text.format(num_threads, target_name)
with NodeCount(self.num_nodes):
dbt.ui.printer.print_timestamped_line(concurrency_line)
with TextOnly():
dbt.ui.printer.print_timestamped_line("")
pool = ThreadPool(num_threads)
try:
self.run_queue(pool)
except KeyboardInterrupt:
pool.close()
pool.terminate()
adapter = get_adapter(self.config)
if not adapter.is_cancelable():
msg = ("The {} adapter does not support query "
"cancellation. Some queries may still be "
"running!".format(adapter.type()))
try:
self.run_queue(pool)
except KeyboardInterrupt:
pool.close()
pool.terminate()
adapter = get_adapter(self.config)
if not adapter.is_cancelable():
msg = ("The {} adapter does not support query "
"cancellation. Some queries may still be "
"running!".format(adapter.type()))
yellow = dbt.ui.printer.COLOR_FG_YELLOW
dbt.ui.printer.print_timestamped_line(msg, yellow)
raise
for conn_name in adapter.cancel_open_connections():
dbt.ui.printer.print_cancel_line(conn_name)
pool.join()
dbt.ui.printer.print_run_end_messages(self.node_results,
early_exit=True)
raise
pool.close()
pool.join()
return self.node_results
def execute_nodes(self):
num_threads = self.config.threads
target_name = self.config.target_name
text = "Concurrency: {} threads (target='{}')"
concurrency_line = text.format(num_threads, target_name)
dbt.ui.printer.print_timestamped_line(concurrency_line)
dbt.ui.printer.print_timestamped_line("")
pool = ThreadPool(num_threads)
try:
self.run_queue(pool)
except KeyboardInterrupt:
pool.close()
pool.terminate()
adapter = get_adapter(self.config)
if not adapter.is_cancelable():
msg = ("The {} adapter does not support query "
"cancellation. Some queries may still be "
"running!".format(adapter.type()))
def execute_nodes(self):
num_threads = self.config.threads
target_name = self.config.target_name
text = "Concurrency: {} threads (target='{}')"
concurrency_line = text.format(num_threads, target_name)
with NodeCount(self.num_nodes):
dbt.ui.printer.print_timestamped_line(concurrency_line)
with TextOnly():
dbt.ui.printer.print_timestamped_line("")
pool = ThreadPool(num_threads)
try:
self.run_queue(pool)
except KeyboardInterrupt:
pool.close()
pool.terminate()
adapter = get_adapter(self.config)
if not adapter.is_cancelable():
msg = ("The {} adapter does not support query "
"cancellation. Some queries may still be "