spark

Spark Journal : Return Multiple dataframes from a Scala method

Until now, I have been focusing on keeping the posts limited to spark, but as you know Scala is one of the main languages used for when using Spark Framework, I will start using both Spark API and Scala language to showcase some interesting use cases.

This time, the task at hand was to return multiple dataframes from a Scala method. I have been returning values, which maybe Int, String, Dataframe , but I have always done it with 1 value in return part of method.
My Colleague and Architect helped me here to show different options on how this can be done very easily.

Note : Before reading further, I would recommend going through this post on StackOverFlow, this will help you to clear conceptual difference between List and Tuple in Scala.

Approach 1
Using List as the return value

import org.apache.spark.sql.DataFrame

def returMultipleDf  : List[DataFrame] = {
    val dataList1 = List((1,"abc"),(2,"def"))
    val df1 = dataList1.toDF("id","Name")
    
    val dataList2 = List((3,"ghi","home"),(4,"jkl","ctrl"))
    val df2 = dataList2.toDF("id","Name","Type")
    
    List(df1, df2)

}

val dfList = returMultipleDf 
val dataFrame1 = dfList(0)
val dataFrame2 = dfList(1)

dataFrame2.show

+---+----+----+
| id|Name|Type|
+---+----+----+
|  3| ghi|home|
|  4| jkl|ctrl|
+---+----+----+

Approach 2
Using Tuple as the return value

import org.apache.spark.sql.DataFrame

def returMultipleDf : (DataFrame, DataFrame) = {
    val dataList1 = List((1,"abc"),(2,"def"))
    val df1 = dataList1.toDF("id","Name")
    
    val dataList2 = List((3,"ghi","home"),(4,"jkl","ctrl"))
    val df2 = dataList2.toDF("id","Name","Type")
    
    (df1, df2)

}

val (df1, df2) = returMultipleDf


df2.show

+---+----+----+
| id|Name|Type|
+---+----+----+
|  3| ghi|home|
|  4| jkl|ctrl|
+---+----+----+

I personally prefer the Approach 2, as it has its own advantages of using Tuple and is more flexible when compared to List.

spark

Spark Journal : Using UNION with SELECT API on dataframes

You will easily come across this use case, where you need to merge 2 separate Dataframes at one go. This is a very peculiar use case, when working with data and there are multiple ways of doing so.


Some crucial points to remember when using Spark UNION
1. Spark has no UNION ALL, it only has a UNION command
2. Spark UNION does not deduplicate the data
3. Spark does not care about the data type of columns when merging
4. Spark does not care about the sequence of columns when merging

The most easiest way of doing this, if you were from a SQL background was to use a UNION SQL command, which would merge data in both the dataframes using a SQL SELECT command itself.
However, here we are focusing on getting this same task done by UNION API on dataframes.

So, you are trying to merge 2 dataframes.
1. When the schema is exactly same

val dataList1 = List((1,"abc"),(2,"def"))
val df1 = dataList1.toDF("id","Name")

val dataList2 = List((3,"efg"),(4,"hij"))
val df2 = dataList2.toDF("id","Name")

val df3 = df1.union(df2).show

+---+----+
| id|Name|
+---+----+
|  1| abc|
|  2| def|
|  3| efg|
|  4| hij|
+---+----+

2. When the number of columns is same but datatype is different
Here the 2nd column in dataframe 1 is String, but second dataframe is Int.

val dataList1 = List((1,"abc"),(2,"def"))
val df1 = dataList1.toDF("id","Name")

val dataList2 = List((3,10),(4,11))
val df2 = dataList2.toDF("id","Name")

val df3 = df1.union(df2).show

+---+----+
| id|Name|
+---+----+
|  1| abc|
|  2| def|
|  3|  10|
|  4|  11|
+---+----+

3. When the number of columns is not same
Here the union fails as expected.

val dataList1 = List((1,"abc"),(2,"def"))
val df1 = dataList1.toDF("id","Name")

val dataList2 = List((3,"ghi","home"),(4,"jkl","ctrl"))
val df2 = dataList2.toDF("id","Name")

val df3 = df1.union(df2).show

java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.

4. When you want to perform UNION based on values in list
This will be very useful, when you don’t know how many unions you want to have and it needs to be derived dynamically
Here we iterate over a list and then union based on elements in list.

case class emptyDfSchema (NameType:String)
val idTypeList  = List(List("Name"),List("Middle Name"),List("SurName"))
var df3 = Seq.empty[emptyDfSchema].toDF


idTypeList.foreach {
    element => {
        df3 = df3.union(element.toDF)
    }
}

df3.show

+-----------+
|   NameType|
+-----------+
|       Name|
|Middle Name|
|    SurName|
+-----------+

So, these are the different ways you can use the UNION API on dataframes directly, you can also use this command with conjunction on SELECT API.

Vertica

Vertica Journal : Parquet and External tables

This is a continuation to what we Vertica External table topic mentioned in previous post here.

This post will show you on how vertica interprets parquet file partitions and how we can best use it to our use cases.

For more details on how to load a parquet file to Vertica table please refer to this blogpost.

Before digging ahead, you might want to have a look at partitioning concept of Parquet files here. This seems to exactly the information you want to know before going ahead.

Before researching on this, I was under the assumption that parquet partitions are beneficial for vertica external tables. But my understanding was incorrect, when I found the below results.
I tried a few scenarios mentioned below and was astonished to see the difference myself.

Setting up Env

Created 2 sample parquet files with sample dataset (2.7 Million records) with exact same data, but 1 being partitioned and other being non-partitioned.

Later, created 2 external tables on vertica mapping to these location of parquet filesĀ 


DROP TABLE IF EXISTS  TEST_PERFORMANCE_PARQUET_PARTITIONED;
CREATE EXTERNAL TABLE TEST_PERFORMANCE_PARQUET_PARTITIONED
(
    
    THREATRE_ID varchar(512),
    MOVIE_NAME  varchar(512),
    SHOW_SLOT varchar(512),
    TICKETS_SOLD Integer, 
    GROSS_INCOME double precision,
    TICKET_DATE varchar(20),
    CITY_NAME varchar(512)
)
 AS 
COPY FROM '/path_to_parquet/*/*/*.parquet' PARQUET (hive_partition_cols='TICKET_DATE, CITY_NAME');

DROP TABLE IF EXISTS TEST_PERFORMANCE_PARQUET_NOT_PARTITIONED;
CREATE EXTERNAL TABLE TEST_PERFORMANCE_PARQUET_NOT_PARTITIONED
( 
    TICKET_DATE varchar(20),
    CITY_NAME varchar(512) ,
    THREATRE_ID varchar(512),
    MOVIE_NAME  varchar(512),
    SHOW_SLOT varchar(512),
    TICKETS_SOLD Integer, 
    GROSS_INCOME  double precision
    
)
 AS 
COPY FROM '/path_to_parquet/*.parquet' PARQUET ;

Once the above was done, I tried different queries on both the tables as below..
Count Aggregation

select count(*) from TEST_PERFORMANCE_PARQUET_NOT_PARTITIONED
-- Time Taken 0.82 seconds
select count(*) from TEST_PERFORMANCE_PARQUET_PARTITIONED
-- Time Taken 1.34 seconds

Aggregation on partitioned columns

select TICKET_DATE, CITY_NAME, sum(GROSS_INCOME) from TEST_PERFORMANCE_PARQUET_NOT_PARTITIONED
where TICKET_DATE = '7-Sep-2019'
group by 1,2
--Time Taken 0.6 seconds
select TICKET_DATE, CITY_NAME, sum(GROSS_INCOME) from TEST_PERFORMANCE_PARQUET_PARTITIONED
where TICKET_DATE = '7-Sep-2019'
group by 1, 2
-- Time taken 1.6 seconds

Explain Plans
Seems Vertica explain plan was not really verbose here, and cost and complete plan was exactly identical here, this means. Vertica engine does not try to interpret the partitions of Parquet files here (or does not show that in the plan). Seems the partition pruning feature of vertica does not respect the partitions of parquet files.
Somehow, vertica engine takes more time to traverse through the partition of parquet files.

So, I am assuming, when I am trying to add queries on partitioned parquet files, the engine is trying to traverse every partition folder and trying to match the where predicate. this hierarchical partition of Parquet files, might be confusing vertica engine and it maybe a reason why partitioned parquet files are not really great for vertica .
Below are the scenarios tested by me on the same lines and the performance for the same.

Performance of Vertica with Parquet files in different scenarios

Partitioned and huge dataPartitioned and Small dataNot partitioned and huge dataNot partitioned and small data
Queries with where Predicate on partitioned clauseperforms fasterperforms slowerperforms lot faster (10x-20x) performs lot faster
Queries without where predicate and without aggregationperforms slowerperforms fasterperforms lot faster (10x-20x) performs lot faster
Queries with distinct on partition columnsperforms faster performs slower performs lot faster (10x-20x) performs lot faster
Queries with aggregation on partitioned columns performs slower performs faster performs lot faster (10x-20x) performs lot faster

Conclusion, the 4th Column, “Not Partitioned and Huge data” is already a winner in every scenario.
This was an eye opener for me, hopefully, the Vertica team looks into this further and asserts on these observations.

spark

Spark Journal : Converting a dataframe to List

This one is going to be a very short article.
We will cover on how to use the Spark API and convert a dataframe to a List.
The converted list is of type <row>. We can iterate over it normally and do any kind of List operations as done on regular lists.

Suppose, you have a use case, where dataframe needs to be converted to a list. You can do it easily using the below approach
Here we use the collect and toList method in sequence.
Collect : returns all elements of dataframe as an array, so every row is returned as one element of the array here.
Quick Tip : Make sure, the data passed to collect is not huge, as collect is an operation done by driver program and consumers resources on driver node. If the data is huge, the collect method may throw OOM issue. Ideally collect is used after filter method on dataframes.
toList : converts the array to type List.

// converting a dataframe to list
val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")

val dList = df.collect().toList

dList.foreach{e=> {println(e)}}

[1,abc]
[2,def]

Go ahead and use this method, I am sure this is going to be lot much handy on your daily tasks.

Uncategorized

Spark Journal : Adding Alias for columns in bulk with SELECT API

This is more of a continuation to my previous blog, which shows how to use alias for column names when using the SELECT API on dataframes in spark.
Exploring the same, I found a good way to handle another scenario, when you are dealing with multiple columns (good number of columns) . In such cases, its not feasible to write a SELECT command with each column manually.

Instead I would prefer a programmatic way to do it, so that its easier, keeps the code clean and is readable.
In this approach ,
1. Firstly we are going to use a predefined Scala Map, which has column names as Keys and column Alias as values stored. Its going to be a default immutable scala object.
2. Secondly, the Map defined above will be used a lookup and we will traverse through each column name of the dataframe to compare /match the existing columns in Map (keys)
3. Thirdly, we will just use this final columns identified after comparison, as a list and replace the list in SELECT API using scala ascription(varargs)

val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")

val colalias : Map[String, String] = Map("id" -> "unique id", "Name" -> "Actual Name")


val aliasedCols = df.columns.map(name => colalias.get(name) match { 
  case Some(newname) => col(name).as(newname) 
  case None => col(name) 
})

df.select(aliasedCols: _*).show

+---------+-----------+
|unique id|Actual Name|
+---------+-----------+
|        1|        abc|
|        2|        def|
+---------+-----------+

Next time, you have such a task at hand, and don’t want to use the traditional way, use this smart way to replace alias of columns dynamically.

spark

Spark Journal : Using alias for column names on dataframes

If you have already referred to my previous article on using the SELECT API on Dataframes in Spark Framework, this is more of a continuation to the same.
Many times, we come across scenarios where we need to use alias for proper representation of columns in a datafrrame. I know, if given a choice, you would opt for writing a SELECT SQL statement over the dataframes and use column alias the same conventional way. Yes, this is possible with Spark Dataframes easily.

However, I am coming out of comfort zone and trying to write the complete SELECT Statement using SELECT API on dataframes. So how will you add column aliases to Dataframes, while using alias.

Approach 1 : Using WithColumnRenamed

val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")

df.select("*").withColumnRenamed("id","unique id").show

+---------+----+
|unique id|Name|
+---------+----+
|        1| abc|
|        2| def|
+---------+----+

Approach 2 : Using alias keyword

val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")

df.select(col("id").alias("unique id")).show

+---------+
|unique id|
+---------+
|        1|
|        2|
+---------+

df.select(col("id").as("unique id"), col("Name").as("Actual Name")).show
+---------+-----------+
|unique id|Actual Name|
+---------+-----------+
|        1|        abc|
|        2|        def|
+---------+-----------+

Approach 3 : Using as keyword

val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")

df.select(col("id").as("unique id"), col("Name").as("Actual Name")).show
+---------+-----------+
|unique id|Actual Name|
+---------+-----------+
|        1|        abc|
|        2|        def|
+---------+-----------+

df.select($"id".as("unique id"), $"Name".as("Actual Name")).show
+---------+-----------+
|unique id|Actual Name|
+---------+-----------+
|        1|        abc|
|        2|        def|
+---------+-----------+

Approach 4 : Using name keyword

val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")

df.select(col("id").as("unique id"), col("Name").name("Actual Name")).show

+---------+-----------+
|unique id|Actual Name|
+---------+-----------+
|        1|        abc|
|        2|        def|
+---------+-----------+

There are some more ways of doing this the efficient way.
In the next article, I will try to cover, how to add column aliases dynamically , when there are many columns that needs to be aliased.