Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run(self):
print("monitor ready :%s" % (self.port))
application = Application([MonitorService], 'monitor',
in_protocol=Soap11(validator='lxml'),
out_protocol=Soap11())
wsgi_application = WsgiApplication(application)
self.server = make_server("", int(self.port), wsgi_application)
self.server.serve_forever()
def initialize(services, tns='spyne.examples.twisted.resource'):
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('spyne.protocol.xml').setLevel(logging.DEBUG)
observer = log.PythonLoggingObserver('twisted')
log.startLoggingWithObserver(observer.emit, setStdout=False)
application = Application(services, 'spyne.examples.twisted.hello',
in_protocol=HttpRpc(), out_protocol=HttpRpc())
return application
if __name__ == '__main__':
application = initialize([SomeService])
wsgi_application = WsgiApplication(application)
resource = WSGIResource(reactor, reactor, wsgi_application)
site = Site(resource)
reactor.listenTCP(PORT, site, interface=HOST)
logging.info('listening on: %s:%d' % (HOST,PORT))
logging.info('wsdl is at: http://%s:%d/?wsdl' % (HOST, PORT))
sys.exit(reactor.run())
speed=100,
owner="Simba"
),
Bike(
size=58,
owner="Nala"
),
]
)
application = Application([SomeService], 'tns',
in_protocol=Soap11(validator='lxml'),
out_protocol=Soap11(polymorphic=True)
)
sys.exit(cherry_graft_and_start(WsgiApplication(application)))
def get_app(engine):
app = EsoapApp([ESoap], 'soap',
in_protocol=Soap11(validator='lxml'),
out_protocol=Soap11())
wsgi = WsgiApplication(app)
return MySession(wsgi, app.pm, engine)
def get_olapy_server(xmla_discover_request_handler, xmla_execute_request_handler):
application = get_spyne_app(
xmla_discover_request_handler, xmla_execute_request_handler
)
wsgi_application = WsgiApplication(application)
return WSGIServer(application=wsgi_application, host=HOST, port=PORT)
except Exception as e:
logging.exception(e)
raise InternalError(e)
if __name__=='__main__':
from wsgiref.simple_server import make_server
application = MyApplication([UserManagerService],
'spyne.examples.user_manager',
in_protocol=Soap11(validator='lxml'),
out_protocol=Soap11(),
)
wsgi_app = WsgiApplication(application)
server = make_server('127.0.0.1', 8000, wsgi_app)
TableModel.Attributes.sqla_metadata.create_all()
logging.info("listening to http://127.0.0.1:8000")
logging.info("wsdl is at: http://localhost:8000/?wsdl")
server.serve_forever()
def main():
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('spyne.protocol.xml').setLevel(logging.DEBUG)
filemgr_app = WsgiApplication(Application([FileServices],
tns='spyne.examples.file_manager',
in_protocol=Soap11(validator='lxml'),
out_protocol=Soap11()
))
try:
os.makedirs('./files')
except OSError:
pass
wsgi_app = DispatcherMiddleware(NotFound(), {'/filemgr': filemgr_app})
return run_simple('localhost', port, wsgi_app, static_files={'/': 'static'},
threaded=True)
def wsgi_soap11_application(services, tns='spyne.simple.soap', validator=None,
name=None):
"""Wraps `services` argument inside a WsgiApplication that uses Soap 1.1 for
both input and output protocols.
"""
from spyne.protocol.soap import Soap11
from spyne.server.wsgi import WsgiApplication
application = Application(services, tns, name=name,
in_protocol=Soap11(validator=validator), out_protocol=Soap11())
return WsgiApplication(application)
# The input protocol is set as HttpRpc to make our service easy to
# call. Input validation via the 'soft' engine is enabled. (which is
# actually the the only validation method for HttpRpc.)
in_protocol=HttpRpc(validator='soft'),
# The ignore_wrappers parameter to JsonDocument simplifies the reponse
# dict by skipping outer response structures that are redundant when
# the client knows what object to expect.
out_protocol=JsonDocument(ignore_wrappers=True),
)
# Now that we have our application, we must wrap it inside a transport.
# In this case, we use Spyne's standard Wsgi wrapper. Spyne supports
# popular Http wrappers like Twisted, Django, Pyramid, etc. as well as
# a ZeroMQ (REQ/REP) wrapper.
wsgi_application = WsgiApplication(application)
# More daemon boilerplate
server = make_server('127.0.0.1', 8000, wsgi_application)
logging.info("listening to http://127.0.0.1:8000")
logging.info("wsdl is at: http://localhost:8000/?wsdl")
server.serve_forever()
# do email sending here
return repr((to, from_, message, 'sent!'))
application = Application([EmailManager], 'spyne.examples.events',
in_protocol=Soap11(), out_protocol=Soap11())
if __name__ == '__main__':
import logging
from wsgiref.simple_server import make_server
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('spyne.protocol.xml').setLevel(logging.DEBUG)
server = make_server('127.0.0.1', 8000, WsgiApplication(application))
logging.info("listening to http://127.0.0.1:8000")
logging.info("wsdl is at: http://localhost:8000/?wsdl")
server.serve_forever()