Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, uri, decoder_pipeline, post_processor, full_post_processor=None):
self.coachtranscript = empty_response
self.uri = uri
self.decoder_pipeline = decoder_pipeline
self.post_processor = post_processor
self.full_post_processor = full_post_processor
WebSocketClient.__init__(self, url=uri, heartbeat_freq=10)
self.pipeline_initialized = False
self.partial_transcript = ""
if USE_NNET2:
self.decoder_pipeline.set_result_handler(self._on_result)
self.decoder_pipeline.set_full_result_handler(self._on_full_result)
self.decoder_pipeline.set_error_handler(self._on_error)
else:
self.decoder_pipeline.set_word_handler(self._on_word)
self.decoder_pipeline.set_error_handler(self._on_error)
self.decoder_pipeline.set_eos_handler(self._on_eos)
self.state = self.STATE_CREATED
self.last_decoder_message = time.time()
self.request_id = ""
self.timeout_decoder = 5
self.num_segments = 0
self.last_partial_result = ""
def __init__(self, url, on_open=None, on_close=None, on_message=None, on_data=None,
on_error=None, **kwargs):
self.callbacks = dict(on_open=on_open, on_close=on_close,
on_message=on_message, on_data=on_data, on_error=on_error)
WS4PyWebSocketClient.__init__(self, url, **kwargs)
----------
host : str
Firefly host.
channel : str
WebSocket channel id.
"""
if host.startswith('http://'):
host = host[7:]
self.this_host = host
url = 'ws://%s/firefly/sticky/firefly/events' % host # web socket url
if channel:
url += '?channelID=%s' % channel
WebSocketClient.__init__(self, url)
self.url_root = 'http://' + host + self._fftools_cmd
self.url_bw = 'http://' + self.this_host + '/firefly/firefly.html;wsch='
self.listeners = {}
self.channel = channel
self.session = requests.Session()
# print 'websocket url:%s' % url
self.connect()
def __init__(self, url, debug=False):
self.debug = debug
WebSocketClient.__init__(self, url)
EventEmitter.__init__(self)
def __init__(self, scheme, protocols, token):
WebSocketClient.__init__(self, scheme, protocols)
self._token = token
def __init__(self, url, **kwargs):
ws_url = self.get_connection_url(url)
WebSocketClient.__init__(self, ws_url)
self.channel = kwargs.get('channel')
self.main_class = kwargs.get('main_class')
self.exited = False
self.ws_queue = Queue.Queue()
self.message_handler = HitboxMessageHandler(
queue=self.ws_queue,
message_queue=self.main_class.queue,
channel=self.channel,
main_class=self.main_class,
smiles=kwargs.get('smiles')
)
self.message_handler.start()
self._viewers_th = HitboxViewersWS(
def __init__(self, ip, port=9090):
"""Constructor for ROSBridgeClient.
Args:
ip (str): The robot IP address.
port (int, optional): The WebSocket port number for rosbridge.
Defaults to 9090.
"""
WebSocketClient.__init__(self, 'ws://{}:{}'.format(ip, port))
self._connected = False
self._id_counter = 0
self._publishers = {}
self._subscribers = {}
self._service_clients = {}
self._service_servers = {}
self._action_clients = {}
self.connect()
threading.Thread(target=self.run_forever).start()
while not self._connected:
time.sleep(0.1)
def __init__(self, uri, decoder_pipeline, post_processor, full_post_processor=None):
self.uri = uri
self.decoder_pipeline = decoder_pipeline
self.post_processor = post_processor
self.full_post_processor = full_post_processor
WebSocketClient.__init__(self, url=uri, heartbeat_freq=10)
self.pipeline_initialized = False
self.partial_transcript = ""
if USE_NNET2:
self.decoder_pipeline.set_result_handler(self._on_result)
self.decoder_pipeline.set_full_result_handler(self._on_full_result)
self.decoder_pipeline.set_error_handler(self._on_error)
else:
self.decoder_pipeline.set_word_handler(self._on_word)
self.decoder_pipeline.set_error_handler(self._on_error)
self.decoder_pipeline.set_eos_handler(self._on_eos)
self.state = self.STATE_CREATED
self.last_decoder_message = time.time()
self.request_id = ""
self.timeout_decoder = 5
self.num_segments = 0
self.last_partial_result = ""