Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import rexviewer as r
import circuits
import naali
import time
import sys
import traceback
user, pwd, server = "Test Bot", "test", "world.realxtend.org:9000"
#user, pwd, server = "d d", "d", "world.evocativi.com:8002"
class TestRunner(circuits.BaseComponent):
def __init__(self, cfsection=None):
self.config = cfsection or dict()
circuits.BaseComponent.__init__(self)
self.testgen = self.run()
@circuits.handler("on_sceneadded")
def sceneadded(self, name):
r.logInfo("base class sceneadded callback")
@circuits.handler("update")
def update(self, deltatime):
prev = None
try:
status = self.testgen.next()
except StopIteration:
class terminated(Event):
"""terminated Event
This Event is sent when a process is completed
:param args: (process)
:type tuple: tuple
"""
def __init__(self, *args):
super(terminated, self).__init__(*args)
class Process(BaseComponent):
channel = "process"
def init(self, args, cwd=None, shell=False):
self.args = args
self.cwd = cwd
self.shell = shell
self.p = None
self.stderr = BytesIO()
self.stdout = BytesIO()
self._status = None
self._terminated = False
self._stdout_closed = False
from stompest.error import StompConnectionError, StompError
from stompest.sync.client import LOG_CATEGORY
except ImportError:
raise ImportError("No stomp support available. Is stompest installed?")
StompSpec.DEFAULT_VERSION = '1.2'
ACK_CLIENT_INDIVIDUAL = StompSpec.ACK_CLIENT_INDIVIDUAL
ACK_AUTO = StompSpec.ACK_AUTO
ACK_CLIENT = StompSpec.ACK_CLIENT
ACK_MODES = (ACK_CLIENT_INDIVIDUAL, ACK_AUTO, ACK_CLIENT)
LOG = logging.getLogger(__name__)
class StompClient(BaseComponent):
""" Send and Receive messages from a STOMP queue """
channel = "stomp"
def init(self, host, port, username=None, password=None,
connect_timeout=3, connected_timeout=3,
version=StompSpec.VERSION_1_2, accept_versions=["1.0", "1.1", "1.2"],
heartbeats=(0, 0), ssl_context=None,
use_ssl=True,
key_file=None,
cert_file=None,
ca_certs=None,
ssl_version=ssl.PROTOCOL_SSLv23,
key_file_password=None,
proxy_host=None,
proxy_port=None,
proxy_user=None,
from creoleparser import create_dialect, creole11_base, Parser
import macros
import sahriswiki
from utils import page_mime
from auth import Permissions
from search import WikiSearch
from dbm import DatabaseManager
from storage import WikiSubdirectoryIndexesStorage as DefaultStorage
from pagetypes import WikiPageText, WikiPageHTML
from pagetypes import WikiPageWiki, WikiPageFile, WikiPageImage
from pagetypes import WikiPageColorText, WikiPageCSV, WikiPageRST
class Environment(BaseComponent):
filename_map = {
"README": (WikiPageText, "text/plain"),
"COPYING": (WikiPageText, "text/plain"),
"CHANGES": (WikiPageText, "text/plain"),
"MANIFEST": (WikiPageText, "text/plain"),
"LICENSE": (WikiPageText, "text/plain"),
"ChangeLog": (WikiPageText, "text/plain"),
"favicon.ico": (WikiPageImage, "image/x-icon"),
}
mime_map = {
"text": WikiPageText,
"application/javascript": WikiPageColorText,
"text/x-python": WikiPageColorText,
"text/css": WikiPageFile,
#!/usr/bin/env python
from circuits import BaseComponent, Debugger, handler
class MyComponent(BaseComponent):
def __init__(self):
super(MyComponent, self).__init__()
Debugger().register(self)
@handler("started", channel="*")
def system_started(self, component):
print "Start event detected"
MyComponent().run()
import importlib
from circuits import handler, BaseComponent, Event
from .sources import new_message
# TODO: make a circuits component context with a worker that relays to the AIwIBoardContext (or other)
_CONTEXTS = ["RPiNWR.Si4707.mock.MockContext"]
try:
from RPiNWR.AIWIBoardContext import AIWIBoardContext
_CONTEXTS.append("RPiNWR.AIWIBoardContext.AIWIBoardContext")
except ImportError:
pass # It's not a valid choice in this environment
_DEFAULT_CONTEXT = _CONTEXTS[-1]
class Radio_Component(BaseComponent):
"""
This component's job is to own the Si4707 object and its context to provide these Circuits framework events:
* new_message for SAME messages
Coming soon:
* radio_status (up,down, frequency, rssi/snr)
This class becomes obsolete with refactoring for:
* All Si4707 functions executed by event (mute, unmute, volume, RSQ check...)
* Context functions by event (write bytes, read bytes, set GPIO (relays))
"""
def __init__(self, args=None):
super().__init__()
self.radio = None
self.context = None
self.ready = False
self.name = name
def __repr__(self):
"x.__repr__() <==> repr(x)"
if len(self.channels) > 1:
channels = repr(self.channels)
elif len(self.channels) == 1:
channels = str(self.channels[0])
else:
channels = ""
return "<%s[%s] (%s)>" % (self.name, channels, self.request_id)
class CustomThreatServiceHelper(BaseComponent):
"""
A helper component, used to dispatch lookup events 'off-thread'.
Normally, when an event handler fires an event, the original
event is notified 'complete' only when all children are done,
this escapes from that process so that lookup queries
are queued while the initial request can return immediately.
"""
channels = (HELPER_CHANNEL,)
def __init__(self, maincomponent):
super(CustomThreatServiceHelper, self).__init__()
self.maincomponent = maincomponent
@handler(channel=HELPER_CHANNEL)
def _lookup(self, event, *args, **kwargs):
"""
from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from circuits import handler, BaseComponent, Event
import schema
metadata = MetaData()
Base = declarative_base(metadata=metadata)
class DatabaseLoaded(Event):
"""Database Loaded Event"""
class DatabaseManager(BaseComponent):
def __init__(self, dburi, echo=False, convert_unicode=True):
super(DatabaseManager, self).__init__()
self.engine = create_engine(dburi,
echo=echo,
convert_unicode=convert_unicode,
)
self.session = scoped_session(
sessionmaker(
bind=self.engine,
autoflush=True,
autocommit=False,
)
)
import importlib
from circuits import handler, BaseComponent, Event
from ...sources import new_message
# TODO: make a circuits component context with a worker that relays to the AIwIBoardContext (or other)
_CONTEXTS = ["RPiNWR.Si4707.mock.MockContext"]
try:
from RPiNWR.AIWIBoardContext import AIWIBoardContext
_CONTEXTS.append("RPiNWR.AIWIBoardContext.AIWIBoardContext")
except ImportError:
pass # It's not a valid choice in this environment
_DEFAULT_CONTEXT = _CONTEXTS[-1]
class Radio_Component(BaseComponent):
"""
This component's job is to own the Si4707 object and its context to provide these Circuits framework events:
* new_message for SAME messages
Coming soon:
* radio_status (up,down, frequency, rssi/snr)
This class becomes obsolete with refactoring for:
* All Si4707 functions executed by event (mute, unmute, volume, RSQ check...)
* Context functions by event (write bytes, read bytes, set GPIO (relays))
"""
def __init__(self, args=None, clock=time.time):
super().__init__()
self.radio = None
self.context = None
self.ready = False
"""VirtualHost
This module implements a virtual host dispatcher that sends requests
for configured virtual hosts to different dispatchers.
"""
from circuits import BaseComponent, handler
from circuits.six.moves.urllib_parse import urljoin
class VirtualHosts(BaseComponent):
"""Forward to anotehr Dispatcher based on the Host header.
This can be useful when running multiple sites within one server.
It allows several domains to point to different parts of a single
website structure. For example:
- http://www.domain.example -> /
- http://www.domain2.example -> /domain2
- http://www.domain2.example:443 -> /secure
:param domains: a dict of {host header value: virtual prefix} pairs.
:type domains: dict
The incoming "Host" request header is looked up in this dict,
and, if a match is found, the corresponding "virtual prefix"
value will be prepended to the URL path before passing the