Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_keypath_and_pagesize_can_be_set(self, mock_build):
config_dict = {
'extractor.bigquery_table_metadata.{}'.format(BigQueryMetadataExtractor.PROJECT_ID_KEY):
'your-project-here',
'extractor.bigquery_table_metadata.{}'.format(BigQueryMetadataExtractor.PAGE_SIZE_KEY):
200,
'extractor.bigquery_table_metadata.{}'.format(BigQueryMetadataExtractor.KEY_PATH_KEY):
'/tmp/doesnotexist',
}
conf = ConfigFactory.from_dict(config_dict)
mock_build.return_value = MockBigQueryClient(ONE_DATASET, ONE_TABLE, TABLE_DATA)
extractor = BigQueryMetadataExtractor()
with self.assertRaises(FileNotFoundError):
extractor.init(Scoped.get_scoped_conf(conf=conf,
scope=extractor.get_scope()))
def test_transform_with_invalid_model_class_conf(self):
# type: () -> None
"""
Test non existing model_class conf will throw error
"""
config_dict = {'transformer.elasticsearch.index': self.elasticsearch_index,
'transformer.elasticsearch.doc_type': self.elasticsearch_type,
'transformer.elasticsearch.model_class':
'databuilder.models.table_elasticsearch_document.NonExistingESDocument'}
transformer = ElasticsearchDocumentTransformer()
with self.assertRaises(Exception) as context:
transformer.init(conf=Scoped.get_scoped_conf(conf=ConfigFactory.from_dict(config_dict),
scope=transformer.get_scope()))
self.assertTrue("'module' object has no attribute 'NonExistingESDocument'"
in context.exception)
def test_from_dict_with_nested_dict(self):
d = OrderedDict()
d['banana'] = 3
d['apple'] = 4
d['pear'] = 1
d['tree'] = {
'a': 'abc\ntest\n',
'b': [1, 2, 3]
}
config = ConfigFactory.from_dict(d)
assert config == d
def setUp(self):
# type: () -> None
logging.basicConfig(level=logging.INFO)
self.cluster_key = "not_master"
config_dict = {
SnowflakeMetadataExtractor.CLUSTER_KEY: self.cluster_key,
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING):
'TEST_CONNECTION',
SnowflakeMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME: False
}
self.conf = ConfigFactory.from_dict(config_dict)
from pyhocon import ConfigFactory, ConfigTree # noqa: F401
from typing import Iterator, Union, Dict, Any # noqa: F401
from databuilder.extractor.base_extractor import Extractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
class GlueExtractor(Extractor):
"""
Extracts tables and columns metadata from AWS Glue metastore
"""
CLUSTER_KEY = 'cluster'
FILTER_KEY = 'filters'
DEFAULT_CONFIG = ConfigFactory.from_dict({CLUSTER_KEY: 'gold', FILTER_KEY: None})
def init(self, conf):
conf = conf.with_fallback(GlueExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(GlueExtractor.CLUSTER_KEY))
self._filters = conf.get(GlueExtractor.FILTER_KEY)
self._glue = boto3.client('glue')
self._extract_iter = None # type: Union[None, Iterator]
def extract(self):
# type: () -> Union[TableMetadata, None]
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
{}
AND {}
""".format(self._conf.get_string(
HiveTableLastUpdatedExtractor.NON_PARTITIONED_TABLE_WHERE_CLAUSE_SUFFIX_KEY),
HiveTableLastUpdatedExtractor.ADDTIONAL_WHERE_CLAUSE)
else:
where_clause_suffix = 'WHERE {}'.format(HiveTableLastUpdatedExtractor.ADDTIONAL_WHERE_CLAUSE)
sql_stmt = HiveTableLastUpdatedExtractor.NON_PARTITIONED_TABLE_SQL_STATEMENT.format(
where_clause_suffix=where_clause_suffix)
LOGGER.info('SQL for non-partitioned table against Hive metastore: {}'.format(sql_stmt))
sql_alchemy_extractor = SQLAlchemyExtractor()
sql_alchemy_conf = Scoped.get_scoped_conf(self._conf, sql_alchemy_extractor.get_scope()) \
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: sql_stmt}))
sql_alchemy_extractor.init(sql_alchemy_conf)
return sql_alchemy_extractor
if not os.path.exists(config_file):
Log.error('Json Path:{} not exists!'.format(config_file))
exit(1)
self.params_root = ConfigFactory.parse_file(config_file)
elif 'config_file' in args_parser and args_parser.config_file is not None:
if not os.path.exists(args_parser.config_file):
Log.error('Json Path:{} not exists!'.format(args_parser.config_file))
exit(1)
self.params_root = ConfigFactory.parse_file(args_parser.config_file)
else:
Log.warn('Base settings not set!')
self.params_root = ConfigFactory.from_dict({})
if args_parser is not None:
for key, value in args_parser.__dict__.items():
if valid_flag is not None and key.split('.')[0] != valid_flag:
continue
if key not in self.params_root:
self.add(key, value)
elif value is not None:
self.update(key, value)
def create_bq_job(metadata_type, gcloud_project):
tmp_folder = '/var/tmp/amundsen/{metadata_type}'.format(metadata_type=metadata_type)
node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder)
bq_usage_extractor = BigQueryTableUsageExtractor()
csv_loader = FsNeo4jCSVLoader()
task = DefaultTask(extractor=bq_usage_extractor,
loader=csv_loader,
transformer=BigqueryUsageTransformer())
job_config = ConfigFactory.from_dict({
'extractor.bigquery_table_usage.{}'.format(BigQueryTableUsageExtractor.PROJECT_ID_KEY):
gcloud_project,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH):
node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH):
relationship_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.SHOULD_DELETE_CREATED_DIR):
True,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR):
node_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY):
neo4j_endpoint,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER):
neo4j_user,
def create_last_updated_job():
# loader saves data to these folders and publisher reads it from here
tmp_folder = '/var/tmp/amundsen/last_updated_data'
node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder)
task = DefaultTask(extractor=Neo4jEsLastUpdatedExtractor(),
loader=FsNeo4jCSVLoader())
job_config = ConfigFactory.from_dict({
'extractor.neo4j_es_last_updated.model_class':
'databuilder.models.neo4j_es_last_updated.Neo4jESLastUpdated',
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH):
node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR):
node_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR):
relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY):
neo4j_endpoint,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER):
neo4j_user,
extracted_search_data_path = '/var/tmp/amundsen/search_data.json'
task = DefaultTask(loader=FSElasticsearchJSONLoader(),
extractor=Neo4jSearchDataExtractor(),
transformer=NoopTransformer())
# elastic search client instance
elasticsearch_client = es
# unique name of new index in Elasticsearch
elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())
# related to mapping type from /databuilder/publisher/elasticsearch_publisher.py#L38
elasticsearch_new_index_key_type = 'table'
# alias for Elasticsearch used in amundsensearchlibrary/search_service/config.py as an index
elasticsearch_index_alias = 'table_search_index'
job_config = ConfigFactory.from_dict({
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY):
'databuilder.models.table_elasticsearch_document.TableESDocument',
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): 'r',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY):
elasticsearch_client,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY):
elasticsearch_new_index_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY):