Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def word_count_inline(text=parameter.csv[spark.DataFrame], counters=output.txt.data):
# type: (spark.DataFrame, Target) -> spark.DataFrame
from operator import add
from dbnd_spark.spark import get_spark_session
lines = text.rdd.map(lambda r: r[0])
counts = (
lines.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).reduceByKey(add)
)
counts.saveAsTextFile(str(counters))
output = counts.collect()
for (word, count) in output:
print("%s: %i" % (word, count))
return get_spark_session().createDataFrame(counts)
from dbnd_spark.spark import get_spark_session
lines = text.rdd.map(lambda r: r[0])
counts = (
lines.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).reduceByKey(add)
)
counts.saveAsTextFile(str(counters))
output = counts.collect()
for (word, count) in output:
print("%s: %i" % (word, count))
return get_spark_session().createDataFrame(counts)
class WordCountSparkInline(PySparkInlineTask):
text = parameter.csv[spark.DataFrame]
counters = output.txt.data
counters_auto_save = output[spark.DataFrame]
def run(self):
from operator import add
from dbnd_spark.spark import get_spark_session
lines = self.text.rdd.map(lambda r: r[0])
counts = (
lines.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).reduceByKey(add)
)
counts.saveAsTextFile(str(self.counters))
output = counts.collect()
for (word, count) in output:
print("%s: %i" % (word, count))
def application_args(self):
return [self.text, self.counters]
class WordCountPySparkTask(PySparkTask):
text = parameter.data
counters = output
python_script = spark_script("word_count.py")
def application_args(self):
return [self.text, self.counters]
class WordCountPipeline(PipelineTask):
text = parameter.data
with_spark = output
with_pyspark = output
def band(self):
self.with_spark = WordCountTask(text=self.text)
self.with_pyspark = WordCountPySparkTask(text=self.text)
@pipeline
def word_count_new_cluster():
wc = WordCountTask()
from dbnd_gcp.dataproc.dataproc import DataProcCtrl
create = DataProcCtrl(wc).create_engine()