Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _save_files(self, images=[]):
if_mode = eval(self.app.config_store.get('files', 'image', 'True'))
sc_mode = self.app.config_store.get('files', 'sidecar', 'auto')
force_iptc = eval(
self.app.config_store.get('files', 'force_iptc', 'False'))
keep_time = eval(
self.app.config_store.get('files', 'preserve_timestamps', 'False'))
if not images:
images = self.images
with Busy():
for image in images:
if keep_time:
file_times = image.file_times
else:
file_times = None
image.metadata.save(
if_mode=if_mode, sc_mode=sc_mode,
force_iptc=force_iptc, file_times=file_times)
unsaved = False
for image in self.images:
if image.metadata.changed():
unsaved = True
break
self.new_metadata.emit(unsaved)
def do_geocode(self, params):
cache_key = params['q']
if 'bounds' in params:
cache_key += params['bounds']
if cache_key in self.geocode_cache:
return self.geocode_cache[cache_key]
params['key'] = self.api_key
params['abbrv'] = '1'
params['no_annotations'] = '1'
lang, encoding = locale.getdefaultlocale()
if lang:
params['language'] = lang
with Busy():
self.rate_limit()
try:
rsp = requests.get(
'https://api.opencagedata.com/geocode/v1/json',
params=params, timeout=5)
except Exception as ex:
logger.error(str(ex))
return []
if rsp.status_code >= 400:
logger.error('Search error %d', rsp.status_code)
return []
rsp = rsp.json()
status = rsp['status']
if status['code'] != 200:
logger.error(
'Search error %d: %s', status['code'], status['message'])
def query(self, url, params):
params['key'] = self.api_key
with Busy():
self.rate_limit()
try:
rsp = requests.get(url, params=params, timeout=5)
except Exception as ex:
logger.error(str(ex))
return []
if rsp.status_code >= 400:
logger.error('Search error %d', rsp.status_code)
return []
if rsp.headers['X-MS-BM-WS-INFO'] == '1':
logger.error(translate(
'MapTabBing', 'Server overload, please try again'))
self.block_timer.start(5000)
rsp = rsp.json()
if rsp['statusCode'] != 200:
logger.error('Search error %d: %s',
def connection_changed(self, connected):
if connected:
with Busy():
self.show_user(*self.session.get_user())
self.show_album_list(self.session.get_albums())
else:
self.show_user(None, None)
self.show_album_list([])
self.user_connect.set_checked(connected)
self.upload_config.setEnabled(connected and not self.upload_worker)
self.user_connect.setEnabled(not self.upload_worker)
self.enable_upload_button()
def list_files(self):
file_data = {}
if self.source:
with Busy():
file_data = self.source.get_file_data()
if file_data is None:
self._fail()
return
self._new_file_list(file_data)
def authorise(self):
with Busy():
# do full authentication procedure
http_server = HTTPServer(('127.0.0.1', 0), AuthRequestHandler)
redirect_uri = 'http://127.0.0.1:' + str(http_server.server_port)
auth_url = self.session.get_auth_url(redirect_uri)
if not auth_url:
logger.error('Failed to get auth URL')
http_server.server_close()
return
server = AuthServer()
thread = QtCore.QThread(self)
server.moveToThread(thread)
server.server = http_server
server.response.connect(self.auth_response)
thread.started.connect(server.handle_requests)
server.finished.connect(thread.quit)
server.finished.connect(server.deleteLater)
def upload_finished(self):
# reload current album metadata (to update thumbnail)
with Busy():
self.set_current_album(self.current_album.id.text)
def select_album(self, index):
if not self.authorise('read'):
self.refresh(force=True)
return
album_id = self.upload_config.widgets['album_choose'].itemData(index)
if album_id == 'me':
self.upload_config.show_album({}, None)
return
with Busy():
album, picture = self.session.get_album(
album_id, 'cover_photo,description,location,name')
self.upload_config.show_album(album, picture)
def regenerate_thumbnail(self):
# DCF spec says thumbnail must be 160 x 120, so other aspect
# ratios are padded with black
with Busy():
# first try using FFmpeg to make thumbnail
data, fmt, w, h = self.make_thumb_ffmpeg()
if not data:
# use PIL or Qt
qt_im = self.get_qt_image()
if not qt_im:
return
if PIL:
data, fmt, w, h = self.make_thumb_PIL(qt_im)
else:
data, fmt, w, h = self.make_thumb_Qt(qt_im)
# set thumbnail
self.metadata.thumbnail = data, fmt, w, h
# reload thumbnail
self.load_thumbnail()
def find_photos(self, min_taken_date, max_taken_date):
# search Flickr
page = 1
while True:
with Busy():
try:
rsp = self.api.people.getPhotos(
user_id='me', page=page, extras='date_taken,url_t',
min_taken_date=min_taken_date.strftime('%Y-%m-%d %H:%M:%S'),
max_taken_date=max_taken_date.strftime('%Y-%m-%d %H:%M:%S'))
if rsp['stat'] != 'ok' or not rsp['photos']['photo']:
return
except Exception as ex:
logger.error(str(ex))
self.disconnect()
return
for photo in rsp['photos']['photo']:
yield photo
page += 1