Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
port: The port you wish to bind to (ie: 44565)
prot: The protocol you wish to operate over, defined by a
:py:class:`py2p.base.Protocol` object
out_addr: Your outward facing address. Only needed if you're
connecting over the internet. If you use '0.0.0.0'
for the addr argument, this will automatically be
set to your LAN address.
debug_level: The verbosity you want this socket to use when
printing event data
Raises:
socket.error: The address you wanted could not be bound, or is
otherwise used
"""
object.__init__(self)
EventEmitter.__init__(self)
self.protocol = prot
self.debug_level = debug_level
self.routing_table = {} # type: Dict[bytes, BaseConnection]
# In format {ID: handler}
self.awaiting_ids = [] # type: List[BaseConnection]
# Connected, but not handshook yet
if out_addr: # Outward facing address, if you're port forwarding
self.out_addr = out_addr
elif addr == '0.0.0.0':
self.out_addr = get_lan_ip(), port
else:
self.out_addr = addr, port
info = (str(self.out_addr).encode(), prot.id.encode(), user_salt)
h = sha384(b''.join(info))
self.id = b58encode_int(int(h.hexdigest(), 16)).encode() # type: bytes
self._logger = getLogger('{}.{}.{}'.format(
def __init__(self, url, debug=False):
self.debug = debug
WebSocketClient.__init__(self, url)
EventEmitter.__init__(self)
def __init__(self):
EventEmitter.__init__(self)
self.channels = []
self.reconnect_count = 0
self.__reset()
def __init__(self, url, auto_reconnect=True, auto_reconnect_timeout=0.5, debug=False):
EventEmitter.__init__(self)
self.collection_data = CollectionData()
self.ddp_client = DDPClient(url, auto_reconnect=auto_reconnect,
auto_reconnect_timeout=auto_reconnect_timeout, debug=debug)
self.ddp_client.on('connected', self.connected)
self.ddp_client.on('socket_closed', self.closed)
self.ddp_client.on('failed', self.failed)
self.ddp_client.on('added', self.added)
self.ddp_client.on('changed', self.changed)
self.ddp_client.on('removed', self.removed)
self.ddp_client.on('reconnected', self._reconnected)
self.connected = False
self.subscriptions = {}
self._login_data = None
self._login_token = None
def __init__(self, id, id_property):
ChangeTracker.__init__(
self,
intrinsic_properties=[id_property],
data={id_property: id, 'meta': ChangeTracker()}
)
EventEmitter.__init__(self)
self._id_property = id_property
self._connection = None
def __init__(self, steamid, key=None, language: str='en', identity_secret: str='', poll_delay: int=30,
login_delay_time: int=0):
"""
:param steamid: stemid64
:param key: steam api key
:param language:
:param identity_secret:
:param poll_delay: how often trades should be polled (too often can cause errors, too infrequent can make your
bot too slow to respond
:param login_delay_time: how long to wait after our session died to retry
"""
EventEmitter.__init__(self)
self.session = aiohttp.ClientSession()
ConfManager.__init__(self, identity_secret, steamid, self.session)
self.steamid = SteamID(steamid)
if not self.steamid.isValid() or not self.steamid.type == SteamID.Type['INDIVIDUAL']:
raise ValueError(f"Steam ID {self.steamid} is not valid, or is not a user ID")
self.key = key
self.language = language
self.poll_delay = poll_delay
self.last_poll = 0
self.logged_in = False
self._trade_cache = {}
self._conf_cache = {}
self.first_run = True
self.login_delay_time = login_delay_time
def __init__(self, url, auto_reconnect=True, auto_reconnect_timeout=0.5, debug=False):
EventEmitter.__init__(self)
self.ddpsocket = None
self._is_closing = False
self._is_reconnecting = False
self.url = url
self.auto_reconnect = auto_reconnect
self.auto_reconnect_timeout = auto_reconnect_timeout
self.debug = debug
self._session = None
self._uniq_id = 0
self._callbacks = {}
self._init_socket()