Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def update_overall_stats_table(syn, project_id, overall_stats_df):
"""
Push the latest version of the overall stats table to Synapse.
"""
try:
print("Searching for existing 'OverallStats' table...")
schema_id = [t for t in syn.getChildren(project_id, includeTypes=['table'])
if t['name'] == 'OverallStats'][0]['id']
schema = syn.get(schema_id)
overall_stats_table = syn.tableQuery('select * from {}'.format(schema_id))
overall_stats_table.schema = schema
print("Updating 'OverallStats' table...")
update_table = synapseclient.Table(schema, overall_stats_df)
overall_stats_table = _update_syn_table(overall_stats_table, update_table, 'team')
except IndexError:
print("Creating 'OverallStats' table...")
project = syn.get(project_id)
cols = synapseclient.as_table_columns(overall_stats_df)
schema = synapseclient.Schema(name='OverallStats', columns=cols, parent=project)
overall_stats_table = synapseclient.Table(schema, overall_stats_df)
print("Storing 'OverallStats' table...")
overall_stats_table = syn.store(overall_stats_table)
schema_id = [t for t in syn.getChildren(project_id, includeTypes=['table'])
if t['name'] == 'AllSubmissions_Annotated'][0]['id']
schema = syn.get(schema_id)
all_subs_table = syn.tableQuery('select * from {}'.format(schema_id))
if all_subs_table.asDataFrame().shape[0] == submission_df.shape[0]:
print("No new submissions since last update.")
all_subs_table.schema = schema
print("Updating 'AllSubmissions' table...")
update_table = synapseclient.Table(schema, submission_df)
all_subs_table = _update_syn_table(all_subs_table, update_table, 'objectId')
except IndexError:
print("Creating 'AllSubmissions' table...")
project = syn.get(project_id)
cols = synapseclient.as_table_columns(submission_df)
schema = synapseclient.Schema(name='AllSubmissions_Annotated', columns=cols, parent=project)
all_subs_table = synapseclient.Table(schema, submission_df)
print("Storing 'AllSubmissions' table...")
all_subs_table = syn.store(all_subs_table)
try:
print("Searching for existing 'TeamStats' table...")
schema_id = [t for t in syn.getChildren(project_id, includeTypes=['table'])
if t['name'] == 'TeamStats'][0]['id']
schema = syn.get(schema_id)
team_stats_table = syn.tableQuery('select * from {}'.format(schema_id))
team_stats_table.schema = schema
print("Updating 'TeamStats' table...")
update_table = synapseclient.Table(schema, team_stats_df)
team_stats_table = _update_syn_table(team_stats_table, update_table, 'team')
except IndexError:
print("Creating 'TeamStats' table...")
project = syn.get(project_id)
cols = synapseclient.as_table_columns(team_stats_df)
schema = synapseclient.Schema(name='TeamStats', columns=cols, parent=project)
team_stats_table = synapseclient.Table(schema, team_stats_df)
print("Storing 'TeamStats' table...")
team_stats_table = syn.store(team_stats_table)
def update_validated_submissions_table(syn, project_id, valid_df):
"""
Push the latest version of the combined validated submissions
table to Synapse.
"""
try:
print("Searching for existing 'ValidatedSubmissions' table...")
schema_id = [t for t in syn.getChildren(project_id, includeTypes=['table'])
if t['name'] == 'ValidatedSubmissions'][0]['id']
schema = syn.get(schema_id)
validated_subs_table = syn.tableQuery('select * from {}'.format(schema_id))
if validated_subs_table.asDataFrame().shape[0] == valid_df.shape[0]:
print("No new valid submissions since last update.")
validated_subs_table.schema = schema
print("Updating 'ValidatedSubmissions' table...")
update_table = synapseclient.Table(schema, valid_df)
validated_subs_table = _update_syn_table(validated_subs_table, update_table, 'objectId')
except IndexError:
print("Creating 'ValidatedSubmissions' table...")
project = syn.get(project_id)
cols = synapseclient.as_table_columns(valid_df)
schema = synapseclient.Schema(name='ValidatedSubmissions', columns=cols, parent=project)
validated_subs_table = synapseclient.Table(schema, valid_df)
print("Storing 'ValidatedSubmissions' table...")
validated_subs_table = syn.store(validated_subs_table)
# Make a change to the view and store
view_dict[0]['fileFormat'] = 'PNG'
with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as temp:
schedule_for_cleanup(temp.name)
temp_filename = temp.name
with io.open(temp_filename, mode='w', encoding="utf-8", newline='') as temp_file:
dw = csv.DictWriter(temp_file, fieldnames=view_dict[0].keys(),
quoting=csv.QUOTE_NONNUMERIC,
lineterminator=str(os.linesep))
dw.writeheader()
dw.writerows(view_dict)
temp_file.flush()
syn.store(synapseclient.Table(entity_view.id, temp_filename))
new_view_dict = list(csv.DictReader(io.open(temp_filename, encoding="utf-8", newline='')))
assert_equals(new_view_dict[0]['fileFormat'], 'PNG')
# query for the change
start_time = time.time()
new_view_results = syn.tableQuery("select * from %s" % entity_view.id)
schedule_for_cleanup(new_view_results.filepath)
new_view_dict = list(csv.DictReader(io.open(new_view_results.filepath, encoding="utf-8", newline='')))
# query until change is seen.
while new_view_dict[0]['fileFormat'] != 'PNG':
# check timeout
assert_less(time.time() - start_time, QUERY_TIMEOUT_SEC)
# query again
new_view_results = syn.tableQuery("select * from %s" % entity_view.id)
new_view_dict = list(csv.DictReader(io.open(new_view_results.filepath, encoding="utf-8", newline='')))
def _update_syn_table(syn_table, update_table, update_key):
"""
Update a Synapse table object based on a local dataframe, using
a specified key to match rows.
"""
syn_df = syn_table.asDataFrame()
update_df = update_table.asDataFrame().set_index(update_key, drop=False)
for idx, row in syn_df.iterrows():
update_idx = syn_df.loc[idx, update_key]
if not syn_df.loc[idx].equals(update_df.loc[update_idx]):
syn_df.loc[idx] = update_df.loc[update_idx]
syn_df = syn_df.append(update_df[~update_df[update_key].isin(syn_df[update_key])])
return synapseclient.Table(syn_table.schema, syn_df)
print("Getting table %s" % entity)
myTableSchema = syn.get(entity)
# CHECK: If Table name already exists, raise value error
existingEntity = syn.findEntityId(myTableSchema.name, parent=destinationId)
if existingEntity is not None:
raise ValueError('An entity named "%s" already exists in this location. Table could not be copied'
% myTableSchema.name)
d = syn.tableQuery('select * from %s' % myTableSchema.id, includeRowIdAndRowVersion=False)
colIds = myTableSchema.columnIds
newTableSchema = Schema(name=myTableSchema.name, parent=destinationId, columns=colIds)
print("Created new table using schema %s" % newTableSchema.name)
newTable = Table(schema=newTableSchema, values=d.filepath)
newTable = syn.store(newTable)
return newTable.schema.id
from all the json files on synapseAnnotations. In the process also updates the table annotation to the latest release version.
"""
currentTable = syn.tableQuery("SELECT * FROM %s" % tableSynId)
# If current table has rows, delete all the rows
if currentTable.asRowSet().rows:
deletedRows = syn.delete(currentTable.asRowSet())
# get table schema and set it's release version annotation
schema = syn.get(tableSynId)
schema.annotations = {"annotationReleaseVersion": str(releaseVersion)}
updated_schema_release = syn.store(schema)
# store the new table on synapse
table = syn.store(synapseclient.Table(schema, newTable))