Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def teardown_class(cls):
pmxbot.storage.SelectableStorage.finalize()
path = os.path.dirname(os.path.abspath(__file__))
os.remove(os.path.join(path, 'pmxbot.sqlite'))
import random
import operator
from . import storage
from .core import command
class Quotes(storage.SelectableStorage):
lib = 'pmx'
@classmethod
def initialize(cls):
cls.store = cls.from_URI()
cls._finalizers.append(cls.finalize)
@classmethod
def finalize(cls):
del cls.store
@staticmethod
def split_num(lookup):
prefix, sep, num = lookup.rpartition(' ')
if not prefix or not num.isdigit():
return lookup, 0
import random
import itertools
import abc
import contextlib
from more_itertools.recipes import pairwise, consume
import jaraco.collections
from jaraco.mongodb import fields
import pmxbot.core
import pmxbot.storage
class Chains(
pmxbot.storage.SelectableStorage,
metaclass=abc.ABCMeta):
term = '\n'
@classmethod
def initialize(cls):
cls.store = cls.from_URI()
cls._finalizers.append(cls.finalize)
@classmethod
def finalize(cls):
del cls.store
def feed(self, message):
message = message.rstrip()
words = message.split(' ') + [self.term]
self.save(words)
def __init__(self, db_uri=None):
super().__init__()
db_uri = db_uri or pmxbot.config.database
self.store = FeedparserDB.from_URI(db_uri)
timer = timing.Stopwatch()
self.update(self.store.get_seen_feeds())
log.info("Loaded feed history in %s", timer.split())
storage.SelectableStorage._finalizers.append(self.__finalize)
import time
from . import storage
from .core import command, on_join
class Notify(storage.SelectableStorage):
@classmethod
def init(cls):
cls.store = cls.from_URI()
cls._finalizers.append(cls.finalize)
@classmethod
def finalize(cls):
del cls.store
class SQLiteNotify(Notify, storage.SQLiteStorage):
def init_tables(self):
CREATE_NOTIFY_TABLE = '''
CREATE TABLE
IF NOT EXISTS notify
(
txt = 'News from %s %s : %s' % (feed['name'], feed['linkurl'], ' || '.join(outputs[:10]))
yield core.NoLog
yield txt
@staticmethod
def format_entry(entry):
"""
Format the entry suitable for output (add the author if suitable).
"""
needs_author = ' by ' not in entry['title'] and 'author' in entry
template = '{title} by {author}' if needs_author else '{title}'
return template.format(**entry)
class FeedparserDB(storage.SelectableStorage):
pass
class SQLiteFeedparserDB(FeedparserDB, storage.SQLiteStorage):
def init_tables(self):
self.db.execute("CREATE TABLE IF NOT EXISTS feed_seen (key varchar)")
self.db.execute('CREATE INDEX IF NOT EXISTS ix_feed_seen_key ON feed_seen (key)')
self.db.commit()
def get_seen_feeds(self):
return [row[0] for row in self.db.execute('select key from feed_seen')]
def add_entries(self, entries):
self.db.executemany('INSERT INTO feed_seen (key) values (?)', [(x,) for x in entries])
self.db.commit()
import abc
import json
import pmxbot
from pmxbot import storage
class ChannelList(storage.SelectableStorage, list):
"""
Channel list, persisted to a storage layer.
"""
_finalizers = []
@classmethod
def initialize(cls):
for aspect in 'member', 'log':
cls._initialize_aspect(aspect)
@classmethod
def _initialize_aspect(cls, aspect):
key = aspect + ' channels'
store = cls.from_URI(pmxbot.config.database)
store.aspect = aspect
store.load()
from jaraco.context import ExceptionTrap
from more_itertools import chunked
import pmxbot
from . import storage
from . import core
from pmxbot.core import command, NoLog
first = operator.itemgetter(0)
_log = logging.getLogger(__name__)
class Logger(storage.SelectableStorage):
"Base Logger class"
@classmethod
def initialize(cls):
cls.store = cls.from_URI()
tmpl = "Logging with {cls.store.__class__.__name__}"
_log.info(tmpl.format_map(locals()))
cls._finalizers.append(cls.finalize)
@classmethod
def finalize(cls):
del cls.store
def message(self, channel, nick, msg):
channel = channel.replace('#', '').lower()
self._message(channel, nick, msg)
import re
import random
from . import storage
from .core import command
class SameName(ValueError):
pass
class AlreadyLinked(ValueError):
pass
class Karma(storage.SelectableStorage):
@classmethod
def initialize(cls):
cls.store = cls.from_URI()
cls._finalizers.append(cls.finalize)
@classmethod
def finalize(cls):
del cls.store
def link(self, thing1, thing2):
"""
Link thing1 and thing2, adding the karma of each into
a single entry.
If any thing does not exist, it is created.
"""
thing1 = thing1.strip().lower()
"""
Support for rolls, quit and join events (i.e. Roll Call).
"""
import datetime
import pmxbot
from . import storage
from . import logging
from pmxbot.core import on_join, on_leave
class ParticipantLogger(storage.SelectableStorage):
"Base class for logging participants"
@classmethod
def initialize(cls):
cls.store = cls.from_URI()
cls._finalizers.append(cls.finalize)
@classmethod
def finalize(cls):
del cls.store
def list_channels(self):
return self._list_channels()
def log_join(self, nick, channel):
self.log(nick, channel, 'join')