spark

Spark Journal : Change the data type of columns in dataframe.

Hey there, recently, we found a functional issue in our parquet files, where the data type of columns was not accurate. The datatype was supposed to be a DECIMAL with some precision and scale, but it was found to be mix and match of String and Double.
The Challenge at hands was to make sure the columns have accurate datatype as needed.

How to make sure we assign the right data type to column in dataframe
We will take an example of converting a integer column to DECIMAL here.

Approach 1
If going with Spark SQL option, use CAST and convert to required data type.
the Dataframe reflects the correct data type

val df = spark.sql("""SELECT  CAST(1 as DECIMAL(14,4)) AS id union SELECT  CAST(2 as DECIMAL(14,4)) AS id""")
df.printSchema
df.show

df: org.apache.spark.sql.DataFrame = [id: decimal(14,4)]
root
 |-- id: decimal(14,4) (nullable = false)

+------+
|    id|
+------+
|1.0000|
|2.0000|
+------+

Approach 2

If going with Spark DataFrame API, we can still use cast method directly as below.

import org.apache.spark.sql.types.{DecimalType}
val dataList1 = List((1,"abc",99),(2,"def",99),(2,"def",99),(2,"def",99),(2,"def",99))
val df1 = dataList1.toDF("id","Name","Marks").select(col("Marks").cast(DecimalType(14,4)).alias("Marks_changed"), col("Marks"))
df1.printSchema
df1.show

import org.apache.spark.sql.types.DecimalType
dataList1: List[(Int, String, Int)] = List((1,abc,99), (2,def,99), (2,def,99), (2,def,99), (2,def,99))
df1: org.apache.spark.sql.DataFrame = [Marks_changed: decimal(14,4), Marks: int]
root
 |-- Marks_changed: decimal(14,4) (nullable = false)
 |-- Marks: integer (nullable = false)

+-------------+-----+
|Marks_changed|Marks|
+-------------+-----+
|      99.0000|   99|
|      99.0000|   99|
|      99.0000|   99|
|      99.0000|   99|
|      99.0000|   99|
+-------------+-----+

For other data types available with Spark API, refer to this link https://spark.apache.org/docs/latest/sql-reference.html.

spark

Spark Journal : Return Multiple dataframes from a Scala method

Until now, I have been focusing on keeping the posts limited to spark, but as you know Scala is one of the main languages used for when using Spark Framework, I will start using both Spark API and Scala language to showcase some interesting use cases.

This time, the task at hand was to return multiple dataframes from a Scala method. I have been returning values, which maybe Int, String, Dataframe , but I have always done it with 1 value in return part of method.
My Colleague and Architect helped me here to show different options on how this can be done very easily.

Note : Before reading further, I would recommend going through this post on StackOverFlow, this will help you to clear conceptual difference between List and Tuple in Scala.

Approach 1
Using List as the return value

import org.apache.spark.sql.DataFrame

def returMultipleDf  : List[DataFrame] = {
    val dataList1 = List((1,"abc"),(2,"def"))
    val df1 = dataList1.toDF("id","Name")
    
    val dataList2 = List((3,"ghi","home"),(4,"jkl","ctrl"))
    val df2 = dataList2.toDF("id","Name","Type")
    
    List(df1, df2)

}

val dfList = returMultipleDf 
val dataFrame1 = dfList(0)
val dataFrame2 = dfList(1)

dataFrame2.show

+---+----+----+
| id|Name|Type|
+---+----+----+
|  3| ghi|home|
|  4| jkl|ctrl|
+---+----+----+

Approach 2
Using Tuple as the return value

import org.apache.spark.sql.DataFrame

def returMultipleDf : (DataFrame, DataFrame) = {
    val dataList1 = List((1,"abc"),(2,"def"))
    val df1 = dataList1.toDF("id","Name")
    
    val dataList2 = List((3,"ghi","home"),(4,"jkl","ctrl"))
    val df2 = dataList2.toDF("id","Name","Type")
    
    (df1, df2)

}

val (df1, df2) = returMultipleDf


df2.show

+---+----+----+
| id|Name|Type|
+---+----+----+
|  3| ghi|home|
|  4| jkl|ctrl|
+---+----+----+

I personally prefer the Approach 2, as it has its own advantages of using Tuple and is more flexible when compared to List.

spark

Spark Journal : Using UNION with SELECT API on dataframes

You will easily come across this use case, where you need to merge 2 separate Dataframes at one go. This is a very peculiar use case, when working with data and there are multiple ways of doing so.


Some crucial points to remember when using Spark UNION
1. Spark has no UNION ALL, it only has a UNION command
2. Spark UNION does not deduplicate the data
3. Spark does not care about the data type of columns when merging
4. Spark does not care about the sequence of columns when merging

The most easiest way of doing this, if you were from a SQL background was to use a UNION SQL command, which would merge data in both the dataframes using a SQL SELECT command itself.
However, here we are focusing on getting this same task done by UNION API on dataframes.

So, you are trying to merge 2 dataframes.
1. When the schema is exactly same

val dataList1 = List((1,"abc"),(2,"def"))
val df1 = dataList1.toDF("id","Name")

val dataList2 = List((3,"efg"),(4,"hij"))
val df2 = dataList2.toDF("id","Name")

val df3 = df1.union(df2).show

+---+----+
| id|Name|
+---+----+
|  1| abc|
|  2| def|
|  3| efg|
|  4| hij|
+---+----+

2. When the number of columns is same but datatype is different
Here the 2nd column in dataframe 1 is String, but second dataframe is Int.

val dataList1 = List((1,"abc"),(2,"def"))
val df1 = dataList1.toDF("id","Name")

val dataList2 = List((3,10),(4,11))
val df2 = dataList2.toDF("id","Name")

val df3 = df1.union(df2).show

+---+----+
| id|Name|
+---+----+
|  1| abc|
|  2| def|
|  3|  10|
|  4|  11|
+---+----+

3. When the number of columns is not same
Here the union fails as expected.

val dataList1 = List((1,"abc"),(2,"def"))
val df1 = dataList1.toDF("id","Name")

val dataList2 = List((3,"ghi","home"),(4,"jkl","ctrl"))
val df2 = dataList2.toDF("id","Name")

val df3 = df1.union(df2).show

java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.

4. When you want to perform UNION based on values in list
This will be very useful, when you don’t know how many unions you want to have and it needs to be derived dynamically
Here we iterate over a list and then union based on elements in list.

case class emptyDfSchema (NameType:String)
val idTypeList  = List(List("Name"),List("Middle Name"),List("SurName"))
var df3 = Seq.empty[emptyDfSchema].toDF


idTypeList.foreach {
    element => {
        df3 = df3.union(element.toDF)
    }
}

df3.show

+-----------+
|   NameType|
+-----------+
|       Name|
|Middle Name|
|    SurName|
+-----------+

So, these are the different ways you can use the UNION API on dataframes directly, you can also use this command with conjunction on SELECT API.

spark

Spark Journal : Converting a dataframe to List

This one is going to be a very short article.
We will cover on how to use the Spark API and convert a dataframe to a List.
The converted list is of type <row>. We can iterate over it normally and do any kind of List operations as done on regular lists.

Suppose, you have a use case, where dataframe needs to be converted to a list. You can do it easily using the below approach
Here we use the collect and toList method in sequence.
Collect : returns all elements of dataframe as an array, so every row is returned as one element of the array here.
Quick Tip : Make sure, the data passed to collect is not huge, as collect is an operation done by driver program and consumers resources on driver node. If the data is huge, the collect method may throw OOM issue. Ideally collect is used after filter method on dataframes.
toList : converts the array to type List.

// converting a dataframe to list
val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")

val dList = df.collect().toList

dList.foreach{e=> {println(e)}}

[1,abc]
[2,def]

Go ahead and use this method, I am sure this is going to be lot much handy on your daily tasks.

spark

Spark Journal : Using alias for column names on dataframes

If you have already referred to my previous article on using the SELECT API on Dataframes in Spark Framework, this is more of a continuation to the same.
Many times, we come across scenarios where we need to use alias for proper representation of columns in a datafrrame. I know, if given a choice, you would opt for writing a SELECT SQL statement over the dataframes and use column alias the same conventional way. Yes, this is possible with Spark Dataframes easily.

However, I am coming out of comfort zone and trying to write the complete SELECT Statement using SELECT API on dataframes. So how will you add column aliases to Dataframes, while using alias.

Approach 1 : Using WithColumnRenamed

val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")

df.select("*").withColumnRenamed("id","unique id").show

+---------+----+
|unique id|Name|
+---------+----+
|        1| abc|
|        2| def|
+---------+----+

Approach 2 : Using alias keyword

val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")

df.select(col("id").alias("unique id")).show

+---------+
|unique id|
+---------+
|        1|
|        2|
+---------+

df.select(col("id").as("unique id"), col("Name").as("Actual Name")).show
+---------+-----------+
|unique id|Actual Name|
+---------+-----------+
|        1|        abc|
|        2|        def|
+---------+-----------+

Approach 3 : Using as keyword

val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")

df.select(col("id").as("unique id"), col("Name").as("Actual Name")).show
+---------+-----------+
|unique id|Actual Name|
+---------+-----------+
|        1|        abc|
|        2|        def|
+---------+-----------+

df.select($"id".as("unique id"), $"Name".as("Actual Name")).show
+---------+-----------+
|unique id|Actual Name|
+---------+-----------+
|        1|        abc|
|        2|        def|
+---------+-----------+

Approach 4 : Using name keyword

val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")

df.select(col("id").as("unique id"), col("Name").name("Actual Name")).show

+---------+-----------+
|unique id|Actual Name|
+---------+-----------+
|        1|        abc|
|        2|        def|
+---------+-----------+

There are some more ways of doing this the efficient way.
In the next article, I will try to cover, how to add column aliases dynamically , when there are many columns that needs to be aliased.

spark

Spark Journal : Using select api on dataframe

When working with spark dataframes, you will find many instances where in you have to use SELECT statements over Dataframes. Beware, this is not the SQL Select statements over the dataframe, but using the Spark API on the dataframe object directly.
I know many people will prefer using a SELECT statements (SQL) directly over a dataframe, which is even supported by Spark, but I started doing the same using the SPARK API on dataframe objects.
If you want to know more about all the supported API with dataframe objects, please refer to this official documentation.

So, after spending almost a day and trying out different combinations, I found that there are multiple ways of doing a SELECT of columns from dataframe using a SELECT API on dataframe object.
Somehow, I don’t feel, its really documented well with examples, but maybe, its just me facing this issue as a beginner.

Lets say we build a dataframe like below for usage.

val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")
df.show

+---+----+
| id|Name|
+---+----+
|  1| abc|
|  2| def|
+---+----+

Approach 1 : Using quoted column names

df.select("id", "Name").show

+---+----+
| id|Name|
+---+----+
|  1| abc|
|  2| def|
+---+----+

Approach 2 : Using $ with Quoted column names
This approach can be used further to drive much more transformations of column, will try to cover ahead.

df.select($"id", $"Name").show

+---+----+
| id|Name|
+---+----+
|  1| abc|
|  2| def|
+---+----+

Approach 3 : Using col keyword along with quoted column names.
Again, using col key word allows you to have much more transformations ahead with ease.

df.select(col("id"), col("Name")).show

+---+----+
| id|Name|
+---+----+
|  1| abc|
|  2| def|
+---+----+

Approach 4 : Using a List[String] which has all column names
This approach is very much popular, when you are dealing with a set of standard column or huge number of columns and don’t want to keep on writing the SELECT with all column names repetitively.
We can define a List[String] with the column names and the order of columns we want to view the result in and then use, map method on dataframe, the map method will iterate over a list of column names (String) and dynamically add column names in your select statement.

var colList = List("id","Name")
df.select(colList.map(col): _*).show

+---+----+
| id|Name|
+---+----+
|  1| abc|
|  2| def|
+---+----+

There are a few more of the approaches, I will try to detail them as, I learn them in depth.
I will cover the next article, which will be more of an extension to this and cover more api that get used very frequently.

spark

Spark Journal : Building Empty DataFrame

I recently started learning Scala language, along with the Spark framework, when working on our big data stack. Having not much experience with Java, it is a challenge to learn the fundamentals, but I am still learning and its a long way to go.

Publishing small bits of useful information to everyone, so that beginners like me, can find it useful.

So the task in spark was to create empty dataframes (Won’t go into dataframe details, for now, as even I am learning the stuff). If you have worked with Pandas framework in Python, you should be acquainted with Dataframe term.
If you still don’t understand it, for quick understanding, think of it as a 2 dimensional table, which stores the data in memory (can be extended to disk).

Creating a dataframe, this task is usually done by reading the data files in whichever format. But we had to create an empty dataframe, for this we used the below approach.

Using Case class
Case class, this is very frequently used construct in Scala language. we will use case class to define the schema of dataframe.
Here we are using Seq keyword, which means we are asking to create a empty sequence in scala and then convert it into the schema mentioned in case class, finally converting to a DataFrame, using the toDF method, which is spark Framework API for creating dataframes from Sequences, List.

case class model (id : Int, Name : String, marks : Double)
val emptyDf = Seq.empty[model].toDF
emptyDf.show

+---+----+-----+
| id|Name|marks|
+---+----+-----+
+---+----+-----+

So, you can define the schema, you need and then create an empty dataframe as above.