PySpark: How to Calculate the Difference Between Rows


You can use the following syntax to calculate the difference between rows in a PySpark DataFrame:

from pyspark.sql.window import Window
import pyspark.sql.functions as F

#define window
w = Window.partitionBy('employee').orderBy('period')

#calculate difference between rows of sales values, grouped by employee
df_new = df.withColumn('sales_diff', F.col('sales')-F.lag(F.col('sales'), 1).over(w))

This particular example calculates the difference in values between consecutive rows in the sales column, grouped by values in the employee column.

The following example shows how to use this syntax in practice.

Example: Calculate Difference Between Rows in PySpark

Suppose we have the following PySpark DataFrame that contains information about sales made by various employees at some company during different sales periods:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

#define data
data = [['A', 1, 18], 
        ['A', 2, 20], 
        ['A', 3, 25], 
        ['A', 4, 40], 
        ['B', 1, 34], 
        ['B', 2, 32],
        ['B', 3, 19]] 
  
#define column names
columns = ['employee', 'period', 'sales'] 
  
#create dataframe using data and column names
df = spark.createDataFrame(data, columns) 
  
#view dataframe
df.show()

+--------+------+-----+
|employee|period|sales|
+--------+------+-----+
|       A|     1|   18|
|       A|     2|   20|
|       A|     3|   25|
|       A|     4|   40|
|       B|     1|   34|
|       B|     2|   32|
|       B|     3|   19|
+--------+------+-----+

We can use the following syntax to calculate the difference in values between consecutive rows in the sales column, grouped by values in the employee column:

from pyspark.sql.window import Window
import pyspark.sql.functions as F

#define window
w = Window.partitionBy('employee').orderBy('period')

#calculate difference between rows of sales values, grouped by employee
df_new = df.withColumn('sales_diff', F.col('sales')-F.lag(F.col('sales'), 1).over(w))

#view new DataFrame
df_new.show()

+--------+------+-----+----------+
|employee|period|sales|sales_diff|
+--------+------+-----+----------+
|       A|     1|   18|      null|
|       A|     2|   20|         2|
|       A|     3|   25|         5|
|       A|     4|   40|        15|
|       B|     1|   34|      null|
|       B|     2|   32|        -2|
|       B|     3|   19|       -13|
+--------+------+-----+----------+

The new column named sales_diff shows the difference in values between consecutive rows in the sales column.

Note that the first value in the sales_diff column for each employee is null since there is no previous value to calculate the difference for.

If you’d like, you can use the fillna function to replace these null values with zero:

#replace null values with 0 in sales_diff column
df_new.fillna(0, 'sales_diff').show()

+--------+------+-----+----------+
|employee|period|sales|sales_diff|
+--------+------+-----+----------+
|       A|     1|   18|         0|
|       A|     2|   20|         2|
|       A|     3|   25|         5|
|       A|     4|   40|        15|
|       B|     1|   34|         0|
|       B|     2|   32|        -2|
|       B|     3|   19|       -13|
+--------+------+-----+----------+

Each of the null values in the sales_diff column have now been replaced with zero.

Note: You can find the complete documentation for the PySpark lag function here.

Additional Resources

The following tutorials explain how to perform other common tasks in PySpark:

PySpark: How to Check if Column Contains String
PySpark: How to Replace String in Column
PySpark: How to Convert String to Integer

Leave a Reply

Your email address will not be published. Required fields are marked *