Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
table = ofn[:-4].replace('-','_')
print "Importing into BigQuery as %s:%s.%s" % (project_id, dataset, table)
sys.stdout.flush()
mypath = os.path.dirname(os.path.realpath(__file__))
SCHEMA_FILE = '%s/schemas/schema_person_course.json' % mypath
the_schema = json.loads(open(SCHEMA_FILE).read())['person_course']
bqutil.load_data_to_table(dataset, table, gsfn, the_schema, format='csv', skiprows=1, project_id=output_project_id)
msg = ''
msg += "Combined person-course dataset, with data from:\n"
msg += str(course_id_set)
msg += "\n\n"
msg += "="*100 + "\n"
msg += "CSV download link: %s" % gsutil.gs_download_link(gsfn)
bqutil.add_description_to_table(dataset, table, msg, append=True, project_id=output_project_id)
# copy the new table (which has a specific date in its name) to a generically named "person_course_latest"
# so that future SQL queries can simply use this as the latest person course table
print "-> Copying %s to %s.person_course_latest" % (table, dataset)
bqutil.copy_bq_table(dataset, table, "person_course_latest")
if extract_subset_tables:
do_extract_subset_person_course_tables(dataset, table)
print "Done"
sys.stdout.flush()
sql_for_description=sql_for_description or the_sql,
allowLargeResults=allowLargeResults,
maximumBillingTier=maximumBillingTier,
)
except Exception as err:
print "ERROR! Failed on SQL="
print the_sql
raise
gsfn = "%s/%s.csv" % (self.gsbucket, tablename)
bqutil.extract_table_to_gs(the_dataset, tablename, gsfn,
format='csv',
do_gzip=False,
wait=False)
msg = "CSV download link: %s" % gsutil.gs_download_link(gsfn)
print msg
bqutil.add_description_to_table(the_dataset, tablename, msg, append=True, project_id=self.output_project_id)
msg = "[researchData]: CSV download link: %s" % gsutil.gs_download_link( gsfilename )
print msg
sys.stdout.flush()
except Exception as err:
print str(err)
if ('BQ Error creating table' in str(err) ):
msg = "[researchData]: Retrying... by sharding."
print msg
sys.stdout.flush()
gsfilename = "%s/%s-*.csv.gz" % ( self.gsp, tablename )
print gsfilename
sys.stdout.flush()
ret = bqutil.extract_table_to_gs( the_dataset, tablename, gsfilename, format=rdp_format, do_gzip=True, wait=True)
msg = "[researchData]: CSV download link: %s" % gsutil.gs_download_link( gsfilename )
print msg
sys.stdout.flush()
# Copy from Google Storage to Secure Data Warehouse for archiving
archiveLocation = find_course_sql_dir(course_id=course_id, basedir=basedir, datedir=datedir, use_dataset_latest=True)
#time.sleep( CFG.TIME_TO_WAIT_30s ) # delay needed to allow for GS to upload file fully (This should be size dependent, and may not be enough time)
msg = "[researchData]: Archiving Research Data table %s from %s to %s" % ( tablename, gsfilename, archiveLocation )
print msg
sys.stdout.flush()
gsutil.upload_file_to_gs(src=gsfilename, dst=archiveLocation, verbose=True)
pass
self.gsp = gsutil.gs_path_from_course_id( course_id=course_id, gsbucket=output_bucket, use_dataset_latest=True )
gsfilename = "%s/%s" % ( self.gsp, RESEARCH_DATA_PRODUCTS[ rdp ] )
else:
print "ERROR! Must specify course_id's. Aborting."
return
try:
# Copy to Google Storage
msg = "[researchData]: Copying Research Data table %s to %s" % ( tablename, gsfilename )
print msg
#gsfilename = "%s/%s-*.csv.gz" % ( self.gsp, tablename ) # temp
gsfilename = "%s/%s.csv.gz" % ( self.gsp, tablename ) # temp
ret = bqutil.extract_table_to_gs( the_dataset, tablename, gsfilename, format=rdp_format, do_gzip=True, wait=True)
msg = "[researchData]: CSV download link: %s" % gsutil.gs_download_link( gsfilename )
print msg
sys.stdout.flush()
except Exception as err:
print str(err)
if ('BQ Error creating table' in str(err) ):
msg = "[researchData]: Retrying... by sharding."
print msg
sys.stdout.flush()
gsfilename = "%s/%s-*.csv.gz" % ( self.gsp, tablename )
print gsfilename
sys.stdout.flush()
ret = bqutil.extract_table_to_gs( the_dataset, tablename, gsfilename, format=rdp_format, do_gzip=True, wait=True)
msg = "[researchData]: CSV download link: %s" % gsutil.gs_download_link( gsfilename )
print msg