Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _unpack(self, payload, sep=str_to_bytes('\x00\x01')):
raw_payload = b64decode(ensure_bytes(payload))
first_sep = raw_payload.find(sep)
signer = raw_payload[:first_sep]
signer_cert = self._cert_store[signer]
sig_len = signer_cert._cert.get_pubkey().bits() >> 3
signature = raw_payload[
first_sep + len(sep):first_sep + len(sep) + sig_len
]
end_of_sig = first_sep + len(sep) + sig_len+len(sep)
v = raw_payload[end_of_sig:].split(sep)
values = [bytes_to_str(signer), bytes_to_str(signature),
bytes_to_str(v[0]), bytes_to_str(v[1]), bytes_to_str(v[2])]
return {
'signer': values[0],
'signature': values[1],
'content_type': values[2],
'content_encoding': values[3],
'body': values[4],
}
def P(s):
print(bytes_to_str(s), file=fh)
def _strip_prefix(self, key):
"""Takes bytes, emits string."""
key = self.key_t(key)
for prefix in self.task_keyprefix, self.group_keyprefix:
if key.startswith(prefix):
return bytes_to_str(key[len(prefix):])
return bytes_to_str(key)
pass
else:
if cached['status'] in READY_STATES:
yield bytes_to_str(task_id), cached
cached_ids.add(task_id)
ids.difference_update(cached_ids)
iterations = 0
while ids:
keys = list(ids)
r = self._mget_to_results(self.mget([self.get_key_for_task(k)
for k in keys]), keys)
cache.update(r)
ids.difference_update(set(bytes_to_str(v) for v in r))
for key, value in items(r):
yield bytes_to_str(key), value
if timeout and iterations * interval >= timeout:
raise TimeoutError('Operation timed out ({0})'.format(timeout))
time.sleep(interval) # don't busy loop.
iterations += 1
def b64encode(s):
return bytes_to_str(base64.b64encode(str_to_bytes(s)))
def _restore(self, message, leftmost=False):
if not self.ack_emulation:
return super(Channel, self)._restore(message)
tag = message.delivery_tag
with self.conn_or_acquire() as client:
P, _ = client.pipeline() \
.hget(self.unacked_key, tag) \
.hdel(self.unacked_key, tag) \
.execute()
if P:
M, EX, RK = loads(bytes_to_str(P)) # json is unicode
self._do_restore_message(M, EX, RK, client, leftmost)
def _mget_to_results(self, values, keys):
if hasattr(values, 'items'):
# client returns dict so mapping preserved.
return dict((self._strip_prefix(k), self.decode(v))
for k, v in items(values)
if v is not None)
else:
# client returns list so need to recreate mapping.
return dict((bytes_to_str(keys[i]), self.decode(value))
for i, value in enumerate(values)
if value is not None)
def _index(self, id, body, **kwargs):
body = {bytes_to_str(k): v for k, v in items(body)}
return self.server.index(
id=bytes_to_str(id),
index=self.index,
doc_type=self.doc_type,
body=body,
**kwargs
)
def _strip_prefix(self, key):
"""Take bytes: emit string."""
key = self.key_t(key)
for prefix in self.task_keyprefix, self.group_keyprefix:
if key.startswith(prefix):
return bytes_to_str(key[len(prefix):])
return bytes_to_str(key)
def get(self, key):
"""Read the value stored at the given key.
Args:
key: The key for which to read the value.
"""
key = bytes_to_str(key)
LOGGER.debug("Getting CosmosDB document %s/%s/%s",
self._database_name, self._collection_name, key)
try:
document = self._client.ReadDocument(
self._get_document_link(key),
self._get_partition_key(key))
except HTTPFailure as ex:
if ex.status_code != ERROR_NOT_FOUND:
raise
return None
else:
return document.get("value")