Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def tearDown(self):
nx.write_gpickle = self.real_write_gpickle
dbt.utils.dependency_projects = self.real_dependency_projects
dbt.clients.system.find_matching = self.real_find_matching
dbt.clients.system.load_file_contents = self.real_load_file_contents
self.mock_content = {}
def mock_find_matching(root_path, relative_paths_to_search,
file_pattern):
if 'sql' not in file_pattern:
return []
to_return = []
if 'models' in relative_paths_to_search:
to_return = to_return + self.mock_models
return to_return
self.real_find_matching = dbt.clients.system.find_matching
dbt.clients.system.find_matching = MagicMock(
side_effect=mock_find_matching)
def mock_load_file_contents(path):
return self.mock_content[path]
self.real_load_file_contents = dbt.clients.system.load_file_contents
dbt.clients.system.load_file_contents = MagicMock(
side_effect=mock_load_file_contents)
def get_addendum(self, project_name, profiles_path):
open_cmd = dbt.clients.system.open_dir_cmd()
return ON_COMPLETE_MESSAGE.format(
open_cmd=open_cmd,
project_name=project_name,
profiles_path=profiles_path,
docs_url=DOCS_URL
)
def get_result_from_cursor(cls, cursor):
data = []
column_names = []
if cursor.description is not None:
column_names = [col[0] for col in cursor.description]
raw_results = cursor.fetchall()
data = [dict(zip(column_names, row))
for row in raw_results]
return dbt.clients.agate_helper.table_from_data(data, column_names)
if tags is None:
tags = []
context = {}
# change these to actual kwargs
base_node = UnparsedMacro(
path=macro_file_path,
original_file_path=macro_file_path,
package_name=package_name,
raw_sql=macro_file_contents,
root_path=root_path,
)
try:
template = dbt.clients.jinja.get_template(
macro_file_contents, context, node=base_node)
except dbt.exceptions.CompilationException as e:
e.node = base_node
raise e
for key, item in template.module.__dict__.items():
if type(item) != jinja2.runtime.Macro:
continue
node_type = None
if key.startswith(dbt.utils.MACRO_PREFIX):
node_type = NodeType.Macro
name = key.replace(dbt.utils.MACRO_PREFIX, '')
elif key.startswith(dbt.utils.OPERATION_PREFIX):
node_type = NodeType.Operation
if tags is None:
tags = []
context = {}
# change these to actual kwargs
base_node = UnparsedMacro(
path=macro_file_path,
original_file_path=macro_file_path,
package_name=package_name,
raw_sql=macro_file_contents,
root_path=root_path,
)
try:
template = dbt.clients.jinja.get_template(
macro_file_contents, context, node=base_node)
except dbt.exceptions.CompilationException as e:
e.node = base_node
raise e
for key, item in template.module.__dict__.items():
if type(item) != jinja2.runtime.Macro:
continue
node_type = None
if key.startswith(dbt.utils.MACRO_PREFIX):
node_type = NodeType.Macro
name = key.replace(dbt.utils.MACRO_PREFIX, '')
elif key.startswith(dbt.utils.OPERATION_PREFIX):
node_type = NodeType.Operation
def initialize(self):
dbt.clients.system.make_directory(self.config.target_path)
dbt.clients.system.make_directory(self.config.modules_path)
table_name = os.path.basename(abspath)[:-4]
node = UnparsedNode(
path=file_match['relative_path'],
name=table_name,
root_path=root_dir,
resource_type=NodeType.Seed,
# Give this raw_sql so it conforms to the node spec,
# use dummy text so it doesn't look like an empty node
raw_sql='-- csv --',
package_name=package_name,
original_file_path=os.path.join(file_match.get('searched_path'),
file_match.get('relative_path')),
)
if should_parse:
try:
table = dbt.clients.agate_helper.from_csv(abspath)
except ValueError as e:
dbt.exceptions.raise_compiler_error(str(e), node)
else:
table = dbt.clients.agate_helper.empty_table()
table.original_abspath = abspath
return node, table
def compile_target(self, target_cfg):
ctx = self.base_context()
compiled = {}
for (key, value) in target_cfg.items():
is_str = isinstance(value, dbt.compat.basestring)
if is_str:
compiled_val = dbt.clients.jinja.get_rendered(value, ctx)
else:
compiled_val = value
compiled[key] = compiled_val
if self.args and hasattr(self.args, 'threads') and self.args.threads:
compiled['threads'] = self.args.threads
return compiled