Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def run(self, TORRENT_FILTER, FILE_FILTER, columns):
columns = objects.localcfg['columns.files'] if columns is None else columns
try:
columns = self.get_file_columns(columns)
tfilter = self.select_torrents(TORRENT_FILTER,
allow_no_filter=False,
discover_torrent=True)
ffilter = self.select_files(FILE_FILTER,
allow_no_filter=True,
discover_file=False)
except ValueError as e:
raise CmdError(e)
log.debug('Listing %s files of %s torrents', ffilter, tfilter)
if asyncio.iscoroutinefunction(self.make_file_list):
await self.make_file_list(tfilter, ffilter, columns)
else:
async def _show_global_limits(self, directions):
for d in directions:
get_method = getattr(objects.srvapi.settings, 'get_limit_rate_' + d)
limit = await get_method()
self._output('Global %sload rate limit: %s' % (d, limit))
def run(self, TOPIC):
topics = TOPIC
lines = []
success = True
existing_topics = []
if len(topics) < 1:
lines = objects.helpmgr.find('overview')
else:
for topic in topics:
try:
l = objects.helpmgr.find(topic)
except ValueError as e:
self.error(e)
success = False
else:
lines.extend(l)
lines.extend(self.TOPIC_DELIMITER)
existing_topics.append(topic)
if lines:
# Remove last TOPIC_DELIMITER
for _ in range(len(self.TOPIC_DELIMITER)):
lines.pop(-1)
if not existing_topics:
existing_topics.append(__appname__)
def _handle_update(self, *_, **__):
self._data_dict = {
**{k: {'id': k,
'value': v,
'default': objects.localcfg.default(k),
'description': objects.localcfg.description(k)}
for k,v in objects.localcfg.items()},
**{'srv.'+k: {'id': 'srv.'+k,
'value': v,
'default': '',
'description': objects.remotecfg.description(k)}
for k,v in objects.remotecfg.items()},
}
self._invalidate()
def __init__(self):
self._text = urwid.Text('Not connected')
self._attrmap = urwid.AttrMap(self._text, 'topbar.host.disconnected')
super().__init__(self._attrmap)
objects.srvapi.rpc.on('connecting', self._handle_connecting)
objects.srvapi.rpc.on('connected', self._handle_connected)
objects.srvapi.rpc.on('disconnected', self._handle_disconnected)
objects.srvapi.rpc.on('error', self._handle_error)
def get_tracker_columns(self, columns):
"""
Check if each item in iterable `columns` is a valid tracker list column name
Raise ValueError or return a new list of `columns`.
"""
return objects.localcfg.validate('columns.trackers', columns)
async def run(self, TOPIC):
# If TOPIC is a setting and it is managed by the server, we must fetch
# config values from the server so we can display its current value.
for topic in TOPIC:
if topic.startswith('srv.'):
try:
await objects.srvapi.settings.update()
except objects.srvapi.ClientError as e:
self.error(e)
finally:
break
return super().run(TOPIC)
def run(self, TOPIC):
topics = TOPIC
lines = []
success = True
existing_topics = []
if len(topics) < 1:
lines = objects.helpmgr.find('overview')
else:
for topic in topics:
try:
l = objects.helpmgr.find(topic)
except ValueError as e:
self.error(e)
success = False
else:
lines.extend(l)
lines.extend(self.TOPIC_DELIMITER)
existing_topics.append(topic)
if lines:
# Remove last TOPIC_DELIMITER
for _ in range(len(self.TOPIC_DELIMITER)):
lines.pop(-1)