Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def insert_prebattle_player(id, name):
if not _prebattle_players.where(id=id):
player = DataObject(id=id, name=name)
_prebattle_players.insert(player)
messages.publish(PrebattlePlayerMessage("added", player))
def set_players_speaking(player_ids, speaking):
for player_id in player_ids:
old_player = _.head(_speaking_players.where(id=player_id))
if speaking and not old_player:
new_player = DataObject(id=player_id)
_speaking_players.insert(new_player)
messages.publish(PlayerSpeakingMessage("added", new_player))
elif not speaking and old_player:
_speaking_players.remove(old_player)
messages.publish(PlayerSpeakingMessage("removed", old_player))
pairings.create_index('player_id')
pairings.create_index('user_unique_id')
# Load old 0.6.x cache file
parser = ConfigParser.ConfigParser()
with open(source_filepath, "rb") as file:
parser.readfp(file)
# Build new cache structures
users.insert_many(DataObject(unique_id=id, name=name) for name, id in parser.items("TeamSpeakUsers"))
players.insert_many(DataObject(id=int(id), name=name) for name, id in parser.items("GamePlayers"))
for user_name, player_names in parser.items("UserPlayerPairings"):
userid = _.head(users.where(name=user_name)).unique_id
for player_name in list(csv.reader([player_names]))[0]:
playerid = _.head(players.where(name=player_name)).id
pairings.insert(DataObject(player_id=int(playerid), user_unique_id=userid))
# Remove users & players which do not exist in pairings
for user in users.clone():
if not pairings.where(user_unique_id=user.unique_id):
users.remove(user)
for player in players.clone():
if not pairings.where(player_id=player.id):
players.remove(player)
# create destination directory if it doesn't exist yet
if not os.path.isdir(dest_dirpath):
os.makedirs(dest_dirpath)
# write out the new cache files
users.json_export(users_filepath)
players.json_export(players_filepath)
def insert_battle_player(id, name):
if not _battle_players.where(id=id):
player = DataObject(id=id, name=name)
_battle_players.insert(player)
messages.publish(BattlePlayerMessage("added", player))
def insert_user(id, name, game_name, unique_id, speaking, is_me, my_channel):
if _users.where(id=id):
return
new_user = DataObject(id=id, name=name, game_name=game_name,
unique_id=unique_id, speaking=speaking, is_me=is_me,
my_channel=my_channel)
_users.insert(new_user)
messages.publish(UserMessage("added", new_user))
users = Table()
users.create_index('unique_id', unique=True)
players = Table()
players.create_index('id', unique=True)
pairings = Table()
pairings.create_index('player_id')
pairings.create_index('user_unique_id')
# Load old 0.6.x cache file
parser = ConfigParser.ConfigParser()
with open(source_filepath, "rb") as file:
parser.readfp(file)
# Build new cache structures
users.insert_many(DataObject(unique_id=id, name=name) for name, id in parser.items("TeamSpeakUsers"))
players.insert_many(DataObject(id=int(id), name=name) for name, id in parser.items("GamePlayers"))
for user_name, player_names in parser.items("UserPlayerPairings"):
userid = _.head(users.where(name=user_name)).unique_id
for player_name in list(csv.reader([player_names]))[0]:
playerid = _.head(players.where(name=player_name)).id
pairings.insert(DataObject(player_id=int(playerid), user_unique_id=userid))
# Remove users & players which do not exist in pairings
for user in users.clone():
if not pairings.where(user_unique_id=user.unique_id):
users.remove(user)
for player in players.clone():
if not pairings.where(player_id=player.id):
players.remove(player)
# create destination directory if it doesn't exist yet
if not os.path.isdir(dest_dirpath):
def update_vehicle(id, player_id, is_alive):
if _vehicles.delete(id=id):
vehicle = DataObject(id=id, player_id=player_id, is_alive=is_alive)
_vehicles.insert(vehicle)
messages.publish(VehicleMessage("modified", vehicle))
def insert_pairing(user_unique_id, player_id):
if not _pairings.where(user_unique_id=user_unique_id, player_id=player_id):
# Insert pairing
pairing = DataObject(user_unique_id=user_unique_id, player_id=player_id)
_pairings.insert(pairing)
# Insert pair's user to cache
user = _.head(_users.where(unique_id=user_unique_id))
if user:
_cached_users.delete(unique_id=user.unique_id)
_cached_users.insert(DataObject(unique_id=user.unique_id, name=user.name))
# Insert pair's player to cache
result = _battle_players.where(id=player_id)
if not result:
result = _prebattle_players.where(id=player_id)
if not result:
result = _me_player.where(id=player_id)
player = _.head(result)
if player:
_cached_players.delete(id=player.id)
_cached_players.insert(DataObject(id=player.id, name=player.name))
# Notify listeners
messages.publish(PairingMessage("added", pairing))
def set_me_player(id, name):
_me_player.remove_many(_me_player)
player = DataObject(id=id, name=name)
_me_player.insert(player)
messages.publish(PlayerMeMessage("added", player))
from collections import Mapping
from littletable import Table, DataObject
import pydash as _
import os
messages = None
logger = logutils.logger.getChild('database')
_me_player = Table('me_player')
_me_player.create_index('id', unique=True)
_battle_players = Table('battle_players')
_battle_players.create_index('id', unique=True)
_vehicles = Table('vehicles')
_vehicles.create_index('id', unique=True)
_vehicles.create_index('player_id', unique=True)
_prebattle_players = Table('prebattle_players')
_prebattle_players.create_index('id', unique=True)
_speaking_players = Table('speaking_players')
_speaking_players.create_index('id', unique=True)
_users = Table('users')
_users.create_index('id', unique=True)
_users.create_index('unique_id')
_cached_users = Table('cached_users')
_cached_users.create_index('unique_id', unique=True)
_cached_players = Table('cached_players')
_cached_players.create_index('id', unique=True)
_pairings = Table('pairings')
_pairings.create_index('player_id')
_pairings.create_index('user_unique_id')