Uncategorized

Spark Journal : Cell value with String, should not be trimmed

A very short article and precisely focusing on the problem of trimmed string in the cell value on Zeppelin tool.

For those, who don’t know what zeppelin is, its web interactive tool, which helps to run scala / sql on spark cluster directly.

Recently, we found a simple issue, where team reported, long string values were getting trimmed, while querying in zeppelin.
After a bit of R&D, we found its just a simple config on Livy interpreter, which enables / disables this behavior.

The problem
If we try with simple query on zeppelin, where the string length is big, it will trim the string in the result as shown below.

The solution
After setting the property ” zeppelin.livy.spark.sql.field.truncate” as false through the interpreter (Livy2) settings, This worked like a charm.

For setting this
1. Go to the interpreter
2. Click on EDIT for Livy2 interpreter
3. Adding this property and save, it will ask for a restart while saving.

Next time, when you open your notebook and run the same query, it will work as below.

Hope, you learned something with this small article !!!

spark

Spark Journal : Return Multiple dataframes from a Scala method

Until now, I have been focusing on keeping the posts limited to spark, but as you know Scala is one of the main languages used for when using Spark Framework, I will start using both Spark API and Scala language to showcase some interesting use cases.

This time, the task at hand was to return multiple dataframes from a Scala method. I have been returning values, which maybe Int, String, Dataframe , but I have always done it with 1 value in return part of method.
My Colleague and Architect helped me here to show different options on how this can be done very easily.

Note : Before reading further, I would recommend going through this post on StackOverFlow, this will help you to clear conceptual difference between List and Tuple in Scala.

Approach 1
Using List as the return value

import org.apache.spark.sql.DataFrame

def returMultipleDf  : List[DataFrame] = {
    val dataList1 = List((1,"abc"),(2,"def"))
    val df1 = dataList1.toDF("id","Name")
    
    val dataList2 = List((3,"ghi","home"),(4,"jkl","ctrl"))
    val df2 = dataList2.toDF("id","Name","Type")
    
    List(df1, df2)

}

val dfList = returMultipleDf 
val dataFrame1 = dfList(0)
val dataFrame2 = dfList(1)

dataFrame2.show

+---+----+----+
| id|Name|Type|
+---+----+----+
|  3| ghi|home|
|  4| jkl|ctrl|
+---+----+----+

Approach 2
Using Tuple as the return value

import org.apache.spark.sql.DataFrame

def returMultipleDf : (DataFrame, DataFrame) = {
    val dataList1 = List((1,"abc"),(2,"def"))
    val df1 = dataList1.toDF("id","Name")
    
    val dataList2 = List((3,"ghi","home"),(4,"jkl","ctrl"))
    val df2 = dataList2.toDF("id","Name","Type")
    
    (df1, df2)

}

val (df1, df2) = returMultipleDf


df2.show

+---+----+----+
| id|Name|Type|
+---+----+----+
|  3| ghi|home|
|  4| jkl|ctrl|
+---+----+----+

I personally prefer the Approach 2, as it has its own advantages of using Tuple and is more flexible when compared to List.

spark

Spark Journal : Using UNION with SELECT API on dataframes

You will easily come across this use case, where you need to merge 2 separate Dataframes at one go. This is a very peculiar use case, when working with data and there are multiple ways of doing so.


Some crucial points to remember when using Spark UNION
1. Spark has no UNION ALL, it only has a UNION command
2. Spark UNION does not deduplicate the data
3. Spark does not care about the data type of columns when merging
4. Spark does not care about the sequence of columns when merging

The most easiest way of doing this, if you were from a SQL background was to use a UNION SQL command, which would merge data in both the dataframes using a SQL SELECT command itself.
However, here we are focusing on getting this same task done by UNION API on dataframes.

So, you are trying to merge 2 dataframes.
1. When the schema is exactly same

val dataList1 = List((1,"abc"),(2,"def"))
val df1 = dataList1.toDF("id","Name")

val dataList2 = List((3,"efg"),(4,"hij"))
val df2 = dataList2.toDF("id","Name")

val df3 = df1.union(df2).show

+---+----+
| id|Name|
+---+----+
|  1| abc|
|  2| def|
|  3| efg|
|  4| hij|
+---+----+

2. When the number of columns is same but datatype is different
Here the 2nd column in dataframe 1 is String, but second dataframe is Int.

val dataList1 = List((1,"abc"),(2,"def"))
val df1 = dataList1.toDF("id","Name")

val dataList2 = List((3,10),(4,11))
val df2 = dataList2.toDF("id","Name")

val df3 = df1.union(df2).show

+---+----+
| id|Name|
+---+----+
|  1| abc|
|  2| def|
|  3|  10|
|  4|  11|
+---+----+

3. When the number of columns is not same
Here the union fails as expected.

val dataList1 = List((1,"abc"),(2,"def"))
val df1 = dataList1.toDF("id","Name")

val dataList2 = List((3,"ghi","home"),(4,"jkl","ctrl"))
val df2 = dataList2.toDF("id","Name")

val df3 = df1.union(df2).show

java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.

4. When you want to perform UNION based on values in list
This will be very useful, when you don’t know how many unions you want to have and it needs to be derived dynamically
Here we iterate over a list and then union based on elements in list.

case class emptyDfSchema (NameType:String)
val idTypeList  = List(List("Name"),List("Middle Name"),List("SurName"))
var df3 = Seq.empty[emptyDfSchema].toDF


idTypeList.foreach {
    element => {
        df3 = df3.union(element.toDF)
    }
}

df3.show

+-----------+
|   NameType|
+-----------+
|       Name|
|Middle Name|
|    SurName|
+-----------+

So, these are the different ways you can use the UNION API on dataframes directly, you can also use this command with conjunction on SELECT API.

spark

Spark Journal : Converting a dataframe to List

This one is going to be a very short article.
We will cover on how to use the Spark API and convert a dataframe to a List.
The converted list is of type <row>. We can iterate over it normally and do any kind of List operations as done on regular lists.

Suppose, you have a use case, where dataframe needs to be converted to a list. You can do it easily using the below approach
Here we use the collect and toList method in sequence.
Collect : returns all elements of dataframe as an array, so every row is returned as one element of the array here.
Quick Tip : Make sure, the data passed to collect is not huge, as collect is an operation done by driver program and consumers resources on driver node. If the data is huge, the collect method may throw OOM issue. Ideally collect is used after filter method on dataframes.
toList : converts the array to type List.

// converting a dataframe to list
val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")

val dList = df.collect().toList

dList.foreach{e=> {println(e)}}

[1,abc]
[2,def]

Go ahead and use this method, I am sure this is going to be lot much handy on your daily tasks.

spark

Spark Journal : Using alias for column names on dataframes

If you have already referred to my previous article on using the SELECT API on Dataframes in Spark Framework, this is more of a continuation to the same.
Many times, we come across scenarios where we need to use alias for proper representation of columns in a datafrrame. I know, if given a choice, you would opt for writing a SELECT SQL statement over the dataframes and use column alias the same conventional way. Yes, this is possible with Spark Dataframes easily.

However, I am coming out of comfort zone and trying to write the complete SELECT Statement using SELECT API on dataframes. So how will you add column aliases to Dataframes, while using alias.

Approach 1 : Using WithColumnRenamed

val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")

df.select("*").withColumnRenamed("id","unique id").show

+---------+----+
|unique id|Name|
+---------+----+
|        1| abc|
|        2| def|
+---------+----+

Approach 2 : Using alias keyword

val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")

df.select(col("id").alias("unique id")).show

+---------+
|unique id|
+---------+
|        1|
|        2|
+---------+

df.select(col("id").as("unique id"), col("Name").as("Actual Name")).show
+---------+-----------+
|unique id|Actual Name|
+---------+-----------+
|        1|        abc|
|        2|        def|
+---------+-----------+

Approach 3 : Using as keyword

val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")

df.select(col("id").as("unique id"), col("Name").as("Actual Name")).show
+---------+-----------+
|unique id|Actual Name|
+---------+-----------+
|        1|        abc|
|        2|        def|
+---------+-----------+

df.select($"id".as("unique id"), $"Name".as("Actual Name")).show
+---------+-----------+
|unique id|Actual Name|
+---------+-----------+
|        1|        abc|
|        2|        def|
+---------+-----------+

Approach 4 : Using name keyword

val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")

df.select(col("id").as("unique id"), col("Name").name("Actual Name")).show

+---------+-----------+
|unique id|Actual Name|
+---------+-----------+
|        1|        abc|
|        2|        def|
+---------+-----------+

There are some more ways of doing this the efficient way.
In the next article, I will try to cover, how to add column aliases dynamically , when there are many columns that needs to be aliased.

spark

Spark Journal : Building Empty DataFrame

I recently started learning Scala language, along with the Spark framework, when working on our big data stack. Having not much experience with Java, it is a challenge to learn the fundamentals, but I am still learning and its a long way to go.

Publishing small bits of useful information to everyone, so that beginners like me, can find it useful.

So the task in spark was to create empty dataframes (Won’t go into dataframe details, for now, as even I am learning the stuff). If you have worked with Pandas framework in Python, you should be acquainted with Dataframe term.
If you still don’t understand it, for quick understanding, think of it as a 2 dimensional table, which stores the data in memory (can be extended to disk).

Creating a dataframe, this task is usually done by reading the data files in whichever format. But we had to create an empty dataframe, for this we used the below approach.

Using Case class
Case class, this is very frequently used construct in Scala language. we will use case class to define the schema of dataframe.
Here we are using Seq keyword, which means we are asking to create a empty sequence in scala and then convert it into the schema mentioned in case class, finally converting to a DataFrame, using the toDF method, which is spark Framework API for creating dataframes from Sequences, List.

case class model (id : Int, Name : String, marks : Double)
val emptyDf = Seq.empty[model].toDF
emptyDf.show

+---+----+-----+
| id|Name|marks|
+---+----+-----+
+---+----+-----+

So, you can define the schema, you need and then create an empty dataframe as above.