Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@srpc(Unicode, Integer, _returns=Iterable(Unicode))
def say_hello(name, times):
'''
Docstrings for service methods appear as documentation in the wsdl
<b>what fun</b>
@param name the name to say hello to
@param the number of times to say hello
@return the completed array
'''
for i in range(times):
yield u'Hello, %s' % name
@rpc(Unicode, Integer, _returns=Iterable(Unicode))
def ask_question(ctx, question, answer):
"""Ask Stan a question!
<b>Ask Stan questions as a Service</b>
@param name the name to say hello to
@param times the number of times to say hello
@return the completed array
"""
yield u'To an artificial mind, all reality is virtual. How do they know that the real world isn\'t just another simulation? How do you?'
from spyne.model.complex import Iterable
from spyne.model.primitive import Integer
from spyne.model.primitive import Unicode
class HelloWorldService(Service):
@rpc(Unicode, Integer, _returns=Iterable(Unicode))
def say_hello(ctx, name, times):
for i in range(times):
yield u'Hello, %s' % name
if __name__=='__main__':
application = Application([HelloWorldService], 'spyne.examples.hello.soap',
in_protocol=Soap11(validator='lxml'),
out_protocol=Soap11(pretty_print=True),
)
# disables context markers. set logging level to logging.INFO to enable
# them.
logging.getLogger('spyne.server.null').setLevel(logging.CRITICAL)
print "With serialization"
print "=================="
print
null = NullServer(application, ostr=True)
ret_stream = null.service.say_hello('Dave', 5)
ret_string = ''.join(ret_stream)
print ret_string
print
def run(self):
print("monitor ready :%s" % (self.port))
application = Application([MonitorService], 'monitor',
in_protocol=Soap11(validator='lxml'),
out_protocol=Soap11())
wsgi_application = WsgiApplication(application)
self.server = make_server("", int(self.port), wsgi_application)
self.server.serve_forever()
@rpc(String)
def msg(self, m):
print(m)
@memoize
def get_version(package):
if isinstance(package, (six.text_type, six.binary_type)):
package = __import__(package)
verstr = getattr(package, '__version__')
retval = []
for f in verstr.split("."):
try:
retval.append(int(f))
except ValueError:
retval.append(f)
return tuple(retval)
if pc is not None:
mn = pc.__module__
on = pc.__name__
dn = self.name
# prevent duplicate class name. this happens when the class is a
# direct subclass of ComplexModel
if dn.split('.', 1)[0] != on:
return "{%s}%s.%s" % (mn, on, dn)
return "{%s}%s" % (mn, dn)
sc = self.service_class
if sc is not None:
return '{%s}%s%s' % (sc.get_internal_key(),
six.get_function_name(self.function),
self.internal_key_suffix)
@rpc(NumQueriesType, _returns=Array(World))
def updates(ctx, queries):
"""Test 5: Database Updates"""
if queries is None:
queries = 1
retval = []
q = ctx.udc.session.query(World)
for id in (randint(1, 10000) for _ in xrange(queries)):
world = q.get(id)
world.randomNumber = randint(1, 10000)
retval.append(world)
ctx.udc.session.commit()
return retval
if not (ctx.in_header.user_name, ctx.in_header.session_id) in session_db:
raise AuthenticationError(ctx.in_object.user_name)
UserService.event_manager.add_listener('method_call', _on_method_call)
if __name__ == '__main__':
from spyne.util.wsgi_wrapper import run_twisted
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('spyne.protocol.xml').setLevel(logging.DEBUG)
logging.getLogger('twisted').setLevel(logging.DEBUG)
application = Application([AuthenticationService, UserService],
tns='spyne.examples.authentication',
in_protocol=Soap11(validator='lxml'),
out_protocol=Soap11()
)
twisted_apps = [
(WsgiApplication(application), 'app'),
]
sys.exit(run_twisted(twisted_apps, 8000))
ctx.udc = session_id[0] # user name
UserService.event_manager.add_listener('method_call', _on_method_call)
if __name__=='__main__':
from spyne.util.wsgi_wrapper import run_twisted
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('spyne.protocol.xml').setLevel(logging.DEBUG)
logging.getLogger('twisted').setLevel(logging.DEBUG)
application = Application([UserService],
tns='spyne.examples.authentication',
in_protocol=Soap11(validator='lxml'),
out_protocol=Soap11()
)
twisted_apps = [
(WsgiApplication(application), 'app'),
]
sys.exit(run_twisted(twisted_apps, 8000))