Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@concurrent
def _reddit_url_submissions_for(url):
data = _cached_reddit_submission_search(url=_sanitize_url_for_reddit(url), aggs='created_utc', frequency='5y')
if len(data['created_utc']) == 0:
return 0
return data['created_utc'][0]['doc_count']
if sNSURI[0] == '"': sNSURI = sNSURI[1:]
if sNSURI[-1] == '"': sNSURI = sNSURI[0:-1]
print "- NameSpace: %s=%s"%(sNSName, sNSURI)
self.ltNSNameURI.append( (sNSName, sNSURI) )
#Now loop over other sections, which should define a set of decorations
decos = self.cfg.get(self.sSectionGeneral, "decos")
lsDeco = decos.split()
self.lDeco = [] #a list
# for sect in self.cfg.sections(): #random order!!! :-(
for sect in lsDeco:
if sect in lReservedSections: continue
sType = self.cfg.get(sect, "type") #get the name of the associated Python class
try:
cls=deco.Deco.getDecoClass(sType) #get the associated Python class
obj = cls(self.cfg, sect, None) #let the class creates an instance using the details in the config file
self.lDeco.append(obj)
#print "-", str(obj)
except Exception, e:
print "ERROR: ", e
print "--- Done ---"
return 1
def lookForConfig(sFile, lsPath):
for sPath in lsPath:
s = sPath+os.sep+sFile
print "Looking for ", s,
if os.path.exists(s):
print "OK"
return s
else:
print "FAILED"
print "No config file %s in %s" % (sFile, lsPath)
sys.exit(1)
if __name__ == "__main__":
deco.setEncoding(sEncoding)
sFile = None
print sys.argv
try:
sPath = sys.argv[1]
try:
sFile = sys.argv[2]
if not os.path.exists(sFile) and len(sys.argv)>3:
#maybe we have a space in the file name... :-( and it is a mess with windows and .bat
if os.path.exists(sys.argv[2] + " " + sys.argv[3]):
sFile = sys.argv[2] + " " + sys.argv[3]
except IndexError:
pass
except IndexError:
sConfigFileName = "wxvisu.ini"
@concurrent
def _update_source_worker(source_info):
# worker function to help update sources in parallel
user_mc = user_admin_mediacloud_client()
media_id = source_info['media_id']
logger.debug("Updating media {}".format(media_id))
source_no_metadata_no_id = {k: v for k, v in list(source_info.items()) if k != 'media_id'
and k not in SOURCE_LIST_CSV_METADATA_PROPS}
response = user_mc.mediaUpdate(media_id, source_no_metadata_no_id)
return response
@concurrent(processes=args.proc_number)
def run_concurrent_optimize(trader: RLTrader, args):
trader.optimize(args.trials, args.trials, args.parallel_jobs)
@synchronized
def _update_sources_in_parallel(sources):
for m in sources:
m['response'] = _update_source_worker(m)
return sources
@synchronized
def reddit_url_submission_counts(story_list):
results = defaultdict(dict)
for s in story_list:
results[s['stories_id']] = _reddit_url_submissions_for(s['url'])
return results
def OnMenu_ImgFolder(self, event):
curdir = os.path.dirname(self.doc.getFilename())
if not curdir: curdir = os.getcwd()
dlg = wx.DirDialog (None, "Select the image folder", "",
wx.DD_DEFAULT_STYLE | wx.DD_DIR_MUST_EXIST)
dlg.CenterOnScreen()
val = dlg.ShowModal()
if val == wx.ID_OK:
DecoImage.sImageFolder = dlg.GetPath()
dlg.Destroy()
self.bModified = True
self.display_page()