Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Test module for NodeHistory.py
Created on 28 June 2016
@author: dgrossman, lanhamt
"""
import logging
import falcon
import pytest
from poseidon.poseidonMonitor.NodeHistory.NodeHistory import Handle_Default
from poseidon.poseidonMonitor.NodeHistory.NodeHistory import NodeHistory
from poseidon.poseidonMonitor.NodeHistory.NodeHistory import nodehistory_interface
module_logger = logging.getLogger(__name__)
application = falcon.API()
application.add_route('/v1/history/{resource}',
nodehistory_interface.get_endpoint('Handle_Default'))
def test_node_hist_class():
''' test instantiate of NodeHistory '''
nh = NodeHistory()
nh.add_endpoint('Handle_Default', Handle_Default)
nh.configure()
nh.configure_endpoints()
def test_handle_default_class():
''' test handle_Default '''
hd = Handle_Default()
hd.owner = Handle_Default()
def create_app():
''' Create a WSGI compatible App object. '''
validate_config()
falcon.responders.create_method_not_allowed = create_method_not_allowed
coloredlogs.install(fmt='[%(asctime)-15s] %(name)s %(message)s')
if config.config['debug']:
logging.getLogger('szurubooru').setLevel(logging.INFO)
if config.config['show_sql']:
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
app = falcon.API(
request_type=api.Request,
middleware=[
middleware.RequireJson(),
middleware.CachePurger(),
middleware.ContextAdapter(),
middleware.DbSession(),
middleware.Authenticator(),
middleware.RequestLogger(),
])
app.add_error_handler(errors.AuthError, _on_auth_error)
app.add_error_handler(errors.IntegrityError, _on_integrity_error)
app.add_error_handler(errors.ValidationError, _on_validation_error)
app.add_error_handler(errors.SearchError, _on_search_error)
app.add_error_handler(errors.NotFoundError, _on_not_found_error)
app.add_error_handler(errors.ProcessingError, _on_processing_error)
class EmptyResponse:
def on_get(self, req, resp):
resp.data = bytes("", "utf8")
def on_post(self, req, resp):
resp.data = bytes("", "utf8")
class StringResponse:
def on_get(self, req, resp, id):
resp.data = bytes(id, "utf8")
app = falcon.API()
app.add_route("/", EmptyResponse())
app.add_route("/user", EmptyResponse())
app.add_route("/user/{id}", StringResponse())
def api_versions(conf=None):
"""API version negotiation app
:return: Falcon WSGI app
"""
middleware_list = list()
middleware_list.append(middleware.RequireJSON())
middleware_list.append(middleware.JSONTranslator())
app = falcon.API(middleware=middleware_list)
app.add_route('/', Resource())
return app
catalog.extend([
('/v1', v1_0.private_endpoints(self, self._conf)),
('/v1.1', v1_1.private_endpoints(self, self._conf)),
('/v2', v2_0.private_endpoints(self, self._conf)),
])
# NOTE(wanghao): Since hook feature has removed after 1.0.0, using
# middleware instead of it, but for the compatibility with old version,
# we support them both now. Hook way can be removed after falcon
# version must be bigger than 1.0.0 in requirements.
if (d_version.LooseVersion(falcon.__version__) >=
d_version.LooseVersion("1.0.0")):
middleware = [FuncMiddleware(hook) for hook in self.before_hooks]
self.app = falcon.API(middleware=middleware)
else:
self.app = falcon.API(before=self.before_hooks)
# Set options to keep behavior compatible to pre-2.0.0 falcon
self.app.req_options.auto_parse_qs_csv = True
self.app.req_options.keep_blank_qs_values = False
self.app.add_error_handler(Exception, self._error_handler)
for version_path, endpoints in catalog:
if endpoints:
for route, resource in endpoints:
self.app.add_route(version_path + route, resource)
def get_app():
session_provider = SessionProvider()
people_repository = PostgresPeopleRepository(session_provider=session_provider)
people_application = PeopleApplication(people_repository=people_repository)
person_list_resource = PersonListResource(people_application=people_application)
person_resource = PersonResource(people_application=people_application)
_app = falcon.API()
_app.add_route('/people', person_list_resource)
_app.add_route("/people/{identifier}", person_resource)
return _app
import models
import middleware
import route_stories
import route_people
import route_settings
models.connect()
#Initialize settings
settings.AppSettings()
api = falcon.API(middleware=middleware.components)
api.add_route('/stories', route_stories.StoriesResource())
api.add_route('/stories/{story_id}', route_stories.StoryResource())
api.add_route('/stories/{story_id}/people', route_stories.StoryPeopleResource())
api.add_route('/people', route_people.PeopleResource())
api.add_route('/people/{person_id}', route_people.PersonResource())
api.add_route('/settings', route_settings.SettingsResource())
:param authentication_module_name: Full name of the authentication module.
:type authentication_module_name: str
:param authentication_kwargs: Keyword arguments to pass to the auth mod.
:type authentication_kwargs: dict
:returns: The commissaire application.
:rtype: falcon.API
"""
try:
module = importlib.import_module(authentication_module_name)
authentication_class = getattr(module, 'AuthenticationPlugin')
authentication = authentication_class(**authentication_kwargs)
except ImportError:
raise Exception('Can not import {0} for authentication'.format(
authentication_module_name))
app = falcon.API(middleware=[authentication, JSONify()])
app.add_route('/api/v0/status', StatusResource())
app.add_route('/api/v0/cluster/{name}', ClusterResource())
app.add_route(
'/api/v0/cluster/{name}/hosts',
ClusterHostsResource())
app.add_route(
'/api/v0/cluster/{name}/hosts/{address}',
ClusterSingleHostResource())
app.add_route(
'/api/v0/cluster/{name}/deploy',
ClusterDeployResource())
app.add_route(
'/api/v0/cluster/{name}/restart',
ClusterRestartResource())
app.add_route(
from sentinel.server import GenerateOVPN
from sentinel.server import Token
from sentinel.utils import JSONTranslator
class Up(object):
def on_post(self, req, resp):
resp.status = falcon.HTTP_200
resp.body = json.dumps({'status': 'UP'})
def on_get(self, req, resp):
resp.status = falcon.HTTP_200
resp.body = json.dumps({'status': 'UP'})
server = falcon.API(middleware=[JSONTranslator()])
server.add_route('/', Up())
server.add_route('/token', Token())
server.add_route('/ovpn', GenerateOVPN())
server.add_route('/disconnect', Disconnect())
def create_api():
api = falcon.API()
api.add_route('/', ApiRoot())
return api