Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
dbt.flags.STRICT_MODE = True
dbt.flags.WARN_ERROR = True
self.maxDiff = None
profile_data = {
'target': 'test',
'quoting': {},
'outputs': {
'test': {
'type': 'redshift',
'host': 'localhost',
'schema': 'analytics',
'user': 'test',
'pass': 'test',
'dbname': 'test',
'port': 1,
def setUp(self):
flags.STRICT_MODE = True
self.raw_profile = {
'outputs': {
'oauth': {
'type': 'bigquery',
'method': 'oauth',
'project': 'dbt-unit-000000',
'schema': 'dummy_schema',
'threads': 1,
},
'service_account': {
'type': 'bigquery',
'method': 'service-account',
'project': 'dbt-unit-000000',
'schema': 'dummy_schema',
'keyfile': '/tmp/dummy-service-account.json',
import socket
import time
from base64 import standard_b64encode as b64
from datetime import datetime
import requests
from pytest import mark
from test.integration.base import DBTIntegrationTest, use_profile
import dbt.flags
from dbt.version import __version__
from dbt.logger import log_manager
from dbt.main import handle_and_check
class ServerProcess(dbt.flags.MP_CONTEXT.Process):
def __init__(self, port, profiles_dir, cli_vars=None):
self.port = port
handle_and_check_args = [
'--strict', 'rpc', '--log-cache-events',
'--port', str(self.port),
'--profiles-dir', profiles_dir
]
if cli_vars:
handle_and_check_args.extend(['--vars', cli_vars])
super().__init__(
target=handle_and_check,
args=(handle_and_check_args,),
name='ServerProcess')
def run(self):
log_manager.reset_handlers()
def setUp(self):
flags.STRICT_MODE = False
self.project_cfg = {
'name': 'X',
'version': '0.1',
'profile': 'test',
'project-root': '/tmp/dbt/does-not-exist',
'quoting': {
'identifier': False,
'schema': False,
}
def setUp(self):
dbt.flags.STRICT_MODE = True
self.graph_result = None
self.write_gpickle_patcher = patch('networkx.write_gpickle')
self.load_projects_patcher = patch('dbt.loader._load_projects')
self.find_matching_patcher = patch('dbt.clients.system.find_matching')
self.load_file_contents_patcher = patch('dbt.clients.system.load_file_contents')
self.get_adapter_patcher = patch('dbt.context.parser.get_adapter')
self.factory = self.get_adapter_patcher.start()
def mock_write_gpickle(graph, outfile):
self.graph_result = graph
self.mock_write_gpickle = self.write_gpickle_patcher.start()
self.mock_write_gpickle.side_effect = mock_write_gpickle
self.profile = {
'outputs': {
rendered_query = model['injected_sql']
profile = project.run_environment()
db_wrapper = DatabaseWrapper(model, adapter, profile)
opts = {
"materialization": get_materialization(model),
"model": model,
"schema": schema,
"dist": dist_qualifier,
"sort": sort_qualifier,
"pre_hooks": pre_hooks,
"post_hooks": post_hooks,
"sql": rendered_query,
"flags": dbt.flags,
"adapter": db_wrapper
}
opts.update(db_wrapper.get_context_functions())
return do_wrap(model, opts, injected_graph, context, project)
def get_model_identifier(model):
if dbt.flags.NON_DESTRUCTIVE:
return model['name']
else:
return "{}__dbt_tmp".format(model['name'])
from dbt.adapters.postgres import PostgresConnectionManager
from dbt.adapters.postgres import PostgresCredentials
from dbt.logger import GLOBAL_LOGGER as logger # noqa
import dbt.exceptions
import dbt.flags
import boto3
from hologram import FieldEncoder, JsonSchemaMixin
from hologram.helpers import StrEnum
from dataclasses import dataclass, field
from typing import Optional
drop_lock: Lock = dbt.flags.MP_CONTEXT.Lock()
IAMDuration = NewType('IAMDuration', int)
class IAMDurationEncoder(FieldEncoder):
@property
def json_schema(self):
return {'type': 'integer', 'minimum': 0, 'maximum': 65535}
JsonSchemaMixin.register_field_encoders({IAMDuration: IAMDurationEncoder()})
class RedshiftConnectionMethod(StrEnum):
DATABASE = 'database'
def load_and_parse(self):
if dbt.flags.STRICT_MODE:
dbt.contracts.project.ProjectList(**self.all_projects)
hook_nodes = {}
for hook_type in RunHookType.Both:
project_hooks = self.load_and_parse_run_hook_type(
hook_type,
)
hook_nodes.update(project_hooks)
return hook_nodes
def _partial_parse_enabled(self):
# if the CLI is set, follow that
if dbt.flags.PARTIAL_PARSE is not None:
return dbt.flags.PARTIAL_PARSE
# if the config is set, follow that
elif self.root_project.config.partial_parse is not None:
return self.root_project.config.partial_parse
else:
return DEFAULT_PARTIAL_PARSE