Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
system.find_matching.return_value = [{
'relative_path': 'test_file.md',
'absolute_path': '/test_root/test_subdir/test_file.md',
'searched_path': '/test_root/test_subdir',
}]
results = list(docs.DocumentationParser.load_file(
'some_package', '/test_root', ['test_subdir'])
)
self.assertEqual(len(results), 1)
result = results[0]
self.assertEqual(result.package_name, 'some_package')
self.assertEqual(result.file_contents, TEST_DOCUMENTATION_FILE)
self.assertEqual(result.original_file_path,
'/test_root/test_subdir/test_file.md')
self.assertEqual(result.root_path, '/test_root')
self.assertEqual(result.resource_type, NodeType.Documentation)
self.assertEqual(result.path, 'test_file.md')
from dbt.node_types import NodeType
name_sql = {}
for component in ('database', 'schema', 'alias'):
if component == 'alias':
source = 'node.name'
else:
source = f'target.{component}'
name = f'generate_{component}_name'
sql = f'{{% macro {name}(value, node) %}} {{% if value %}} {{{{ value }}}} {{% else %}} {{{{ {source} }}}} {{% endif %}} {{% endmacro %}}'
name_sql[name] = sql
all_sql = '\n'.join(name_sql.values())
for name, sql in name_sql.items():
pm = ParsedMacro(
name=name,
resource_type=NodeType.Macro,
unique_id=f'macro.{package}.{name}',
package_name=package,
original_file_path=normalize('macros/macro.sql'),
root_path='./dbt_modules/root',
path=normalize('macros/macro.sql'),
raw_sql=all_sql,
macro_sql=sql,
)
yield pm
def generate_source_node(
self, block: TargetBlock, refs: ParserRef
) -> ParsedSourceDefinition:
assert isinstance(block.target, SourceTarget)
source = block.target.source
table = block.target.table
unique_id = '.'.join([
NodeType.Source, self.project.project_name, source.name, table.name
])
description = table.description or ''
source_description = source.description or ''
collect_docrefs(source, refs, None, description, source_description)
loaded_at_field = table.loaded_at_field or source.loaded_at_field
freshness = self._calculate_freshness(source, table)
quoting = source.quoting.merged(table.quoting)
path = block.path.original_file_path
return ParsedSourceDefinition(
package_name=self.project.project_name,
database=(source.database or self.default_database),
schema=(source.schema or source.name),
identifier=(table.identifier or table.name),
def find_macro_by_name(self, name, package):
"""Find a macro in the graph by its name and package name, or None for
any package.
"""
return self._find_by_name(name, package, 'macros', [NodeType.Macro])
def get_project_config(self, runtime_config):
# most configs are overwritten by a more specific config, but pre/post
# hooks are appended!
config = {}
for k in self.AppendListFields:
config[k] = []
for k in self.ExtendDictFields:
config[k] = {}
if self.node_type == NodeType.Seed:
model_configs = runtime_config.seeds
elif self.node_type == NodeType.Snapshot:
model_configs = {}
else:
model_configs = runtime_config.models
if model_configs is None:
return config
# mutates config
self.smart_update(config, model_configs)
fqn = self.fqn[:]
for level in fqn:
level_config = model_configs.get(level, None)
if level_config is None:
def find_source_by_name(self, source_name, table_name, package):
"""Find any valid target for "source()" in the graph by its name and
package name, or None for any package.
"""
name = '{}.{}'.format(source_name, table_name)
return self._find_by_name(name, package, 'nodes', [NodeType.Source])
token = sqlparse.sql.Token(
sqlparse.tokens.Keyword,
", ".join(c.sql for c in ctes)
)
parsed.insert_after(with_stmt, token)
return str(parsed)
COMPILED_TYPES: Dict[NodeType, Type[CompiledNode]] = {
NodeType.Analysis: CompiledAnalysisNode,
NodeType.Model: CompiledModelNode,
NodeType.Operation: CompiledHookNode,
NodeType.RPCCall: CompiledRPCNode,
NodeType.Seed: CompiledSeedNode,
NodeType.Snapshot: CompiledSnapshotNode,
NodeType.Test: CompiledTestNode,
}
def compiled_type_for(parsed: ParsedNode):
if parsed.resource_type in COMPILED_TYPES:
return COMPILED_TYPES[parsed.resource_type]
else:
return type(parsed)
def parsed_instance_for(compiled: CompiledNode) -> ParsedNode:
cls = PARSED_TYPES.get(compiled.resource_type)
if cls is None:
# how???
raise ValueError('invalid resource_type: {}'
ParsedResource = Union[
ParsedMacro, ParsedNode, ParsedDocumentation, ParsedSourceDefinition
]
PARSED_TYPES: Dict[NodeType, Type[ParsedResource]] = {
NodeType.Analysis: ParsedAnalysisNode,
NodeType.Documentation: ParsedDocumentation,
NodeType.Macro: ParsedMacro,
NodeType.Model: ParsedModelNode,
NodeType.Operation: ParsedHookNode,
NodeType.RPCCall: ParsedRPCNode,
NodeType.Seed: ParsedSeedNode,
NodeType.Snapshot: ParsedSnapshotNode,
NodeType.Source: ParsedSourceDefinition,
NodeType.Test: ParsedTestNode,
}
unparsed = UnparsedNode(
name=full_name,
resource_type=NodeType.Test,
package_name=package_name,
root_path=root_dir,
path=hashed_path,
original_file_path=path,
raw_sql=raw_sql
)
# supply our own fqn which overrides the hashed version from the path
# TODO: is this necessary even a little bit for tests?
fqn_override = self.get_fqn(unparsed.incorporate(path=full_path),
source_package)
node_path = self.get_path(NodeType.Test, unparsed.package_name,
unparsed.name)
result = self.parse_node(unparsed,
node_path,
source_package,
tags=['schema'],
fqn_extra=None,
fqn=fqn_override,
column_name=column_name)
parse_ok = self.check_block_parsing(full_name, test_path, raw_sql)
if not parse_ok:
# if we had a parse error in parse_node, we would not get here. So
# this means we rejected a good file :(
raise dbt.exceptions.InternalException(
'the block parser rejected a good node: {} was marked invalid '
def _load_nodes(self):
self._load_sql_nodes(ModelParser, NodeType.Model, 'source_paths')
self._load_sql_nodes(AnalysisParser, NodeType.Analysis,
'analysis_paths')
self._load_sql_nodes(DataTestParser, NodeType.Test, 'test_paths',
tags=['data'])
hook_parser = HookParser(self.root_project, self.all_projects,
self.macro_manifest)
self.nodes.update(hook_parser.load_and_parse())
archive_parser = ArchiveParser(self.root_project, self.all_projects,
self.macro_manifest)
self.nodes.update(archive_parser.load_and_parse())
self._load_seeds()