Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
logger.error(
'Unable to deserialize lock file. Try to reactivate instant testing. '
'The broken content is: %s',
serialised_state,
)
if state:
logger.info(
'Recovering context for the instant testing [pid=%s, gateway=%s]',
state['session_pid'], state['gateway_port'],
)
os.environ['PYSPARK_GATEWAY_PORT'] = str(state['gateway_port'])
os.environ['PYSPARK_GATEWAY_SECRET'] = str(state['gateway_secret'])
gateway = launch_gateway()
java_import(gateway.jvm, 'org.apache.spark.SparkContext')
jvm_spark_context = gateway.jvm.SparkContext.getOrCreate()
jvm_java_spark_context = gateway.jvm.JavaSparkContext(jvm_spark_context)
SparkContext._gateway = gateway
SparkContext._jvm = gateway.jvm
return SparkContext(
appName=jvm_spark_context.appName(),
master=jvm_spark_context.master(),
gateway=gateway,
jsc=jvm_java_spark_context,
)
def init(self, arglist, _sc = None, _sqlContext = None):
sc = SparkContext() if _sc is None else _sc
sqlContext = HiveContext(sc) if _sqlContext is None else _sqlContext
sc.setLogLevel("ERROR")
self.sqlContext = sqlContext
self.sc = sc
self._jvm = sc._jvm
from py4j.java_gateway import java_import
java_import(self._jvm, "org.tresamigos.smv.ColumnHelper")
java_import(self._jvm, "org.tresamigos.smv.SmvDFHelper")
java_import(self._jvm, "org.tresamigos.smv.dqm.*")
java_import(self._jvm, "org.tresamigos.smv.python.SmvPythonHelper")
self.j_smvPyClient = self.create_smv_pyclient(arglist)
# shortcut is meant for internal use only
self.j_smvApp = self.j_smvPyClient.j_smvApp()
# issue #429 set application name from smv config
sc._conf.setAppName(self.appName())
# user may choose a port for the callback server
gw = sc._gateway
cbsp = self.j_smvPyClient.callbackServerPort()
cbs_port = cbsp.get() if cbsp.isDefined() else gw._python_proxy_port
# check wither the port is in-use or not. Try 10 times, if all fail, error out
def _general_imports(self):
jvm = self._get_jvm()
java_import(jvm, "org.mrgeo.data.DataProviderFactory")
java_import(jvm, "org.mrgeo.hdfs.utils.HadoopFileUtils")
java_import(jvm, "org.mrgeo.core.*")
java_import(jvm, "org.mrgeo.data.*")
java_import(jvm, "org.mrgeo.job.*")
java_import(jvm, "org.mrgeo.mapalgebra.*")
java_import(jvm, "org.mrgeo.mapalgebra.raster.*")
java_import(jvm, "org.mrgeo.mapalgebra.vector.*")
java_import(jvm, "org.mrgeo.utils.*")
java_import(jvm, "org.mrgeo.utils.logging.*")
def _general_imports(self):
jvm = self._get_jvm()
java_import(jvm, "org.mrgeo.data.DataProviderFactory")
java_import(jvm, "org.mrgeo.hdfs.utils.HadoopFileUtils")
java_import(jvm, "org.mrgeo.core.*")
java_import(jvm, "org.mrgeo.data.*")
java_import(jvm, "org.mrgeo.job.*")
java_import(jvm, "org.mrgeo.mapalgebra.*")
java_import(jvm, "org.mrgeo.mapalgebra.raster.*")
java_import(jvm, "org.mrgeo.mapalgebra.vector.*")
java_import(jvm, "org.mrgeo.utils.*")
java_import(jvm, "org.mrgeo.utils.logging.*")
def __init__(self):
from py4j.java_gateway import java_import
"""When running a Python script from Scala - this function is called
by the script to initialize the connection to the Java Gateway and get the spark context.
code is basically copied from:
https://github.com/apache/zeppelin/blob/master/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py#L30
"""
if os.environ.get("SPARK_EXECUTOR_URI"):
SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"])
gateway = JavaGateway(GatewayClient(port=int(os.environ.get("PYSPARK_GATEWAY_PORT"))), auto_convert=True)
java_import(gateway.jvm, "org.apache.spark.SparkEnv")
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
intp = gateway.entry_point
jSparkSession = intp.pyGetSparkSession()
jsc = intp.pyGetJSparkContext(jSparkSession)
jconf = intp.pyGetSparkConf(jsc)
conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
self.sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
# Spark 2
self.sparkSession = SparkSession(self.sc, jSparkSession)
# Make sure the object have the proper code in them. In case somewhere we've made a new mrgeo object
for method_name, code in _mapop_code.items():
if not hasattr(mrgeo, method_name):
setattr(mrgeo, method_name, code.compile(method_name).get(method_name))
for method_name, code in _rastermapop_code.items():
if not hasattr(RasterMapOp, method_name):
setattr(RasterMapOp, method_name, code.compile(method_name).get(method_name))
for method_name, code in _vectormapop_code.items():
if not hasattr(VectorMapOp, method_name):
setattr(VectorMapOp, method_name, code.compile(method_name).get(method_name))
return
jvm = gateway.jvm
client = gateway_client
java_import(jvm, "org.mrgeo.job.*")
java_import(jvm, "org.mrgeo.mapalgebra.MapOpFactory")
java_import(jvm, "org.mrgeo.mapalgebra.raster.RasterMapOp")
java_import(jvm, "org.mrgeo.mapalgebra.vector.VectorMapOp")
java_import(jvm, "org.mrgeo.mapalgebra.raster.MrsPyramidMapOp")
java_import(jvm, "org.mrgeo.mapalgebra.IngestImageMapOp")
java_import(jvm, "org.mrgeo.mapalgebra.ExportMapOp")
java_import(jvm, "org.mrgeo.mapalgebra.PointsMapOp")
java_import(jvm, "org.mrgeo.mapalgebra.MapOp")
java_import(jvm, "org.mrgeo.utils.SparkUtils")
java_import(jvm, "org.mrgeo.hdfs.utils.HadoopFileUtils")
java_import(jvm, "org.mrgeo.data.*")
mapops = jvm.MapOpFactory.getMapOpClasses()
for rawmapop in mapops:
def import_flink_view(gateway):
"""
import the classes used by PyFlink.
:param gateway:gateway connected to JavaGateWayServer
"""
# Import the classes used by PyFlink
java_import(gateway.jvm, "org.apache.flink.table.api.*")
java_import(gateway.jvm, "org.apache.flink.table.api.java.*")
java_import(gateway.jvm, "org.apache.flink.table.api.dataview.*")
java_import(gateway.jvm, "org.apache.flink.table.catalog.*")
java_import(gateway.jvm, "org.apache.flink.table.descriptors.*")
java_import(gateway.jvm, "org.apache.flink.table.descriptors.python.*")
java_import(gateway.jvm, "org.apache.flink.table.sources.*")
java_import(gateway.jvm, "org.apache.flink.table.sinks.*")
java_import(gateway.jvm, "org.apache.flink.table.sources.*")
java_import(gateway.jvm, "org.apache.flink.table.types.*")
java_import(gateway.jvm, "org.apache.flink.table.types.logical.*")
java_import(gateway.jvm, "org.apache.flink.table.util.python.*")
java_import(gateway.jvm, "org.apache.flink.api.common.python.*")
java_import(gateway.jvm, "org.apache.flink.api.common.typeinfo.TypeInformation")
java_import(gateway.jvm, "org.apache.flink.api.common.typeinfo.Types")
java_import(gateway.jvm, "org.apache.flink.api.java.ExecutionEnvironment")
java_import(gateway.jvm,
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment")
java_import(gateway.jvm, "org.apache.flink.table.catalog.*")
java_import(gateway.jvm, "org.apache.flink.table.descriptors.*")
java_import(gateway.jvm, "org.apache.flink.table.descriptors.python.*")
java_import(gateway.jvm, "org.apache.flink.table.sources.*")
java_import(gateway.jvm, "org.apache.flink.table.sinks.*")
java_import(gateway.jvm, "org.apache.flink.table.sources.*")
java_import(gateway.jvm, "org.apache.flink.table.types.*")
java_import(gateway.jvm, "org.apache.flink.table.types.logical.*")
java_import(gateway.jvm, "org.apache.flink.table.util.python.*")
java_import(gateway.jvm, "org.apache.flink.api.common.python.*")
java_import(gateway.jvm, "org.apache.flink.api.common.typeinfo.TypeInformation")
java_import(gateway.jvm, "org.apache.flink.api.common.typeinfo.Types")
java_import(gateway.jvm, "org.apache.flink.api.java.ExecutionEnvironment")
java_import(gateway.jvm,
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment")
java_import(gateway.jvm, "org.apache.flink.api.common.restartstrategy.RestartStrategies")
def _general_imports(self):
jvm = self._get_jvm()
java_import(jvm, "org.mrgeo.data.DataProviderFactory")
java_import(jvm, "org.mrgeo.hdfs.utils.HadoopFileUtils")
java_import(jvm, "org.mrgeo.core.*")
java_import(jvm, "org.mrgeo.data.*")
java_import(jvm, "org.mrgeo.job.*")
java_import(jvm, "org.mrgeo.mapalgebra.*")
java_import(jvm, "org.mrgeo.mapalgebra.raster.*")
java_import(jvm, "org.mrgeo.mapalgebra.vector.*")
java_import(jvm, "org.mrgeo.utils.*")
java_import(jvm, "org.mrgeo.utils.logging.*")
def _ensure_initialized(cls):
SparkContext._ensure_initialized()
gw = SparkContext._gateway
java_import(gw.jvm, "org.apache.spark.streaming.*")
java_import(gw.jvm, "org.apache.spark.streaming.api.java.*")
java_import(gw.jvm, "org.apache.spark.streaming.api.python.*")
# start callback server
# getattr will fallback to JVM, so we cannot test by hasattr()
if "_callback_server" not in gw.__dict__:
_daemonize_callback_server()
# use random port
gw._start_callback_server(0)
# gateway with real port
gw._python_proxy_port = gw._callback_server.port
# get the GatewayServer object in JVM by ID
jgws = JavaObject("GATEWAY_SERVER", gw._gateway_client)
# update the port of CallbackClient with real port
gw.jvm.PythonDStream.updatePythonGatewayPort(jgws, gw._python_proxy_port)
# register serializer for TransformFunction
# it happens before creating SparkContext when loading from checkpointing