Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _owner_list(self):
"""
Turns the owner_email property into a list. This should not be overridden.
"""
owner_email = self.owner_email
if owner_email is None:
return []
elif isinstance(owner_email, six.string_types):
return owner_email.split(',')
else:
return owner_email
def _create_request(self, full_url, body=None):
# when full_url contains basic auth info, extract it and set the Authorization header
url = urlparse(full_url)
if url.username:
# base64 encoding of username:password
auth = base64.b64encode(six.b('{}:{}'.format(url.username, url.password or '')))
if six.PY3:
auth = auth.decode('utf-8')
# update full_url and create a request object with the auth header set
full_url = url._replace(netloc=url.netloc.split('@', 1)[-1]).geturl()
req = Request(full_url)
req.add_header('Authorization', 'Basic {}'.format(auth))
else:
req = Request(full_url)
# add the request body
if body:
req.data = urlencode(body).encode('utf-8')
return req
def run(self):
artist_count = defaultdict(int)
for t in self.input():
with t.open('r') as in_file:
for line in in_file:
_, artist, track = line.strip().split()
artist_count[artist] += 1
with self.output().open('w') as out_file:
for artist, count in six.iteritems(artist_count):
out_file.write('{}\t{}\n'.format(artist, count))
def _create_request(self, full_url, body=None):
# when full_url contains basic auth info, extract it and set the Authorization header
url = urlparse(full_url)
if url.username:
# base64 encoding of username:password
auth = base64.b64encode(six.b('{}:{}'.format(url.username, url.password or '')))
if six.PY3:
auth = auth.decode('utf-8')
# update full_url and create a request object with the auth header set
full_url = url._replace(netloc=url.netloc.split('@', 1)[-1]).geturl()
req = Request(full_url)
req.add_header('Authorization', 'Basic {}'.format(auth))
else:
req = Request(full_url)
# add the request body
if body:
req.data = urlencode(body).encode('utf-8')
return req
def find_all_by_parameters(self, task_name, session=None, **task_params):
"""
Find tasks with the given task_name and the same parameters as the kwargs.
"""
with self._session(session) as session:
query = session.query(TaskRecord).join(TaskEvent).filter(TaskRecord.name == task_name)
for (k, v) in six.iteritems(task_params):
alias = sqlalchemy.orm.aliased(TaskParameter)
query = query.join(alias).filter(alias.name == k, alias.value == v)
tasks = query.order_by(TaskEvent.ts)
for task in tasks:
# Sanity check
assert all(k in task.parameters and v == str(task.parameters[k].value) for (k, v) in six.iteritems(task_params))
yield task
def of_cls(self):
"""
DONT USE. Will be deleted soon. Use ``self.of``!
"""
if isinstance(self.of, six.string_types):
warnings.warn('When using Range programatically, dont pass "of" param as string!')
return Register.get_task_cls(self.of)
return self.of
def put_string(self, contents, dest_path, mimetype=None):
mimetype = mimetype or mimetypes.guess_type(dest_path)[0] or DEFAULT_MIMETYPE
assert isinstance(mimetype, six.string_types)
if not isinstance(contents, six.binary_type):
contents = contents.encode("utf-8")
media = http.MediaIoBaseUpload(six.BytesIO(contents), mimetype, resumable=bool(contents))
self._do_put(media, dest_path)
def _build_pig_cmd(self):
opts = self.pig_options()
def line(k, v):
return ('%s=%s%s' % (k, v, os.linesep)).encode('utf-8')
with tempfile.NamedTemporaryFile() as param_file, tempfile.NamedTemporaryFile() as prop_file:
if self.pig_parameters():
items = six.iteritems(self.pig_parameters())
param_file.writelines(line(k, v) for (k, v) in items)
param_file.flush()
opts.append('-param_file')
opts.append(param_file.name)
if self.pig_properties():
items = six.iteritems(self.pig_properties())
prop_file.writelines(line(k, v) for (k, v) in items)
prop_file.flush()
opts.append('-propertyFile')
opts.append(prop_file.name)
cmd = [self.pig_command_path()] + opts + ["-f", self.pig_script_path()]
logger.info(subprocess.list2cmdline(cmd))
yield cmd
def _run_get_new_deps(self):
task_gen = self.task.run()
if not isinstance(task_gen, types.GeneratorType):
return None
next_send = None
while True:
try:
if next_send is None:
requires = six.next(task_gen)
else:
requires = task_gen.send(next_send)
except StopIteration:
return None
new_req = flatten(requires)
if all(t.complete() for t in new_req):
next_send = getpaths(requires)
else:
new_deps = [(t.task_module, t.task_family, t.to_str_params())
for t in new_req]
return new_deps
def _purge_children(self):
"""
Find dead children and put a response on the result queue.
:return:
"""
for task_id, p in six.iteritems(self._running_tasks):
if not p.is_alive() and p.exitcode:
error_msg = 'Task {} died unexpectedly with exit code {}'.format(task_id, p.exitcode)
p.task.trigger_event(Event.PROCESS_FAILURE, p.task, error_msg)
elif p.timeout_time is not None and time.time() > float(p.timeout_time) and p.is_alive():
p.terminate()
error_msg = 'Task {} timed out after {} seconds and was terminated.'.format(task_id, p.worker_timeout)
p.task.trigger_event(Event.TIMEOUT, p.task, error_msg)
else:
continue
logger.info(error_msg)
self._task_result_queue.put((task_id, FAILED, error_msg, [], []))