Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
cmd += [
f"https://{self.host}:{self.port}/{request.method}",
"-H",
"Content-Type: application/json",
"--data-binary",
f"@{nf.name}",
"-w \\n%{http_code}",
]
if self.ca:
cmd.extend(["--cacert", self.ca])
if self.key:
cmd.extend(["--key", self.key])
if self.cert:
cmd.extend(["--cert", self.cert])
LOG.debug(f"Running: {' '.join(cmd)}")
rc = subprocess.run(cmd, capture_output=True)
if rc.returncode != 0:
if rc.returncode == 60:
raise CurlClientSSLException
LOG.error(rc.stderr)
raise RuntimeError(f"Curl failed with return code {rc.returncode}")
# The response status code is displayed on the last line of
# the output (via -w option)
rep, status_code = rc.stdout.decode().rsplit("\n", 1)
if int(status_code) != 200:
LOG.error(rep)
raise RuntimeError(f"Curl failed with status code {status_code}")
self.stream.update(rep.encode())
def test_colorize_stream_windows(patch_colorama, monkeypatch, colorize, tty):
monkeypatch.setattr(os, "name", "nt")
stream = Stream(tty)
logger.add(stream, format="{message}", colorize=colorize)
logger.debug("Message")
writer = patch_colorama.AnsiToWin32().stream.write.called
winapi_test = patch_colorama.win32.winapi_test
stream_write = patch_colorama.AnsiToWin32().stream.write
if colorize or (colorize is None and tty):
assert winapi_test.called
assert stream_write.called
else:
assert not winapi_test.called
assert not stream_write.called
def test_progressive_format(writer):
def formatter(record):
fmt = "[{level.name}] {message}"
if "noend" not in record["extra"]:
fmt += "\n"
return fmt
logger.add(writer, format=formatter)
logger.bind(noend=True).debug("Start: ")
for _ in range(5):
logger.opt(raw=True).debug(".")
logger.opt(raw=True).debug("\n")
logger.debug("End")
assert writer.read() == ("[DEBUG] Start: .....\n" "[DEBUG] End\n")
def test_serialize(with_exception):
record_dict = record_json = None
def sink(message):
nonlocal record_dict, record_json
record_dict = message.record
record_json = json.loads(message)["record"]
logger.configure(extra=dict(not_serializable=object()))
logger.add(sink, format="{message}", catch=False, serialize=True)
if not with_exception:
logger.debug("Test")
else:
try:
1 / 0
except:
logger.exception("Test")
assert set(record_dict.keys()) == set(record_json.keys())
"LOG_record",
{
"id": current_term,
"msg": "This log is committed in term {}".format(current_term),
},
readonly_hint=None,
expected_result=True,
)
commit_index = res.commit
LOG.debug("Waiting for transaction to be committed by all nodes")
wait_for_index_globally_committed(
commit_index, current_term, network.get_joined_nodes()
)
LOG.debug("Stopping primary")
primary.stop()
LOG.debug("Waiting for a new primary to be elected...")
time.sleep(max_election_duration)
# More than F nodes have been stopped, trying to commit any message
LOG.debug(
"No progress can be made as more than {} nodes have stopped".format(
nodes_to_stop
)
)
try:
primary, current_term = network.find_primary()
assert False, "Primary should not be found"
except TypeError:
assert args.consensus == "pbft", "Unexpected error"
self.minibatched_y_train[partner_index][minibatch_index],
)
history = self.collaborative_round_fit(
sequentially_trained_model, train_data_for_fit_iteration, self.val_data, partner.batch_size)
# Log results of the round
self.log_collaborative_round_partner_result(partner, for_loop_idx, history.history["val_accuracy"][0])
# On final collaborative round, save the partner's model in the models' list
if is_last_round:
self.models_weights_list[partner_index] = sequentially_trained_model.get_weights()
# Update iterative results
self.update_iterative_results(partner_index, history)
logger.debug("End of sequential collaborative round.")
val = data[int(key)]
else:
if ignorecase:
for datakey in data.keys():
if datakey.lower() == key.lower():
key = datakey
break
val = data[key]
data = val
logger.debug('3')
if search:
search_ret = []
logger.debug('2')
if isinstance(data, (list, tuple)):
logger.debug('list')
for d in data:
for key in d.keys():
logger.warning(d.keys())
if key == search:
try:
search_ret.append(d[key])
except (KeyError, ValueError, IndexError, TypeError, AttributeError):
pass
else:
logger.debug(6)
for key in data.keys():
logger.debug(data.keys())
if key == search:
try:
search_ret.append(data[key])
except (KeyError, ValueError, IndexError, TypeError, AttributeError):
def sort_dictionary(dictionary: dict) -> dict:
"""Sorts a dictionary.
Uses OrderedDict to sort a dictionary.
Args:
dictionary -> the dictionary to sort.
Returns:
Returns the dictionary, but sorted.
"""
ret = dict(OrderedDict(sorted(dictionary.items())))
logger.debug(
f"The old dictionary was {dictionary} and I am sorting it to {ret}"
)
return ret
if is_classifier:
L.debug(
'└> Yes, the estimator is inherited from `ClassifierMixin`.'
)
return est
L.debug('└> No, the estimator is not inherited from `ClassifierMixin`.')
# Check RegressorMixin:
L.debug(
'Check whether the estimator is inherited from `RegressorMixin`.'
)
is_regressor = isinstance(est, RegressorMixin)
if is_regressor:
L.debug('└> Yes, the estimator is inherited from `RegressorMixin`.')
return est
L.debug('└> No, the estimator is not inherited from `RegressorMixin`.')
if not (is_classifier or is_regressor):
msg = (
'The passed object of type `{}` is neither '
'a classifier nor a regressor.'
).format(qualname)
L.error(msg)
raise ValueError(msg)
return None
def decrypt(self, text):
logger.debug("Attempting to decrypt binary")
try:
result = self.decode(text)
except ValueError as e:
return {
"lc": self.lc,
"IsPlaintext?": False,
"Plaintext": None,
"Cipher": None,
"Extra Information": None,
}
except TypeError as e:
return {
"lc": self.lc,
"IsPlaintext?": False,
"Plaintext": None,
"Cipher": None,