How to use the pyspark.sql.functions.udf function in pyspark

To help you get started, we’ve selected a few pyspark examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github intel-analytics / analytics-zoo / pyzoo / zoo / examples / pytorch / train / Lenet_mnist.py View on Github external
def lossFunc(input, target):
        return nn.CrossEntropyLoss().forward(input, target.flatten().long())

    torch_model = LeNet()
    model = TorchNet.from_pytorch(torch_model, [1, 1, 28, 28])
    criterion = TorchCriterion.from_pytorch(lossFunc, [1, 10], torch.LongTensor([5]))
    classifier = NNClassifier(model, criterion, SeqToTensor([1, 28, 28])) \
        .setBatchSize(256) \
        .setOptimMethod(Adam()) \
        .setLearningRate(0.001)\
        .setMaxEpoch(2)

    nnClassifierModel = classifier.fit(trainingDF)

    print("After training: ")
    shift = udf(lambda p: p - 1, DoubleType())
    res = nnClassifierModel.transform(validationDF) \
        .withColumn("prediction", shift(col('prediction')))
    res.show(100)

    correct = res.filter("label=prediction").count()
    overall = res.count()
    accuracy = correct * 1.0 / overall
    print("Validation accuracy = %g " % accuracy)
github mozilla / telemetry-airflow / jobs / update_orphaning_dashboard_etl.py View on Github external
def merge_keyed_count_histograms(histograms, histogram_name):
        res = {}
        n_hist = len(histograms)

        for i, histogram_struct in enumerate(histograms):
            histogram_array = histogram_struct[histogram_name]
            if histogram_array:
                for key, count_histogram_string in histogram_array:
                    if key not in res:
                        res[key] = [0]*n_hist
                    res[key][i]=json.loads(count_histogram_string)['values'].get('0', 0)

        return res

    merge_keyed_count_histograms_udf = F.udf(merge_keyed_count_histograms, MapType(StringType(), ArrayType(IntegerType())))

    def merge_keyed_count_histogram_col(df, col_name):
        return df.withColumn(col_name+"_merged", merge_keyed_count_histograms_udf(col_name, F.lit(col_name))).drop(col_name).withColumnRenamed(col_name+"_merged", col_name)

    def merge_enumerated_histograms(histograms, histogram_name, n_values):
        res = []
        all_null = True # needed to maintain compatibility with Longitudinal
        for histogram_string_struct in histograms:
            compacted_histogram = [0]*(int(n_values)+1)
            histogram_string = histogram_string_struct[histogram_name]
            if histogram_string:
                all_null = False
                values = json.loads(histogram_string).get('values', {})
                for key, value in values.items():
                    compacted_histogram[int(key)] = value
            res.append(compacted_histogram)
github scanner-research / esper-tv / app / esper / spark_util.py View on Github external
def _annotate_hour(df):
    assert ('video_id' in df.columns)
    assert ('min_frame' in df.columns)

    video_id_to_hour_fps = defaultdict(lambda: (0, 29.97))
    for v in get_videos().select('id', 'fps', 'hour').collect():
        video_id_to_hour_fps[v.id] = (v.hour, v.fps)
    def hour_helper(video_id, min_frame):
        start_hour, fps = video_id_to_hour_fps[video_id]
        hour_offset = (min_frame / fps) / 3600.
        return int(start_hour + hour_offset) % 24
    my_udf = func.udf(hour_helper, IntegerType())
    df = df.withColumn(
        'hour', my_udf('video_id', 'min_frame')
    )
    return df
github scanner-research / esper-tv / app / esper / spark_util.py View on Github external
video_id_to_commericals = defaultdict(list)
    for c in get_commercials().select('video_id', 'min_frame', 'max_frame').collect():
        video_id_to_commericals[c.video_id].append(
            (c.min_frame, c.max_frame)
        )
    def in_commercial_helper(video_id, min_frame, max_frame):
        if video_id in video_id_to_commericals:
            for c_min, c_max in video_id_to_commericals[video_id]:
                if min_frame >= c_min and min_frame < c_max:
                    return True
                if max_frame <= c_max and max_frame > c_min:
                    return True
        return False

    my_udf = func.udf(in_commercial_helper, BooleanType())
    df = df.withColumn(
        'in_commercial', my_udf('video_id', 'min_frame', 'max_frame')
    )
    return df
github Azure-Samples / MachineLearningSamples-BigData / Code / scoring_webservice.py View on Github external
featureeddf = featureeddf.withColumn('hourofday', hour(featureeddf['StartHour']))
 
    featureeddf = featureeddf.withColumn('weekofyear', weekofyear(featureeddf['StartHour']))
    dayofweek = F.date_format(featureeddf['StartHour'], 'EEEE')    
    featureeddf = featureeddf.withColumn('dayofweek', dayofweek )
    featureeddf = featureeddf.withColumn('dayofmonth', hour(featureeddf['StartHour']))
    
    import datetime
    trainBeginTimestamp = int(datetime.datetime.strftime(  datetime.datetime.strptime(trainBegin, "%Y-%m-%d %H:%M:%S") ,"%s"))
    def linearTrend(x):
        if x is None:
            return 0
        # return # of hour since the beginning
        return (x-trainBeginTimestamp)/3600/24/365.25
    # 
    linearTrendUdf =  udf(linearTrend,IntegerType())
    featureeddf = featureeddf.withColumn('linearTrend',linearTrendUdf(F.unix_timestamp('StartHour')))
    import pandas
    from pandas.tseries.holiday import USFederalHolidayCalendar
    cal = USFederalHolidayCalendar()
    holidays_datetime = cal.holidays(start=holidayBegin, end=holidayEnd).to_pydatetime()
    holidays = [t.strftime("%Y-%m-%d") for t in holidays_datetime]
    
    
    def isHoliday(x):
        if x is None:
            return 0
        if x in holidays:
            return 1
        else:
            return 0
    isHolidayUdf =  udf (isHoliday, IntegerType())
github NYUBigDataProject / SparkClean / sparkclean / df_outliers.py View on Github external
def normalize(self,col):
        max_column = self._df.agg({col:'max'}).collect()[0][0]
        min_column = self._df.agg({col:'min'}).collect()[0][0]
        def change(value):
            if max_column == min_column:
                return 0
            return (value-min_column)/(max_column-min_column)
        df = self.transformer.df
        udfValue = udf(change,DoubleType())
        df = df.withColumn('new'+col, udfValue(col))
        df = df.drop(col).withColumnRenamed('new'+col,col)
        self.transformer = DataFrameTransformer(df)
        return
github Azure-Samples / MachineLearningSamples-BigData / Code / scoring_webservice.py View on Github external
return 0
        if x >=8 and x <=18:
            return 1
        else:
            return 0
    isBusinessHourUdf =  udf (isBusinessHour, IntegerType())
    featureeddf = featureeddf.withColumn("BusinessHour",isBusinessHourUdf('hourofday'))
    
    def isMorning(x):
        if x is None:
            return 0
        if x >=6 and x <=9:
            return 1
        else:
            return 0
    isMorningUdf =  udf (isMorning, IntegerType())
    featureeddf = featureeddf.withColumn("Morning",isMorningUdf('hourofday'))
    
    featureeddf.persist()
    
    return featureeddf
github Azure-Samples / MachineLearningSamples-AerialImageClassification / Code / 02_Modeling / run_mmlspark.py View on Github external
self.output_uri = 'wasb://{}@{}.blob.core.windows.net/{}/model'.format(
			self.container_trained_models, self.storage_account_name,
			output_model_name)
		self.predictions_filename = '{}_predictions_test_set.csv'.format(
			output_model_name)

		# Load the pretrained model
		self.last_layer_name = 'z.x' if (pretrained_model_type == 'resnet18') \
			else 'h2_d'
		self.cntk_model = mmlspark.CNTKModel().setInputCol('unrolled') \
			.setOutputCol('features') \
			.setModelLocation(self.spark, self.model_uri) \
			.setOutputNodeName(self.last_layer_name)

		# Initialize other Spark pipeline components
		self.extract_label_udf = udf(lambda row: os.path.basename(
										os.path.dirname(row.path)),
									 StringType())
		self.extract_path_udf = udf(lambda row: row.path, StringType())
		if mmlspark_model_type == 'randomforest':
			self.mmlspark_model_type = RandomForestClassifier(numTrees=20,
															  maxDepth=5)
		elif mmlspark_model_type == 'logisticregression':
			self.mmlspark_model_type = LogisticRegression(regParam=0.01,
														  maxIter=10)
		self.unroller = mmlspark.UnrollImage().setInputCol('image') \
			.setOutputCol('unrolled')

		return
github alwaysprep / PySparkMLPipelineHelpers / transformers / OneHotFeatureHashing.py View on Github external
dic = defaultdict(float)
            numCatFeatures = (numFeatures - len(numerical))
            for categoricalFeature in categorical:
                index = hash(categoricalFeature) % numCatFeatures
                dic[index] = 1.0
            for index, numericalFeature in enumerate(numerical):
                dic[numCatFeatures + index] = numericalFeature
            sorted_dic = OrderedDict(sorted(dic.items()))
            return SparseVector(numFeatures, sorted_dic.keys(), sorted_dic.values())

        t = VectorUDT()

        out_col = self.getOutputCol()
        num_col = dataset[self.getNumericalCol()]
        cat_col = dataset[self.getCategoricalCol()]
        return dataset.withColumn(out_col, udf(f, t)(num_col, cat_col))
github NYUBigDataProject / SparkClean / sparkclean / df_transformer.py View on Github external
'columns' argument is expected to be a string or a list of columns names.
        It is a requirement for this method that the data_type provided must be the same to data_type of columns.
        On the other hand, if user writes columns == '*' the method makes operations in func if only if columns
        have same data_type that data_type argument.

        :return transformer object
        """
        dict_types = {'string': StringType(), 'str': StringType(), 'integer': IntegerType(),
                      'int': IntegerType(), 'float': FloatType(), 'double': DoubleType(), 'Double': DoubleType()}

        types = {'string': 'string', 'str': 'string', 'String': 'string', 'integer': 'int',
                 'int': 'int', 'float': 'float', 'double': 'double', 'Double': 'double'}

        try:
            function = udf(func, dict_types[data_type])
        except KeyError:
            assert False, "Error, data_type not recognized"

        def col_set(columns, function):
            exprs = [function(col(c)).alias(c) if c in columns else c for (c, t) in self._df.dtypes]
            try:
                self._df = self._df.select(*exprs)
            except Exception as e:
                print(e)
                assert False, "Error: Make sure if operation is compatible with row datatype."

        # Check if columns argument must be a string or list datatype:
        self._assert_type_str_or_list(columns, "columns")

        # Filters all string columns in dataFrame
        valid_cols = [c for (c, t) in filter(lambda t: t[1] == types[data_type], self._df.dtypes)]