Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run_spark_action(action):
import py4j
try:
results = action()
except py4j.protocol.Py4JJavaError:
logging.error("Spark job failed to run! Jenkins should probably restart this build.")
raise
return results
def get(self):
if failure_reason:
return "failure-reason"
else:
raise Py4JJavaError("msg", JavaException())
self.assertRaises(ValueError, Statistics.chiSqTest, observed3, expected3)
# Negative counts in observed
neg_obs = Vectors.dense([1.0, 2.0, 3.0, -4.0])
self.assertRaises(Py4JJavaError, Statistics.chiSqTest, neg_obs, expected1)
# Count = 0.0 in expected but not observed
zero_expected = Vectors.dense([1.0, 0.0, 3.0])
pearson_inf = Statistics.chiSqTest(observed, zero_expected)
self.assertEqual(pearson_inf.statistic, inf)
self.assertEqual(pearson_inf.degreesOfFreedom, 2)
self.assertEqual(pearson_inf.pValue, 0.0)
# 0.0 in expected and observed simultaneously
zero_observed = Vectors.dense([2.0, 0.0, 1.0])
self.assertRaises(Py4JJavaError, Statistics.chiSqTest, zero_observed, zero_expected)
def test_pipe_functions(self):
data = ['1', '2', '3']
rdd = self.sc.parallelize(data)
with QuietTest(self.sc):
self.assertEqual([], rdd.pipe('cc').collect())
self.assertRaises(Py4JJavaError, rdd.pipe('cc', checkCode=True).collect)
result = rdd.pipe('cat').collect()
result.sort()
for x, y in zip(data, result):
self.assertEqual(x, y)
self.assertRaises(Py4JJavaError, rdd.pipe('grep 4', checkCode=True).collect)
self.assertEqual([], rdd.pipe('grep 4').collect())
def deco(*a, **kw):
try:
return f(*a, **kw)
except py4j.protocol.Py4JJavaError as e:
s = e.java_exception.toString()
stackTrace = '\n\t at '.join(map(lambda x: x.toString(),
e.java_exception.getStackTrace()))
if s.startswith('org.apache.spark.sql.AnalysisException: '):
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
if s.startswith('org.apache.spark.sql.catalyst.analysis'):
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
if s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '):
raise ParseException(s.split(': ', 1)[1], stackTrace)
if s.startswith('org.apache.spark.sql.streaming.StreamingQueryException: '):
raise StreamingQueryException(s.split(': ', 1)[1], stackTrace)
if s.startswith('org.apache.spark.sql.execution.QueryExecutionException: '):
raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)
if s.startswith('java.lang.IllegalArgumentException: '):
raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
raise
mlhd_df.registerTempTable('mlhd')
except AnalysisException as e:
logging.critical("Error while registering dataframe mlhd: %s", str(e))
raise
for _ in range(5):
try:
print("Running SQL...")
artist_popularity_df = listenbrainz_spark.sql_context.sql("""
SELECT artist_mbid, COUNT(artist_mbid) as cnt
FROM mlhd
GROUP BY artist_mbid
ORDER BY cnt DESC
""")
break
except Py4JJavaError as e:
logging.error("error while running the query: %s", str(e))
else:
logging.critical("Could not run query. Exiting...")
sys.exit(-1)
print("number of rows: ", artist_popularity_df.count())
artist_popularity_df.show()
print("Saving...")
file_name = 'mlhd-artist-popularity-%s.csv' % datetime.now.strftime('%Y%m%d-%H%M%S')
csv_path = config.HDFS_CLUSTER_URI + os.path.join(MLHD_DATA_PATH, 'csv', file_name)
for _ in range(10):
try:
artist_popularity_df.write.csv(csv_path)
break
except Exception as e:
logging.error("Couldn't write result to CSV, trying again, error: %s", str(e))
def init_dir(rm, recursive, create_dir):
""" Create directories in HDFS to run the recommendation engine.
"""
try:
listenbrainz_spark.init_spark_session('Manage Directories')
except Py4JJavaError as err:
logging.error('{}\n{}\nAborting...'.format(str(err), err.java_exception))
sys.exit(-1)
hdfs_connection.init_hdfs(config.HDFS_HTTP_URI)
if rm:
try:
utils.delete_dir(path.RECOMMENDATION_PARENT_DIR)
utils.delete_dir(path.CHECKPOINT_DIR)
logging.info('Successfully deleted directories.')
except HdfsError as err:
logging.error('{}: Some/all directories are non-empty. Try "--recursive" to delete recursively.'.format(
type(err).__name__))
logging.warning('Deleting directory recursively will delete all the recommendation data.')
sys.exit(-1)
if recursive:
# An action must be called for persist to evaluate.
num_training = training_data.count()
num_validation = validation_data.count()
num_test = test_data.count()
current_app.logger.info('Training models...')
t0 = time()
model, model_metadata, best_model_metadata = train(training_data, validation_data, num_validation, config.RANKS,
config.LAMBDAS, config.ITERATIONS)
models_training_time = '{:.2f}'.format((time() - t0) / 3600)
try:
best_model_test_rmse = compute_rmse(model.model, test_data, num_test)
except Py4JJavaError as err:
current_app.logger.error('Root mean squared error for best model for test data not computed\n{}\nAborting...'.format(
str(err.java_exception)), exc_info=True)
sys.exit(-1)
# Cached data must be cleared to avoid OOM.
training_data.unpersist()
validation_data.unpersist()
current_app.logger.info('Saving model...')
t0 = time()
metadata_file_path = os.path.join(path.DATA_DIR, best_model_metadata['model_id'])
try:
model.model.save(listenbrainz_spark.context, config.HDFS_CLUSTER_URI + metadata_file_path)
except Py4JJavaError as err:
current_app.logger.error('Unable to save best model "{}"\n{}. Aborting...'.format(best_model_metadata['model_id'],
str(err.java_exception)), exc_info=True)
def fromFile(cls, name):
"""Load a model from PMML file with given pathname"""
pc = PMMLContext.getOrCreate()
try:
java_model = pc._jvm.org.pmml4s.model.Model.fromFile(name)
return cls(java_model)
except Py4JJavaError as e:
je = e.java_exception
raise PmmlError(je.getClass().getSimpleName(), je.getMessage())
def transform(self, word):
"""
Transforms a word to its vector representation
.. note:: Local use only
:param word: a word
:return: vector representation of word(s)
"""
try:
return self.call("transform", word)
except Py4JJavaError:
raise ValueError("%s not found" % word)