Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import tomodachi
from tomodachi.transport.aws_sns_sqs import aws_sns_sqs
@tomodachi.service
class MockDecoratorService(tomodachi.Service):
name = 'mock_decorator'
log_level = 'INFO'
function_tested = False
@aws_sns_sqs('test-topic')
async def test(self, default_value: bool = True) -> None:
self.function_tested = default_value
@classmethod
async def build_message(cls, service: Any, topic: str, data: Any) -> str:
message = {
'protocol': 'custom',
'data': data
}
return json.dumps(message)
@classmethod
async def parse_message(cls, payload: str) -> Union[Dict, Tuple]:
message = json.loads(payload)
return message, None, None
@tomodachi.service
class AWSSNSSQSService(tomodachi.Service):
name = 'test_amqp'
log_level = 'INFO'
options = {
'amqp': {
'login': 'guest',
'password': 'guest'
}
}
closer = asyncio.Future() # type: Any
test_topic_data_received = False
test_topic_data = None
data_uuid = data_uuid
def check_closer(self):
if self.test_topic_data_received:
if not self.closer.done():
import asyncio
import os
import signal
from typing import Any # noqa
from aiohttp import web
import tomodachi
from tomodachi.transport.http import http
@tomodachi.service
class HttpService(tomodachi.Service):
name = 'test_http'
options = {
'http': {
'port': 53251,
'access_log': True,
'real_ip_from': '127.0.0.1'
}
}
uuid = None
closer = asyncio.Future() # type: Any
function_order = []
@http('GET', r'/?')
async def index(self, request: web.Request) -> str:
self.function_order.append('index')
return 'response'
import asyncio
import os
import signal
from typing import Any # noqa
import tomodachi
from tomodachi.transport.schedule import heartbeat, schedule
@tomodachi.service
class SchedulerService(tomodachi.Service):
name = 'test_schedule'
uuid = None
closer = asyncio.Future() # type: Any
seconds_triggered = 0
third_seconds_triggered = 0
@heartbeat
async def every_second(self) -> None:
self.seconds_triggered += 1
@schedule(interval='3 seconds')
async def every_third_second(self) -> None:
self.third_seconds_triggered += 1
@schedule(interval='*/2 * * * *')
async def every_second_minute(self) -> None:
import tomodachi
@tomodachi.service
class ExceptionService(tomodachi.Service):
name = 'test_exception'
log_level = 'DEBUG'
def __init__(self) -> None:
raise Exception("fail in __init__()")
async def _start_service(self) -> None:
raise Exception("fail in _start_service()")
import asyncio
import os
import signal
import tomodachi
from tomodachi.discovery.dummy_registry import DummyRegistry
from tomodachi.protocol.json_base import JsonBase
@tomodachi.service
class AutoClosingService(tomodachi.Service):
name = 'test_auto_closing'
discovery = [DummyRegistry]
message_protocol = JsonBase
start = False
started = False
stop = False
async def _start_service(self) -> None:
self.start = True
async def _started_service(self) -> None:
self.started = True
await asyncio.sleep(0.1)
os.kill(os.getpid(), signal.SIGTERM)
self.invocation_count += 1
@tomodachi.decorator
def count_invocations_4(self: Any, *args: Any, **kwargs: Any) -> None:
self.invocation_count += 1
@tomodachi.decorator
def count_invocations_0(self: Any, *args: Any, **kwargs: Any) -> str:
self.invocation_count += 1
return '0'
@tomodachi.service
class HttpService(tomodachi.Service):
name = 'test_http'
options = {
'http': {
'port': None,
'access_log': True,
'real_ip_from': '127.0.0.1'
}
}
invocation_count = 0
uuid = None
closer = asyncio.Future() # type: Any
@http('GET', r'/count/1/?')
@count_invocations_1
async def count_1(self, request: web.Request) -> str:
return str(self.invocation_count)
import asyncio
import os
import pathlib
import uuid
from typing import Callable, Tuple, Union
from aiohttp import web
from aiohttp.web_fileresponse import FileResponse
import tomodachi
from tomodachi import http, http_error, http_static, websocket
@tomodachi.service
class ExampleWebsocketService(tomodachi.Service):
name = 'example_websocket_service'
log_level = 'DEBUG'
uuid = os.environ.get('SERVICE_UUID')
# Some options can be specified to define credentials, used ports, hostnames, access log, etc.
options = {
'http': {
'port': 4711,
'content_type': 'text/plain',
'charset': 'utf-8',
'access_log': True
}
}
@http('GET', r'/(?:|index.html)')
async def index(self, request: web.Request) -> web.Response:
import os
from typing import Any, Dict
import tomodachi
from tomodachi import amqp, amqp_publish
from tomodachi.discovery import DummyRegistry
from tomodachi.protocol import JsonBase
@tomodachi.service
class ExampleAmqpService(tomodachi.Service):
name = 'example_amqp_service'
log_level = 'INFO'
uuid = os.environ.get('SERVICE_UUID')
# Build own "discovery" functions, to be run on start and stop
# See tomodachi/discovery/dummy_registry.py for example
discovery = [DummyRegistry]
# The message protocol class defines how a message should be processed when sent and received
# See tomodachi/protocol/json_base.py for a basic example using JSON and transferring some metadata
message_protocol = JsonBase
# Some options can be specified to define credentials, used ports, hostnames, access log, etc.
options = {
'amqp': {
'queue_ttl': 60
# Functionality before function is called
service.log('middleware before')
return_value = await func(*args, **kwargs)
# There's also the possibility to pass in extra arguments or keywords arguments, for example:
# return_value = await func(*args, id='overridden', **kwargs)
# Functinoality after function is called
service.log('middleware after')
return return_value
@tomodachi.service
class ExampleAmqpService(tomodachi.Service):
name = 'example_amqp_service'
log_level = 'INFO'
uuid = os.environ.get('SERVICE_UUID')
# Build own "discovery" functions, to be run on start and stop
# See tomodachi/discovery/dummy_registry.py for example
discovery = [DummyRegistry]
# The message protocol class defines how a message should be processed when sent and received
# See tomodachi/protocol/json_base.py for a basic example using JSON and transferring some metadata
message_protocol = JsonBase
# Adds a middleware function that is run on every incoming message.
# Several middlewares can be chained.
message_middleware = [middleware_function]