Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def sov_dbm():
# 1 - creat one report for client and oen report for peers
client_name, peer_name = sov_create_reports()
# 2 - Download the reports and aggregate into one table
sov_rows = sov_process_client(client_name)
sov_rows.extend(sov_process_peer(peer_name))
# 3 - Save the report to the specified table
rows_to_table(
project.task['auth'],
project.id,
project.task['dataset'],
project.task['table'],
sov_rows,
SCHEMA,
0
)
def get_owners():
if project.verbose: print 'GETTING OWNERS'
owners = []
if 'sheet' in project.task['owners']:
owners = sheets_read(
project.task['auth'],
project.task['owners']['sheet']['url'],
project.task['owners']['sheet']['tab'],
project.task['owners']['sheet']['range']
)
elif 'bigquery' in project.task['owners']:
owners = query_to_rows(
project.task['auth'],
project.id,
project.task['owners']['bigquery']['dataset'],
project.task['owners']['bigquery']['query']
)
# group account owners by email, create easy lookup sets for ids
owners_grouped = {}
for owner in owners:
try:
owners_grouped.setdefault(owner[2], {
'Account Name':owner[0],
'Account Owner':owner[1],
'Account Email':owner[2],
'DCM Network ID':[],
'DBM Partner ID':[],
'DS Account ID':[],
"readOptions": {
"readConsistency":"STRONG"
},
"keys": [{
"path": [
{ 'kind': kind, 'name': k}
],
'partitionId': {
'projectId': project_id,
'namespaceId': namespace
}
} for k in key ]
}
response = API_Datastore(auth).projects().lookup(
projectId=project.id,
body=body
).execute()
# ignore missing, just do found for simplicity
for e in response.get('found', []):
yield _datastore_path(e['entity']['key']['path']), _datastore_p_to_v(e['entity']['properties'])
def bigquery():
if 'run' in project.task and 'query' in project.task.get('run', {}):
if project.verbose: print("QUERY", project.task['run']['query'])
run_query(
project.task['auth'],
project.id,
project.task['run']['query'],
project.task['run'].get('legacy', True),
#project.task['run'].get('billing_project_id', None)
)
elif 'values' in project.task['from']:
rows = get_rows(project.task['auth'], project.task['from'])
rows_to_table(
project.task['to'].get('auth', project.task['auth']),
project.id,
project.task['to']['dataset'],
project.task['to']['table'],
rows,
project.task.get('schema', []),
0
rows = report_to_rows(report)
rows = report_clean(rows)
if rows:
if project.verbose: print("DYNAMIC COSTS WRITTEN:", table)
# pull DCM schema automatically
try:
schema = report_schema(next(rows))
except StopIteration: # report is empty
raise ValueError("REPORT DID NOT RUN")
# write report to bigquery
rows_to_table(
project.task['out']["auth"],
project.id,
project.task['out']["dataset"],
table,
rows,
schema,
0
)
else:
if project.verbose: print("DYNAMIC COSTS REPORT NOT READY:", table)
rows,
destination['bigquery'].get('schema', []),
destination['bigquery'].get('disposition', 'WRITE_TRUNCATE'),
)
elif destination['bigquery'].get('is_incremental_load', False) == True:
incremental_rows_to_table(
destination['bigquery'].get('auth', auth),
destination['bigquery'].get('project_id', project.id),
destination['bigquery']['dataset'],
destination['bigquery']['table'] + variant,
rows,
destination['bigquery'].get('schema', []),
destination['bigquery'].get('skip_rows', 1), #0 if 'schema' in destination['bigquery'] else 1),
destination['bigquery'].get('disposition', 'WRITE_APPEND'),
billing_project_id=project.id
)
else:
rows_to_table(
destination['bigquery'].get('auth', auth),
destination['bigquery'].get('project_id', project.id),
destination['bigquery']['dataset'],
destination['bigquery']['table'] + variant,
rows,
destination['bigquery'].get('schema', []),
destination['bigquery'].get('skip_rows', 1), #0 if 'schema' in destination['bigquery'] else 1),
destination['bigquery'].get('disposition', 'WRITE_TRUNCATE'),
)
if 'sheets' in destination:
if destination['sheets'].get('delete', False):
def iam():
set_iam(project.task['auth'], project.id, project.task['role'], project.task['email'])
# NOT RECOMMENDED: determine schema if missing
else:
if project.verbose: print('SHEETS SCHEMA DETECT ( Note Recommended - Define Schema In JSON )')
# cast rows to types ( for schema detection )
rows = rows_to_type(rows)
rows, schema = get_schema(
rows,
project.task.get('header', False),
infer_type=project.task.get('infer_type', True)
)
# write to table ( not using put because no use cases for other destinations )
rows_to_table(
auth=project.task['out'].get('auth', project.task['auth']),
project_id=project.id,
dataset_id=project.task['out']['bigquery']['dataset'],
table_id=project.task['out']['bigquery']['table'],
rows=rows,
schema=schema,
skip_rows=1 if project.task.get('header', False) else 0,
disposition=project.task['out']['bigquery'].get('disposition', 'WRITE_TRUNCATE')
)
else:
print('SHEET EMPTY')
# if it does not exist and write, clear the table
elif disposition == 'WRITE_TRUNCATE':
if project.verbose: print("BIGQUERY: No data, clearing table.")
body = {
"tableReference": {
"projectId": project_id,
"datasetId": dataset_id,
"tableId": table_id
},
"schema": {
"fields": schema
}
}
# change project_id to be project.id, better yet project.cloud_id from JSON
API_BigQuery(auth).tables().insert(projectId=project.id, datasetId=dataset_id, body=body).execute()