Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def filter(self, record):
return record.name == 'discord.gateway' or 'Shard ID' in record.msg or 'Websocket closed ' in record.msg
def emit(self, record):
self.cog.add_record(record)
class Commands(db.Table):
id = db.PrimaryKeyColumn()
guild_id = db.Column(db.Integer(big=True), index=True)
channel_id = db.Column(db.Integer(big=True))
author_id = db.Column(db.Integer(big=True), index=True)
used = db.Column(db.Datetime, index=True)
prefix = db.Column(db.String)
command = db.Column(db.String, index=True)
failed = db.Column(db.Boolean, index=True)
_INVITE_REGEX = re.compile(r'(?:https?:\/\/)?discord(?:\.gg|\.com|app\.com\/invite)?\/[A-Za-z0-9]+')
def censor_invite(obj, *, _regex=_INVITE_REGEX):
return _regex.sub('[censored-invite]', str(obj))
def hex_value(arg):
return int(arg, base=16)
def object_at(addr):
for o in gc.get_objects():
if id(o) == addr:
return o
return None
class Stats(commands.Cog):
class GatewayHandler(logging.Handler):
def __init__(self, cog):
self.cog = cog
super().__init__(logging.INFO)
def filter(self, record):
return record.name == 'discord.gateway' or 'Shard ID' in record.msg or 'Websocket closed ' in record.msg
def emit(self, record):
self.cog.add_record(record)
class Commands(db.Table):
id = db.PrimaryKeyColumn()
guild_id = db.Column(db.Integer(big=True), index=True)
channel_id = db.Column(db.Integer(big=True))
author_id = db.Column(db.Integer(big=True), index=True)
used = db.Column(db.Datetime, index=True)
prefix = db.Column(db.String)
command = db.Column(db.String, index=True)
failed = db.Column(db.Boolean, index=True)
_INVITE_REGEX = re.compile(r'(?:https?:\/\/)?discord(?:\.gg|\.com|app\.com\/invite)?\/[A-Za-z0-9]+')
def censor_invite(obj, *, _regex=_INVITE_REGEX):
return _regex.sub('[censored-invite]', str(obj))
def hex_value(arg):
return int(arg, base=16)
def object_at(addr):
for o in gc.get_objects():
locked = db.Column(db.Boolean, default=False)
max_age = db.Column(db.Interval, default="'7 days'::interval", nullable=False)
class StarboardEntry(db.Table, table_name='starboard_entries'):
id = db.PrimaryKeyColumn()
bot_message_id = db.Column(db.Integer(big=True), index=True)
message_id = db.Column(db.Integer(big=True), index=True, unique=True, nullable=False)
channel_id = db.Column(db.Integer(big=True))
author_id = db.Column(db.Integer(big=True))
guild_id = db.Column(db.ForeignKey('starboard', 'id', sql_type=db.Integer(big=True)), index=True, nullable=False)
class Starrers(db.Table):
id = db.PrimaryKeyColumn()
author_id = db.Column(db.Integer(big=True), nullable=False)
entry_id = db.Column(db.ForeignKey('starboard_entries', 'id'), index=True, nullable=False)
@classmethod
def create_table(cls, *, exists_ok=True):
statement = super().create_table(exists_ok=exists_ok)
sql = "CREATE UNIQUE INDEX IF NOT EXISTS starrers_uniq_idx ON starrers (author_id, entry_id);"
return statement + '\n' + sql
class StarboardConfig:
__slots__ = ('bot', 'id', 'channel_id', 'threshold', 'locked', 'needs_migration', 'max_age')
def __init__(self, *, guild_id, bot, record=None):
self.id = guild_id
self.bot = bot
if record:
self.channel_id = record['channel_id']
from urllib.parse import urlparse
import itertools
import traceback
import datetime
import discord
import asyncio
import asyncpg
import enum
import re
from .utils import db, config, time
class Players(db.Table):
id = db.PrimaryKeyColumn()
discord_id = db.Column(db.Integer(big=True), unique=True, index=True)
challonge = db.Column(db.String)
switch = db.Column(db.String, unique=True)
class Teams(db.Table):
id = db.PrimaryKeyColumn()
active = db.Column(db.Boolean, default=True)
challonge = db.Column(db.String, index=True)
logo = db.Column(db.String)
class TeamMembers(db.Table, table_name='team_members'):
id = db.PrimaryKeyColumn()
team_id = db.Column(db.ForeignKey('teams', 'id'), index=True)
player_id = db.Column(db.ForeignKey('players', 'id'), index=True)
captain = db.Column(db.Boolean, default=False)
class ChallongeError(commands.CommandError):
raise StarError('\N{WARNING SIGN} Starboard channel not found.')
return True
return commands.check(predicate)
def MessageID(argument):
try:
return int(argument, base=10)
except ValueError:
raise StarError(f'"{argument}" is not a valid message ID. Use Developer Mode to get the Copy ID option.')
class Starboard(db.Table):
id = db.Column(db.Integer(big=True), primary_key=True)
channel_id = db.Column(db.Integer(big=True))
threshold = db.Column(db.Integer, default=1, nullable=False)
locked = db.Column(db.Boolean, default=False)
max_age = db.Column(db.Interval, default="'7 days'::interval", nullable=False)
class StarboardEntry(db.Table, table_name='starboard_entries'):
id = db.PrimaryKeyColumn()
bot_message_id = db.Column(db.Integer(big=True), index=True)
message_id = db.Column(db.Integer(big=True), index=True, unique=True, nullable=False)
channel_id = db.Column(db.Integer(big=True))
author_id = db.Column(db.Integer(big=True))
guild_id = db.Column(db.ForeignKey('starboard', 'id', sql_type=db.Integer(big=True)), index=True, nullable=False)
class Starrers(db.Table):
id = db.PrimaryKeyColumn()
author_id = db.Column(db.Integer(big=True), nullable=False)
entry_id = db.Column(db.ForeignKey('starboard_entries', 'id'), index=True, nullable=False)
class Starboard(db.Table):
id = db.Column(db.Integer(big=True), primary_key=True)
channel_id = db.Column(db.Integer(big=True))
threshold = db.Column(db.Integer, default=1, nullable=False)
locked = db.Column(db.Boolean, default=False)
max_age = db.Column(db.Interval, default="'7 days'::interval", nullable=False)
class StarboardEntry(db.Table, table_name='starboard_entries'):
id = db.PrimaryKeyColumn()
bot_message_id = db.Column(db.Integer(big=True), index=True)
message_id = db.Column(db.Integer(big=True), index=True, unique=True, nullable=False)
channel_id = db.Column(db.Integer(big=True))
author_id = db.Column(db.Integer(big=True))
guild_id = db.Column(db.ForeignKey('starboard', 'id', sql_type=db.Integer(big=True)), index=True, nullable=False)
class Starrers(db.Table):
id = db.PrimaryKeyColumn()
author_id = db.Column(db.Integer(big=True), nullable=False)
entry_id = db.Column(db.ForeignKey('starboard_entries', 'id'), index=True, nullable=False)
@classmethod
def create_table(cls, *, exists_ok=True):
statement = super().create_table(exists_ok=exists_ok)
sql = "CREATE UNIQUE INDEX IF NOT EXISTS starrers_uniq_idx ON starrers (author_id, entry_id);"
return statement + '\n' + sql
class StarboardConfig:
__slots__ = ('bot', 'id', 'channel_id', 'threshold', 'locked', 'needs_migration', 'max_age')
def __init__(self, *, guild_id, bot, record=None):
from discord.ext import commands
from .utils import db
from .utils.formats import plural
from collections import defaultdict
import discord
import re
class Profiles(db.Table):
# this is the user_id
id = db.Column(db.Integer(big=True), primary_key=True)
nnid = db.Column(db.String)
squad = db.Column(db.String)
# merger from the ?fc stuff
fc_3ds = db.Column(db.String)
fc_switch = db.Column(db.String)
# extra Splatoon data is stored here
extra = db.Column(db.JSON, default="'{}'::jsonb", nullable=False)
class DisambiguateMember(commands.IDConverter):
async def convert(self, ctx, argument):
# check if it's a user ID or mention
match = self._get_id_match(argument) or re.match(r'<@!?([0-9]+)>$', argument)
if match is not None:
# exact matches, like user ID + mention should search
# for every member we can see rather than just this guild.
user_id = int(match.group(1))
result = ctx.bot.get_user(user_id)
if result is None:
def create_table(cls, *, exists_ok=True):
statement = super().create_table(exists_ok=exists_ok)
# create the indexes
sql = "CREATE INDEX IF NOT EXISTS tags_name_trgm_idx ON tags USING GIN (name gin_trgm_ops);\n" \
"CREATE INDEX IF NOT EXISTS tags_name_lower_idx ON tags (LOWER(name));\n" \
"CREATE UNIQUE INDEX IF NOT EXISTS tags_uniq_idx ON tags (LOWER(name), location_id);"
return statement + '\n' + sql
class TagLookup(db.Table, table_name='tag_lookup'):
id = db.PrimaryKeyColumn()
# we will create more indexes manually
name = db.Column(db.String, index=True)
location_id = db.Column(db.Integer(big=True), index=True)
owner_id = db.Column(db.Integer(big=True))
created_at = db.Column(db.Datetime, default="now() at time zone 'utc'")
tag_id = db.Column(db.ForeignKey('tags', 'id'))
@classmethod
def create_table(cls, *, exists_ok=True):
statement = super().create_table(exists_ok=exists_ok)
# create the indexes
sql = "CREATE INDEX IF NOT EXISTS tag_lookup_name_trgm_idx ON tag_lookup USING GIN (name gin_trgm_ops);\n" \
"CREATE INDEX IF NOT EXISTS tag_lookup_name_lower_idx ON tag_lookup (LOWER(name));\n" \
"CREATE UNIQUE INDEX IF NOT EXISTS tag_lookup_uniq_idx ON tag_lookup (LOWER(name), location_id);"
return statement + '\n' + sql
class Players(db.Table):
id = db.PrimaryKeyColumn()
discord_id = db.Column(db.Integer(big=True), unique=True, index=True)
challonge = db.Column(db.String)
switch = db.Column(db.String, unique=True)
class Teams(db.Table):
id = db.PrimaryKeyColumn()
active = db.Column(db.Boolean, default=True)
challonge = db.Column(db.String, index=True)
logo = db.Column(db.String)
class TeamMembers(db.Table, table_name='team_members'):
id = db.PrimaryKeyColumn()
team_id = db.Column(db.ForeignKey('teams', 'id'), index=True)
player_id = db.Column(db.ForeignKey('players', 'id'), index=True)
captain = db.Column(db.Boolean, default=False)
class ChallongeError(commands.CommandError):
pass
class TournamentState(enum.IntEnum):
invalid = 0
pending = 1
checking_in = 2
checked_in = 3
underway = 4
complete = 5
class PromptResult(enum.Enum):
timeout = 0