Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def base_api():
api_key = os.getenv("FCM_TEST_API_KEY", None)
assert api_key, "Please set the environment variables for testing according to CONTRIBUTING.rst"
return BaseAPI(api_key=api_key)
def test_init_baseapi():
try:
BaseAPI()
assert False, "Should raise AuthenticationError"
except errors.AuthenticationError:
pass
from .baseapi import BaseAPI
from .errors import InvalidDataError
class FCMNotification(BaseAPI):
def notify_single_device(self,
registration_id=None,
message_body=None,
message_title=None,
message_icon=None,
sound=None,
condition=None,
collapse_key=None,
delay_while_idle=False,
time_to_live=None,
restricted_package_name=None,
low_priority=False,
dry_run=False,
data_message=None,
click_action=None,
badge=None,
from tornado import escape
from tornado import gen
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
from ..baseapi import BaseAPI
from ..errors import AuthenticationError, InternalPackageError, FCMServerError
from ..fcm import FCMNotification
class TornadoBaseAPI(BaseAPI):
def __init__(self, *args, **kwargs):
super(TornadoBaseAPI, self).__init__(*args, **kwargs)
self.http_client = AsyncHTTPClient()
@gen.coroutine
def send_request(self, payloads=None):
for payload in payloads:
request = HTTPRequest(
url=self.FCM_END_POINT,
method='POST',
headers=self.request_headers(),
body=payload
)
response = yield self.http_client.fetch(request)