You can use the following syntax to calculate the difference between rows in a PySpark DataFrame:

from pyspark.sql.window import Window import pyspark.sql.functions as F #define window w = Window.partitionBy('employee').orderBy('period') #calculate difference between rows of sales values, grouped by employee df_new = df.withColumn('sales_diff', F.col('sales')-F.lag(F.col('sales'), 1).over(w))

This particular example calculates the difference in values between consecutive rows in the **sales** column, grouped by values in the **employee** column.

The following example shows how to use this syntax in practice.

**Example: Calculate Difference Between Rows in PySpark**

Suppose we have the following PySpark DataFrame that contains information about sales made by various employees at some company during different sales periods:

**from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#define data
data = [['A', 1, 18],
['A', 2, 20],
['A', 3, 25],
['A', 4, 40],
['B', 1, 34],
['B', 2, 32],
['B', 3, 19]]
#define column names
columns = ['employee', 'period', 'sales']
#create dataframe using data and column names
df = spark.createDataFrame(data, columns)
#view dataframe
df.show()
+--------+------+-----+
|employee|period|sales|
+--------+------+-----+
| A| 1| 18|
| A| 2| 20|
| A| 3| 25|
| A| 4| 40|
| B| 1| 34|
| B| 2| 32|
| B| 3| 19|
+--------+------+-----+
**

We can use the following syntax to calculate the difference in values between consecutive rows in the **sales** column, grouped by values in the **employee** column:

from pyspark.sql.window import Window import pyspark.sql.functions as F #define window w = Window.partitionBy('employee').orderBy('period') #calculate difference between rows of sales values, grouped by employee df_new = df.withColumn('sales_diff', F.col('sales')-F.lag(F.col('sales'), 1).over(w)) #view new DataFrame df_new.show() +--------+------+-----+----------+ |employee|period|sales|sales_diff| +--------+------+-----+----------+ | A| 1| 18| null| | A| 2| 20| 2| | A| 3| 25| 5| | A| 4| 40| 15| | B| 1| 34| null| | B| 2| 32| -2| | B| 3| 19| -13| +--------+------+-----+----------+

The new column named **sales_diff** shows the difference in values between consecutive rows in the **sales** column.

Note that the first value in the **sales_diff** column for each employee is null since there is no previous value to calculate the difference for.

If you’d like, you can use the **fillna** function to replace these null values with zero:

#replace null values with 0 in sales_diff column df_new.fillna(0, 'sales_diff').show() +--------+------+-----+----------+ |employee|period|sales|sales_diff| +--------+------+-----+----------+ | A| 1| 18| 0| | A| 2| 20| 2| | A| 3| 25| 5| | A| 4| 40| 15| | B| 1| 34| 0| | B| 2| 32| -2| | B| 3| 19| -13| +--------+------+-----+----------+

Each of the null values in the **sales_diff** column have now been replaced with zero.

**Note**: You can find the complete documentation for the PySpark **lag **function here.

