This content is a comprehensive tutorial demonstrating an end-to-end data engineering project using PySpark and DBT, focusing on building a Medallion architecture (Bronze, Silver, Gold layers) with dynamic and modular code practices.
Mind Map
Genişletmek için tıkla
Tam etkileşimli Mind Map'i keşfetmek için tıkla
This project can help you crack multiple
offers in 2025 because you will master
Spark structured streaming bypark
transformations, dynamic data injection,
integrating Python control flow and
conditionals with Pispar, building
modular code with Python classes and
utilities. handle incremental load and
up abserts and you will build your gold
layer of Medallion architecture using
DBT which will cover DBD dynamic
sources, Ginga functions, incremental
DBT models, DBT CLI, DBD properties and
configs, ephemeral materialization,
slowly changing dimensions, DBD
snapshots and much more. Do you know
what pispark and dbt are the hottest
topic right now in the world of data
engineering and by the end of this video
you will master pispark and dbt trust me
and this will eventually help you to
crack your dream role this year if you
are excited about this end to end
complete data engine project so just let
me know in the comment section and let's
get started with this amazing project so
what's up what's up what's up ma fam
happy Sunday first of all and do you
know what this Sunday is very very very
very very very special because of
another project without wasting any time
because do you know what the project
video is already recorded so I know the
knowledge that you're going to get in
this particular project video is next
level so let me just give you a quick
overview of this project that we want to
build today first of all this project is
pure opensource that means we will be
just using opensource frameworks pispark
and dbt and python so this is something
that can actually help you to ace in the
interviews because you can sit in any
interview Azure, GCP, AWS whatever
because everywhere we use Pispark, DBT
and Python pure pure pure skills and you
know what um I don't know like if you
can just interpret the depth from this
particular architecture but if I just
tell you because the video is already
recorded if I just tell you each layer
is full of full of you can say real
world scenarios. If I talk about bronze
layer. Okay, let's go with this
particular architecture. So first of
all, we going to work with pispark
streaming as our source. Okay. So we
have our source files. We have our
source and we have stored our data in
the data lake. We going to directly
start with something called as spark
structured streaming to incrementally
load our data. And do you know what we
going to create dynamic notebooks to
incrementally load data pure pure pure
Python and pispark we will be using
loops arrays and some classes as well.
So it will be very very very fun. So
this will land our data into the bronze
layer and yes the
platform will be data bricks that we'll
be using to run our spark code. The
platform will be data bricks but the
code will be pure pure pure pispark. So
you can just run that code anywhere.
Okay. Then once that particular you can
say code or basically basically layer
bronze layer is built as per the
medallion architecture this data will go
to silver layer and let me just add that
particular um arrow which is missing
it's fine real time real time
architecture. So so let's say by the way
it is matching. So this bronze layer
data will go to the silver layer and you
know what in the silver layer we going
to create Python classes which will make
our code modular so that we do not need
to build the class basically any kind of
transformations because we'll be using
classes Python classes pure Python
classes with Pispar code and these kind
of scenarios are the hottest topic right
now in the interviews because this test
your knowledge both conceptual plus
implementation both make sense and we
going to just work with Python um custom
modules or basically utilities then DBT
the core of this video basically heart
of this video basically every layer is
the heart I'm just telling you like
bronze silver gold all these three
layers are insane in this particular
video because I literally didn't realize
like what will be built in this video
but when this video is completed I was
like what man this is amazing So now
obviously we're going to build our star
schema in the gold layer and using dbt
it's not like very overview or let's say
highle dbt integration with pispark no
very much in depth you want to learn
about models about about models sources
and then ginga functions yes you going
to just use ginga templates as well then
we going to use loops if conditions then
we also going to use snapshots we going
to use you can say yaml files
personalized configurations everything
will be in the DBT pure DBT and we'll
building our dimension and fact tables
using this DBT plus we also want to
learn how we can just work with
incremental data and abserts in DBT
which is next level thing
everything everything everything is
discussed in this complete end to end
data engine project video I know you are
really really excited I know I'm double
excited because I know the quality of
content that is already created. Just
one request, just drop a lovely comment
on this video because this will help me
a lot so that I can just continue
creating more and more these types of
videos. Plus, I have a great news to
celebrate with you all. And let me just
tell you what's so now let's let's let's
actually congratulate these people who
have recently cracked the interviews in
the MNC's
in maybe whatever company they wanted
to. So many many many many many
congratulations and I also want to see
your name here
and how you can just achieve this thing
by just simply following the things that
we are discussing in the videos
discussing in the projects and just be
focused that's it that's it and I'm
literally waiting your for your comment
to be here because I would love to
feature it um I know that I will not be
able to just feature all the comments
but yeah if I am just featuring some of
the comments you can just literally feel
that okay we are celebrating wins of our
data fam and I would love to celebrate
your win as well so so so many many many
many many congratulations now let's talk
about our data okay and we have selected
an amazing data set for this particular
project and just scroll down and this is
the repo by the way analy YouTube and
just scroll down and just click on
pispark dbt project because this is a
pispark dbt project. So obviously the
folder name is same and these are all
the CSV files that you can also download
it okay in your local system because
you'll be just using these CSV files and
this is basically a kind of Uber data
set okay and we have just picked this
particular data set why because this
will give you an a sense of dimensional
data model which is basically dimensions
and fact tables make sense and this is a
kind of data modeling technique that
data engineers work with and dimensional
data modeling is one of the most popular
and most in- demand data modeling
technique that you should know. Okay,
make sense? Very good. Fact tables,
dimension tables, incremental data,
slowly changing dimensions, everything
will be covered in this particular
project video and just be with me. Pure
pispark and dbt pure. Okay, very good.
So now this is the data that you can
download. Okay. And now let's get
started with our second thing that you
should know which is nothing but your
environment. So which environment we
should use for Pispark. Obviously we can
just develop all the things locally
using our own system but I know um not
everyone will be having the laptop with
good RAM with good memory with good
processing units right so what we going
to use? We going to use datab bricks
free edition which is totally free which
is totally free. You do not need to even
add a credit card or debit card nothing
which is totally free and this
particular you can say datab bricks free
edition will help you to to just run
your pispark code that's it that's it we
just want to use a platform to run our
pispark code make sense and it is the
best platform to run pispark code and
obviously all the big data things make
sense very good so how you can also
create your free databicks edition if
you haven't created simply go on
explorer and simply type datab bricks
free edition
and hit enter and simply select try
database for free. Okay. And I have
already created this one. So let me just
open an incognito mode
maybe here. Um
Um
makes sense. So if you click here try
databix for free and then
you will see this one get free edition
instead. Do not click here because this
is just you can say express edition
which is just available for 14 days but
you want something which is available
for a longer period. Right? So simply
click on get free edition instead. When
you click here then you can just click
on sign up. Make sense? Sign up for free
edition. Simply create your account here
and that's it. Once you create your
account here, you will be able to land
on this particular page and click on
this particular thing and this is your
datab bricks homepage which is loading.
Perfect. So this is your datab bricks
homepage. Simple sorted. Very good. So I
know that you would know a little bit
about data bricks. Okay. By the way we
are simply focusing on pispark. So
that's fine. But if you want to learn
and explore in the field of data bricks,
you already know where to from where to
learn this, right? Okay, makes sense
because see data bricks is in demand and
it is actually growing a lot this
platform instead.
Make sense? So just go with the flow.
Just get the right knowledge at the
right time. And this is the right time
to gain knowledge in databicks. Make sense?
sense?
Make sense? Common sense? Okay. Okay, so
this is our databicks environment. So
first of all we need to create a catalog
because obviously um I would expect that
you should have some knowledge in
database but it's fine even if you do
not have basically just follow the steps
because we are focusing on pispark but
still you would need to handle your
metadata right so for metadata we create
something called as catalog in datab
bricks make sense do not make it
complicated it is very simple simply
click on catalog and you will see all
the cataloges available make sense these
are some of the cataloges that I created
created for my previous videos. You can
simply ignore that is fine. We will
simply create a new catalog. Click on
plus add and click on create a catalog.
And let's create a catalog called
Pispark DBT. This is our catalog name.
Simple. And then simply say create
and configure catalog and then scroll
down next save.
And our catalog is created. And this is
the catalog name Pispark DBT. Make
sense? What is catalog? Catalog is
basically the data management feature
which is equivalent to you can say
database kind of if you are coming from
SQL background catalog is equivalent to
database. Okay. And then we have schema
here. Okay. And we have default schema.
We have information schema. These are by
default two schemas. And we will also
create our own schemas for different
different layers. As you know that we
are going to follow Medallion
architecture. So in the medallion
architecture we create three schemas
bronze, silver and gold. Make sense? But
even before creating those medallion
architecture, we will create one more
schema which will be source schema.
Because our source can be anything,
right? It can be SQL database, it can be
APIs, it can be literally anything. In
our case, it will be CSV files which are
stored in the data lake. Now the good
thing is this datab bricks free edition
is automatically attached to the data
lake and what is the provider of the
data lake AWS it is already attached to
it. So we can leverage something called
as volumes okay and you do not need to
worry about what is volumes because this
is just our source and being a data
engineer you do not need to actually
worry about how to create the source you
should be worrying about how to use the
source. So in this particular section we
will be creating the volumes but it is
fine if you know about volumes. If you
do not it's also fine because you are
simply creating the source right. So
let's simply create a schema. Click on
these three dots or basically click on
create schema and I will name it as
let's say source and click on create.
So this is my schema called source and
within this source I will create a
volume. Okay makes sense a volume. Okay.
So click on the schema, click on create
and then click on volume. Make sense?
And volume name will be let's say source
source source data. Okay. And this will
be manage volume. And then click on create.
create.
Make sense? So this is my volume called
source data. Make sense? Make sense?
Okay. Very good. So this is our source
data volume where we'll be uploading all
the CSV files.
Make sense? All the CSV files. Okay. So
there are basically two ways. Either you
can upload all the CSV files here within
this volume or you can create dedicated
folders for each file. Choice is yours.
Make sense? Make sense? Make sense? Make
sense? Choice is yours. So it's up to
you how you want to just do that. Okay.
So in order to do that, I can simply
click on this particular volume and then
you can simply say create directory.
Okay. Create directory. Create directory
means you want to create a folder within your
your
volume. And what is the advantage of it?
Advantage is very very simple. Advantage
is you can simply store if you have
multiple files within the dedicated folders.
folders.
Hm, makes sense. Okay. So, it is a good
practice to create a folder for each
particular file. Okay. So, let's say I
want to create a directory for um trips
because I have trips file. Okay. I will
simply say create trips. So this is my
directory now. See trips and even if you
refresh this area you will see that
folder created here.
Uh here if you click here this is the
trips folder. Make sense? So this is my
first directory and within this
directory I can upload the data any
data. See upload to this volume. So if I
click here upload to this volume and if
I click on browse make sure that your
data will go to trips make sure that
trips is here otherwise it will uh load
all the data in the root directory
basically root location that we do not
want right so let me just click on
browse and let me just add that trips
file okay so trips dotcsv is added okay
and we have the option called override
files with the same file name if you
have any file with the same file name it
will overwrite it but we do not have the
file name. So it's fine. Click on
upload. So this file is uploaded called trips.csv.
trips.csv.
Make sense? And
whatever we have, we have trips. Then we
have vehicles. Okay. Then we have
vehicles. So I will simply click here.
Okay. I'll click on source data. And you
can see one folder here trips. Make
sense? Then I will click on create
directory and let's create a directory
for vehicles.
Okay, make sense? Now let's upload to
this volume. Same step. Let's say
vehicles. Perfect. Upload.
Okay. Sorted. Let's create another folder.
folder.
Create directory. And this directory
name will be customers.
Make sense? Very good. Customers are
here. Okay. Then let's upload to this volume.
Customers. Perfect.
Okay. Now we have drivers.
And then within this drivers, I can
Okay. Then we have locations.
And then we have locations file.
Then let's upload to this particular
volume and let's say payments. Okay. So
this is also done. So now all the
folders for our volume are done. See all
the folders are here. Why this is
helpful? As I just mentioned that if you
have more and more files and let me just
tell you we will add more and more files
in these folders because we also want to
see how we can incrementally process and
load the data. So all the things will be
covered in this particular video. Just
sit back and relax and just code. Okay.
So this is our data source that we have
created within just 5 minutes. And if
you understood this part, it is fine. If
not, it is still fine because it is it
is like very very very simple. It's not
a big deal at all. Okay. So, perfect.
So, this is basically the volume that we
have created. Okay. If you click on this
folder customers and if you click on the
CSV file, you will see that this is the
CSV file. Okay. And if you just click on
cop copy path
and if you just paste it here you will
see that this is a path to the volumes.
Okay, this is the path to the volumes
that you use in spark code as well. And
I will just show you. Don't need to
worry at all. Make sense? Make sense?
Okay. And if you just go to files, these
are basically all the files. These are
the details. And this is the managed
meta store that is created for us. Okay.
Perfect. Then permissions are here.
Okay. Perfect. So now finally we can
just create our first particular folder.
So simply click on home and click on
create and then click here to create a
folder. And folder name will be like
let's say
pispark dbt project. So this is my
folder name and within this folder we
will be creating our notebooks. Why
notebooks? Because notebooks
are the building blocks for your spark
code. Whenever you want to write code,
you always go for notebooks. Yes, you
can even create Python files, but
notebooks are better to interpret.
Notebooks are better when you want to
run your code in chunks. And this is the
recommended approach as well. Make
sense? Okay. So, let's create our first
notebook and I will call it as notebook.
And let's close this. And this is my UI.
So basically if you would not be seeing
the same exact UI, you can go to your
name, click on settings, then click on developer,
developer,
then go to the bottom and just click
here tabs for notebooks and files. Okay.
So this is basically the new feature
that we have within the database UI. So
that we can just open our notebook in
the form of tab C. This is the one
notebook. If I want to open second then
we can just create tabs and this feature
was not there before. So perfect. So
let's first of all rename our notebook
and I will name it as let's say bronze injection.
injection.
Bronze injection because we want to
ingest our data. Okay.
In the bronze layer first of all make
sense. Make sense. And do you know what
we going to work with? Dynamic
notebooks. Okay, dynamic notebooks. We
do not want to just let's say run our um
code just for like one file or basically
one source. We want to dynamically do
it. Okay. And how we can just just do
that? I will just show you. Everything
will be you everything will be done
using spark code. Pure spark code. Do
not need to worry at all. We'll be just
creating classes, loops, conditionals,
everything. Okay. So, first of all, let
me just show you how our data looks
like. So
if I go to catalog the third icon so
this is my catalog uh it's called
pispark dbt perfect and this is my
schema source okay perfect so this is my
data source data so if I want to first
of all look at the data that we have
like so many folders how we can just
first of all look at this data it is
very simple just attach your notebook
with a cluster what is cluster in Apache
Spark Apache Spark basically runs on
compute, right? Compute is the backbone
for Apache Spark. It runs with
distributed computing engines. So for
that you need clusters, right? So this
is basically an amazing feature called
serverless compute which will
automatically you can say
add more and more nodes if required and
reduce the nodes if you are not working
with more and more data. So it is like
autoscaling. You do not need to worry
about anything. Autoscaling serverless
plus serverless means you are not
managing the virtual machines. Datab
bricks is managing the virtual machines
on its end. So everything is on the data
brick side but if you do not want to
create serverless compute if you create
allpurpose compute then in that
particular scenario you will be managing
all the VMs that you do not want right
so just go with serverless and that's
fine. So now let's see how you can just
look at the data first of all. Okay,
very simple. Simply say df equals spark
let me just increase the size spark dot
read dot format. Okay. And what is the
format of our files? It is csv. Perfect.
Then we will say dot option and header
equals to true. And in pispark you
should know that we just create header
equals true. Okay. For CSV especially
because CSV do not carry any kind of
header. Basically they carry header but
they do not carry any kind of schema. So
that is why we say hey just make the
header as our schema. Simple. Okay. Then
we need to say dot option.
What else? We can say infer schema
equals true.
If we do not specify infer schema true
it will treat all the columns as string
columns because by default CSV files
will store all the information as
strings. Right? So that is why we need
to say infer schema. Make sense? Good.
Then I will simply say dotloadad. Now in
the load you would know that we always
pass the location of our file. Now what
is the location of our file? Because we
can either work with databases, we can
either work with let's say data links
and obviously in the modern world we
work more with the data links. So what
is the location of the data lake? You
actually do not need any kind of
location of the data lake. You just need
the location of volumes because volumes
is your source. Volumes are built on top
of the data links. Make sense? So how
you can just get the location of volume?
Simply click on source source data and
whatever folder you want to read. Let's
say you want to read customers data. So
you can click on these arrows and the
location will be inserted. But there's a
specific you can say
way to write the location and it is
saying by the way who will add the dot.
Okay, perfect. Spark. Perfect. So load.
So we simply first of all write volumes.
Okay. Then we write the catalog name.
Catalog name is dbt
project. Then we write the schema name
which is uh source. Okay. Then we write
the um you can say volume name. Volume
name is source data.
Then we write the location. What is the
location? It is simply a folder called
customers. That's it. That's it. Simply
run this.
Okay. Simply run this and this is a
location. This is the format how you can
just specify the location in the modern
world where we use volumes which are
built on top of your data lake. Make
sense? Okay, good. And if you do not
remember this particular thing, use the
advantage of UI. Simply click here on
the two arrows and it's fine. See, it's
exactly the same. Make sense? Let me
just rerun it. And in order to see this
data, simply use display command.
displayed here and hit enter. Basically,
shift plus enter. If you want to run
this cell, you can either click here or
you can hit shift plus enter together.
So, this is my data. See all the things
are here. And do not worry, these are
pseudo phone numbers. So, do not try to
hey, let me just call them. So, these
are basically the data for customers.
Make sense? And this is such an amazing
data set that we also have last updated
time stamp. And this is basically the
Uber data set that we have. Pseudo one,
but yeah. And this is that data frame
basically that data which is used in the
real world. That is why you are seeing
all the information like last updated
time stamp, sign up date because these
are the things that exist in the real
world. And I can say that I try to
create the projects in such a way that
you can actually highlight those in your
resume smartly and you can actually say
that hey I have built this project and
by seeing those projects. Three,
obviously it is um totally it totally
depends upon how the person is
displaying that project. But if you are
able to confidently and smartly
just showcase this kind of project, the
other person will feel that actually
this person has like worked on real data
and real project and let's actually give
you can say some more you can say
consideration on your profile or
anything because there is a myth that
you can only showcase the projects which
are built in the industry. No, if you
have built your hobby projects or
basically you can say any university
project but if those projects are of
that level those will be considered as
well. Those will be considered as well
because it is not just about creating a
very simple very uh entry-level project.
No, we are building actually a
production level the real world project.
When I use the word real world I mean
it. It's not like just adding real world
and that's it and building simple um
project. No, there will be challenges.
There will be exact environment that you
will feel in the real world. And if you
do not trust me, just talk to anyone who
is already a data engineer. Okay, the
real one, not the one who would like to
you can say criticize me. Just talk to
some real developers and just ask, hey,
I'm building the this project. Um is it
anywhere relevant to the industry or
basically in your organization that you
work with? The person will say
definitely yes. Definitely yes. Make
sense? Good. So this is my data. Okay.
And this is my data frame. And if you
have some fundamental knowledge about
medallion architecture in the medallion
architecture in the bronze layer, we try
to write the data in the as it is form.
As it is form, we do not need to apply
any kind of transformation. Why? Because
we also need to have a kind of source of
truth for our pipelines and bronze data
is the exact replica of the source.
Make sense? That is why we do not we
never apply any kind of transformation.
Never. We simply append the data as is.
Make sense? And obviously we want to
just load the data incrementally. And I
will just show you how you can just load
the data incrementally because let's say
today in the customers folder we just
have one file right customers dot csv
tomorrow there will be a new file
because obviously there can be new
customers then another then another then
another. So there can be like multiple
files right so we do not want to process
all the data because in the world of big
data we do not process all the data
every time because obviously it will be
linked to the cost right. So in the real
world we load the data incrementally.
So how you can load the data
incrementally? We will be using
something called as pispark streaming.
Yes. Do not do not do not feel like hey
we going to use pi like spark streaming
or pispark streaming like it will be
very difficult. No it will be very easy.
It will be very easy and you can
literally highlight a project in which
you have also used realtime data
processing and it's a big deal. It's a
big deal. Trust me. Okay. So now now
let's see how you can actually process
the data basic not process like
basically ingest the data incrementally.
Okay very good. So now we are all set to
incrementally ingest the data and for
that let's first of all create our
schema. Okay so let's go to catalog and
let's create a new schema within our
catalog and let's name it as bronze.
bronze.
Make sense?
Bronze schema. Perfect. Click on create.
And within this bronze schema, we do not
need to actually worry about the data
lake because obviously we'll be creating
the tables which are delta tables for
our bronze table. But we need to store
the metadata for those tables. So that
is why we'll be creating a data lake
volume so that we can store the you can
say metadata not basically metadata um
basically the checkpoint like to
identify which process which file we
need to process and which file is
already processed. This is basically the
fundamental of spark streaming and if
you do not know no need to worry I will
just show you step by step. So for this
let's create a volume and volume name
Okay checkpoint make sense. Click on
create. So this is our checkpoint
volume. Okay. So let's go back to our
bronze injection notebook. Okay. So now
what we going to do? What we going to
do? Let me just change the theme first
of all. Developer
Developer
dark mode. Perfect
bronze injection. So okay. So this is
our notebook. Okay.
So what we going to do? So this is our
display df that we have displayed the
data frame, right? And this is our
static data frame that we just wanted to
see the data frame. But what is the like
what is the other purpose of this
particular you can say batch reading
because obviously we are working with
spark structured streaming. So what is
the advantage of it? So see whenever we
want to work with spark structured
streaming we should always define the
schema for our data frame always so that
it can automatically detect hey this
column should be of this data type hey
this column should be of this data type
and so on. Okay. So there are now
basically two ways to define the schema.
is you can manually add the schema one
by one. The smarter way is when you just
write like let's say when you just write
your code for reading the data frame.
You can run one command called df do.
dots schema and if you hit and uh um if
you just simply run this you will see
this is the complete schema. You can see
that customer ID is of integer type,
first name is of string type. So
basically this is the schema that we
have got from the batch reading and
Spark has automatically inferred the
schema for this. If you are satisfied
with this schema, you can actually store
it in a data frame called let's say schema
let's say trips or basically customers.
Okay? And then you can define it schema customers.
customers.
Make sense? And if you're not happy with
the schema, let's say you want to change
one data type. Let's say um customer ID,
you want to keep it as string type,
let's say. So you can simply copy and
paste and change it manually. So this
saves a lot of time where you do not
need to define each and every column
manually. You can simply get the schema
and change the desired columns. That's
it. You do not need to worry about all
the schemas at all. Make sense? I hope
it makes sense. So we will be also doing
the same thing. So this is our schema
customers and this is for like just the
customers one. Okay. So now
let's say I want to now start my
streaming data streaming data processing
right. So how we can just do that? Let
me just add
uh text and I will simply say let's say
H3 and bold and I will make it as
Okay, spark streaming. So in this
particular spark streaming we can simply
define our data frame and we can read
our data frame using stream method
instead of batch method. So the code is
very much similar. So it says spark dot
read stream instead of read and then
format and I will simply say CSV. Okay.
And then I will say dot option header true.
true.
Then I will say schema instead of infer
schema now we need to say schema. So
schema will be my schema
customers right and this is my data
frame and I do not want to display it.
Okay so this is my data frame that we
have read using streaming method.
Perfect. Let me just run this. What will
happen? Nothing. Because of lazy
evaluation we have just defined what we
need to do. Now we want to write this
data frame into the bronze layer and we
want to create a table on top of it.
TF dot write stream
dot format and the best format is delta.
Okay. Then we can say output mode. So
now what should be the output mode? So
output mode should be append. Should be
what? Append. Why append? because we
want more files to be appended every
time we will be running this particular
stream. Make sense? So I'll simply say
append and then I will say dot option
and then I need to define the checkpoint
location. Now what is this checkpoint location?
location?
So basically checkpoint location helps
spark to know which files are processed.
So let me just tell you. So let's say
this is your spark right this is your
spark and not spark basically this is
your let's say source
okay and here is your spark
so this park engine will read the data
from the source and write the data to
the destination.
Make sense? Let's say this is one file
and one file is here. Now next day you
have another file. Let's say this is the
new file. This is a new file. Now it
needs to just process this file instead
of both the files. Right? So how spark
will know that it has already processed
this file. So that is why it creates
something called as checkpoint location.
As the name suggest it is a checkpoint.
So it will take care of that particular
spark streaming query that hey I have
already processed this green file
already process this file. So this the
file name the metadata of the file
everything will be stored here. So
before processing the next write stream
it will read the checkpoint and will say
hey this thing is already processed. So
I just need to process this particular
thing and just push it into the
destination. This is the advantage of
spark streaming checkpoint location and
we need to manage this particular
location that's why I created that
particular volume to store that
information right perfect so I will
simply say checkpoint location and what
will be the location as you know if you
go here and this is our schema bronze
and this is my location okay so far we
do not have any kind of folder in this
but we will create one folder like this
let's say
this one and within the within that I
want to create a folder called
customers. So what will be the advantage
of it? The advantage is very simple. So
within the checkpoint um volume we want
to store all the checkpoints for
customers, trips, locations, everything.
Make sense? Very good. Now I will say
dot trigger. Now this is very important.
So whenever you want to just work with
you can say streaming basically
real-time data we want our pipelines to
run in in an interval right 5 seconds 10
seconds 15 seconds or maybe 1 second 2 3
second so it's up to you how you want to
just process your data make sense it can
be 1 second 2 3 seconds or you can
simply say processing time
equals let's say 10 seconds okay you can
write like this but obviously We do not
want to process our data or basically
run our data notebooks in every 10
seconds. So I will simply say once
equals true.
What is once equals true? So what it
will do if it has processed your files
it will stop immediately stop then when
you'll be just triggering it next time
then it will read only the incremental
files then stop then next time
incremental files then stop. So this way
we can save our compute and obviously if
we are using free addition we cannot use
the compute which will be just charging
a lot to databicks because you are
learning and you do not want to create a
long bill for them. So that is why we
always use once equals to true in the
you can say environment where you do not
want to spend a lot of compute. Make
sense? Okay. But yeah in the real world
you can simply use processing processing
time 1 second 2 seconds 3 seconds 10
seconds. But here we will simply using
once equals to true. Once it is loaded
then stop. Simple. Perfect. Then at the
last we need to simply say dot2 table
and we want to create a table for this.
And we want to create a table in this
catalog. DBT
no dbt I think pi spark dbt then bronze
then customers make sense. So this is
our code that we want to write for the
spark structure streaming for
incrementally loading the data. But how
we can just make it dynamic that is the
question because if I run this code it
will simply load the data for customers.
Simple then if I want to do it for
locations then I will be just copy and
pasting it in the next cell then next
then next you will be saying yeah that's
what we do in the real world right? Yes,
but I want you to become an efficient
data engineer, a pro data engineer. How
you can just become the one? By
obviously doing the special things, not
like the things which are not being done
by everyone else, right? So the best way
to process this data that will make your
rumé highlighted or basically your
project highlighted dynamic injection.
What do I mean by dynamic injection? So basically
basically
first of all I want to just show you
what is the static thing here. The
static thing is obviously this location
obviously this checkpoint location
obviously this table creation mode and
then the schema. Let's talk about schema
at the end because that is the you can
say in which you want to just perform a
little bit of pre-work but that's fine
but how we can just make everything else
dynamic. It's very simple. So I will
simply create a list of variables
because obviously I can just show you a
lot of ways but I I just want to show
you the pispark way so that you will not
be stuck with any kind of you can say
tool that you're using. You can simply
run this pispark code anywhere else.
Okay. So I will create a code cell and
I'll simply say
um let's say entities let's say I have
these entities and I can pass the list.
I have customers.
I have I think trips,
right? Then I have locations.
Then I have m payments.
Payments or payment? Payments. Then
vehicles and customers. No, customers is
down. Vehicles.
Okay. Because I always prefer dynamic
things because dynamic data solutions
are the hottest topic right now. Every
organization or basically your hiring
manager will feel happy if you have
built dynamic solution because tactic
code is not very reliable in the in
today's world. Right? So vehicles is
done and then drivers. Okay. So this is
my list of entities that I want to
process. Make sense? Okay. Make sense?
So this is my list. Perfect. So what I
will do? I will run
run
a loop. Okay.
I will simply say for entity in
entities. Okay. And then I will remove
all these things with a variable. I will
simply say f. This is basically f string
in python. And I will simply say entity.
Entity. Okay. And here as well in the
checkpoint I will say entity so that it
will create a dynamic folder for all the entities.
entities.
And then table name as well. I want to
create a new table for each entity. So
this is done. This is very simple. Make
sense? Very good. Now the only thing
left is this particular schema
customers. So how we can just work with
this thing. Okay. So for this particular
thing you will create basically
array basically not array
schema array for each entity. So this
way you have schema customers like this.
Make sense? let's say df do.s schema
there are like basically so many ways
but I want to just show you so that you
can just perform everything in a dynamic
way everything in a dynamic way make
sense so what I will do I will simply
copy this code for the batch read okay
and I will even paste it here
and I will just show you the manual way
as well because choice is yours I want
to make everything dynamic but you can
just follow a hybrid approach manual
plus uh automatic so here I will simply
say customers
Uh here it will become entity.
we can say
So it will become schema entity.
I hope you are absorbing the knowledge
what I'm trying to do. DF batch. Okay.
And then this will become DF batch.
Perfect. So what we are trying to do, we
are simply running a loop. Okay. So
first it will go and process this batch
data and we will simply read it. Okay.
And you know whenever we just read the
data we actually do not consume a lot of
computation. Okay. Even if we have hit
the trigger because it will simply read
a few files. That's it. Then it will
simply grab the schema for it. Okay? And
then it will start the stream processing
and it will pass that schema here. And
we can just change the variable name
here which will become schema entity.
Make sense? And this way you can
actually do this processing easily. This
is your full-fledged dynamic solution
for injection into the bronze layer. You
didn't define any schema. You didn't
write any schema. You didn't define any
data types. You didn't define any
location. You didn't define any table
and you didn't copy the code. Nothing.
Just pure dynamic solution and that's
it. Make sense? That's how you build the
real world solutions.
Okay. What was the second way? If I do
not want to include this code in my
processing. So for that what you will
do? You will go here and you will keep
on adding this particular schema in
basically a list of dictionaries.
Basically this is schema customers then
schema this then schema that and blah
blah blah blah so many names. Okay. So
that particular array will look like
this. Let's say array.
array.
Okay. So this will be dictionary. Okay.
And this will be let's say name
and name will be let's say customers.
Okay. And
here will be schema. Okay.
Okay.
And schema will be like this.
schema will be this one. Okay. So this
way this is your first entity then
second then third then fourth. So you
will be keep on doing this thing. So
this will become your array. Okay. This
will become your array. And instead of
running a loop on just a list you will
run a loop on the array. And here you
can simply use the keys of that list.
Make sense? So let me just show you. I
actually deleted it. So let me just
create another one. Array. Okay.
So let's say name. Okay. Let's say name
is customers
and then schema.
So if you want to just run a loop, you
can just do like this. So for now we
have just one entity. Let me just copy
and paste it. Let me just show you just
for your understanding because I want to
know I want you to know everything.
Okay. So this is my array. So you will
run something like this for I in this
array. Okay. And if you just want to use
any kind of let's say array. Okay. So
you can simply say print
I dot okay I do.
Not array basically schema
make sense because this is a dictionary
right dictionary of two keys name and
schema. See this is a dictionary of two
two key value pairs name and schema.
These are basically your two key value
pairs right? So these two key value
pairs you can simply pick the schema.
Make sense? Because this is a
dictionary. So just print I do. schema
and let's see uh okay makes sense
because this is a dictionary not a tpple
so you will simply write like this I of
schema because each element of the loop
is a dictionary right so see that's how
you can just use a schema so choice is
yours both are fine but I just wanted to
show you more dynamic way and now you
know now you know okay perfect so I can
also remove this particular these two
cells these are not required anymore
perfect so this is my code and that we
have written. So I can now run this code
and let's see if we have any errors. We
can fix it but I don't think so we
should have. And let's run this and
let's wait for it to process. Path
doesn't exist.
Entity. Okay, makes sense because I
didn't wrap this thing into a variable.
Makes sense. Okay, now let's run this.
Path does not exist. Entity. What do you
mean? DBFS volumes uh
uh
source data. Okay, I think oh we forgot
to wrap this thing as well. Okay, so
let's do it because this was a batch
code and I can just do it. But what what
was what's the issue? We didn't add F.
That's it. Minor minor minor things
and let's wait and this alone one cell
will process our all the tables. Just
imagine the power of this particular
data. Basically, there's an error. What
is the error?
Let's see what is the error.
Okay. Is there any error or warning?
Some streams dominated before command
could finish. Okay. Query. This is this
a schema mismatch detected when writing
to the delta table. Okay. To enable
schema migration using dataf frame
writer or data frame trader, please
select option not schema true.
Okay. Okay.
Okay. So, let's first of all check the
state like what do we have currently?
Let's open the bronze and we just have
one table. Oh, I got it. I was so sure
that okay, something is wrong. We forgot
to add here entity. So, what actually
happened? It created the table name with
entity and when it was just trying to
create another table obviously it will
say hey schema mismatch is there. So,
it's fine. We can simply remove this
table. It's fine or we can even delete
it later on. It's fine. So let's run
this code and silly mistake just ignore
and just I was saying that just imagine
the power of this particular dynamic
solution. If let's say you want to
process hundreds of tables or basically
you want to create hundreds of tables
you do not need to copy and paste that
cell 100 times. Now just one cell 100
tables are done. This is the power and
this is done. See all the tables are
processed and I can also show you the
graph. This was a graph and currently it
show basically this graph is just a
representation of real-time data. This
job is finished. So that is why we do
not have anything in real time but this
is done. Let me just show you and
refresh the page. Our bronze injection
is done. It's done.
Okay. If I open pispark dbt if I open
bronze. Oh man. Yes. All these six
tables are there. You will say seven
bro. You need to ignore this one if you
remember. So all these six tables are
there. All these six delta tables are
there and we have enabled incremental
logic using dynamic notebook and you saw
how we do it in the real world. So that
is my point of creating this project as
well. Basically all the projects that I
create see I know there are a few people
who want to who love to criticize me.
Thank you so much for doing that. It's
fine. So even if someone says hey you
are just building a project which is
just the Kaggle using Kaggle data set
this data set that data set bro data
engineering is not just about data sets
data engineering is about solutions okay
so do not think like data engineering is
just about dealing with messy messy data
set no it's about solutions okay so
I would say thank you so much for saying
all those things and I I know like there
are few people that's it but I love
those people as well and that's it and I
just want you to become aware of these
things that you can build these things with
with
normal data sets as well. Yes. Okay.
Make sense? Okay. Very good. And just be
positive. Just be positive. It's fine.
It's fine. If someone is saying bad
about you just say thank you. Thank you
so much. Okay. So this was all about our
bronze layer. And our bronze layer is
ready. And if we just go and check our catalog,
catalog,
it will be having all the tables. And
let's remove this entity table because
we do not want to keep it. Okay, just
click here three dots and delete.
Perfect. Our six data sets are there.
And don't worry, you will also get all
the notebooks in my GitHub repo so that
you can refer it for your future use
case, for your interviews, everything.
But I would recommend you to create your
own notebooks. Please, please, please.
But I know sometimes you can feel maybe
not feel like let's say you are just
experiencing some errors then you can
just refer those notebooks. Okay. So
that note those notebooks are just for
the reference but please try to create
your own notebooks because it will boost
your knowledge and your confidence.
Okay. Simple sorted. Very good. Now
let's try to create our silver layer.
And now let's discuss like what we
actually need to do in the silver layer.
And we all know that silver layer means
data transformation, data processing.
But do you know what? I will be showing
you how you can create Python classes to
automate the data transformation step.
What another dynamic solution? Yes,
another dynamic solution because this
thing can actually make a difference in
your resumeum that we are going to do in
the silver layer. Okay, makes sense.
What's that? How we can just do that?
What are the things that we will be
doing it? Okay, let's see. So now let's
talk about our silver layer.
This silver layer will be special
because you will learn a lot in the
silver layer. Trust me,
I know you already know that pi spark is
the basis of data transformation, right?
So you will be learning a lot of things
including abserts as well. If you know
about absert, you should feel excited.
If you do not know about absert, then
hold on, you will get to know. No
worries at all. So let's first of all go
to our workspace and this is our
project. Let's create our silver
notebook. Okay. So let's call it as silver
silver
transformation.
Perfect. So first of all attach this
notebook with this particular cluster.
Perfect. So now
we want to transform our data. In short,
if you want to just define what is
silver layer. Silverware layer basically
transformation layer where we just apply
some transformations, we do cleaning, we
do a lot of other things as well that we
will just be telling in this particular
section of the video. Okay, perfect. So
this particular notebook will be
handling all the data transformations
that we know and let me just plug in the mic
mic
and let me check it is working. Yes, it
is working fine. So as we know that this
notebook will be just processing all the
data. Okay, perfect. And as I just told
you that this notebook will be handling
dynamic transformations as well. Make
sense? And obviously like there will be
like so many data objects within this
notebook. That's for sure. But we'll be
handling some dynamic transformations as
well. Perfect. So let's say we want to
transform our data which is let's say
how many I think we have six right? So
if I open bronze we have six tables.
Yeah. Let's do one by one. Let's say I
want to process first of all customers.
Okay. So let's say H3 or let's say H4
and then make it bold. Then let's say customers
customers
make sense. So we are just now
processing customers. In order to
process the customers data, okay, we
will be first looking at the data. How
does it look like? Okay. So I will
simply say DF equals spark read and this
time I will simply say dot table because
we have a delta table for it and simply
provide that delta table path which is
just the catalog name, schema name and
table name. So catalog name is this
spicewark dbt dot bronze dot customers
make sense
simple sorted and let's display the data
as well or let's do it in the next cell
perfect so this will simply give us the
data not a big deal
display df
and let's try to display the data as
well so as we know that Each table is
unique in terms of transformation. Make
sense? But there are some
transformations which will be oh why it
is null by the way. No returns, no rows
returned. Wow. Why why why don't we have
any data? Okay,
Okay,
we have all the things. Okay.
Maybe this one. Okay. Then let's see
Okay. This has data. Why customers
doesn't have any data? Why
customers? Okay. Makes sense.
Is something wrong with customers data?
Okay. No rows returned. Hm. Makes sense.
Let's go to our bronze injection. Let's
see what
happened wrong. Okay. So if I open this
catalog, if I open this thing, then
bronze, then this is our volume. This is
our checkpoint. We have everything for
customers. Yes. So let's do one thing.
Let's drop this particular directory called
called
customers. And let's refresh it. And
what will happen after this? We will not
be having any record for customers.
Okay, makes sense. We have the label but
we do not have any kind of you can say
checkpoint make sense so if I will be
just running this particular
notebook for one more time it should
give me the data make sense yes source
data and then I just want to make sure
like the spelling is same because that
doesn't make any sense if the data is
not added there would be something maybe
so okay volumes. This is checkpoint. And
if I go to source,
Everything is fine. Everything is fine. Uh
Uh
let's try to process the data for one
more time. Okay. And this will be a
quick test as well for our incremental
data load. So that is also good. So if I
just show you the data for let's say drivers.
drivers.
Okay. So let's do our testing for
incremental load. So if I say df dot
count, if I want to see like how many
records do I have? I have 50 records,
right? If I now run this particular
notebook for one more time, let's say
this, this, how many records should I
see in my drivers data frame? Should I
see 100 or just 50? If our logic is
right, if our implementation is right,
then we should only see 50 records
because we do not have any new file for
drivers. Right? If you see our source
doesn't have any new data. So that is
the power of item potency which is there
in our notebook. Make sense? Okay. Very
good. Now in order for customers we
should see new data because we deleted
the checkpoint for customers. So now
Spark doesn't know if we have new data or not. So Spark will treat all the data
or not. So Spark will treat all the data as new data. Make sense? If everything
as new data. Make sense? If everything is fine. Now let's try to see our
is fine. Now let's try to see our customers for one more time.
customers for one more time. If it still shows zero, then obviously
If it still shows zero, then obviously there's something wrong. Oh, now we have
there's something wrong. Oh, now we have 200. Oh, it's perfect. Maybe there would
200. Oh, it's perfect. Maybe there would be something wrong. Maybe type or
be something wrong. Maybe type or something. Display df.
Okay, now I have data. Okay, finally see how you can just test your injection as
how you can just test your injection as well. So now you know like everything is
well. So now you know like everything is fine in our injection logic because we
fine in our injection logic because we are not seeing duplicates. Okay, we are
are not seeing duplicates. Okay, we are just seeing the relevant data that we
just seeing the relevant data that we should see. Make sense? just 50 records.
should see. Make sense? just 50 records. Okay. And I can even show you
Okay. And I can even show you in the driver if you are seeing 100 that
in the driver if you are seeing 100 that means you have done something wrong. If
means you have done something wrong. If you see 50 that means everything is
you see 50 that means everything is fine. Uh drivers or drivers
fine. Uh drivers or drivers or driver.
or driver. What is the table name? Um
What is the table name? Um bronze
bronze drivers. Okay.
drivers. Okay. Now let's see 50. Perfect. Our logic is
Now let's see 50. Perfect. Our logic is fine. If you see 100, you have done
fine. If you see 100, you have done something wrong. Okay. So this is our
something wrong. Okay. So this is our data frame. So now I was saying that
data frame. So now I was saying that there are some transformations which
there are some transformations which will be very very customized and very
will be very very customized and very subject to each entity. But there will
subject to each entity. But there will be some transformations which will be
be some transformations which will be applicable to all the data frames. Just
applicable to all the data frames. Just let me know what are those
let me know what are those transformations which are applicable to
transformations which are applicable to all the data frames no matter no matter
all the data frames no matter no matter what data frame you are processing. So
what data frame you are processing. So the answer is dduplication applying
the answer is dduplication applying absurds. These kinds of transformations
absurds. These kinds of transformations are subject to only and only
are subject to only and only not only to all the data frames. Make
not only to all the data frames. Make sense? So we'll be just handling how we
sense? So we'll be just handling how we can just apply the generic
can just apply the generic transformations and instead of rewriting
transformations and instead of rewriting the code because I don't like just
the code because I don't like just performing static things. I love dynamic
performing static things. I love dynamic things. Make sense? So we'll be just
things. Make sense? So we'll be just creating Python classes for that and
creating Python classes for that and don't need to worry. So let's say I want
don't need to worry. So let's say I want to start with customers
to start with customers and you'll be learning a lot of pispark
and you'll be learning a lot of pispark functions as well. So don't need to
functions as well. So don't need to worry. Let's say display df.
Okay, perfect. We have 200 records. That's fine. So now within this
That's fine. So now within this customers table, we want to make it more
customers table, we want to make it more enriched. Okay. And we want to basically
enriched. Okay. And we want to basically transform it. Make sense? Okay. So first
transform it. Make sense? Okay. So first of all the best thing about this is we
of all the best thing about this is we already have
already have the date time column in the desired
the date time column in the desired format which is a great thing which is a
format which is a great thing which is a great thing that's fine but we every
great thing that's fine but we every time add another column to our data
time add another column to our data frame that is like more another you can
frame that is like more another you can say generic transformation that should
say generic transformation that should be applicable on all the data frames.
be applicable on all the data frames. What is that particular time stamp
What is that particular time stamp column? So basically that is called as
column? So basically that is called as processing time stamp processing time
processing time stamp processing time stamp that means it will highlight the
stamp that means it will highlight the time stamp when that record was updated
time stamp when that record was updated because in the real world we need to see
because in the real world we need to see like when this record was updated when
like when this record was updated when this record was upserted. So by that
this record was upserted. So by that time stamp we just use to like you can
time stamp we just use to like you can say filter out the records which are old
say filter out the records which are old or which are new. Make sense? So that is
or which are new. Make sense? So that is the power of that particular column that
the power of that particular column that we use. Make sense? Okay. So that will
we use. Make sense? Okay. So that will be also added in our generic
be also added in our generic transformations. Make sense? But before
transformations. Make sense? But before generic transformation, we want to clean
generic transformation, we want to clean up a lot of things here. So here you can
up a lot of things here. So here you can see that we have
see that we have here
here this particular
this particular email column. This one
email column. This one this email column. So let's say we want
this email column. So let's say we want to first of all clean this by obviously
to first of all clean this by obviously um it is by default cleaned up but we
um it is by default cleaned up but we want to understand what are the domains
want to understand what are the domains of our customers so that we can just run
of our customers so that we can just run our ads anything it totally depends upon
our ads anything it totally depends upon the requirement but we need to fetch the
the requirement but we need to fetch the domains of our email ids of our
domains of our email ids of our customers make sense how we can just do
customers make sense how we can just do that for that I will be using something
that for that I will be using something called as and let me just say it as df
called as and let me just say it as df cust just to make it more readable.
cust just to make it more readable. Okay, perfect. DF cast. So I will be
Okay, perfect. DF cast. So I will be saying df cost
saying df cost equals
equals df cust dot withidth column because this
df cust dot withidth column because this is a new column. Whenever we want to
is a new column. Whenever we want to create a new column, we use width
create a new column, we use width column. Okay, I will say domain. Okay,
column. Okay, I will say domain. Okay, and I will apply a transformation called
and I will apply a transformation called split.
split. Split function. What this function will
Split function. What this function will do? This will split our column values
do? This will split our column values into a list based on a delimeter. So I
into a list based on a delimeter. So I will say split on this column which is
will say split on this column which is called email. And what is a delimeter?
called email. And what is a delimeter? Delmare is at the rate. Make sense?
Delmare is at the rate. Make sense? Perfect. Do you know what will happen?
Perfect. Do you know what will happen? It will create a list of values. And
It will create a list of values. And each list will be having two values
each list will be having two values because there's only one split. So 0 and
because there's only one split. So 0 and one. 0 and one. 0 and one. That means in
one. 0 and one. 0 and one. That means in each list there'll be just two elements.
each list there'll be just two elements. Now we'll be just applying indexing
Now we'll be just applying indexing because we do not want the first element
because we do not want the first element of it. Just see this is a list.
of it. Just see this is a list. Okay, just imagine this is a list and
Okay, just imagine this is a list and this is the first element. This is the
this is the first element. This is the second element. So we do not want the
second element. So we do not want the first name for now. Obviously we'll be
first name for now. Obviously we'll be just uh needing this in the future but
just uh needing this in the future but for now we just need the domain. Make
for now we just need the domain. Make sense? This is our domain. So I will
sense? This is our domain. So I will simply getting which index one
simply getting which index one makes sense because zero index is this
makes sense because zero index is this one 1 index is this. So we'll simply say
one 1 index is this. So we'll simply say apply the index of one
apply the index of one make sense. So this is our
make sense. So this is our transformation and you can even see it
transformation and you can even see it if you want to let's say display dfcast
if you want to let's say display dfcast and
and we can also import all the things
we can also import all the things like all the libraries and all from
like all the libraries and all from pispark.sqlf SQL dot functions
pispark.sqlf SQL dot functions import axis
and from pipark dossql
dossql types
types import strings make sense good. So now
import strings make sense good. So now if I just show you this particular thing
you will be able to see domain column which is this one. See domain domain
which is this one. See domain domain domain domain these are all the domains
domain domain these are all the domains that I have that we have literally
that I have that we have literally extracted using pispar
extracted using pispar make sense so now if you just want to do
make sense so now if you just want to do next cleanup what is the next cleanup
next cleanup what is the next cleanup that we want to do so this is our phone
that we want to do so this is our phone number okay and you can see that there
number okay and you can see that there are so so so many things which are
are so so so many things which are actually not relevant here for example
actually not relevant here for example there are hyphens that we can remove.
there are hyphens that we can remove. There are dots that we can remove. There
There are dots that we can remove. There are brackets that we can remove. There
are brackets that we can remove. There are basically so many things, right?
are basically so many things, right? Plus one as well. We can remove X as
Plus one as well. We can remove X as well. So, we want to make this phone
well. So, we want to make this phone number cleaned so that we can actually
number cleaned so that we can actually use this column. Let's say we want to
use this column. Let's say we want to store the information in the right
store the information in the right format. Make sense? Makes sense. Makes
format. Make sense? Makes sense. Makes sense. Makes sense. So, how we can just
sense. Makes sense. So, how we can just do that? So, first of all, you have a
do that? So, first of all, you have a lot of options. Okay, one thing that you
lot of options. Okay, one thing that you can do, you can use a function called
can do, you can use a function called drag xp replace where you can replace
drag xp replace where you can replace the values. Let's say you want to
the values. Let's say you want to replace hyphen with nothing. You want to
replace hyphen with nothing. You want to replace dot with nothing. So you have a
replace dot with nothing. So you have a lot of options that you can just do. But
lot of options that you can just do. But there's a more efficient way that I want
there's a more efficient way that I want to show you and that is so I will simply
to show you and that is so I will simply write df cost equals df cost dot with
write df cost equals df cost dot with column. This time we do not want to
column. This time we do not want to create a new column but we want to
create a new column but we want to modify the existing column. So it is
modify the existing column. So it is very simple. I can simply say phone
very simple. I can simply say phone number which is the same name of the
number which is the same name of the column because I don't want to create a
column because I don't want to create a new column. So now what will be the
new column. So now what will be the transformation or basically what will
transformation or basically what will the function we going to use it will it
the function we going to use it will it is it is called reg xp replace. So reg
is it is called reg xp replace. So reg xp replace. So what we want to replace
xp replace. So what we want to replace and with like with with what so
and with like with with what so basically we want to replace everything
basically we want to replace everything which are not numbers. Okay we simply
which are not numbers. Okay we simply want to replace everything which are not
want to replace everything which are not numbers. That's it. So I'll simply say
numbers. That's it. So I'll simply say list 0 to 9. These are my numbers. And I
list 0 to 9. These are my numbers. And I do not want to keep anything which is
do not want to keep anything which is not a list. And where is that little
not a list. And where is that little cap?
cap? That reverse V. I'm just finding a key.
That reverse V. I'm just finding a key. Where is that? Oh man, where is that
Where is that? Oh man, where is that key? Oh yeah, six. So these are
key? Oh yeah, six. So these are basically the numbers 0 to 9. And we do
basically the numbers 0 to 9. And we do not want to keep anything other than
not want to keep anything other than this. So I'll simply say hey replace all
this. So I'll simply say hey replace all the unnecessary stuff with nothing. Do
the unnecessary stuff with nothing. Do not even add a space. Nothing. Okay. Now
not even add a space. Nothing. Okay. Now let's see display dfcast.
let's see display dfcast. Perfect.
Perfect. Uh what is it saying? Missing one
Uh what is it saying? Missing one required position argument replacement.
required position argument replacement. Uh oh we forgot to pass the column name
Uh oh we forgot to pass the column name I guess. So I can simply say phone
I guess. So I can simply say phone number
because we simply said hey just create this new column which is called phone
this new column which is called phone number but on which column we want to
number but on which column we want to apply reg xp relays obviously on phone
apply reg xp relays obviously on phone number. Now it's fine. Now it's fine.
number. Now it's fine. Now it's fine. See, so this is my new phone number
See, so this is my new phone number column which doesn't have any kind of
column which doesn't have any kind of you can say special characters, braces,
you can say special characters, braces, dots, alphabets, nothing. But yes, some
dots, alphabets, nothing. But yes, some numbers will be longer because in some
numbers will be longer because in some countries we have different different
countries we have different different extension codes that we have to follow
extension codes that we have to follow and that we cannot reduce. Yes, you have
and that we cannot reduce. Yes, you have to keep everything which is in the
to keep everything which is in the number format. Okay, because you do not
number format. Okay, because you do not know the rules of all the countries.
know the rules of all the countries. Some have like three to four number of
Some have like three to four number of extensions. Some have like 0ero to one.
extensions. Some have like 0ero to one. In our um country we have plus one. So
In our um country we have plus one. So it depends. Make sense? It depends. So
it depends. Make sense? It depends. So now I also want to show you one more
now I also want to show you one more thing within this particular data frame
thing within this particular data frame which is like subject to only this
which is like subject to only this particular data frame. Let's say instead
particular data frame. Let's say instead of having first name, last name like
of having first name, last name like these like these two separate columns. I
these like these two separate columns. I don't want to just create many columns
don't want to just create many columns because I do not like to having
because I do not like to having unnecessary columns. So how you can just
unnecessary columns. So how you can just apply concatenation in pispark. So it is
apply concatenation in pispark. So it is very simple. So you will simply say df
very simple. So you will simply say df cust equals df cust do withid column and
cust equals df cust do withid column and this will be a new column and you will
this will be a new column and you will say let's say full name.
say let's say full name. Okay. And here you can use concat
Okay. And here you can use concat function but I personally like using
function but I personally like using concat ws function. So what it does we
concat ws function. So what it does we simply define a delimeter basically not
simply define a delimeter basically not delimeter basically what will be the
delimeter basically what will be the character that will be added
character that will be added automatically after concatenating two or
automatically after concatenating two or more things so I'll simply say just add
more things so I'll simply say just add a space that's it and what will be the
a space that's it and what will be the function like columns that I want to
function like columns that I want to concatenate first name and last name
concatenate first name and last name perfect make sense now let's say display
perfect make sense now let's say display df cost
df cost and I can also do one thing I can remove
and I can also do one thing I can remove df.
df. Uh basically I can drop
Uh basically I can drop if I do not want to keep first name and
if I do not want to keep first name and last name because I why do I need it
last name because I why do I need it now? Because I already have that
now? Because I already have that particular full name, right? So this is
particular full name, right? So this is my full name and I have dropped both the
my full name and I have dropped both the columns.
columns. Make sense? So that's how you can just
Make sense? So that's how you can just drop the unnecessary columns as well if
drop the unnecessary columns as well if you do not want to keep it. Make sense?
you do not want to keep it. Make sense? Okay. Very good. So we have this
Okay. Very good. So we have this particular
particular data frame so far. Okay. So far so now
data frame so far. Okay. So far so now now we can just talk about some generic
now we can just talk about some generic transformations that I was just talking
transformations that I was just talking about in the beginning. Okay. So for
about in the beginning. Okay. So for that we'll be just creating a class a
that we'll be just creating a class a Python class in which we'll be just
Python class in which we'll be just creating multiple functions which will
creating multiple functions which will be generic. One is dduplication for sure
be generic. One is dduplication for sure that we're going to do it right now.
that we're going to do it right now. Second will be absort. Okay. And the
Second will be absort. Okay. And the third one will be will be um what should
third one will be will be um what should we create in generic the third function?
we create in generic the third function? Third function can be
Third function can be um what was the function name that we
um what was the function name that we talked about in the beginning.
talked about in the beginning. Okay, we will see if we want to add more
Okay, we will see if we want to add more and more things we can just add in in
and more things we can just add in in that particular generic class. Okay,
that particular generic class. Okay, makes sense. Okay, so now let's create
makes sense. Okay, so now let's create the class. But that class is special,
the class. But that class is special, right? And I want to keep that class as
right? And I want to keep that class as a utility. As a what? As a utility. So I
a utility. As a what? As a utility. So I can simply create a Python file here. I
can simply create a Python file here. I can simply say create or I can even
can simply say create or I can even create a folder if I want to. So I will
create a folder if I want to. So I will simply say folder and the folder name
simply say folder and the folder name will be let's say UT
will be let's say UT lis or basically utils. Okay, utils
lis or basically utils. Okay, utils simple. So this is my util folder and
simple. So this is my util folder and within that folder I will create my
within that folder I will create my python file and I will name it as let's
python file and I will name it as let's say
say um
um custom utils
custom utils py okay so this is our
py okay so this is our python
python file but in order to access this file
file but in order to access this file you would need to do one thing in the
you would need to do one thing in the beginning that is adding the source path
beginning that is adding the source path to the system path. What do I mean? So
to the system path. What do I mean? So you would need to write something like
you would need to write something like this. Import OS import sis. Okay. First
this. Import OS import sis. Okay. First of all, import these two libraries. Then
of all, import these two libraries. Then you need to add this path. Which path?
you need to add this path. Which path? This one. This path to the system path.
This one. This path to the system path. Make sense? And how you can just do
Make sense? And how you can just do that? Let me show you. So first of all
that? Let me show you. So first of all you will simply say current directory
equals OS dot get current working directory. Okay, this is one way of
directory. Okay, this is one way of doing it. If you will be using this
doing it. If you will be using this particular function in Python, you can
particular function in Python, you can also say OS dot I guess path dot
also say OS dot I guess path dot abapabsolute path. Okay, like this is
abapabsolute path. Okay, like this is the one os.path dotabsolute path. And
the one os.path dotabsolute path. And then you can simply say file but this is
then you can simply say file but this is a notebook so it won't work here. So we
a notebook so it won't work here. So we will simply say get working directory
will simply say get working directory makes sense. So what it will do it will
makes sense. So what it will do it will simply give you the current working
simply give you the current working directory path. See till pispark dbd
directory path. See till pispark dbd project. In some cases it is not like
project. In some cases it is not like mandatory thing but sometimes you can
mandatory thing but sometimes you can see some errors while importing the
see some errors while importing the module. So this is the workar around for
module. So this is the workar around for that. You need to add this whole path in
that. You need to add this whole path in your system path. Make sense? So I'll
your system path. Make sense? So I'll simply say OS dot not OS basically
simply say OS dot not OS basically system.path
system.path dot append and then you need to simply
dot append and then you need to simply say current directory make sense current
say current directory make sense current directory. So you need to append this
directory. So you need to append this path
path with this. So now this particular path
with this. So now this particular path is added in your notebook. Make sense?
is added in your notebook. Make sense? So now what you can do you can literally
So now what you can do you can literally write anything here. Let's say I want to
write anything here. Let's say I want to write variable 1 equals blah blah blah
write variable 1 equals blah blah blah blah. Okay. And if I want to use this
blah. Okay. And if I want to use this variable now I can just do that like
variable now I can just do that like this. Let's say I'll simply say from
this. Let's say I'll simply say from uh utils
uh utils dot custom utils
import variable one.
variable one. Uh it's import
Uh it's import no module name calls. Very good. So this
no module name calls. Very good. So this is the error that I was just talking
is the error that I was just talking about and I'm glad this error came. So
about and I'm glad this error came. So this is the error that it is saying hey
this is the error that it is saying hey you need to just
you need to just say what is the utility name. Okay. And
say what is the utility name. Okay. And we have literally
we have literally ran this particular append function.
ran this particular append function. Okay. So simply reset your cluster.
Okay. So simply reset your cluster. Okay. And then you need to simply add
Okay. And then you need to simply add this thing. So once it is reset
this thing. So once it is reset then it will take some time to start
then it will take some time to start maybe few seconds. Done. Then you can
maybe few seconds. Done. Then you can simply add it like this.
simply add it like this. Okay perfect. Then simply run this. Now
Okay perfect. Then simply run this. Now if you will just try to
if you will just try to read this file.
read this file. It should work. See now it worked. So
It should work. See now it worked. So you can simply say variable one. So this
you can simply say variable one. So this is the value. Make sense? So one thing
is the value. Make sense? So one thing is you have to add this this utility.
is you have to add this this utility. Okay, this utilities path in your system
Okay, this utilities path in your system path and then you can just use it. If
path and then you can just use it. If you are seeing this error and you are
you are seeing this error and you are not able to do this, no need to worry.
not able to do this, no need to worry. This was just you can say a best
This was just you can say a best practice. But sometimes if you want to
practice. But sometimes if you want to just go with the flow and you are seeing
just go with the flow and you are seeing the errors and you do not want to keep
the errors and you do not want to keep on seeing the errors, simply create the
on seeing the errors, simply create the class that I'll be creating in this
class that I'll be creating in this particular Python file in here. You can
particular Python file in here. You can simply create the class here at the top
simply create the class here at the top of your Python file because once you
of your Python file because once you have that particular class in your
have that particular class in your notebook, you do not need to just go
notebook, you do not need to just go anywhere else to import it. Okay, but I
anywhere else to import it. Okay, but I just wanted to show you the best
just wanted to show you the best practice as well. But sometimes, yes, I
practice as well. But sometimes, yes, I can feel your pain. Let's say you are
can feel your pain. Let's say you are not able to just get rid of the error.
not able to just get rid of the error. Fine. Simply copy the class and paste it
Fine. Simply copy the class and paste it in your notebook and that's it. Because
in your notebook and that's it. Because your aim is to learn how to create the
your aim is to learn how to create the classes, not just to figure out like how
classes, not just to figure out like how to import the classes, right? That's
to import the classes, right? That's just a best practice. Okay, makes sense.
just a best practice. Okay, makes sense. Now let's create a class. So I want to
Now let's create a class. So I want to create a class. Let's say
create a class. Let's say create class and class name will be um
create class and class name will be um transformations.
transformations. Okay, transformations make sense. So
Okay, transformations make sense. So within this class if you have some
within this class if you have some fundamental understanding we first of
fundamental understanding we first of all create you can say some class
all create you can say some class variables which are called as
variables which are called as attributes. But I don't want to create
attributes. But I don't want to create any attribute because each function will
any attribute because each function will be unique. Okay. So here what I can do I
be unique. Okay. So here what I can do I can simply say def and I want to create
can simply say def and I want to create a function called ddup.
a function called ddup. Make sense? And within the class one
Make sense? And within the class one thing that we always define is the self
thing that we always define is the self parameter. Make sense? And this ddo
parameter. Make sense? And this ddo function will obviously accept data
function will obviously accept data frame plus it will also look for the
frame plus it will also look for the list of columns on which we need to
list of columns on which we need to apply the dduplication.
apply the dduplication. Make sense? Like on which column we need
Make sense? Like on which column we need to apply the dduplication.
to apply the dduplication. Let's say we want to apply dduplication
Let's say we want to apply dduplication on one column. We can also apply
on one column. We can also apply dduplication on two columns, three
dduplication on two columns, three columns, four columns and so on. Right?
columns, four columns and so on. Right? So we will simply say
So we will simply say ddup columns.
Perfect. And this can be a list right and we can
and we can import list from typing import list.
import list from typing import list. Okay. And this will be a list basically
Okay. And this will be a list basically and this will be a data frame. And I can
and this will be a data frame. And I can say from
say from pispark.sql SQL import data frame.
pispark.sql SQL import data frame. Okay, so TF will be data frame.
Okay, makes sense. And self is just self variable. So it's fine. So whenever you
variable. So it's fine. So whenever you want to perform dduplication, what do we
want to perform dduplication, what do we do?
do? We simply use something called as
We simply use something called as window function.
window function. Okay. And ddoop is like a very generic
Okay. And ddoop is like a very generic function or basically transformation
function or basically transformation that you will do in almost all the data
that you will do in almost all the data frames. So for that I will simply say
frames. So for that I will simply say df equals
df equals df dot width column.
df dot width column. Okay. And we will be creating a column
Okay. And we will be creating a column called ddoop. Ddoop basically let's say
called ddoop. Ddoop basically let's say um ddup key. Yes. Let's create ddup key.
um ddup key. Yes. Let's create ddup key. And what this column will do? This
And what this column will do? This column will first of all create a hash
column will first of all create a hash column based on the all the columns that
column based on the all the columns that you have provided here in the D2
you have provided here in the D2 columns. Make sense? So this will be the
columns. Make sense? So this will be the hashing of all the columns that you have
hashing of all the columns that you have provided in the list. Very good. So how
provided in the list. Very good. So how you can just get the list? Very easy.
you can just get the list? Very easy. You can simply say ddup list. Okay. And
You can simply say ddup list. Okay. And you know what? You can even get this
you know what? You can even get this particular list in the form of string
particular list in the form of string really. Yes. And you can just convert it
really. Yes. And you can just convert it into a list using um eval method. So it
into a list using um eval method. So it is very easy. But let's keep it a list.
is very easy. But let's keep it a list. Uh or let's let's let's keep it a string
Uh or let's let's let's keep it a string or it's fine. It's fine. It's fine.
or it's fine. It's fine. It's fine. Okay. You should learn new new things.
Okay. You should learn new new things. Okay. So now this will be a list. Okay.
Okay. So now this will be a list. Okay. and how we can just define that
and how we can just define that particular thing and you already know
particular thing and you already know we'll be using concat
we'll be using concat make sense and what will be the column
make sense and what will be the column names column names will be a list of
names column names will be a list of ddup columns now what is this ax that I
ddup columns now what is this ax that I have used here what is this thing
have used here what is this thing whenever we want to work with let's say
whenever we want to work with let's say pispark transformations okay and we have
pispark transformations okay and we have some python classes or basically any
some python classes or basically any python capability that you want to use
python capability that you want to use so this is a list and in the concat we
so this is a list and in the concat we simply pass what we simply pass the
simply pass what we simply pass the column names in the form of string. But
column names in the form of string. But this particular list can be unpacked
this particular list can be unpacked using this axis.
using this axis. Make sense? Using this axis. So we can
Make sense? Using this axis. So we can simply unpack anything using arrix. So
simply unpack anything using arrix. So now what it will do? It will create a
now what it will do? It will create a ddup key and it will be the combination
ddup key and it will be the combination of all the columns. Perfect. That is
of all the columns. Perfect. That is fine. Once it is done then we will
fine. Once it is done then we will simply say
simply say df dot width column. basically df equals