Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# -*- coding: utf-8 -*-
from irc3.testing import BotTestCase
from irc3.testing import MagicMock
from irc3.testing import patch
from collections import defaultdict
from panoramisk import Message
class Event(Message):
def __init__(self, event, **kwargs):
self.name = event
self.headers = dict(event=event, **kwargs)
def __getitem__(self, name):
return self.headers[name]
def update(self, **kwargs):
self.headers.update(kwargs)
self.name = self.headers['event']
class TestAsterirc(BotTestCase):
config = dict(
def setUp(self):
self.patch_asyncio()
patcher = patch('panoramisk.Manager.send_action_via_http')
self.send_action = patcher.start()
self.response = Message(
'response', '', headers={'Response': 'Success'})
self.send_action.return_value = self.response
self.addCleanup(patcher.stop)
self.mocks = {}
for name in ('connect', 'close'):
patcher = patch('panoramisk.Manager.' + name)
self.mocks[name] = patcher.start()
self.addCleanup(patcher.stop)
def test_message_from_message():
message = Message({"Uniqueid": "1444884710.0"})
wrapper = MessageWrapper(message)
assert wrapper.uniqueid == "1444884710.0"
assert wrapper.keyid == "1444884710"
Privilege: dialplan,all
Channel: SIP/test
ChannelState: 4
ChannelStateDesc: Ring
CallerIDNum: 0055551234
CallerIDName: 0055551234
ConnectedLineNum:
ConnectedLineName:
Language: en
Context: income-call
Exten: test
Priority: 4
Uniqueid: 1444884710.0
Variable: CALL_ID
Value: 5aeb879d-753e-48f2-92da-22ae0653cee8
""".replace("\n", utils.EOL)
def test_message_from_message():
message = Message({"Uniqueid": "1444884710.0"})
wrapper = MessageWrapper(message)
assert wrapper.uniqueid == "1444884710.0"
assert wrapper.keyid == "1444884710"
def test_message_from_text():
wrapper = MessageWrapper(MESSAGE)
assert wrapper.uniqueid == "1444884710.0"
assert "language" in wrapper
assert wrapper.language == "en"
assert wrapper["language"] == "en"
def send_action(self, *args, **kwargs):
try:
res = self.manager.send_action(*args, **kwargs)
return res
except Exception as e:
self.log.error('send_action(%r, **%r)', args, kwargs)
self.log.exception(e)
return Message('response',
'Sorry an error occured. ({})'.format(repr(e)),
headers={'Response': 'Failed'})
def __init__(self, options, filters, push_configs):
self.loop = asyncio.get_event_loop()
max_queues = options.pop("max_size", DEFAULT_MAX_QUEUES)
max_queue_size = options.pop("max_queue_size", DEFAULT_MAX_QUEUE_SIZE)
self.controller = Controller(self.loop, max_queues, max_queue_size)
self.controller.load_configs(filters, push_configs)
options.pop("loop", None) # discard invalid argument
self.manager = Manager(loop=self.loop, **options)
self.manager.log.addHandler(logging.NullHandler())
self.manager.register_event("*", self.handle_events)
self.config = config = dict(
host='127.0.0.1',
port=5038,
http_port='8088',
protocol='http',
debug=True,
)
config.update(bot.config.get('asterisk', {}))
self.log = logging.getLogger('irc3.ast')
self.ilog = logging.getLogger('irc.asterirc')
self.ilog.set_irc_targets(bot, self.config['channel'])
self.log.info('Channel is set to {channel}'.format(**config))
self.rooms = defaultdict(dict)
self.http = None
self.resolver = irc3.utils.maybedotted(self.config['resolver'])
self.manager = Manager(log=logging.getLogger('irc3.ast.manager'),
**config)
self.manager.register_event('Shutdown', self.handle_shutdown)
self.manager.register_event('Meet*', self.handle_meetme)
if config.get('debug'):
self.manager.register_event('*', self.handle_event)
if isinstance(self.resolver, type):
self.resolver = self.resolver(bot)
maxNumTries = int(config['attacker']['maxNumTries'])
maxNumInvitesWithoutAuth = int(config['attacker']['maxNumInvites'])
BLExpireTime = int(config['attacker']['BLExpireTime'])
WLExpireTime = int(config['attacker']['WLExpireTime'])
TLExpireTime = int(config['attacker']['TLExpireTime'])
iptablesChain = config['attacker']['iptablesChain']
else:
maxNumTries = 5
BLExpireTime = 86400
WLExpireTime = 21600
TLExpireTime = 3600
iptablesChain = "INPUT"
# We connect into a Asterisk Manager (Asterisk 11 or newer with Security permissions to read)
manager = Manager(loop=asyncio.get_event_loop(), host=managerHost, port=managerPort, username=managerUser, secret=managerPass)
# Set the logging system to dump everything we do
logging.basicConfig(filename=logFile,level=logging.DEBUG,format='%(asctime)s %(levelname)s: %(message)s')
Log = logging.getLogger()
level = logging.getLevelName(logLevel)
Log.setLevel(level)
logging.debug("Configured Blacklist expire time: "+str(BLExpireTime))
logging.debug("Configured Whitelist expire time: "+str(WLExpireTime))
logging.debug("Configured Templist expire time: "+str(TLExpireTime))
# We set the lists where we storage the addresses.
templist={} # Suspected addresses
whitelist={} # Trusted addresses
blacklist={} # Attackers addresses
invitelist={} # Invite control
def __init__(self, message):
if isinstance(message, str):
message = Message.from_line(message)
self.message = message