Uncategorized

Spark Journal : Cell value with String, should not be trimmed

A very short article and precisely focusing on the problem of trimmed string in the cell value on Zeppelin tool.

For those, who don’t know what zeppelin is, its web interactive tool, which helps to run scala / sql on spark cluster directly.

Recently, we found a simple issue, where team reported, long string values were getting trimmed, while querying in zeppelin.
After a bit of R&D, we found its just a simple config on Livy interpreter, which enables / disables this behavior.

The problem
If we try with simple query on zeppelin, where the string length is big, it will trim the string in the result as shown below.

The solution
After setting the property ” zeppelin.livy.spark.sql.field.truncate” as false through the interpreter (Livy2) settings, This worked like a charm.

For setting this
1. Go to the interpreter
2. Click on EDIT for Livy2 interpreter
3. Adding this property and save, it will ask for a restart while saving.

Next time, when you open your notebook and run the same query, it will work as below.

Hope, you learned something with this small article !!!

spark

Spark Journal : Change the data type of columns in dataframe.

Hey there, recently, we found a functional issue in our parquet files, where the data type of columns was not accurate. The datatype was supposed to be a DECIMAL with some precision and scale, but it was found to be mix and match of String and Double.
The Challenge at hands was to make sure the columns have accurate datatype as needed.

How to make sure we assign the right data type to column in dataframe
We will take an example of converting a integer column to DECIMAL here.

Approach 1
If going with Spark SQL option, use CAST and convert to required data type.
the Dataframe reflects the correct data type

val df = spark.sql("""SELECT  CAST(1 as DECIMAL(14,4)) AS id union SELECT  CAST(2 as DECIMAL(14,4)) AS id""")
df.printSchema
df.show

df: org.apache.spark.sql.DataFrame = [id: decimal(14,4)]
root
 |-- id: decimal(14,4) (nullable = false)

+------+
|    id|
+------+
|1.0000|
|2.0000|
+------+

Approach 2

If going with Spark DataFrame API, we can still use cast method directly as below.

import org.apache.spark.sql.types.{DecimalType}
val dataList1 = List((1,"abc",99),(2,"def",99),(2,"def",99),(2,"def",99),(2,"def",99))
val df1 = dataList1.toDF("id","Name","Marks").select(col("Marks").cast(DecimalType(14,4)).alias("Marks_changed"), col("Marks"))
df1.printSchema
df1.show

import org.apache.spark.sql.types.DecimalType
dataList1: List[(Int, String, Int)] = List((1,abc,99), (2,def,99), (2,def,99), (2,def,99), (2,def,99))
df1: org.apache.spark.sql.DataFrame = [Marks_changed: decimal(14,4), Marks: int]
root
 |-- Marks_changed: decimal(14,4) (nullable = false)
 |-- Marks: integer (nullable = false)

+-------------+-----+
|Marks_changed|Marks|
+-------------+-----+
|      99.0000|   99|
|      99.0000|   99|
|      99.0000|   99|
|      99.0000|   99|
|      99.0000|   99|
+-------------+-----+

For other data types available with Spark API, refer to this link https://spark.apache.org/docs/latest/sql-reference.html.

scala

Scala Journal : Using tuples effectively

Hey Folks, back after a good amount of time and No….. was not on vacations, but working on some very critical deliverable.

So What did I do the last one and half month, learned more Scala and Spark… its getting interesting day by day and I have already learnt a lot of stuff, that can be shared through by posts here…. so stay tuned…. there is good information coming along.

Continue reading
spark

Spark Journal : Return Multiple dataframes from a Scala method

Until now, I have been focusing on keeping the posts limited to spark, but as you know Scala is one of the main languages used for when using Spark Framework, I will start using both Spark API and Scala language to showcase some interesting use cases.

This time, the task at hand was to return multiple dataframes from a Scala method. I have been returning values, which maybe Int, String, Dataframe , but I have always done it with 1 value in return part of method.
My Colleague and Architect helped me here to show different options on how this can be done very easily.

Note : Before reading further, I would recommend going through this post on StackOverFlow, this will help you to clear conceptual difference between List and Tuple in Scala.

Approach 1
Using List as the return value

import org.apache.spark.sql.DataFrame

def returMultipleDf  : List[DataFrame] = {
    val dataList1 = List((1,"abc"),(2,"def"))
    val df1 = dataList1.toDF("id","Name")
    
    val dataList2 = List((3,"ghi","home"),(4,"jkl","ctrl"))
    val df2 = dataList2.toDF("id","Name","Type")
    
    List(df1, df2)

}

val dfList = returMultipleDf 
val dataFrame1 = dfList(0)
val dataFrame2 = dfList(1)

dataFrame2.show

+---+----+----+
| id|Name|Type|
+---+----+----+
|  3| ghi|home|
|  4| jkl|ctrl|
+---+----+----+

Approach 2
Using Tuple as the return value

import org.apache.spark.sql.DataFrame

def returMultipleDf : (DataFrame, DataFrame) = {
    val dataList1 = List((1,"abc"),(2,"def"))
    val df1 = dataList1.toDF("id","Name")
    
    val dataList2 = List((3,"ghi","home"),(4,"jkl","ctrl"))
    val df2 = dataList2.toDF("id","Name","Type")
    
    (df1, df2)

}

val (df1, df2) = returMultipleDf


df2.show

+---+----+----+
| id|Name|Type|
+---+----+----+
|  3| ghi|home|
|  4| jkl|ctrl|
+---+----+----+

I personally prefer the Approach 2, as it has its own advantages of using Tuple and is more flexible when compared to List.

spark

Spark Journal : Using UNION with SELECT API on dataframes

You will easily come across this use case, where you need to merge 2 separate Dataframes at one go. This is a very peculiar use case, when working with data and there are multiple ways of doing so.


Some crucial points to remember when using Spark UNION
1. Spark has no UNION ALL, it only has a UNION command
2. Spark UNION does not deduplicate the data
3. Spark does not care about the data type of columns when merging
4. Spark does not care about the sequence of columns when merging

The most easiest way of doing this, if you were from a SQL background was to use a UNION SQL command, which would merge data in both the dataframes using a SQL SELECT command itself.
However, here we are focusing on getting this same task done by UNION API on dataframes.

So, you are trying to merge 2 dataframes.
1. When the schema is exactly same

val dataList1 = List((1,"abc"),(2,"def"))
val df1 = dataList1.toDF("id","Name")

val dataList2 = List((3,"efg"),(4,"hij"))
val df2 = dataList2.toDF("id","Name")

val df3 = df1.union(df2).show

+---+----+
| id|Name|
+---+----+
|  1| abc|
|  2| def|
|  3| efg|
|  4| hij|
+---+----+

2. When the number of columns is same but datatype is different
Here the 2nd column in dataframe 1 is String, but second dataframe is Int.

val dataList1 = List((1,"abc"),(2,"def"))
val df1 = dataList1.toDF("id","Name")

val dataList2 = List((3,10),(4,11))
val df2 = dataList2.toDF("id","Name")

val df3 = df1.union(df2).show

+---+----+
| id|Name|
+---+----+
|  1| abc|
|  2| def|
|  3|  10|
|  4|  11|
+---+----+

3. When the number of columns is not same
Here the union fails as expected.

val dataList1 = List((1,"abc"),(2,"def"))
val df1 = dataList1.toDF("id","Name")

val dataList2 = List((3,"ghi","home"),(4,"jkl","ctrl"))
val df2 = dataList2.toDF("id","Name")

val df3 = df1.union(df2).show

java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.

4. When you want to perform UNION based on values in list
This will be very useful, when you don’t know how many unions you want to have and it needs to be derived dynamically
Here we iterate over a list and then union based on elements in list.

case class emptyDfSchema (NameType:String)
val idTypeList  = List(List("Name"),List("Middle Name"),List("SurName"))
var df3 = Seq.empty[emptyDfSchema].toDF


idTypeList.foreach {
    element => {
        df3 = df3.union(element.toDF)
    }
}

df3.show

+-----------+
|   NameType|
+-----------+
|       Name|
|Middle Name|
|    SurName|
+-----------+

So, these are the different ways you can use the UNION API on dataframes directly, you can also use this command with conjunction on SELECT API.

Vertica

Vertica Journal : Parquet and External tables

This is a continuation to what we Vertica External table topic mentioned in previous post here.

This post will show you on how vertica interprets parquet file partitions and how we can best use it to our use cases.

For more details on how to load a parquet file to Vertica table please refer to this blogpost.

Before digging ahead, you might want to have a look at partitioning concept of Parquet files here. This seems to exactly the information you want to know before going ahead.

Before researching on this, I was under the assumption that parquet partitions are beneficial for vertica external tables. But my understanding was incorrect, when I found the below results.
I tried a few scenarios mentioned below and was astonished to see the difference myself.

Setting up Env

Created 2 sample parquet files with sample dataset (2.7 Million records) with exact same data, but 1 being partitioned and other being non-partitioned.

Later, created 2 external tables on vertica mapping to these location of parquet filesĀ 


DROP TABLE IF EXISTS  TEST_PERFORMANCE_PARQUET_PARTITIONED;
CREATE EXTERNAL TABLE TEST_PERFORMANCE_PARQUET_PARTITIONED
(
    
    THREATRE_ID varchar(512),
    MOVIE_NAME  varchar(512),
    SHOW_SLOT varchar(512),
    TICKETS_SOLD Integer, 
    GROSS_INCOME double precision,
    TICKET_DATE varchar(20),
    CITY_NAME varchar(512)
)
 AS 
COPY FROM '/path_to_parquet/*/*/*.parquet' PARQUET (hive_partition_cols='TICKET_DATE, CITY_NAME');

DROP TABLE IF EXISTS TEST_PERFORMANCE_PARQUET_NOT_PARTITIONED;
CREATE EXTERNAL TABLE TEST_PERFORMANCE_PARQUET_NOT_PARTITIONED
( 
    TICKET_DATE varchar(20),
    CITY_NAME varchar(512) ,
    THREATRE_ID varchar(512),
    MOVIE_NAME  varchar(512),
    SHOW_SLOT varchar(512),
    TICKETS_SOLD Integer, 
    GROSS_INCOME  double precision
    
)
 AS 
COPY FROM '/path_to_parquet/*.parquet' PARQUET ;

Once the above was done, I tried different queries on both the tables as below..
Count Aggregation

select count(*) from TEST_PERFORMANCE_PARQUET_NOT_PARTITIONED
-- Time Taken 0.82 seconds
select count(*) from TEST_PERFORMANCE_PARQUET_PARTITIONED
-- Time Taken 1.34 seconds

Aggregation on partitioned columns

select TICKET_DATE, CITY_NAME, sum(GROSS_INCOME) from TEST_PERFORMANCE_PARQUET_NOT_PARTITIONED
where TICKET_DATE = '7-Sep-2019'
group by 1,2
--Time Taken 0.6 seconds
select TICKET_DATE, CITY_NAME, sum(GROSS_INCOME) from TEST_PERFORMANCE_PARQUET_PARTITIONED
where TICKET_DATE = '7-Sep-2019'
group by 1, 2
-- Time taken 1.6 seconds

Explain Plans
Seems Vertica explain plan was not really verbose here, and cost and complete plan was exactly identical here, this means. Vertica engine does not try to interpret the partitions of Parquet files here (or does not show that in the plan). Seems the partition pruning feature of vertica does not respect the partitions of parquet files.
Somehow, vertica engine takes more time to traverse through the partition of parquet files.

So, I am assuming, when I am trying to add queries on partitioned parquet files, the engine is trying to traverse every partition folder and trying to match the where predicate. this hierarchical partition of Parquet files, might be confusing vertica engine and it maybe a reason why partitioned parquet files are not really great for vertica .
Below are the scenarios tested by me on the same lines and the performance for the same.

Performance of Vertica with Parquet files in different scenarios

Partitioned and huge dataPartitioned and Small dataNot partitioned and huge dataNot partitioned and small data
Queries with where Predicate on partitioned clauseperforms fasterperforms slowerperforms lot faster (10x-20x) performs lot faster
Queries without where predicate and without aggregationperforms slowerperforms fasterperforms lot faster (10x-20x) performs lot faster
Queries with distinct on partition columnsperforms faster performs slower performs lot faster (10x-20x) performs lot faster
Queries with aggregation on partitioned columns performs slower performs faster performs lot faster (10x-20x) performs lot faster

Conclusion, the 4th Column, “Not Partitioned and Huge data” is already a winner in every scenario.
This was an eye opener for me, hopefully, the Vertica team looks into this further and asserts on these observations.