spark

Spark Journal : Converting a dataframe to List

This one is going to be a very short article.
We will cover on how to use the Spark API and convert a dataframe to a List.
The converted list is of type <row>. We can iterate over it normally and do any kind of List operations as done on regular lists.

Suppose, you have a use case, where dataframe needs to be converted to a list. You can do it easily using the below approach
Here we use the collect and toList method in sequence.
Collect : returns all elements of dataframe as an array, so every row is returned as one element of the array here.
Quick Tip : Make sure, the data passed to collect is not huge, as collect is an operation done by driver program and consumers resources on driver node. If the data is huge, the collect method may throw OOM issue. Ideally collect is used after filter method on dataframes.
toList : converts the array to type List.

// converting a dataframe to list
val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")

val dList = df.collect().toList

dList.foreach{e=> {println(e)}}

[1,abc]
[2,def]

Go ahead and use this method, I am sure this is going to be lot much handy on your daily tasks.

Uncategorized

Spark Journal : Adding Alias for columns in bulk with SELECT API

This is more of a continuation to my previous blog, which shows how to use alias for column names when using the SELECT API on dataframes in spark.
Exploring the same, I found a good way to handle another scenario, when you are dealing with multiple columns (good number of columns) . In such cases, its not feasible to write a SELECT command with each column manually.

Instead I would prefer a programmatic way to do it, so that its easier, keeps the code clean and is readable.
In this approach ,
1. Firstly we are going to use a predefined Scala Map, which has column names as Keys and column Alias as values stored. Its going to be a default immutable scala object.
2. Secondly, the Map defined above will be used a lookup and we will traverse through each column name of the dataframe to compare /match the existing columns in Map (keys)
3. Thirdly, we will just use this final columns identified after comparison, as a list and replace the list in SELECT API using scala ascription(varargs)

val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")

val colalias : Map[String, String] = Map("id" -> "unique id", "Name" -> "Actual Name")


val aliasedCols = df.columns.map(name => colalias.get(name) match { 
  case Some(newname) => col(name).as(newname) 
  case None => col(name) 
})

df.select(aliasedCols: _*).show

+---------+-----------+
|unique id|Actual Name|
+---------+-----------+
|        1|        abc|
|        2|        def|
+---------+-----------+

Next time, you have such a task at hand, and don’t want to use the traditional way, use this smart way to replace alias of columns dynamically.

spark

Spark Journal : Using alias for column names on dataframes

If you have already referred to my previous article on using the SELECT API on Dataframes in Spark Framework, this is more of a continuation to the same.
Many times, we come across scenarios where we need to use alias for proper representation of columns in a datafrrame. I know, if given a choice, you would opt for writing a SELECT SQL statement over the dataframes and use column alias the same conventional way. Yes, this is possible with Spark Dataframes easily.

However, I am coming out of comfort zone and trying to write the complete SELECT Statement using SELECT API on dataframes. So how will you add column aliases to Dataframes, while using alias.

Approach 1 : Using WithColumnRenamed

val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")

df.select("*").withColumnRenamed("id","unique id").show

+---------+----+
|unique id|Name|
+---------+----+
|        1| abc|
|        2| def|
+---------+----+

Approach 2 : Using alias keyword

val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")

df.select(col("id").alias("unique id")).show

+---------+
|unique id|
+---------+
|        1|
|        2|
+---------+

df.select(col("id").as("unique id"), col("Name").as("Actual Name")).show
+---------+-----------+
|unique id|Actual Name|
+---------+-----------+
|        1|        abc|
|        2|        def|
+---------+-----------+

Approach 3 : Using as keyword

val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")

df.select(col("id").as("unique id"), col("Name").as("Actual Name")).show
+---------+-----------+
|unique id|Actual Name|
+---------+-----------+
|        1|        abc|
|        2|        def|
+---------+-----------+

df.select($"id".as("unique id"), $"Name".as("Actual Name")).show
+---------+-----------+
|unique id|Actual Name|
+---------+-----------+
|        1|        abc|
|        2|        def|
+---------+-----------+

Approach 4 : Using name keyword

val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")

df.select(col("id").as("unique id"), col("Name").name("Actual Name")).show

+---------+-----------+
|unique id|Actual Name|
+---------+-----------+
|        1|        abc|
|        2|        def|
+---------+-----------+

There are some more ways of doing this the efficient way.
In the next article, I will try to cover, how to add column aliases dynamically , when there are many columns that needs to be aliased.

spark

Spark Journal : Using select api on dataframe

When working with spark dataframes, you will find many instances where in you have to use SELECT statements over Dataframes. Beware, this is not the SQL Select statements over the dataframe, but using the Spark API on the dataframe object directly.
I know many people will prefer using a SELECT statements (SQL) directly over a dataframe, which is even supported by Spark, but I started doing the same using the SPARK API on dataframe objects.
If you want to know more about all the supported API with dataframe objects, please refer to this official documentation.

So, after spending almost a day and trying out different combinations, I found that there are multiple ways of doing a SELECT of columns from dataframe using a SELECT API on dataframe object.
Somehow, I don’t feel, its really documented well with examples, but maybe, its just me facing this issue as a beginner.

Lets say we build a dataframe like below for usage.

val dataList = List((1,"abc"),(2,"def"))
val df = dataList.toDF("id","Name")
df.show

+---+----+
| id|Name|
+---+----+
|  1| abc|
|  2| def|
+---+----+

Approach 1 : Using quoted column names

df.select("id", "Name").show

+---+----+
| id|Name|
+---+----+
|  1| abc|
|  2| def|
+---+----+

Approach 2 : Using $ with Quoted column names
This approach can be used further to drive much more transformations of column, will try to cover ahead.

df.select($"id", $"Name").show

+---+----+
| id|Name|
+---+----+
|  1| abc|
|  2| def|
+---+----+

Approach 3 : Using col keyword along with quoted column names.
Again, using col key word allows you to have much more transformations ahead with ease.

df.select(col("id"), col("Name")).show

+---+----+
| id|Name|
+---+----+
|  1| abc|
|  2| def|
+---+----+

Approach 4 : Using a List[String] which has all column names
This approach is very much popular, when you are dealing with a set of standard column or huge number of columns and don’t want to keep on writing the SELECT with all column names repetitively.
We can define a List[String] with the column names and the order of columns we want to view the result in and then use, map method on dataframe, the map method will iterate over a list of column names (String) and dynamically add column names in your select statement.

var colList = List("id","Name")
df.select(colList.map(col): _*).show

+---+----+
| id|Name|
+---+----+
|  1| abc|
|  2| def|
+---+----+

There are a few more of the approaches, I will try to detail them as, I learn them in depth.
I will cover the next article, which will be more of an extension to this and cover more api that get used very frequently.

spark

Spark Journal : Building Empty DataFrame

I recently started learning Scala language, along with the Spark framework, when working on our big data stack. Having not much experience with Java, it is a challenge to learn the fundamentals, but I am still learning and its a long way to go.

Publishing small bits of useful information to everyone, so that beginners like me, can find it useful.

So the task in spark was to create empty dataframes (Won’t go into dataframe details, for now, as even I am learning the stuff). If you have worked with Pandas framework in Python, you should be acquainted with Dataframe term.
If you still don’t understand it, for quick understanding, think of it as a 2 dimensional table, which stores the data in memory (can be extended to disk).

Creating a dataframe, this task is usually done by reading the data files in whichever format. But we had to create an empty dataframe, for this we used the below approach.

Using Case class
Case class, this is very frequently used construct in Scala language. we will use case class to define the schema of dataframe.
Here we are using Seq keyword, which means we are asking to create a empty sequence in scala and then convert it into the schema mentioned in case class, finally converting to a DataFrame, using the toDF method, which is spark Framework API for creating dataframes from Sequences, List.

case class model (id : Int, Name : String, marks : Double)
val emptyDf = Seq.empty[model].toDF
emptyDf.show

+---+----+-----+
| id|Name|marks|
+---+----+-----+
+---+----+-----+

So, you can define the schema, you need and then create an empty dataframe as above.

Powershell

Powershell Journal – For Loop in a single statement

Recently I was given a task for writing a powershell script which does some DELETION of blob objects, but the challenge was to keep this script minimal and mostly limited to a one liner.

I know, you won’t call a one liner as a script, but instead a command.
So, I took this challenge and started dewlling into “How we can iterate over a list of objects in a single line with Powershell

Yes, I know its possible by just writing a the for loop syntax in a single line like this, but we are going to do it using pipe operator. So we cannot use a regular For Loop syntax in this case.

We can do it 2 ways.

Lets suppose, we are trying to list files in a directory

D:\Navin\test> Get-ChildItem .

    Directory: D:\Navin\test

Mode                LastWriteTime         Length Name
----                -------------         ------ ----
-a----        8/27/2019   9:47 PM              0 test1 - Copy.txt
-a----        8/27/2019   9:47 PM              0 test1.txt
-a----        8/27/2019   9:47 PM              0 test2.txt
-a----        8/27/2019   9:47 PM              0 test3.txt

Based on the output, we can use ForEach-Object along with a pipeline operator here to pass the result of first command to second command as input. At the same time, you can refer to $_. as the current element of list, while iterating in loop.

D:\Navin\test> Get-ChildItem  | ForEach-Object {write-host $_.Name}
test1 - Copy.txt
test1.txt
test2.txt
test3.txt

Other way of doing this is by using a % symbol instead of ForEach-Object, this does exactly the same thing, but is a more shorter version.

D:\Navin\test> Get-ChildItem  | % {write-host $_.Name}
test1 - Copy.txt
test1.txt
test2.txt
test3.txt

That is how we can iterate over a result (list) in a single line of powershell.
Go ahead and explore this by yourself, you will be amazed to know, how easy it is.

Troubleshooting

How to delete Azure BLOB Snapshots using powershell

Recently, we came across a scenario, where the HDInsight Cluster was connected to an Azure blob and the spark jobs were failing, when deleting the parquet files, for which snapshot was already present.

Azure Snapshots, are just in time backups created based on the blob files.
We were unable to track on how these snapshots were created, but the important task at hand was to delete the snapshots, so that spark could continue processing regular parquet files.

Identifying and deleting the snapshots manually would have taken ages, as its a very cumbersome process, when done through Azure Storage Explorer.
I did some research on automating this effort, as there are already client libraries available for Azure BLOB Storage with Java, Powershell, Python.
As I am efficient with Powershell and Azure Powershell is more native to Microsoft Cloud stack, I decided to go with using Powershell to automate this effort.

If you are starting new with this, I would recommend going ahead with reading this post first.

Pre-Requisites
1. Powershell 5.0 and above
2. Azure Powershell Module
3. Azure Storage account name and key for access through powershell

We start with creating a context, this is more of establishing a connection with Azure Storage account.

$StorageAccountName = "dummyaccount"
$StorageAccountKey = "xxxx"
$ContainerName = "containername"
$BlobName = "fact" 
$tx = New-AzureStorageContext -StorageAccountName $StorageAccountName -StorageAccountKey $StorageAccountKey

The next cmdlet Get-AzureStorageContainer, gets connection object to the container, you can many containers inside a storage account, we can connect to a single storage container using the below command.

$blobObject = Get-AzureStorageContainer -Context $Ctx -Name $ContainerName

Now that you are connected to the container, we can query the container to check all the BLOB files as below, the function ListBlobs, takes Blob name (this can be a directory or path to a directory OR the prefix of the blob name) and boolean parameter, which is used for checking the flat structure and not the hierarchical structure. So in the below case, it will only list all files and folders under the mentioned bloc and not its sub directories.

For more information, refer to this page

$ListOfBlobs = $blobObject.CloudBlobContainer.ListBlobs($BlobName, $true, "Snapshots")

For deleting the snapshots found, from the above command, we loop over the result set and call a DELETE method over the blob, which is an actual snapshot.

foreach ($CloudBlockBlob in $ListOfBlobs)
{
  if ($CloudBlockBlob.IsSnapshot) {
    Write-Host "Deleting $($CloudBlockBlob.Name), Snapshot was created on $($CloudBlockBlob.SnapshotTime)"
    $CloudBlockBlob.Delete() 
  }
}

This is how you delete all the snapshots under a specific BLOB folder or a blob file directly.

Troubleshooting

How to resolve GPG keys issue for Azure CLI ?

Today blog is a part of troubleshooting series.

We use Azure-CLI to transfer files from our Linux VM to Azure File Share storage account. This is a very typical use case, where we want to transfer files from our VM’s to Azure Storage and typically to Azure file Share account.

More about AZURE-CLI can be found here

Recently we found our apps failing with the below Error.

While initializing the apps, we had the Azure-CLI installation steps mentioned here in docker files.

Downloading packages:
Delta RPMs disabled because /usr/bin/applydeltarpm not installed.
[91mwarning: /var/cache/yum/x86_64/7/azure-cli/packages/azure-cli-2.0.66-1.el7.x86_64.rpm: Header V4 RSA/SHA256 Signature, key ID xxxxxxxx: NOKEY
[0mPublic key for azure-cli-2.0.66-1.el7.x86_64.rpm is not installed
--------------------------------------------------------------------------------
Total                                               33 MB/s |  38 MB  00:01     
Retrieving key from https://packages.microsoft.com/keys/microsoft.asc

The GPG keys listed for the "Azure CLI" repository are already installed but they are not correct for this package.
Check that the correct key URLs are configured for this repository.


 Failing package is: azure-cli-2.0.66-1.el7.x86_64
 GPG Keys are configured as: https://packages.microsoft.com/keys/microsoft.asc

Reason:

It seems the while following the installation steps for Azure CLI we followed the steps as below

Step 1 : We download the GPG key from Microsoft, as this rpm needs to be downloaded from Microsoft repository.

It seems the while following the installation steps for Azure CLI we followed the steps as below

sudo rpm --import https://packages.microsoft.com/keys/microsoft.asc

Step 2 : Create local azure-cli repository information.

sudo sh -c 'echo -e "[azure-cli]\nname=Azure CLI\nbaseurl=https://packages.microsoft.com/yumrepos/azure-cli\nenabled=1\ngpgcheck=1\ngpgkey=https://packages.microsoft.com/keys/microsoft.asc" > /etc/yum.repos.d/azure-cli.repo'

Step 3 : Install with the yum install command.

sudo yum install azure-cli

These steps were followed daily in an automated way, but, suddenly we found the Step 3 command started failing yesterday.
When digged further, we found the GPG keys that was downloaded from Microsoft changed and was not matching with the current version of GPG for azure-cli rpm downloaded as a part of Step 3.

Resolution
Our Architect helped us to find a resolution to this issue, by disabling the GPG check in Step 2.
As a part of this resolution, every thing remains same as a part of this installation of Azure-cli, except for Step 2

Step 2: Create local azure-cli repository information.

sudo sh -c 'echo -e "[azure-cli]\nname=Azure CLI\nbaseurl=https://packages.microsoft.com/yumrepos/azure-cli\nenabled=1\ngpgcheck=0\ngpgkey=https://packages.microsoft.com/keys/microsoft.asc" > /etc/yum.repos.d/azure-cli.repo' 

After this when you run Step3, the installation goes fine.

Maybe this is not the most perfect solution, but this helped us to get away with the problem.

Troubleshooting

Troubleshooting series

This series is going to be help you troubleshoot issues related to Big Data stack.

This series will include a good number of troubleshooting instances faced by me during daily implementation cycles with respect to technoliogies like Spark, Azure, K8, Docker , Powershell, Microservices, Python, Vertica and much more….

We will have the blog posts named as tbit{#}, which will denote troubleshooting bits and the seq number associated to it.

Stay tuned to this series for meaningful insights and faster solving of your issues.

Vertica

How to build a calendar using Vertica SQL (bit 5)

This post is a part of Bit Series blog.

Every data-warehouse project needs to have calendar table as a dimension, which hosts all possible combinations of dates. This table can be used as a dimension or in many cases to derive the dates.

With this post, we will be seeing on how we can generate a calendar table directly using Vertica SQL. The Vertica SQL will be using a special Time series function which is inbuilt inside vertica.

The time series function of vertica enables us to perform analytics based on time-series and gap filling. You can get more information about it here.

For now we will stick to this user of generating the calendar using this function as below

CREATE TABLE
    calendar AS
SELECT
    slice_time                       AS actual_timestamp ,
    CAST(slice_time AS DATE)         AS DATE ,
    date_part('WEEK', slice_time)    AS week,
    date_part('YEAR', slice_time)    AS YEAR,
    date_part('MONTH', slice_time)   AS MONTH,
    date_part('DAY', slice_time)     AS DAY,
    date_part('QUARTER', slice_time) AS quarter
FROM
    (
        SELECT
            '2019-01-01 00:00:00' AS sample_date
        UNION
        SELECT
            '2019-12-31 00:00:00' AS sample_date ) a TIMESERIES slice_time AS '1 DAY' OVER(ORDER BY
    sample_date::VARCHAR::TIMESTAMP)

So, what are we doing here
1. We take 2 sample dates, between which we need to fill the gap, suppose you want the calendar to be for year 2019, take the date as shown above in subquery
2. We are using 1 vertica function here, TIMESERIES , slice_time is just an alias here. TIMESERIES is going to generate the dates based on interval specified, in this case ‘1 DAY’
3. Lastly the TIMESERIES function will always require a ORDER by operation on timestamp column. Here we are just sorting the data based on timestamp generated.
4. Once we get a daily timestamp, we are just using standard date functions to derive various date time properties of that timestamp, like day, week, quarter, year, month, etc.
5. Finally, we are just storing the data from the select statement in the CALENDAR table

You can check the results of this table, and it can be used easily as dimension table for Calendar use case.

Go ahead and try generating this data at your end, there are many possibilities, where you can use this data.