Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@rpc
def spam(self, ham):
self.publish("message")
self.foo_session.add(FooModel(data=ham))
self.foo_session.commit()
self.foo_session.flush()
return ham + ' & eggs'
@rpc
def echo(self, arg):
return arg # pragma: no cover
@rpc
def proxy(self, method, *args):
""" Proxies RPC calls to ``method`` on itself, so we can test handling
of errors in remote services.
"""
getattr(self.rpcproxy, method)(*args)
@rpc
def spam(self, ham):
return ham
@rpc(sensitive_arguments=sensitive_arguments)
def method(self, a, b):
pass # pragma: no cover
@rpc
def send(self, type, body, address):
logging.info('Get message: %s,%s,%s' % (type, body, address))
output = self.y.sendTextMessage(address, body)
return True
#pprint(self)
@rpc
def status_message(self, owner, repo):
status = self.webservice.repo_status(owner, repo)
outcome = "passing" if status['last_build_result'] else "failing"
return "Project {repo} {outcome} since {timestamp}.".format(
repo=status['slug'],
outcome=outcome,
timestamp=status['last_build_finished_at']
)
@rpc
@statsd.timer('request_reservation')
def __request_reservation(self, payload):
self.logger.info("requesting reservation to accounts service", extra={
"uuid": payload})
res = u"{}".format(payload)
return self.accounts.request_reservation(res)
@rpc
def set_conf(self, target, data):
try:
payload = data
if target == "all":
self.dispatch("all_set_conf", payload)
else:
self.dispatch("{}_set_conf".format(target), payload)
msg = "Set_conf event dispatched to {}".format(target)
self.log.info(msg)
return msg
except Exception as e:
self.log.exception(str(e))
return '500', "Exception: {}".format(e.message)
@rpc
def stop(self, target):
try:
if target == "all":
self.dispatch("all_stop", self.PAYLOAD)
else:
self.dispatch("{}_stop".format(target), self.PAYLOAD)
msg = "Stop event dispatched to {}".format(target)
self.log.info(msg)
return msg
except Exception as e:
self.log.exception(str(e))
return '500', "Exception: {}".format(e.message)