Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Date = JavaClass("java.sql.Date", gateway_client)
return Date.valueOf(obj.strftime("%Y-%m-%d"))
class DatetimeConverter(object):
def can_convert(self, obj):
return isinstance(obj, datetime.datetime)
def convert(self, obj, gateway_client):
Timestamp = JavaClass("java.sql.Timestamp", gateway_client)
return Timestamp(int(time.mktime(obj.timetuple())) * 1000 + obj.microsecond // 1000)
# datetime is a subclass of date, we should register DatetimeConverter first
register_input_converter(DatetimeConverter())
register_input_converter(DateConverter())
def _test():
import doctest
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
globs = globals()
sc = SparkContext('local[4]', 'PythonTest')
globs['sc'] = sc
globs['sqlContext'] = SQLContext(sc)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
def convert(self, obj, gateway_client):
Timestamp = JavaClass("java.sql.Timestamp", gateway_client)
return Timestamp(int(time.mktime(obj.timetuple())) * 1000 + obj.microsecond // 1000)
logger.error(
'Unable to deserialize lock file. Try to reactivate instant testing. '
'The broken content is: %s',
serialised_state,
)
if state:
logger.info(
'Recovering context for the instant testing [pid=%s, gateway=%s]',
state['session_pid'], state['gateway_port'],
)
os.environ['PYSPARK_GATEWAY_PORT'] = str(state['gateway_port'])
os.environ['PYSPARK_GATEWAY_SECRET'] = str(state['gateway_secret'])
gateway = launch_gateway()
java_import(gateway.jvm, 'org.apache.spark.SparkContext')
jvm_spark_context = gateway.jvm.SparkContext.getOrCreate()
jvm_java_spark_context = gateway.jvm.JavaSparkContext(jvm_spark_context)
SparkContext._gateway = gateway
SparkContext._jvm = gateway.jvm
return SparkContext(
appName=jvm_spark_context.appName(),
master=jvm_spark_context.master(),
gateway=gateway,
jsc=jvm_java_spark_context,
)
def setUpClass(cls):
ReusedPySparkTestCase.setUpClass()
cls.tempdir = tempfile.NamedTemporaryFile(delete=False)
try:
cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
except py4j.protocol.Py4JError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
except TypeError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
os.unlink(cls.tempdir.name)
cls.spark = HiveContext._createForTesting(cls.sc)
cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
cls.df = cls.sc.parallelize(cls.testData).toDF()
def run_spark_action(action):
import py4j
try:
results = action()
except py4j.protocol.Py4JJavaError:
logging.error("Spark job failed to run! Jenkins should probably restart this build.")
raise
return results
def get(self):
if failure_reason:
return "failure-reason"
else:
raise Py4JJavaError("msg", JavaException())
self.assertRaises(ValueError, Statistics.chiSqTest, observed3, expected3)
# Negative counts in observed
neg_obs = Vectors.dense([1.0, 2.0, 3.0, -4.0])
self.assertRaises(Py4JJavaError, Statistics.chiSqTest, neg_obs, expected1)
# Count = 0.0 in expected but not observed
zero_expected = Vectors.dense([1.0, 0.0, 3.0])
pearson_inf = Statistics.chiSqTest(observed, zero_expected)
self.assertEqual(pearson_inf.statistic, inf)
self.assertEqual(pearson_inf.degreesOfFreedom, 2)
self.assertEqual(pearson_inf.pValue, 0.0)
# 0.0 in expected and observed simultaneously
zero_observed = Vectors.dense([2.0, 0.0, 1.0])
self.assertRaises(Py4JJavaError, Statistics.chiSqTest, zero_observed, zero_expected)
def test_pipe_functions(self):
data = ['1', '2', '3']
rdd = self.sc.parallelize(data)
with QuietTest(self.sc):
self.assertEqual([], rdd.pipe('cc').collect())
self.assertRaises(Py4JJavaError, rdd.pipe('cc', checkCode=True).collect)
result = rdd.pipe('cat').collect()
result.sort()
for x, y in zip(data, result):
self.assertEqual(x, y)
self.assertRaises(Py4JJavaError, rdd.pipe('grep 4', checkCode=True).collect)
self.assertEqual([], rdd.pipe('grep 4').collect())
def _create_connection(self):
while True:
parameters = GatewayParameters(address=self.address,
port=self.port,
auto_close=self.auto_close,
eager_load=True)
connection = MuffledGatewayConnection(parameters, self.gateway_property)
connection_success = False
try:
connection.start()
connection_success = True
except Py4JNetworkError:
pass
except (KeyboardInterrupt, SystemExit):
break
if connection_success:
break
time.sleep(0.1)
return connection
def init(self, arglist, _sc = None, _sqlContext = None):
sc = SparkContext() if _sc is None else _sc
sqlContext = HiveContext(sc) if _sqlContext is None else _sqlContext
sc.setLogLevel("ERROR")
self.sqlContext = sqlContext
self.sc = sc
self._jvm = sc._jvm
from py4j.java_gateway import java_import
java_import(self._jvm, "org.tresamigos.smv.ColumnHelper")
java_import(self._jvm, "org.tresamigos.smv.SmvDFHelper")
java_import(self._jvm, "org.tresamigos.smv.dqm.*")
java_import(self._jvm, "org.tresamigos.smv.python.SmvPythonHelper")
self.j_smvPyClient = self.create_smv_pyclient(arglist)
# shortcut is meant for internal use only
self.j_smvApp = self.j_smvPyClient.j_smvApp()
# issue #429 set application name from smv config
sc._conf.setAppName(self.appName())
# user may choose a port for the callback server
gw = sc._gateway
cbsp = self.j_smvPyClient.callbackServerPort()
cbs_port = cbsp.get() if cbsp.isDefined() else gw._python_proxy_port
# check wither the port is in-use or not. Try 10 times, if all fail, error out