Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def expire_all_unassigned_hits(self):
"""Move through the whole hit_id list and attempt to expire the
HITs, though this only immediately expires those that aren't assigned.
"""
# TODO note and mark assigned hits as ones to be expired later.
# this will improve the shutdown experience
shared_utils.print_and_log(
logging.INFO,
'Expiring all unassigned HITs...',
should_print=not self.is_test,
)
completed_ids = self.worker_manager.get_complete_hits()
for hit_id in self.hit_id_list:
if hit_id not in completed_ids:
# TODO get confirmation that the HIT is acutally expired
mturk_utils.expire_hit(self.is_sandbox, hit_id)
def reject_work(self, assignment_id, reason):
"""reject work for a given assignment through the mturk client"""
client = mturk_utils.get_mturk_client(self.is_sandbox)
client.reject_assignment(AssignmentId=assignment_id, RequesterFeedback=reason)
if self.db_logger is not None:
self.db_logger.log_reject_assignment(assignment_id)
shared_utils.print_and_log(
logging.INFO,
'Assignment {} rejected for reason {}.' ''.format(assignment_id, reason),
)
if self.hit_is_returned or self.disconnected:
self.m_free_workers([self])
return False
if timeout:
current_time = time.time()
if (current_time - start_time) > timeout:
shared_utils.print_and_log(
logging.INFO,
"Timeout waiting for ({})_({}) to complete {}.".format(
self.worker_id, self.assignment_id, self.conversation_id
),
)
self.set_hit_is_abandoned()
self.m_free_workers([self])
return False
shared_utils.print_and_log(
logging.DEBUG,
'Waiting for ({})_({}) to complete {}...'.format(
self.worker_id, self.assignment_id, self.conversation_id
),
)
self.wait_completion_timeout(wait_periods)
shared_utils.print_and_log(
logging.INFO,
'Conversation ID: {}, Agent ID: {} - HIT is done.'.format(
self.conversation_id, self.id
),
)
self.m_free_workers([self])
return True
def _print_not_available_for(self, item):
shared_utils.print_and_log(
logging.WARN,
'Conversation ID: {}, Agent ID: {} - HIT '
'is abandoned and thus not available for '
'{}.'.format(self.conversation_id, self.id, item),
should_print=True,
)
def _add_agent_to_pool(self, agent):
"""Add a single agent to the pool"""
if agent not in self.agent_pool:
# Add the agent to pool
with self.agent_pool_change_condition:
if agent not in self.agent_pool:
shared_utils.print_and_log(
logging.DEBUG,
"Adding worker {} to pool.".format(agent.worker_id),
)
self.agent_pool.append(agent)
def _upload_worker_data(self):
"""Uploads worker data acceptance and completion rates to the parlai
server
"""
worker_data = self.worker_manager.get_worker_data_package()
data = {'worker_data': worker_data}
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
try:
requests.post(PARLAI_MTURK_UPLOAD_URL, json=data, headers=headers)
except Exception:
shared_utils.print_and_log(
logging.WARNING,
'Unable to log worker statistics to parl.ai',
should_print=True,
)
def close_channel(self, connection_id):
"""
Closes a channel by connection_id.
"""
shared_utils.print_and_log(
logging.DEBUG, 'Closing channel {}'.format(connection_id)
)
if connection_id in self.open_channels:
self.open_channels.remove(connection_id)
with self.packet_map_lock:
packet_ids = list(self.packet_map.keys())
# Clear packets associated with this sender
for packet_id in packet_ids:
packet = self.packet_map[packet_id]
packet_conn_id = packet.get_receiver_connection_id()
if connection_id == packet_conn_id:
del self.packet_map[packet_id]
def _reaper_thread(*args):
start_time = time.time()
wait_time = self.DEF_MISSED_PONGS * self.PING_RATE
while time.time() - start_time < wait_time:
if self.is_shutdown:
return
if self.alive:
return
time.sleep(0.3)
if self.server_death_callback is not None:
shared_utils.print_and_log(
logging.WARN,
'Server has disconnected and could not reconnect. '
'Assuming the worst and calling the death callback. '
'(Usually shutdown)',
should_print=True,
)
self.server_death_callback()
def give_worker_qualification(self, worker_id, qual_name, qual_value=None):
"""Give a worker a particular qualification"""
qual_id = mturk_utils.find_qualification(qual_name, self.is_sandbox)
if qual_id is False or qual_id is None:
shared_utils.print_and_log(
logging.WARN,
'Could not give worker {} qualification {}, as the '
'qualification could not be found to exist.'
''.format(worker_id, qual_name),
should_print=True,
)
return
mturk_utils.give_worker_qualification(
worker_id, qual_id, qual_value, self.is_sandbox
)
shared_utils.print_and_log(
logging.INFO,
'gave {} qualification {}'.format(worker_id, qual_name),
should_print=True,
)
def close_all_channels(self):
"""
Closes all channels by clearing the list of channels.
"""
shared_utils.print_and_log(logging.DEBUG, 'Closing all channels')
connection_ids = list(self.open_channels)
for connection_id in connection_ids:
self.close_channel(connection_id)