Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def emit_cluster_change_event(self, cluster_dns_name, status_code, success, error_message):
event_name = constants.CLUSTER_CHANGE_EVENT
time_stamp = self.get_utc_date_time()
kwargs_list = [(EVENT_NAME, event_name),
(TIMESTAMP, time_stamp),
(constants.CLUSTER_DNS_NAME, cluster_dns_name),
(constants.STATUS_CODE, status_code),
(constants.SUCCESS, success),
(constants.ERROR_MESSAGE, error_message)]
self.send_to_handler(kwargs_list)
def _pyspark_command(self, sql_context_variable_name, encode_result=True):
command = u'{}.sql(u"""{} """).toJSON()'.format(sql_context_variable_name, self.query)
if self.samplemethod == u'sample':
command = u'{}.sample(False, {})'.format(command, self.samplefraction)
if self.maxrows >= 0:
command = u'{}.take({})'.format(command, self.maxrows)
else:
command = u'{}.collect()'.format(command)
# Unicode support has improved in Python 3 so we don't need to encode.
if encode_result:
print_command = '{}.encode("{}")'.format(constants.LONG_RANDOM_VARIABLE_NAME,
conf.pyspark_dataframe_encoding())
else:
print_command = constants.LONG_RANDOM_VARIABLE_NAME
command = u'for {} in {}: print({})'.format(constants.LONG_RANDOM_VARIABLE_NAME,
command,
print_command)
return Command(command)
def emit_magic_execution_end_event(self, magic_name, language, magic_guid,
success, exception_type, exception_message):
self._verify_language_ok(language)
time_stamp = self.get_utc_date_time()
event_name = constants.MAGIC_EXECUTION_END_EVENT
kwargs_list = [(EVENT_NAME, event_name),
(TIMESTAMP, time_stamp),
(constants.MAGIC_NAME, magic_name),
(constants.LIVY_KIND, language),
(constants.MAGIC_GUID, magic_guid),
(constants.SUCCESS, success),
(constants.EXCEPTION_TYPE, exception_type),
(constants.EXCEPTION_MESSAGE, exception_message)]
self.send_to_handler(kwargs_list)
def to_command(self, kind, sql_context_variable_name):
if kind == constants.SESSION_KIND_PYSPARK:
return self._pyspark_command(sql_context_variable_name)
elif kind == constants.SESSION_KIND_PYSPARK3:
return self._pyspark_command(sql_context_variable_name, False)
elif kind == constants.SESSION_KIND_SPARK:
return self._scala_command(sql_context_variable_name)
elif kind == constants.SESSION_KIND_SPARKR:
return self._r_command(sql_context_variable_name)
else:
raise BadUserDataException(u"Kind '{}' is not supported.".format(kind))
def emit_magic_execution_end_event(self, magic_name, language, magic_guid,
success, exception_type, exception_message):
self._verify_language_ok(language)
time_stamp = self.get_utc_date_time()
event_name = constants.MAGIC_EXECUTION_END_EVENT
kwargs_list = [(EVENT_NAME, event_name),
(TIMESTAMP, time_stamp),
(constants.MAGIC_NAME, magic_name),
(constants.LIVY_KIND, language),
(constants.MAGIC_GUID, magic_guid),
(constants.SUCCESS, success),
(constants.EXCEPTION_TYPE, exception_type),
(constants.EXCEPTION_MESSAGE, exception_message)]
self.send_to_handler(kwargs_list)
def emit_magic_execution_end_event(self, magic_name, language, magic_guid,
success, exception_type, exception_message):
self._verify_language_ok(language)
time_stamp = self.get_utc_date_time()
event_name = constants.MAGIC_EXECUTION_END_EVENT
kwargs_list = [(EVENT_NAME, event_name),
(TIMESTAMP, time_stamp),
(constants.MAGIC_NAME, magic_name),
(constants.LIVY_KIND, language),
(constants.MAGIC_GUID, magic_guid),
(constants.SUCCESS, success),
(constants.EXCEPTION_TYPE, exception_type),
(constants.EXCEPTION_MESSAGE, exception_message)]
self.send_to_handler(kwargs_list)
def emit_sql_execution_end_event(self, session_guid, language, session_id, sql_guid, statement_guid,
success, exception_type, exception_message):
self._verify_language_ok(language)
event_name = constants.SQL_EXECUTION_END_EVENT
time_stamp = self.get_utc_date_time()
kwargs_list = [(EVENT_NAME, event_name),
(TIMESTAMP, time_stamp),
(constants.SESSION_GUID, session_guid),
(constants.LIVY_KIND, language),
(constants.SESSION_ID, session_id),
(constants.SQL_GUID, sql_guid),
(constants.STATEMENT_GUID, statement_guid),
(constants.SUCCESS, success),
(constants.EXCEPTION_TYPE, exception_type),
(constants.EXCEPTION_MESSAGE, exception_message)]
self.send_to_handler(kwargs_list)
def emit_sql_execution_end_event(self, session_guid, language, session_id, sql_guid, statement_guid,
success, exception_type, exception_message):
self._verify_language_ok(language)
event_name = constants.SQL_EXECUTION_END_EVENT
time_stamp = self.get_utc_date_time()
kwargs_list = [(EVENT_NAME, event_name),
(TIMESTAMP, time_stamp),
(constants.SESSION_GUID, session_guid),
(constants.LIVY_KIND, language),
(constants.SESSION_ID, session_id),
(constants.SQL_GUID, sql_guid),
(constants.STATEMENT_GUID, statement_guid),
(constants.SUCCESS, success),
(constants.EXCEPTION_TYPE, exception_type),
(constants.EXCEPTION_MESSAGE, exception_message)]
self.send_to_handler(kwargs_list)
def emit_sql_execution_start_event(self, session_guid, language, session_id, sql_guid,
samplemethod, maxrows, samplefraction):
self._verify_language_ok(language)
event_name = constants.SQL_EXECUTION_START_EVENT
time_stamp = self.get_utc_date_time()
kwargs_list = [(EVENT_NAME, event_name),
(TIMESTAMP, time_stamp),
(constants.SESSION_GUID, session_guid),
(constants.LIVY_KIND, language),
(constants.SESSION_ID, session_id),
(constants.SQL_GUID, sql_guid),
(constants.SAMPLE_METHOD, samplemethod),
(constants.MAX_ROWS, maxrows),
(constants.SAMPLE_FRACTION, samplefraction)]
self.send_to_handler(kwargs_list)
def _get_retry_policy():
policy = conf.retry_policy()
if policy == LINEAR_RETRY:
return LinearRetryPolicy(seconds_to_sleep=5, max_retries=5)
elif policy == CONFIGURABLE_RETRY:
return ConfigurableRetryPolicy(retry_seconds_to_sleep_list=conf.retry_seconds_to_sleep_list(), max_retries=conf.configurable_retry_policy_max_retries())
else:
raise BadUserConfigurationException(u"Retry policy '{}' not supported".format(policy))