Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, query, samplemethod=None, maxrows=None, samplefraction=None, spark_events=None, coerce=None):
super(SQLQuery, self).__init__()
if samplemethod is None:
samplemethod = conf.default_samplemethod()
if maxrows is None:
maxrows = conf.default_maxrows()
if samplefraction is None:
samplefraction = conf.default_samplefraction()
if samplemethod not in {u'take', u'sample'}:
raise BadUserDataException(u'samplemethod (-m) must be one of (take, sample)')
if not isinstance(maxrows, int):
raise BadUserDataException(u'maxrows (-n) must be an integer')
if not 0.0 <= samplefraction <= 1.0:
raise BadUserDataException(u'samplefraction (-r) must be a float between 0.0 and 1.0')
self.query = query
self.samplemethod = samplemethod
self.maxrows = maxrows
self.samplefraction = samplefraction
if spark_events is None:
spark_events = SparkEvents()
self._spark_events = spark_events
self._coerce = coerce
def to_command(self, kind, sql_context_variable_name):
if kind == constants.SESSION_KIND_PYSPARK:
return self._pyspark_command(sql_context_variable_name)
elif kind == constants.SESSION_KIND_PYSPARK3:
return self._pyspark_command(sql_context_variable_name, False)
elif kind == constants.SESSION_KIND_SPARK:
return self._scala_command(sql_context_variable_name)
elif kind == constants.SESSION_KIND_SPARKR:
return self._r_command(sql_context_variable_name)
else:
raise BadUserDataException(u"Kind '{}' is not supported.".format(kind))
def to_command(self, kind, sql_context_variable_name):
if kind == constants.SESSION_KIND_PYSPARK:
return self._pyspark_command(sql_context_variable_name)
elif kind == constants.SESSION_KIND_PYSPARK3:
return self._pyspark_command(sql_context_variable_name, False)
elif kind == constants.SESSION_KIND_SPARK:
return self._scala_command(sql_context_variable_name)
elif kind == constants.SESSION_KIND_SPARKR:
return self._r_command(sql_context_variable_name)
else:
raise BadUserDataException(u"Kind '{}' is not supported.".format(kind))
def execute(self, session):
self._spark_events.emit_sql_execution_start_event(session.guid, session.kind, session.id, self.guid,
self.samplemethod, self.maxrows, self.samplefraction)
command_guid = ''
try:
command = self.to_command(session.kind, session.sql_context_variable_name)
command_guid = command.guid
(success, records_text) = command.execute(session)
if not success:
raise BadUserDataException(records_text)
result = self._records_to_dataframe(records_text)
except Exception as e:
self._spark_events.emit_sql_execution_end_event(session.guid, session.kind, session.id, self.guid,
command_guid, False, e.__class__.__name__, str(e))
raise
else:
self._spark_events.emit_sql_execution_end_event(session.guid, session.kind, session.id, self.guid,
command_guid, True, "", "")
return result
class BadUserDataException(LivyClientLibException):
"""An exception that is thrown when data provided by the user is invalid
in some way."""
class SqlContextNotFoundException(LivyClientLibException):
"""Exception that is thrown when the SQL context is not found."""
class SparkStatementException(LivyClientLibException):
"""Exception that is thrown when an error occurs while parsing or executing Spark statements."""
# == DECORATORS FOR EXCEPTION HANDLING ==
EXPECTED_EXCEPTIONS = [BadUserConfigurationException, BadUserDataException, LivyUnexpectedStatusException, SqlContextNotFoundException,
HttpClientException, LivyClientTimeoutException, SessionManagementException]
def handle_expected_exceptions(f):
"""A decorator that handles expected exceptions. Self can be any object with
an "ipython_display" attribute.
Usage:
@handle_expected_exceptions
def fn(self, ...):
etc..."""
exceptions_to_handle = tuple(EXPECTED_EXCEPTIONS)
# Notice that we're NOT handling e.DataFrameParseException here. That's because DataFrameParseException
# is an internal error that suggests something is wrong with LivyClientLib's implementation.
def wrapped(self, *args, **kwargs):
try:
def __init__(self, query, samplemethod=None, maxrows=None, samplefraction=None, spark_events=None):
super(SQLQuery, self).__init__()
if samplemethod is None:
samplemethod = conf.default_samplemethod()
if maxrows is None:
maxrows = conf.default_maxrows()
if samplefraction is None:
samplefraction = conf.default_samplefraction()
if samplemethod not in {u'take', u'sample'}:
raise BadUserDataException(u'samplemethod (-m) must be one of (take, sample)')
if not isinstance(maxrows, int):
raise BadUserDataException(u'maxrows (-n) must be an integer')
if not 0.0 <= samplefraction <= 1.0:
raise BadUserDataException(u'samplefraction (-r) must be a float between 0.0 and 1.0')
self.query = query
self.samplemethod = samplemethod
self.maxrows = maxrows
self.samplefraction = samplefraction
if spark_events is None:
spark_events = SparkEvents()
self._spark_events = spark_events
def __init__(self, query, samplemethod=None, maxrows=None, samplefraction=None, spark_events=None, coerce=None):
super(SQLQuery, self).__init__()
if samplemethod is None:
samplemethod = conf.default_samplemethod()
if maxrows is None:
maxrows = conf.default_maxrows()
if samplefraction is None:
samplefraction = conf.default_samplefraction()
if samplemethod not in {u'take', u'sample'}:
raise BadUserDataException(u'samplemethod (-m) must be one of (take, sample)')
if not isinstance(maxrows, int):
raise BadUserDataException(u'maxrows (-n) must be an integer')
if not 0.0 <= samplefraction <= 1.0:
raise BadUserDataException(u'samplefraction (-r) must be a float between 0.0 and 1.0')
self.query = query
self.samplemethod = samplemethod
self.maxrows = maxrows
self.samplefraction = samplefraction
if spark_events is None:
spark_events = SparkEvents()
self._spark_events = spark_events
self._coerce = coerce
def __init__(self, url, auth, username="", password="", implicitly_added=False):
if not url:
raise BadUserDataException(u"URL must not be empty")
self.url = url.rstrip(u"/")
self.username = username
self.password = password
self.auth = auth
# implicitly_added is set to True only if the endpoint wasn't configured manually by the user through
# a widget, but was instead implicitly defined as an endpoint to a wrapper kernel in the configuration
# JSON file.
self.implicitly_added = implicitly_added