Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def reply(error, response):
"""
format messages to be sent back to parent process
"""
stdout.write(compact_json([
datetime.now().isoformat(),
error,
response]) + b'\n')
stdout.flush()
ids or names from each line (e.g. package_search, package_show
or package_list output)
record numbers here correspond to names/ids extracted not lines
"""
start_record = int(arguments['--start-record'])
max_records = arguments['--max-records']
if max_records is not None:
max_records = int(max_records)
for num, name in enumerate(chain.from_iterable(
extract_ids_or_names(line) for line in jsonl_input), 1):
if num < start_record:
continue
if max_records is not None and num >= start_record + max_records:
break
yield num, compact_json(name)
def reply(error, record=None):
"""
format messages to be sent back to parent process
"""
stdout.write(compact_json([
datetime.now().isoformat(),
error,
record]) + b'\n')
stdout.flush()
with quiet_int_pipe() as errors:
for job_ids, finished, result in pool:
if not result:
# child exited with traceback
return 1
timestamp, error, response = json.loads(
result.decode('utf-8'))
if not arguments['--quiet']:
stderr.write(('%s %s %s %s %s\n' % (
finished,
job_ids,
next(stats),
error,
compact_json(response).decode('utf-8') if response else ''
)).encode('utf-8'))
if log:
log.write(compact_json([
timestamp,
finished,
error,
response,
]) + b'\n')
log.flush()
if 'pipe' in errors:
return 1
if 'interrupt' in errors:
return 2
continue
if max_records is not None and num >= start_record + max_records:
break
yield num, compact_json(name)
cmd = _worker_command_line(thing, arguments)
processes = int(arguments['--processes'])
if hasattr(ckan, 'parallel_limit'):
# add your sites to ckanapi.remoteckan.MY_SITES instead of removing
processes = min(processes, ckan.parallel_limit)
stats = completion_stats(processes)
if not arguments['ID_OR_NAME']:
pool = worker_pool(cmd, processes, name_reader())
else:
pool = worker_pool(cmd, processes, enumerate(
(compact_json(n) + b'\n' for n in arguments['ID_OR_NAME']), 1))
with quiet_int_pipe() as errors:
for job_ids, finished, result in pool:
if not result:
# child exited with traceback
return 1
timestamp, error, response = json.loads(
result.decode('utf-8'))
if not arguments['--quiet']:
stderr.write(('%s %s %s %s %s\n' % (
finished,
job_ids,
next(stats),
error,
compact_json(response).decode('utf-8') if response else ''
f = open(expanduser(fvalue), 'rb')
except IOError as e:
raise CLIError("Error opening %r: %s" %
(expanduser(fvalue), e.args[1]))
file_args[fkey] = f
else:
raise CLIError("argument not in the form KEY=STRING, "
"KEY:JSON or KEY@FILE %r" % kv)
result = ckan.call_action(arguments['ACTION_NAME'], action_args,
files=file_args)
if arguments['--output-jsonl']:
if isinstance(result, list):
for r in result:
yield compact_json(r) + b'\n'
else:
yield compact_json(result) + b'\n'
elif arguments['--output-json']:
yield compact_json(result) + b'\n'
else:
yield pretty_json(result) + b'\n'
timestamp,
finished,
error,
record.get('name', '') if record else None,
]) + b'\n')
datapackages_path = arguments['--datapackages']
if datapackages_path:
create_datapackage(record, datapackages_path, stderr)
# keep the output in the same order as names
while expecting_number in results:
record = results.pop(expecting_number)
if record:
# sort keys so we can diff output
jsonl_output.write(compact_json(record,
sort_keys=True) + b'\n')
expecting_number += 1
if 'pipe' in errors:
return 1
if 'interrupt' in errors:
return 2