Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def publish_date_query(start_date, end_date, start_date_inclusive=True, end_date_inclusive=False,
field='publish_day'):
valid_date_fields = ['publish_day', 'publish_week', 'publish_month', 'publish_year']
if field not in valid_date_fields:
raise mediacloud.error.MCException("Not a valid date field {}".format(field))
return field+':' + _solr_date_range(start_date, end_date, start_date_inclusive, end_date_inclusive)
def story_counts_by_snapshot(topics_id):
user_mc = user_mediacloud_client(user_mediacloud_key())
snapshots = user_mc.topicSnapshotList(topics_id)
counts = {}
for s in snapshots:
# get the count of stories in the overally timespan for this snapshot
timespans = apicache.cached_topic_timespan_list(user_mediacloud_key(), topics_id,
snapshots_id=s['snapshots_id'], foci_id=None)
try:
total = timespans[0]['story_count']
except mediacloud.error.MCException:
total = 0
except IndexError: # this doesn't have any snapshots (ie. it failed to generate correctly)
total = 0
# search by tag to find out how many stories were spidered
spidered = 0
try:
spidered = apicache.topic_story_count(user_mediacloud_key(), topics_id,
snapshots_id=s['snapshots_id'], foci_id=None,
timespans_id=timespans[0]['timespans_id'],
q="* AND tags_id_stories:{}".format(TAG_SPIDERED_STORY))['count']
except mediacloud.error.MCException:
spidered = 0
except IndexError: # this doesn't have any snapshots (ie. it failed to generate correctly)
total = 0
seeded = total - spidered
counts[s['snapshots_id']] = {'total': total, 'spidered': spidered, 'seeded': seeded}
media_tags_ids=tag_ids_to_add, **optional_args)['topics'][0]
topics_id = topic_result['topics_id']
logger.info("Created new topic \"{}\" as {}".format(name, topics_id))
# if this includes any of the US-centric collections, add the retweet partisanship subtopic by default
if set(tag_ids_to_add).intersection(US_COLLECTIONS):
add_retweet_partisanship_to_topic(topic_result['topics_id'],
'Retweet Partisanship',
'Subtopics driven by our analysis of Twitter followers of Trump and Clinton during the 2016 election season. Each media soure is scored based on the ratio of retweets of their stories in those two groups.')
# client will either make a empty snapshot, or a spidering one
return topic_summary(topics_id)
except Exception as e:
logging.error("Topic creation failed {}".format(name))
logging.exception(e)
return json_error_response(str(e), 500)
except mediacloud.error.MCException as e:
logging.error("Topic creation failed {}".format(name))
logging.exception(e)
return json_error_response(e.message, e.status_code)
def _geotagcount_handler(api_key, keywords, media, start, end):
try:
results = _geotagcount(api_key, keywords, media, start, end)
return json.dumps(results, separators=(',',':'))
except mcerror.MCException as exception:
app.core.logger.error("Query failed: "+str(exception))
content = json.dumps({'error':str(exception)}, separators=(',',':'))
status_code = exception.status_code
return content, status_code
except Exception as exception:
app.core.logger.error("Query failed: "+str(exception))
return json.dumps({'error':str(exception)}, separators=(',',':')), 400
tag_story_counts = []
partisanship_tags = _cached_media_tags(TAG_SETS_ID_RETWEET_PARTISANSHIP_2016)
# grab the total stories
try:
total_stories = topic_story_count(user_mediacloud_key(), topics_id)['count']
except mediacloud.error.MCException:
total_stories = 0
# make a count for each tag
for tag in partisanship_tags:
try:
tagged_story_count = topic_story_count(user_mediacloud_key(), topics_id, q=tag['query'])['count']
pct = float(tagged_story_count)/float(total_stories)
except ZeroDivisionError:
tagged_story_count = 0
pct = 0
except mediacloud.error.MCException:
tagged_story_count = 0
pct = 0
tag_story_counts.append({
'label': tag['label'],
'tags_id': tag['tags_id'],
'count': tagged_story_count,
'pct': pct
})
# order them in the way a person would expect ( left to center to right)
ordered_tag_story_counts = list()
ordered_tag_story_counts.append([t for t in tag_story_counts if t['tags_id'] == 9360520][0])
ordered_tag_story_counts.append([t for t in tag_story_counts if t['tags_id'] == 9360521][0])
ordered_tag_story_counts.append([t for t in tag_story_counts if t['tags_id'] == 9360522][0])
ordered_tag_story_counts.append([t for t in tag_story_counts if t['tags_id'] == 9360523][0])
ordered_tag_story_counts.append([t for t in tag_story_counts if t['tags_id'] == 9360524][0])
return jsonify({'story_counts': ordered_tag_story_counts})
def _cached_topic_story_count(user_mc_key, topics_id, **kwargs):
'''
Internal helper - don't call this; call topic_story_count instead. This needs user_mc_key in the
function signature to make sure the caching is keyed correctly.
'''
if user_mc_key == TOOL_API_KEY:
local_mc = mc
else:
local_mc = user_mediacloud_client()
try:
results = local_mc.topicStoryCount(topics_id, **kwargs)
except mediacloud.error.MCException as mce:
# when there is nno timespan (ie. an ungenerated version you are adding subtopics to)
return {'count': 0}
return results
timespans = apicache.cached_topic_timespan_list(user_mediacloud_key(), topics_id,
snapshots_id=s['snapshots_id'], foci_id=None)
try:
total = timespans[0]['story_count']
except mediacloud.error.MCException:
total = 0
except IndexError: # this doesn't have any snapshots (ie. it failed to generate correctly)
total = 0
# search by tag to find out how many stories were spidered
spidered = 0
try:
spidered = apicache.topic_story_count(user_mediacloud_key(), topics_id,
snapshots_id=s['snapshots_id'], foci_id=None,
timespans_id=timespans[0]['timespans_id'],
q="* AND tags_id_stories:{}".format(TAG_SPIDERED_STORY))['count']
except mediacloud.error.MCException:
spidered = 0
except IndexError: # this doesn't have any snapshots (ie. it failed to generate correctly)
total = 0
seeded = total - spidered
counts[s['snapshots_id']] = {'total': total, 'spidered': spidered, 'seeded': seeded}
return jsonify(counts)
def verifyAuthToken(self):
try:
self.tagSetList(0, 1)
return True
except mediacloud.error.MCException:
return False
except Exception as exception:
logger.warning(u"AuthToken verify failed: %s", exception)
return False
def topic_focal_sets_list(user_mc_key, topics_id, snapshots_id):
# This needs user_mc_key in the function signature to make sure the caching is keyed correctly.
user_mc = user_mediacloud_client(user_mc_key)
try:
response = user_mc.topicFocalSetList(topics_id, snapshots_id=snapshots_id)
except mediacloud.error.MCException:
# if a topic failed while trying to generate the snapshot, it can have no overall timespans which
# makes this throw an error; better to fail by returning no focalsets
response = []
return response
def retweet_partisanship_story_counts(topics_id):
# TODO: add in overall timespan id here so it works in different snapshots
tag_story_counts = []
partisanship_tags = _cached_media_tags(TAG_SETS_ID_RETWEET_PARTISANSHIP_2016)
# grab the total stories
try:
total_stories = topic_story_count(user_mediacloud_key(), topics_id)['count']
except mediacloud.error.MCException:
total_stories = 0
# make a count for each tag
for tag in partisanship_tags:
try:
tagged_story_count = topic_story_count(user_mediacloud_key(), topics_id, q=tag['query'])['count']
pct = float(tagged_story_count)/float(total_stories)
except ZeroDivisionError:
tagged_story_count = 0
pct = 0
except mediacloud.error.MCException:
tagged_story_count = 0
pct = 0
tag_story_counts.append({
'label': tag['label'],
'tags_id': tag['tags_id'],
'count': tagged_story_count,