Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def do_request(name, method, params_string):
params = ovs.json.from_string(params_string)
msg = ovs.jsonrpc.Message.create_request(method, params)
s = msg.is_valid()
if s:
sys.stderr.write("not a valid JSON-RPC request: %s\n" % s)
sys.exit(1)
error, stream = ovs.stream.Stream.open_block(ovs.stream.Stream.open(name))
if error:
sys.stderr.write("could not open \"%s\": %s\n"
% (name, os.strerror(error)))
sys.exit(1)
rpc = ovs.jsonrpc.Connection(stream)
error = rpc.send(msg)
if error:
sys.stderr.write("could not send request: %s\n" % os.strerror(error))
_where_uuid_equals(self._inc_row.uuid)),
"columns": [self._inc_column]})
# Add comment.
if self._comments:
operations.append({"op": "comment",
"comment": "\n".join(self._comments)})
# Dry run?
if self.dry_run:
operations.append({"op": "abort"})
if not any_updates:
self._status = Transaction.UNCHANGED
else:
msg = ovs.jsonrpc.Message.create_request("transact", operations)
self._request_id = msg.id
if not self.idl._session.send(msg):
self.idl._outstanding_txns[self._request_id] = self
self._status = Transaction.INCOMPLETE
else:
self._status = Transaction.TRY_AGAIN
self.__disassemble()
return self._status
def __do_send_lock_request(self, method):
self.__update_has_lock(False)
self._lock_request_id = None
if self._session.is_connected():
msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
msg_id = msg.id
self._session.send(msg)
else:
msg_id = None
return msg_id