Hang tight while we fetch the video data and transcripts. This only takes a moment.
Connecting to YouTube player…
Fetching transcript data…
We’ll display the transcript, summary, and all view options as soon as everything loads.
Next steps
Loading transcript tools…
How to build an automated data pipeline using Airflow, dbt, Postgres, and Superset (Windows 11 WSL) | Calvin Yoon | YouTubeToText
YouTube Transcript: How to build an automated data pipeline using Airflow, dbt, Postgres, and Superset (Windows 11 WSL)
Skip watching entire videos - get the full transcript, search for keywords, and copy with one click.
Share:
Video Transcript
Video Summary
Summary
Core Theme
This content details the process of building a real-time weather data pipeline, from data ingestion and storage to transformation and visualization, all orchestrated within Docker containers and automated using Airflow.
Hey, this may not look like much right
now. What you see here is a dashboard
reporting the temperature in almost real
time. The data comes from
weatherstack.com that provides weather
data for a particular location of your
choosing. For this setup, I have a
Python script that retrieves this data
and it runs on auler set by airflow and
it inserts the records into a Postgress
database. The architecture looks like
this where live data gets extracted by
Python and transformed using dbt and
loaded into postgress. This is all
automated using airflow and the data is
visualized using superset. Now the other
feature of this project is that
everything runs inside docker containers
which means that all of these tools are
running on an external server and none
of it is installed into my computer. If
you stumbled across this video, this is
just the first version, and I do plan to
make improvements to the tutorial and
explanations. But if you're just
starting out on your data journey, I
hope I can give you at least a little
bit of exposure to all the various
things within this ecosystem. And if
you're more experienced with some of the
tools that I'll be demonstrating here,
then I am looking for and will
appreciate any feedback that you have
since everything here is still in the
development phase. and I do want to make
this production ready. So without
further ado, let's get
started. Let's search for Visual Studio
Code and I'm going to click on the first
link. Then I'm going to download for
Windows. The download has
started and let's open the .exe
file. I accept. Next, I'm going to
install it into my programs folder. I'm
going to create a program shortcut in
the start menu.
I don't need a desktop
icon. I don't need these. I'm okay with
these two. Next. And
install. And let's finish. And now we
have Visual Studio Code. I'm going to
check a few things. I'm going to go to
open a remote window on the bottom left.
Click on
WSL. And it should have installed the
extension. So I'm going to click on
extensions. And let's see what version
we have. We have 0.99, which is the
latest. Let's go to the bottom left
again, open a remote window, and connect
to WSL using DRO. It says no WSL distros
found. Let's go to the bottom left
again, open a remote window. Let's click
on connect to WSL. And here we should
see a button, add DRO. I'm going to
click on that. and I want to install
YUbuntu version 24. Click on that. Let's
give it a few minutes. Now, since I'm
creating a new instance of Linux in my
computer, it's going to ask me to create
a username and password. So, my username
is going to be Calvin, and I'm going to
create a password and retype the new
password. And I am now inside my Linux
instance. Let me clear the
terminal. Now I can exit out of this
Linux instance by typing exit. And now
you could see that my location has
changed to my local system. I can log
back into my Linux instance by going to
open a remote window, clicking on
connect to WSL using DRO and then
clicking on the instance that we just
created. Now look at the bottom left. We
are now remote connected to our Linux
instance. Let's go into explorer, open
folder, and you see that we're currently
in our user folder within the Linux
instance. Click okay. Do I trust the
authors? Yes. Now, these are the files
in our user directory. You can imagine
we're starting with a blank desktop. So,
I can open a new terminal. This tilda
means that I'm currently in the user
directory right here. I can go ahead and
make a new folder. I'm going to call it
repos. And this is where I'm going to
host all of my projects. You could see
it right here. Or I could use DSC to
click and create a file or a folder and
just name it right here. You can also
access this directory through your file
explorer by searching forward slash
slwsl dollar sign slash. And that's
going to take you to the Linux instance.
And you can see in home you have your
user file which has all the contents
that we see here. Let's exit out of this
and let's finish setting up our Linux
instance. We want to make sure all of
the packages inside of this instance is
up to date. So I'm going to do
pseudoapp update enter. I'm going to
type my
password and then I want to upgrade some
packages. So, I'm going to do
pseudoabt full minus upgrade. Enter. I'm
going to hit Y to continue. Enter. And
let's give it a minute or two to finish
upgrading. Now, the upgrade is complete.
I'm going to clear this terminal. And
I'm going to check for a few things.
Python 3 minus version. I see that
Python came already installed with
Ubuntu. And I'm going to check pip minus
minus version. And I see that pip does
not exist. So I have to install that.
Pip is a Python installer package and
it's going to help me install any Python
libraries that I'll need for my
projects. I'm going to do pseudoapp
install Python 3 minus pip. Enter. Yes
to continue. Let's give it a minute or
two. Now I'm going to check pip minus
minus version. And I see that it's been
successfully installed. So that finishes
setting up my Linux instance. This is
docker.com and I'm going to download the
Docker desktop for Windows AMD 64.
Docker is kind of like the app store for
many popular tools that are used in
software development. Let's open the
.exe file. Click yes. And I don't need a
shortcut. Okay, let's give it a minute
or two. And now I have Docker installed
on my Windows. If I open Docker, it's
going to take me here. I hit accept. I
already created an account so I can log
in with my Gmail. I'm going to skip the
survey. And you probably noticed on my
screen as it was installing, it's
created some backend folders in my Linux
instance because Docker on my Windows
can actually communicate with my Linux
instance. And you could check by going
into settings, resources, WSL
integration. And you see that it's
detected our instance. So I can enable
the integration by toggling the switch.
Apply and restart. I can X out of this
and I can X out of this. So now I have
Docker set up. This is weatherstack.com.
This is where we're going to get
realtime weather data. If we look at the
pricing, we will see that with the free
plan, we can make 100 requests a month.
Unlike the other paid plans, we do not
get full historical data and we can only
request weather data for one location.
If we go into documentation, we can see
what the API request should look like.
And if we keep scrolling down, then we
can see the structure of our response.
We have things like the name of the
location, the local time, we have the
weather description, we also have the
temperature, and the humidity. So, let's
go ahead and sign up for a free account.
We're going to select the free plan.
Fill in your details here and click on
sign up. Then, when you log in, you get
your API access key. And you can see
I've already made 59 calls this
month. Okay, I'm going to go ahead and
clear my terminal and let's set up my
project folder. I want to call this
weather data project. And inside this
folder, I'm going to create another one
called API request. Now, I'm going to
make a Python file called API
API
request.py. And here, I'm going to
create a function that will let me fetch
data from the API URL. So, let's say def
fetch data. And I'm going to do
requests.get API URL. And the output of
this I'm going to save as response and
then I'm going to print the response.
Now I need two things here. One is I
need the request library for this line
to work. And the second is I need the
API URL. So in my terminal, let me see
if I have the request library. I'm going
to do pip show requests and it is
available. So up here, I'm going to
import requests. And the second thing is
I need my API URL. So I'm going to go
into weather stack. And I'm going to
request. I'm going to wrap this in
quotes because it's a string. And I'm
going to squish this because it's a URL.
Just like that. Going back to my
dashboard in weather stack, I'm going to
copy my API access key and paste it
right there. And then finally, let's
call the fetch data function. I'm going
to save this. I'm going to cd into the
API request folder. So let's see where I
am right now. If I do ls, then I can
list all the content of my current
directory and I can cd into repos. then
cd into weather data project and then
API request. Now I'm going to do python
3 api request py to run this file and I
get a response of 200 which is
successful. At the end of response I'm
going to do json parenthesis so that I
can parse through this JSON save and run
the file again. And now we could see the
contents of this response JSON. So let's
finalize this function. I want to
implement a try accept block so that I
could be notified of the status whether
it was successful or not. Under the
response I'm going to do raise for
status which is a method of the request
library that raises an exception for 400
or 500 HTTP errors. Ultimately, it is
the response.json that I want as the
output of this function. So, this is
what I'm going to return. And then I'm
going to print API response received
successfully. If it does pass this check
and for the accept I'm going to do
requests do exceptions dot request
exceptions or singular as e where I'm
going to rease the exception and I'm
also going to log as an f string an
error occurred and the variable e. I'm
going to clean up the formatting. And I
could also clean up the API URL by
creating a variable API key, turning the
URL into an F string, cutting out the
key Ctrl X, inserting the variable API
key right there, and then pasting my API
key up here. Save. And if I run the file
again, then I get API response received
successfully. As an afterthought, maybe
right before the try block, it'd be a
good idea to say something like fetching
weather data from weather stack API just
so that we know that we've started
running this function. And I'm going to
save this. And since I know that I'll be
using the output of this function, I'm
just going to create something like mock
return this
object so that we can simulate getting a
successful response without making
additional API calls. So I'm going to
comment this out for now. Hey, it's me
from the future. You don't need to
comment this out. You could just leave
the two functions and delete this call.
Just like that. And now I'm just going
to clear this terminal. Inside the API
request folder, I'm going to make
another file and call it insert records.
py. And I want to see how I can access
this function in the API request file
from this new insert records file. I can
do from API request which is this file
import the mock fetch data function like
that and then I can call the mock fetch
data function like
this and then I can print the results of
this function save and I can cd into the
API request folder which I'm in and I
can do python 3 insert records py Y
enter. And now I have data from another
Python file. And this idea of importing
functions from other Python files is
really going to help me out with
organizing this project. Now let's talk
about how to save this record in a
database. The database we're going to
use is Postgress. And I'm going to
create a new file under the project
folder and call this docker compos.yaml.
And this file is going to contain code
that will allow us to access and
configure the apps that we need. So I'm
going to start with services and the
first service I'm going to call is DB
and I'm going to call the service
container with container name as
Postgress container. Now I need the
image for this service. Let's go to
docker desktop and if the icon's
unresponsive I can go to task manager
and look for
com.doccker.backend. rightclick and end
task. And I'm also going to look for
docker desktop. I'm going to rightclick
and end task. Now let's click on docker
desktop again. And now it opens up. So
I'm going to search for the image
postgress. And here I have the image
name and here I have the tag or the
version. I see that the latest version is
is
14.17. So in docker compose I'm going to write
write
postgress 14.17.
Now for the ports, I want my local Linux
system to communicate with Postgress
which lives inside the Docker server. So
I'm going to bind the local port 5000 to
Docker ports 5432 which is what
Postgress uses. And the convention is
usually to match the port numbers, but
I'm just using 5000 to help us see
patterns later on. Now I'm going to
overwrite some variables that Postgress
has. So I set up the environment and
those variables are Postgress DB,
Postgress user and Postgress password.
For Postgress DB I want my database to
be called DB. And for user and password
I can use whatever credentials I want.
So DB user and DB password. Now I want
Postgress to save its files and tables
to my local system. So I'm going to set
my volumes and I want docker to create
this folder in my local which is going
to be inside the project folder
Postgress data and I want docker to
mount it as its own var lib
postgresql data. Now let's save this.
I'm going to open a new terminal and I'm
going to run docker compose up and you
could see that the Postgress image was
pulled and our database was created and
the initialization process is complete.
Now I'm going to open a new terminal.
I'm going to cd into the project folder
and I'm going to run docker compose
execute and I'm going to execute the
service db and within the service I can
use the postgress command psql minus u
for the user and minus d for the
database we want to access. Now at this
stage you might get docker permission
denied and this is because group
memberships don't carry over to a new
terminal session unless we log out. So
in that case you can refresh your group
membership for this new session with new
group docker and then you should be able
to execute postgress. So now inside
postgress if I do forward slashl then
this is a list of all the databases that
I have in postgress and we see the one
that we just made db. If I do forward
slashdn then this is a list of all the
schemas or data sets that exist inside
of the db database. What if I wanted to
check the other database? I could do
forward slash C with the name of the
database I want to access. So let's do
Postgress. And then here if I do forward
slashdn, then now I have the schemas or
the data sets within the database named
Postgress. So let me return back to the
DB database. And then if I do forward
slashdt and then
public.star, then this should list all
the tables that exist within the public
schema. But right now we don't have any
tables. Okay. So let's open a new
terminal and go to insert records so
that we can start creating functions to
save data from the API request into our
database. Let's remove this line. And
the first thing we're going to do is
create a function called connect to DB.
I'm going to print connecting to the
PostgreSQL database. And I'm going to
set up my try and accept block. Now how
do we connect to Postgress? We're going
to use a library called
psychopg2. Let me see if this is
available. I'm going to do pip show
psychopg2 and it says package not found.
So we're going to install this by doing
pip install psychoppg2 minus binary. And
we get an error that essentially says do
we want to install this package for our
whole system or do we just want to
install it for our project? And I just
want to install it for my project. So I
want to create a virtual environment
which is an isolated folder where you
can install a bunch of packages that
will be confined only within your
project. So I want to cd into the
project folder. And this is where I want
to create the virtual environment. So
I'm going to do python 3 minus m venv
and then the name of my virtual
environment folder which I just want to
call venv. It says the virtual
environment was not created because I
don't have this package. So I'm going to
do pseudoappt install python 3.12 minus
vend. I'm going to enter my password.
And now let's try to create the virtual
environment. Here's my virtual
environment folder. And we can activate
it by doing
source.v. And if we open this folder, we
have bin and the activate file. So bin
activate enter and this means that we
have it activated. So inside the virtual
environment I can go ahead and pip
install psycho pg2 minus binary and then
if I do pip show psychopg2 or let's do
psychopppg2 minus binary then we see
that it's been installed. So back to the
insert records file, I'm going to import
psychoppg2. And then in the try block,
it has a method called connect. And it's
going to connect with the following
parameters. Host is local host, port is
5,000, DB name is DB, user is DB user,
and password is DB password. And these
are values that we determined in the
docker compose file. The output of this
I'm going to save as con for connection.
And I'm going to print con in the accept
block. I'm going to use psychoppg2's
error as e and then print as an fstring
database connection failed and pass the
e variable and then raise the error. I'm
going to call this function. Save this.
Let cd into the API request folder and
run this file with python 3 insert
records. py. And here we have the
connection object. So this is what I
want to return as an output of this
function. So I'm going to change this
print to return. Now the next thing we
have to do is create a table. So let me
remove this line and I'm going to create
a function called create table. And this
is going to depend on the input con
which is our connection object from the
previous function. I'm going to print
creating table if not exist. Then I'm
going to set up my try and accept block.
And in the try block I'm going to use
con which has a method called cursor
which allows us to do things inside
Postgress. I'm just going to initialize
this as cursor. And then I'm going to
use cursor to execute some SQL. Here's
my triple double quotations up here and
down here. And then I'm going to do
create schema if not exists dev
semicolon. And inside dev, I'm going to
create a table to store data from the
API request. So I'm going to do create
table if not exists. Dev raw weather
data. Now inside parenthesis I'm going
to create the column definitions. So I
want ID which is going to be the serial
and the primary key. I want city as
text, the temperature as float, the
weather descriptions as text, the wind
speed as float, the time as time stamp,
the inserted at as timestamp to default
as now, and the UTC offset as text. I'm
going to add a semicolon at the end of
the parenthesis and then after execute,
I'm going to do con.t.
Then I want to print table was created.
In the accept block, I'm going to use
psychopg2's error as e and print as an
fstring failed to create table and pass
the e variable and raise the error.
Let's test this out by calling the
connect to db function saving the output
as con and then calling the create table
function with con as the input. Let's
save this. I'm going to run this file.
Enter. And it says table was created.
Let's go back to the Postgress terminal.
Let's do forward slashdn to see that the
dev schema was created and then forward
slashdt dev.star to see that the raw
weather data table was created. And now
I could do select star from dev.t raw
weather data semicolon to see that the
table structure was created. Now
obviously there's no records yet, but
now we're going to insert the record.
Now I'm going to create a function
called insert records and the input's
going to be a connection object and data
which I get from the API request. So
first I'm going to print inserting
weather data into the database. I'm
going to set up my try and accept block.
In my try block I'm going to use the
connection object and initialize the
cursor just like we did before so that
we could do
cursor.execute some SQL. Here's my
triple quotations and my SQL is going to
be insert into dev.w weather data and in
parenthesis I'm going to list all the
columns. So city, temperature, weather
descriptions, wind speed, time inserted
at and UTC offset. I don't need the ID
because this is autogenerated by
Postgress. And at the end of the
parenthesis, I'm going to insert the
values which come from the source for
city, temperature, weather,
descriptions, wind speed, time. The
value for inserted out is going to be
now. And UTC offset is going to come
from the source. Now the SQL is only the
first input to this execute function.
The second one is going to be inside
parenthesis the location of these
values. So the city is going to come
from data location name and the
temperature is going to come from data
current temperature. But looking at the
weather stack documentation, seems like
everything we need is going to be in
location and current. So to clean this
up, I'm going to set data current as
weather and I'm going to set data
location as location. That way I can
replace this with location and I could
replace this with weather. The weather
description is going to come from
weather weather descriptions and the
zero index. The wind speed is going to
come from weather wind speed. The time
is going to come from location local
time. And then the UTC offset is going
to come from location UTC offset. After
the SQL, I'm going to do
con. And then I'm going to print data
successfully inserted. In the accept
block, I'm going to use psychopg2's
error as e and I'm going to print as an
fstring error inserting data into the
database and pass the e variable. And
then I'm going to raise the error. Now
let's test this out by bringing these
two lines down here. I want to call the
insert records function with connection
and data as the input. And then my data
is going to come from the mock fetch
data which we've imported earlier. So
paste that here with the parenthesis.
Let's save. Let's go into our third
terminal and then run the file. And we
have data successfully inserted. Let's
go to Postgress and let's select star
from raw weather data. And here we have
that record. So all of these functions
work and let's package everything
together. So I'm going to make a
function called main. Set up my try and
accept block. I'm going to insert these
lines into the try block and I'm going
to do
except exception as e and print as an
string. an error occurred during
execution and pass the E variable. And
then after all of this, regardless of
whether it succeeded or not, I'm going
to do finally if the variable connection
is in locals and I want to close the
connection and print database connection
closed. Let's save this. In Postgress, I
want to clear this record by doing truncate
truncate
tabledev. Weather data semicolon. Let's
select star again. The record is gone.
Let's go to our third terminal and let's
go to our insert records file. Let's
call the main function and let's run
this file. Everything ran successfully.
Let's go back to Postgress. Let's select
star. And we have our record. And so
we've completed with all the functions
here. So I'm going to delete this line
and save this file. Now how do we
automate this so we don't have to
manually run the file each time? We're
going to use a tool called Airflow.
Let's open up a new terminal. And this
service I'm going to call AF. I'm going
to call this service container as
airflow container. And what about the
image? Let's go to docker desktop. Let's
search for airflow. And here we have the
image as well as the tag. I'm going to
use version
3.0.0. So let's go to docker compose.
And I'm going to type apache airflow colon
colon
3.0.0. Now for the ports, I'm going to
bind my local port 8000 with the servers
port 8080, which is what Airflow uses.
And for the environment variables, I'm
going to overwrite Airflow
Airflow
database SQL Alchemy con with the
address of our desired database. I
actually want to create a new database
just for Airflow so that it can store
its metadata over there. I also want to
create a new user just for airflow so
that it can have ownership over the new
database. So inside the Postgress folder
I'm going to create a new file called
airflow init.sql. And it looks like I
don't have permission to create that
file. So let's check out the permissions
of that folder. I'm going to do ls minus
l and copy the path and enter. And this
999 means the user inside the Postgress
container. And all of these dashes means
that I don't have any access at all. And
so let's add the group write permissions
to this directory. I'm going to do
pseudo changemod minus r g plus rw and
copy the path of this folder. Going to
type in my password. And now I want to
change the group of the directory to my
users group. So I'm going to do pseudo
change group minus r user and the
directories path. And then I'm going to
make sure the group members can read and
write. So pseudo change modus r 770 and
the directories path. So let me hit
retry. And now we were able to create
this file. So this file is going to
contain instructions for Postgress to go
ahead and create the user airflow with
password airflow and to create the
database airflow DB making the owner
airflow. Let's save that. Go back to
docker compose and that allows me to
type the address of this database which
is going to be postgrql plus the psycho
pg2 adapter colon slash the username and
password airflow airflow at the service
name which is db on port
5432 slash the name of the database that
we'd like to access. So airflow db. Now,
Airflow works off of Python files which
contain instructions on what it should
automate. These instructions are called
DAGs, and we'll be creating these files.
So, inside the project folder, I'm going
to create a new folder called Airflow.
And inside of that, create another
folder called DAGs. And this is where
we're going to save our DAGs. So, I'm
going to set up volumes because I want
the container to recognize this folder
and mount Airflow DAGs with the servers
opt airflow dags and I need a dash right
there. Now, I want to run the service
after Postgress starts. So, I'm going to
set depends on to DB. I also want
Airflow and Postgress to communicate
with each other internally. So what I'm
going to do is on the same hierarchy as
services I'm going to set networks and
I'm going to create a network called my
network and the driver is going to be
bridge. Now I can put airflow in this
network just like that and I can also
put postgress in this network as well.
Now let's save this. I'm going to clear
this terminal. Let's cd into the project
folder and let's run docker compose up.
And you can see we're now pulling the
airflow image. It says that the real
airflow does not exist. Let's quit this
process. And that's because I need the
Postgress container to recognize this
SQL file on my local drive. And so I
need to set up a new volume for Postgress
Postgress
airflowinit.sql and mount that with
docker entry point init db.d
airflowinit.sql. Let's save this and run
docker compose up again. and we're
getting the same error. So I'm going to
kill this process. And this is because
we're trying to initialize the database
with the SQL command, but the database
has already been initialized. So what
I'm going to do is check to see if any
containers are running and there are
none. Then I'm going to delete my
Postgress data folder which contains all
of the previous initialization. So I'm
going to do that with pseudo rm minus
rf.postgress data. Okay, we could see
it's been deleted. I'm going to do
docker compose down and then run docker
compose up. And now we created the roll
and the database and airflow's gone back
to us with a command error. And this is
because airflow needs a command to run.
So I'm going to set command greater than
bash minus c airflow db migrate and
airflow standalone. Let's save this.
Let's kill this process. Let's do docker
compose down and then docker compose up.
It says database migrating done. And if
we wait a few minutes then it's going to
start up airflow. So let's give it a
minute or two. Now we get that airflow
is running. If I search for password we
get airflow's credentials. So I'm going
to copy that and go to this link open.
And it says this site can't be reached.
Well that's because we have airflow
running on port 8000 locally. And so
let's go to localhost 8000 and that's
going to route us to port 8080 in the
server. So let's sign in with the
credentials given. And now we're inside
airflow. I want to check something else.
Let's open a new terminal so that we can
run postgress. I'm going to cd into the
project folder. And I'm going to do
docker compose execute db psql the user
database user and the database db. Let's
do back slash L. And we see the Airflow
database that we just created. So let's
change into the Airflow database with
back slash C airflow DB. And then let's
check out the schemas with back slashdn.
And we have a public schema. And let's
do back slashdt public.star. And we see
that airflow has saved 45 tables in this
database. Let's hit two to close that
window. And we've successfully set up
airflow. Let's go to the DAGs and we see
no DAGs found because we need to create
them. So let's go to our DAGs folder and
create a new Python file. Hey, it's me
from the future. At this step, you might
have trouble creating files in the DAG
folder. For example, if I do
orchestrator py and I get a permission
error. And so the first thing I'm going
to do is check for the current
permissions by doing an ls minus l and
copying the path of airflow. And then
I'm going to do pseudo change group
minus r dollar sign user and paste the
path to airflow. I'm going to type in my
password. Let's inspect the permissions
and see how it changes from root to
kelvin. Then I'm going to do pseudo
changemod minus r 770 and paste the path
and check the permissions. And now I
should be able to create the file and
create a new Python file called
orchestrator.py. Now the base structure
of a DAG is pretty straightforward. You
start with from airflow import DAG and
then you call the DAG constructor and
it's going to have DAG id and let's call
this weather API orchestrator. It also
has default args equal to default args
and it has a schedule. Now we set this
constructor equal to DAG and then we
call it down here with DAG. We have our
first task and our second task and so
on. So now where do we get the default
args? We create the dictionary up here.
We set default args equal to in curly
braces. We have things like description.
So let's say a DAG to orchestrate data.
And then we have the start date which is
essentially when we want this DAG to be
considered active. So let's say datetime
2025 430 and for that to work I need to
do from datetime import date time and
also in the default args we have catch
up which is essentially do we want to
back fill with previous runs and I'm
going to say false. Now, back to the DAG
constructor, we have this schedule where
we can set how often we want this to
run. And so, I'm going to do time delta
minutes equals 1. And for that to work,
I need to import time delta as well. And
now, how do we define our tasks? Well, I
do task one is equal to some kind of
action. And the thing that does this
action is the Python operator. Inside
the Python operator, we have a task ID
and let's call it example task. And then
we have a python callable which calls
the function that we want to run. So for
example, if I have a function called
example task and this is going to print
this is an example task. Then what I
want to call here is example task. Now
for the Python operator to work I need
to import from airflow operators python.
I'm going to import python operator.
Let's save this. Let's find our docker
terminal. kill this process. So my DAG
keeps disappearing on me. If I look at
the logs, it says killing the process
here for DAG file and then the process
exits. I wonder if it's because of this
line right here. Let me wrap this in a
function. Call it safe main callable and
indent this and then return the main
function and replace the Python callable
with safe main callable. Save that.
Let's kill this process and do docker
compose down. Let's do docker compose
down minus v. And then let's do docker
compose up. Let's refresh airflow and
let's pick up our new credentials and
sign in. And here's our dag again. Let's
click on that. I want to check our
latest run. Everything looks okay. Let's
go to the code. And this ran with our
new function. Let's go to airflow
refresh. And we see we have our tag
here. Let's click on this DAG. Let me
resize the panels. I'm going to unpause
this DAG and hit refresh. And our first
task ran successfully. So if I go to the
grid and I click on the latest run and
then go to the task ID, then I could see
the logs and I could see our print
statement. This is an example task. So
now we want to call the main function
from the insert records file. I want to
do something like from insert records
import main. But we have to think we're
technically in the container because in
the docker compose file we told airflow
to use the dags folder as if it lives in
the container. And so our DAG which
lives inside the container is unable to
access any of the files that exist
outside of the container. So in the
volumes I'm going to add API request
colon opt airflow API request. That way
the Airflow container can treat our API
request folder as if it exists inside
the container. And I'm going to allow
Airflow to have access to insert records
by appending the API request folder into
the system path. So I'm going to do cis
path append opt airflow API request. And
then I can delete the example task
function and I can replace example task
with the main function. Let's save this
and go back to airflow. I'm going to
click on DAGs. Now, I have an error
here. Let's check it out. It says CIS is
not defined. Did you forget to import
CIS? Yes, I did. So, in my orchestrator
file, I'm going to import CIS. Let's
save that. Go back to Airflow. Let's
refresh. We have another error. And this
one says no module named insert records.
Well, just a moment ago when we made
edits to our Docker Compose file, we
were supposed to stop the containers and
run them again. So I'm going to kill
this process and I'm going to do docker
compose down and then docker compose up.
Now let's give it a minute and airflow
is running now with new credentials. So
I'm going to copy this password, go back
to airflow, refresh and sign in. So it
looks like we're having some trouble
detecting our DAGs. I'm going to open a
new terminal cd into our project folder
and run docker execute minus it airflow
container bash. And now we're inside the
Airflow container. I'm going to do ls
opt airflow dags. And I get permission
denied. So I'm going to need to update
the permissions again. Let's open a new
terminal. I'm going to do ls minus l and
copy the path. And I'm going to do
pseudo change owner minus r dollar sign
user dollar sign user airflow dags.
Enter my password. And this is because
we need to cd into the project folder.
And let's try the change owner again.
Let's see the new permissions now. So
that hasn't changed. Now let's pseudo
change modus r 755 airflow dags and
check the permissions again. And now we
have execute permissions. Let's go back
to our docker terminal. Let's kill this
process. Let's do docker compose down
and then docker compose up. And let's
get our new credentials. Go back to
Airflow, sign in, and now we should see
our DAG. And now we don't have errors.
So let's go to the orchestrator and
let's take a look at the latest run. And
it's still example task. Let's see why.
That's because I didn't update my task
ID. So let me change this to ingest data
task. Save this. Let's go back to
Airflow. I'm going to refresh and I'm
going to trigger this tag. And now we
can see we have the ingest data task
here. Let's click on the latest run and
the task ID. And we actually see that
the database connection failed. It
couldn't connect to port 5000. That's
because if we go to docker compose, port
5000 is in our local machine, but within
docker, we want airflow to communicate
with the postgress container which lives
on port 5432. So let's go to insert
records and let's change the host to the
service name DB and let's change the
port to 5432. Let's save this. Go back
to airflow. Let's refresh. Let's trigger
this DAG. Now let's click on the latest
run and the task. And now we see that
the data was inserted successfully. Now
let's go to our Postgress terminal. I'm
going to run Postgress. And then I'm
going to do select star from
dev.ra data. And here I see new records.
And so we've successfully set up this
DAG for our API request. So I see new
records were inserted. Now I want to try
this with live data. And so up here you
notice that I brought in fetch data from
the API request and then scrolling down
I commented out mock fetch data and I
started calling the actual fetch data
from the API. So when you do that we can
go to orchestrator and we could set our
schedule to every 5 minutes and that's
because it seems that the weather is
updated every 5 minutes. So let's save
that and then once you make those
changes then you can go back to airflow
and let it continue to run on schedule.
Here we could see we collected a few
more data points and so I'm ready to
pause this tag. Hey, it's me from the
future. At this stage, I don't recommend
taking more than two data points because
there's going to be something we do
later on where we have to wipe our data.
Let's kill this process. Now, I want to
be able to transform the data for
analytics and reporting. For that, I'm
going to use a tool called DBT. In
Docker Compose, I'm going to search for
DBT. And let's take a look at the first
one. It seems like this image is not
compatible with Postgress. So let's
check out the next one by Fishtown
Analytics. And it looks like this one's
deprecated, but the newer images can be
found in these locations. So I'm going
to click on the link for dbt Postgress.
And here we can find the image and the
tag. So let's set up the service called
dbt. And the container name is going to
be dbt container. And the image is going
to be what we found on GitHub. And for
the volumes, I want Docker to create my
DBT folder and mount that to its user
app folder. And then for the working
directory, I'm going to use the user app
folder. And I want this service to
depend on Postgress running. And I want
to put this in my network. And for
command, let's start with init and call
our project my project. Let's save. And
then I'm going to do docker compose
down. And it says network is not
allowed. That's a typo. So I'm going to
make this plural. Let's try docker
compose down. And to set up dbt, I'm
going to do docker compose run dbt. This
will allow us to interact with its
terminal. And it's asking which database
would you like to use? I'm going to
select one for postgress. The host is
going to be DB. Port is 5432. The user
is DB user. The password is DB password.
The database name is DB. And the schema
is dev. And these are all the things
that we've determined in the Postgress
container here as well as in the insert
records file here. And for threads, I'm
going to do four. It says our profile
was written to this directory. And now
we have to run dbt debug. So I'm going
to replace this command with debug.
Let's docker compose down. And then
let's do docker compose run dbt. Now it
says dbt cannot find these two files.
The dbt project file lives inside the
dbt folder and inside my project. So
what I can do now is revise my volume so
that the my project folder is mounted
with the user app folder. Let's save
this. Let's do docker compose down and
then docker compose up. And now dbt was
able to find the dbt project file. Now
we want to locate the profiles.yaml
file. I'm going to try to overwrite the
environment variable called dbt profiles
directory. And I'm going to set this
equal to this parent directory here. Let
me save kill this process. Do docker
compose down and then docker compose up.
And that didn't work. So what I need to
do is I need to find the profiles yammo
file in the container and bring it out
to my local. So let's kill this process.
I'm going to do docker ps minus a to see
a list of all of my containers. And the
container that has my profile.yaml file
is the one with the dbt init command. So
that's this container. So I'm going to
do docker cp which is copy from this
container ID colon
slashroot dbt
/profile.yaml. I want to copy this into
my dbt folder. So I'm going to take the
path and paste it here and enter. I made
a typo. It's supposed to be root and
then dbt. So let's enter that. And
there's another typo with profile. It's
supposed to be profiles with an s and
let's enter. And then now I get
permission denied. So let's modify the
permissions here. I'm going to open a
new terminal and similar with what we
did to the Postgress folder. I'm going
to check the permissions of the dbt
folder with ls minus l and the path.
Then I'm going to do pseudo change group
minus r dollar sign user and paste that
path. I'm going to type in my password
and then I'm going to do pseudo change
modus r 770 and paste that path. Enter.
Let's go ahead and try to copy over the
profiles.yaml file. And now we see we've
successfully copied the file over. So
now I'm going to do docker compose down
and then docker compose up. And it still
can't be found. Well, that's because I
need to add to my volumes. So, I'm going
to kill this process and I'm going to
mount my DBT folder to the servers root
and dbt. Let's save that. Let's do
docker compose down and then docker
compose up again. And now it says all
checks passed. Now I'm going to kill
this process again. And the last thing
I'm going to do is I'm going to replace
the command with run because if we look
at the models folder, we have some
example models that we can build. And so
we'll be able to build those with the
run command. So let's do docker compose
down and then docker compose up. And now
you could see our models were built
successfully. I'm going to keep this
terminal running. And I'm going to open
a new terminal so that I can access
Postgress. And so I'm going to cd into
the project folder. And I'm going to do
docker compose execute db psql user db
user and database db. And if I do back
slashdt dev.star then you could see this
table which came from this model. Now I
want to start creating my models. So I'm
going to delete this table with drop
table dev do my first dbt model. And it
says I can't drop this because it
depends on my second dbt model. So at
the end of this line I'm going to say
cascade and enter. So if I do back slash
dv which is a view or like a different
type of table and then I do dev.star and
that confirms we've also deleted my
second dbt model which was a view. And
just to be sure you could do back
slashdt dev.star and we don't have any
more tables. Now I'm going to delete the
example folder and all the contents
inside. And then I'm going to go to dbt
project and delete example and its
configurations like that and save. Hey,
this is me from the future. All I did
was just ex out of all of my tabs so
that I could work off of a clean slate.
And if you want, you could do the same
and just keep going on. The first thing
I'm going to do is declare my sources.
So inside my models folder, I'm going to
create a new folder called sources. And
then inside that, I'm going to create a
file called sources. IML and I'm going
to do version two sources, the name of
the schema which is dev, the database
which is DB and the tables. So we have
one with the name raw weather data and
it has columns with the following names
ID, city, temperature, weather
descriptions, wind speed, time inserted
at, and UTC offset. Let's save this. Now
I want to create my staging layer. So,
inside the models folder, I'm going to
create a new folder called staging. It's
still under sources. So, I'm going to
drag this to models move. And inside the
staging folder, I'm going to create a
file called
staging.sql. And I'm going to start with
a config block with double curly braces
config parenthesis. I want this
materialized as a table, comma, and the
unique key is going to be ID. And I'm
going to do select star from double
curly braces source parenthesis dev
schema raw weather data table. Let me
save this. I'm going to open a terminal
and I want to run dbt so that I can
create this table. So I'm going to cd
into weather data project. I'm going to
refresh the docker group. I'm going to
docker compose up dbt. I'm getting this
message because my docker desktop closed
on me. So I'm going to open that up and
I'm going to make sure it's still
integrated with WSL settings resource
WSL integration and it's enabled. So
let's go back. I'm going to run docker
compose updt and it successfully created
the staging model. So I'm going to open
a new terminal cd into the weather data
project. Refresh the docker group and
let's open up postgress with docker
compose. execute db and the postgress
command psql user db user and the
database db let me do forward slashdt
dev.star we have our staging table and
let me do select star dev.staging
staging semicolon. I forgot the from
clause. Let's try that again. And we've
successfully created the staging table,
which means that we've correctly
declared our source. Let me actually
stage this. I'm going to say with source
as this query format that I want to
select ID, city, temperature, weather
descriptions, wind speed. I want to
rename time as weather time local. And
then I want inserted converted to local
time. So I'm going to do that by
inserted at plus UTC offset which is the
next column. Setting this as hours and
then colon interval as inserted at
local. And this is all from the source.
Save this. Let me go back to my dbt
terminal. I'm going to docker compose up
dbt. This ran successfully. I'm going to
go back to Postgress. Let me hit Q and
then let's do select star from staging.
And we could see that these changes have
been made. I'm going to create one more
folder in models. Let's call this mart
and create a new file called weather
report.sql. And this is the layer where
we create tables that we intend to turn
into reports. So I'm going to set up my
config block with the double curly
braces config parenthesis. I want this
materialized as a table. The unique key
is going to be ID. And I could do select
star from double curly braces. I'm going
to reference the staging model. Let's
save. Actually, let's go by naming
conventions. So, I'm going to turn
staging.SQL to STG weather data and then
change the staging reference to STG
weather data and save. I realize I have
some duplicate data in the first three
rows. So I want to go back to the
staging model and see if we can
dduplicate here. So I'm creating another
CTE. I'm going to put a comma here. I'm
going to call this ddup as select star.
I'm going to give a row number to each
time frames record over partition by
time which is what this column is
originally called and order by inserted
out which is what this column was
originally called. And I'm going to call
this as RN. And this is going to be from
source. Let me comment this out. And I'm
going to do select star from ddup. Let's
save. Let's go to the dbt terminal. I'm
going to do docker compose up dbt. The
models got created. So I'm going to go
to Postgress. Let's hit Q. I'm going to
select from staging. Actually, we did
change the table name, right? So I'm
actually just going to do select star
from stage weather data semicolon. Of
course, I need the schema dev. And now
we see that for each time frame right
here, we have row numbers 1, two, and
three. And I think it would make sense
to get the earliest record just for our
use case. And so I'm going to do where
Rn equals 1. I'm actually going to
uncomment this, change the source to
ddup, apply my filter here, remove these
lines, save, and then let's go back to
Postgress, hit Q. Let's take a look at
the tables with DT dev.star. star. I'm
going to drop table dev.staging
semicolon. And I'm also going to drop table
table
dev.stage weather data semicolon. And
let's go to dbt and docker compose up
again. It ran successfully. Now let's
select star from stage weather data. And
you see we got rid of the duplicate
records. Let's go to weather report.
Perhaps we don't need the record ID. So
I'm going to delete select star. And I'm
just going to do city, temperature,
weather description, wind speed, weather
time local. And maybe I don't need the
inserted at local as well. I'm going to
create one more table inside malt and
call this daily
average.SQL. Hey, it's me from the
future. Here I'm going to give a recap
because there was a portion that I did
not record. So I created this daily
average model. Notice it doesn't have a
unique ID. So I deleted that. and
sourcing from the stage weather data
model, I get the city and the weather
time local and I wrap that in date to
get the day of the record since I want
to get the daily average. Then I took
the average of the temperature and the
average of the wind speed grouping by
the city and the date and the average
gave me a bunch of trailing decimals and
so I wrapped that in round, 2, but came
across a data type issue. So I had to
convert double precision to numeric for
the round to work. And then when I get
more data points in the future, I can
order by the city and the date. And I
ran the DBT models. Everything ran
successfully and I went to Postgress
went into staging saw that I took care
of the duplicate records and then did
select star from the daily average and
got the result that I expected. So I
finished creating all of my models. Now
let's talk about creating a DAG for DBT.
I'm going to copy the orchestrator file
and call it dbt orchestrator. I can
delete these two lines and I'm going to
call this weather dbt orchestrator and
let's call this task 2. And what we want
to do is trigger the dbt container. So
instead of python operator, we're
dealing with a docker operator. So I
actually want from airflow.providers.doccker.operators.doccker
Docker import docker operator and I'm
going to set task 2 equal to docker
operator and its parameters are going to
be task ID the image the command the
working directory mount which are the
volumes the network mode the docker URL
and auto remove for task ID let's call
this transform data task the image we're
going to get from docker compose and I'm
going to copy this and paste it here the
command We're also going to get from
docker compose it's going to be run. So
paste it here. The working directory
also from docker compose. Paste that
there. The mounts are going to be our
volumes. So let me copy this and just
paste it down here for our reference.
And we're going to use one mount
constructor for each volume. So I call
mount which is going to take in source
target and type. I'm going to put a
comma here. And then I'm going to copy
this again for the other volume. For the
source, it only takes the absolute path.
So, let's look for dbt my project, which
is right here. I'm going to rightclick
and copy the path and paste it in
source. And the target I'm going to take
from the volume mounting right here and
paste that. And the type is going to be
bind. And then for the other mount, I'm
going to look for my DBT folder, which
is right here. I'm going to rightclick,
copy path, paste it as source. And then
the target I'm going to take from the
mounting and cut and paste. And then the
type is going to be bind. Now I could
remove my notes. And then for the
network mode I'm going to take that from
the docker compose. It's my network and
paste that here. And the docker URL is
Unix/var run and
docker.sock. And this is the path to the
docker socket. And auto remove is going
to be success. Now going back to the
mount. This only works if I import it from
from
docker.types import mount. Let's save
this and let's do docker compose up.
Let's take a look at the logs. It says
configuration paths exist in your dbt project.yml file which do not apply to
project.yml file which do not apply to any resources. Still references example.
any resources. Still references example. So let me go there. DBT project. I can
So let me go there. DBT project. I can scroll down and I could actually just
scroll down and I could actually just delete these two lines and save that. It
delete these two lines and save that. It looks like our DBT models ran
looks like our DBT models ran successfully and we got a new password
successfully and we got a new password for Airflow. So, I'm going to copy this
for Airflow. So, I'm going to copy this and let's go to this link open. It says
and let's go to this link open. It says site can't be reached because well, if
site can't be reached because well, if you look at the Docker compose, we have
you look at the Docker compose, we have to go to our local port 8000, which then
to go to our local port 8000, which then gets routed into Docker's port 8080. So,
gets routed into Docker's port 8080. So, let's change this to 8000. And I'm going
let's change this to 8000. And I'm going to input my credentials and sign in.
to input my credentials and sign in. Let's go to DAGs. Looks like we have an
Let's go to DAGs. Looks like we have an error. Let's check it out. says,
error. Let's check it out. says, "Perhaps you forgot a comma in line 38."
"Perhaps you forgot a comma in line 38." So, let's go to dbt orchestrator line
So, let's go to dbt orchestrator line 38. And I did forget a bunch of commas
38. And I did forget a bunch of commas right here and here. Let's save that and
right here and here. Let's save that and go back to Airflow. Let's refresh. I get
go back to Airflow. Let's refresh. I get another error. Let's check this one out.
another error. Let's check this one out. It says the name Python operator is not
It says the name Python operator is not defined. Let's go back to dbt
defined. Let's go back to dbt orchestrator. And that's because we're
orchestrator. And that's because we're not using this anymore. So, I can delete
not using this anymore. So, I can delete this block. Let's save. Go back to
this block. Let's save. Go back to airflow. Refresh. Let's check out this
airflow. Refresh. Let's check out this new error. It says invalid arguments
new error. It says invalid arguments were passed to docker operator this task
were passed to docker operator this task ID. So let's go to the dbt orchestrator
ID. So let's go to the dbt orchestrator file. Looks like I missed in the second
file. Looks like I missed in the second mount profiles.yaml and in the target as
mount profiles.yaml and in the target as well profiles.yaml and also mount should
well profiles.yaml and also mount should be lowercase m. Let's save this. Go back
be lowercase m. Let's save this. Go back to airflow. Refresh. And we have no
to airflow. Refresh. And we have no errors. Now let's go to weather dbt
errors. Now let's go to weather dbt orchestrator. Let's unpause the dag and
orchestrator. Let's unpause the dag and refresh. And looks like we have a failed
refresh. And looks like we have a failed run. So let's click on our latest and
run. So let's click on our latest and click on task ID. And it looks like
click on task ID. And it looks like airflow is unable to access docker. So
airflow is unable to access docker. So let's go to docker compose. And in the
let's go to docker compose. And in the airflow volume I need to mount my
airflow volume I need to mount my systems var run docker sock with
systems var run docker sock with airflow's var run docker sock. Let's
airflow's var run docker sock. Let's save this. Go back to airflow. Refresh.
save this. Go back to airflow. Refresh. And let's trigger a new run trigger.
And let's trigger a new run trigger. Let's check out the latest. Click on
Let's check out the latest. Click on task ID. I still get the same error. So
task ID. I still get the same error. So let's go to VSC. Let's open a new
let's go to VSC. Let's open a new terminal. Let me just check the socket
terminal. Let me just check the socket with ls minus lv varun docker.sock. So
with ls minus lv varun docker.sock. So that's working fine. Let's go back to
that's working fine. Let's go back to the docker terminal and let's quit this
the docker terminal and let's quit this process with control c and let's try
process with control c and let's try docker compose up minus minus build.
docker compose up minus minus build. Let's give it a few minutes and it gives
Let's give it a few minutes and it gives me new credentials. So let's go back to
me new credentials. So let's go back to airflow and refresh. I'm going to sign
airflow and refresh. I'm going to sign in again. Let's click on dags and go to
in again. Let's click on dags and go to the dbt orchestrator. Let's trigger this
the dbt orchestrator. Let's trigger this again. Let's click on the latest and the
again. Let's click on the latest and the task ID. We're actually getting
task ID. We're actually getting permission denied. Let's quit this
permission denied. Let's quit this process. I want to check the group ID of
process. I want to check the group ID of the Docker socket with a stat minus C
the Docker socket with a stat minus C single quote percentage G single quote
single quote percentage G single quote /var run docker.sock and it's in group
/var run docker.sock and it's in group 101. Then let me do group add 101. Save.
101. Then let me do group add 101. Save. Let me docker compose up. Let's give it
Let me docker compose up. Let's give it a few minutes. We got new credentials.
a few minutes. We got new credentials. Let's go back to Airflow, refresh, sign
Let's go back to Airflow, refresh, sign in again. Click on DAGs and DBT
in again. Click on DAGs and DBT orchestrator and refresh and let's
orchestrator and refresh and let's trigger a new run. Let's click on the
trigger a new run. Let's click on the latest and task ID and it says failed to
latest and task ID and it says failed to set up container networking. My network
set up container networking. My network is not found. I think what's happening
is not found. I think what's happening is our network called my network is what
is our network called my network is what allows DBT, airflow and Postgress to
allows DBT, airflow and Postgress to communicate with each other. But if we
communicate with each other. But if we look at dbt orchestrator, we have docker
look at dbt orchestrator, we have docker operator spin up a new dbt image which
operator spin up a new dbt image which is separate from the dbt image in our
is separate from the dbt image in our docker compose file. So we need to
docker compose file. So we need to somehow get the new containers to
somehow get the new containers to communicate using our network. Let me
communicate using our network. Let me quit this process. Let's check out the
quit this process. Let's check out the networks that we have. If I do docker
networks that we have. If I do docker networks ls, it should actually be
networks ls, it should actually be singular. We see that even though we
singular. We see that even though we named our networks my network, docker
named our networks my network, docker compose adds this prefix to the name. So
compose adds this prefix to the name. So every time docker operator spins up a
every time docker operator spins up a new container for dbt, it's trying to
new container for dbt, it's trying to look for my network, but that's not one
look for my network, but that's not one of the networks that are available. So I
of the networks that are available. So I actually have to replace my network with
actually have to replace my network with weather data project my network. Let's
weather data project my network. Let's save that. Let's go back to airflow
save that. Let's go back to airflow refresh. Let's do docker compose up. And
refresh. Let's do docker compose up. And we got new credentials. And it's not
we got new credentials. And it's not giving me new credentials. It's it says
giving me new credentials. It's it says it has been previously generated, but
it has been previously generated, but let me try going to airflow and refresh.
let me try going to airflow and refresh. And I'm still logged in. And it looks
And I'm still logged in. And it looks like it ran the job successfully. Let's
like it ran the job successfully. Let's click on the latest task, transform data
click on the latest task, transform data task, and we have logs that say our DBT
task, and we have logs that say our DBT models were created successfully. So now
models were created successfully. So now we have dbt running on auler. Now the
we have dbt running on auler. Now the last thing I'm going to do is I'm going
last thing I'm going to do is I'm going to set up my DAG so that dbt runs after
to set up my DAG so that dbt runs after the API data has been ingested
the API data has been ingested successfully. So in this segment, I'm
successfully. So in this segment, I'm going to move the contents from dbt
going to move the contents from dbt orchestrator into the orchestrator file.
orchestrator into the orchestrator file. So I need to bring the docker operator
So I need to bring the docker operator and the mount the default args and the
and the mount the default args and the DAG is the same. And I'm going to bring
DAG is the same. And I'm going to bring task number two and bring it under task
task number two and bring it under task number one. I can remove this comment.
number one. I can remove this comment. And at the end of my width block, I'm
And at the end of my width block, I'm just going to do task one. It's greater
just going to do task one. It's greater than greater than task two. And this
than greater than task two. And this sets the dependencies. And then I'm
sets the dependencies. And then I'm going to remove my DBT orchestrator
going to remove my DBT orchestrator file. Going to go into Airflow refresh.
file. Going to go into Airflow refresh. Let's click on this orchestrator. Let's
Let's click on this orchestrator. Let's click on the graph. And here we see our
click on the graph. And here we see our two tasks. Let me go back to
two tasks. Let me go back to orchestrator. I'm going to change the
orchestrator. I'm going to change the DAG name. API DBT Orchestrator. Let's
DAG name. API DBT Orchestrator. Let's have this run every 1 minute. Save.
have this run every 1 minute. Save. Let's go back to Airflow. Refresh. Click
Let's go back to Airflow. Refresh. Click on the new DAG. Unpause. Refresh. Let's
on the new DAG. Unpause. Refresh. Let's click on the latest run. And it
click on the latest run. And it successfully completed the two tasks.
successfully completed the two tasks. Let's refresh again. And it finished
Let's refresh again. And it finished running our second task. So with that,
running our second task. So with that, I'm going to pause this DAG. Now let's
I'm going to pause this DAG. Now let's bring in the tool that's going to help
bring in the tool that's going to help us visualize the data. This tool is
us visualize the data. This tool is called Super Set. And a lot of tutorials
called Super Set. And a lot of tutorials say to clone the repository, but this is
say to clone the repository, but this is a massive repository. We just need a few
a massive repository. We just need a few files. So, first let me go to my
files. So, first let me go to my terminal and do docker compose down. And
terminal and do docker compose down. And let's also docker compose down minus v.
let's also docker compose down minus v. And let me just put these windows side
And let me just put these windows side to side. So, the first place I'm going
to side. So, the first place I'm going to go to is if we scroll down, we see
to go to is if we scroll down, we see docker compose. So, let me click on
docker compose. So, let me click on that. And here we see a bunch of
that. And here we see a bunch of services. We don't need all of them. I'm
services. We don't need all of them. I'm going to copy over superset. I'm also
going to copy over superset. I'm also going to copy over superset in it and
going to copy over superset in it and I'm going to copy over reddus. And then
I'm going to copy over reddus. And then let's go back. I'm going to go into the
let's go back. I'm going to go into the docker folder and in my own project I'm
docker folder and in my own project I'm going to create a new folder called
going to create a new folder called docker. And I want to bring in the
docker. And I want to bring in the content of some of this file over. So
content of some of this file over. So we're going to start with the docker
we're going to start with the docker init. And in my docker folder I'm going
init. And in my docker folder I'm going to create a new file called
to create a new file called dockerinit.sh. Make sure the name's
dockerinit.sh. Make sure the name's exactly the same. And I'm going to copy
exactly the same. And I'm going to copy the contents and paste it into my file
the contents and paste it into my file and save. Let's go back. The next thing
and save. Let's go back. The next thing I want is Docker Bootstrap. So I'm going
I want is Docker Bootstrap. So I'm going to create a new file called Docker
to create a new file called Docker Bootstrap. And let's open this. Copy the
Bootstrap. And let's open this. Copy the contents and paste. Let's go back. Next,
contents and paste. Let's go back. Next, I want the N file. Sov. Let's open this
I want the N file. Sov. Let's open this up. Copy the contents and paste. And
up. Copy the contents and paste. And then let's go back. And I want to go
then let's go back. And I want to go into Python
into Python path_dev. And in my Docker folder, I'm
path_dev. And in my Docker folder, I'm going to create a new file called
superset_config.py and bring in the contents of the corresponding file. Copy
contents of the corresponding file. Copy and
and paste. Now, let's go to our docker
paste. Now, let's go to our docker compose. And if we look at superset
compose. And if we look at superset init, then it's going to have us build a
init, then it's going to have us build a custom docker image, which we're not
custom docker image, which we're not going to do. So, I'm going to delete
going to do. So, I'm going to delete that and replace that with image. And
that and replace that with image. And the image I'm going to use is
the image I'm going to use is apache/superset colon
apache/superset colon 3.0.0-EN py310 because this version will
3.0.0-EN py310 because this version will contain some of the Python packages that
contain some of the Python packages that we'll need. Now looking at the end file,
we'll need. Now looking at the end file, I just need the top one. So I'm going to
I just need the top one. So I'm going to delete these two lines. And looking at
delete these two lines. And looking at the user and the volumes, we have some
the user and the volumes, we have some variables. So if we go back to
variables. So if we go back to Superset's Docker Compose file, we see
Superset's Docker Compose file, we see that they've defined their variables up
that they've defined their variables up here. So I'm going to copy over both and
here. So I'm going to copy over both and paste them on top of services. Now
paste them on top of services. Now looking at the volumes, we don't need
looking at the volumes, we don't need all of these. We don't need superset
all of these. We don't need superset front end. We don't need superset home
front end. We don't need superset home and we don't need tests. But what we do
and we don't need tests. But what we do need is to mount the superset config
need is to mount the superset config file so that superset can recognize it.
file so that superset can recognize it. So I'm going to mount docker superset
So I'm going to mount docker superset config.py and mount that to app python
config.py and mount that to app python path superset
path superset config.py. Let's scroll down for the
config.py. Let's scroll down for the environment variables. I don't need
environment variables. I don't need Cypress config. And I see that the
Cypress config. And I see that the superset load examples comes from the M
superset load examples comes from the M file. So let's go to M. Let me open this
file. So let's go to M. Let me open this in a split window. And I'm going to
in a split window. And I'm going to bring over superset config. So
bring over superset config. So ultimately we're trying to create this
ultimately we're trying to create this database URI. And it has the database
database URI. And it has the database dialect which uses this. And here I want
dialect which uses this. And here I want to add the psycho pg2 connector. The
to add the psycho pg2 connector. The database user is going to be superset.
database user is going to be superset. The database password is also going to
The database password is also going to be superset. So this tells me that in
be superset. So this tells me that in Postgress I have to create another user.
Postgress I have to create another user. The database host is going to be DB
The database host is going to be DB which matches our Postgress service. The
which matches our Postgress service. The database port is 5432 and the database
database port is 5432 and the database DB is going to be superset. Now for the
DB is going to be superset. Now for the examples URI, our database dialect is
examples URI, our database dialect is going to be the same. The examples user
going to be the same. The examples user is going to be examples. The examples
is going to be examples. The examples password is going to be examples. The
password is going to be examples. The host and the port are the same. and
host and the port are the same. and examples DB is going to use the database
examples DB is going to use the database examples. So I'm going to go into my
examples. So I'm going to go into my Postgress folder and I'm going to create
Postgress folder and I'm going to create a file called
a file called supersetinit.sql and I'm going to copy
supersetinit.sql and I'm going to copy over the code from airflow init into
over the code from airflow init into this file and I'm going to create the
this file and I'm going to create the user superset with the password
user superset with the password superset. I actually want my database to
superset. I actually want my database to be called superset db and the owner as
be called superset db and the owner as superset. Let's go back to env and I
superset. Let's go back to env and I actually want to change the database DB
actually want to change the database DB name but it says do not modify. So I'm
name but it says do not modify. So I'm going to try to work around this and
going to try to work around this and call the Postgress database as superset
call the Postgress database as superset DB. Then I'm going to go back to
DB. Then I'm going to go back to superset init and I'm going to duplicate
superset init and I'm going to duplicate this code and create user example with a
this code and create user example with a password example and create the database
password example and create the database example DB with the owner as example.
example DB with the owner as example. Let's go back to docker compose and back
Let's go back to docker compose and back to superset in it. I'm going to
to superset in it. I'm going to overwrite the environment variable
overwrite the environment variable database DB which is this value here as
database DB which is this value here as the value that comes from Postgress DB.
the value that comes from Postgress DB. And that way instead of superset it's
And that way instead of superset it's going to be superset db. And the last
going to be superset db. And the last thing for superset in it is that I'm
thing for superset in it is that I'm going to include it into my network my
going to include it into my network my network. Now let's look at superset. For
network. Now let's look at superset. For the environment file it's just going to
the environment file it's just going to need the top one. And for the build
need the top one. And for the build we're just going to replace this with
we're just going to replace this with the image that we used above for
the image that we used above for superset in it. The port is going to be
superset in it. The port is going to be 8080. So I don't need the bottom one. I
8080. So I don't need the bottom one. I don't need the extra host. For the
don't need the extra host. For the environment variables I don't need
environment variables I don't need cypress config. And I'm also going to
cypress config. And I'm also going to bring in the database DB. But I realize
bring in the database DB. But I realize that this should be an underscore. So
that this should be an underscore. So I'm going to fix that. And then let's
I'm going to fix that. And then let's copy that over. And lastly, let's copy
copy that over. And lastly, let's copy over this network so that we include
over this network so that we include superset into our network. Now for
superset into our network. Now for Reddius, everything looks good. I see
Reddius, everything looks good. I see that it creates a docker volume which is
that it creates a docker volume which is going to be stored in the docker server
going to be stored in the docker server as opposed to local. And so I'm going to
as opposed to local. And so I'm going to create the volumes reddus and external
create the volumes reddus and external false. Let's save that. And another
false. Let's save that. And another thing we have to do is we have to go to
thing we have to do is we have to go to the Postgress service and we have to
the Postgress service and we have to mount this new file that we created. So
mount this new file that we created. So I'm going to mount Postgress
I'm going to mount Postgress superersetinit.sql with the docker entry
superersetinit.sql with the docker entry point in it
point in it db.d/s
db.d/s superersetinit.sql. Now before I run
superersetinit.sql. Now before I run docker compose I have to remind myself
docker compose I have to remind myself that these SQL commands will only be
that these SQL commands will only be executed at the initialization of
executed at the initialization of Postgress. We already initialized
Postgress. We already initialized Postgress and all of its metadata is
Postgress and all of its metadata is stored in data. But also knowing that we
stored in data. But also knowing that we have some API data stored inside this
have some API data stored inside this folder. We're going to make the
folder. We're going to make the executive decision to remove that data
executive decision to remove that data for the purpose of integrating superset.
for the purpose of integrating superset. So I'm going to do docker compose down.
So I'm going to do docker compose down. I'm going to do docker compose down
I'm going to do docker compose down minus v. That's a typo minus v. Let me
minus v. That's a typo minus v. Let me do pseudo rm minus
do pseudo rm minus rfostgress data. I'm going to type in my
rfostgress data. I'm going to type in my password. And I've deleted the data
password. And I've deleted the data folder. And then I'm going to do docker
folder. And then I'm going to do docker compose up. It says we got permission
compose up. It says we got permission denied trying to access the docker init
denied trying to access the docker init file. So let me check the permissions of
file. So let me check the permissions of docker. I'm going to copy this path. And
docker. I'm going to copy this path. And I'm going to do ls minus l and the
I'm going to do ls minus l and the docker path. And what I want to do is
docker path. And what I want to do is pseudo change group minus r dollar sign
pseudo change group minus r dollar sign user and paste this path. Let's check
user and paste this path. Let's check out the permissions. Nothing's changed.
out the permissions. Nothing's changed. And I'm going to do pseudo change mod
And I'm going to do pseudo change mod minus
minus r770 and paste this path. Now let's
r770 and paste this path. Now let's check the permissions. And here we have
check the permissions. And here we have read write and execute. So let me delete
read write and execute. So let me delete the data folder again. Let me docker
the data folder again. Let me docker compose down minus v. And now let me
compose down minus v. And now let me docker compose up. It looks like we got
docker compose up. It looks like we got an error at superset in it. Let me
an error at superset in it. Let me remove the superset volume. And I'm
remove the superset volume. And I'm going to do docker compose down minus v.
going to do docker compose down minus v. And then I'm going to remove the
And then I'm going to remove the superset folder with pseudo rm minus
superset folder with pseudo rm minus rfs superset. I'm going to remove the
rfs superset. I'm going to remove the data folder again. pseudo rm minus
data folder again. pseudo rm minus rf/postgress/data. And let's try docker
rf/postgress/data. And let's try docker compose up. And it looks like we got
compose up. And it looks like we got another error with superset init. It
another error with superset init. It says that the database superset does not
says that the database superset does not exist. And let's see where this comes
exist. And let's see where this comes from. Well, in the superset init file,
from. Well, in the superset init file, we want to create the database superset
we want to create the database superset db. In the docker compose, we wanted our
db. In the docker compose, we wanted our database DB to be the value of Postgress
database DB to be the value of Postgress DB. Postgress DB is superset DB. And it
DB. Postgress DB is superset DB. And it seems like superset is not recognizing
seems like superset is not recognizing that. Let's go to superset config. And
that. Let's go to superset config. And here I want to debug the environment
here I want to debug the environment variables. So I'm going to say superset
variables. So I'm going to say superset environment debug. I'm going to print
environment debug. I'm going to print the database dialect. I'm going to print
the database dialect. I'm going to print the database user and the database host
the database user and the database host and the database DB. I've added this
and the database DB. I've added this code block here so that we can print
code block here so that we can print some of these values. Let's save. Let's
some of these values. Let's save. Let's do docker compose down minus v. Let's
do docker compose down minus v. Let's remove the postgress data folder. And
remove the postgress data folder. And let's do docker compose up. So here
let's do docker compose up. So here everything else prints except database
everything else prints except database db which is just a blank string. So that
db which is just a blank string. So that means something's wrong with this
means something's wrong with this environmental variable. I'm going to go
environmental variable. I'm going to go to docker compose and simply overwrite
to docker compose and simply overwrite the database db variable as superset db
the database db variable as superset db for both the superset init and superset
for both the superset init and superset services. Let's do docker compose up.
services. Let's do docker compose up. And now it looks like we get a different
And now it looks like we get a different kind of error. It started loading the
kind of error. It started loading the examples, but it says password
examples, but it says password authentication failed for user examples.
authentication failed for user examples. But Postgress says the role examples
But Postgress says the role examples does not exist. Let's first check the
does not exist. Let's first check the superset in it SQL file. And here we
superset in it SQL file. And here we have it as singular. So I want to put an
have it as singular. So I want to put an s at the end to the user password, the
s at the end to the user password, the database, and the owner. Let's go to the
database, and the owner. Let's go to the end file and make sure that examples is
end file and make sure that examples is plural. Now let's try this again with
plural. Now let's try this again with docker compose down minus v and remove
docker compose down minus v and remove the postgress data folder and then do
the postgress data folder and then do docker compose up. Looks like we're
docker compose up. Looks like we're starting to load the example data. So
starting to load the example data. So let's give this a few minutes. Now it
let's give this a few minutes. Now it looks like superset in it loaded some
looks like superset in it loaded some example tables and then ran into an
example tables and then ran into an error for too many requests. And that's
error for too many requests. And that's okay for now because we were able to get
okay for now because we were able to get a few examples. And let's go back to
a few examples. And let's go back to docker compose. And for superset load
docker compose. And for superset load examples let's replace this with no. I'm
examples let's replace this with no. I'm going to do docker compose down to stop
going to do docker compose down to stop the containers. And this time we don't
the containers. And this time we don't need to remove the Postgress data folder
need to remove the Postgress data folder because we saw that superset was able to
because we saw that superset was able to establish a connection with Postgress
establish a connection with Postgress and save some data into it. So I'm just
and save some data into it. So I'm just going to skip right to docker compose up
going to skip right to docker compose up and now we're running the superset app.
and now we're running the superset app. Now it says that superset is running on
Now it says that superset is running on 8088. So I'm going to click on this link
8088. So I'm going to click on this link and we get an error connecting to
and we get an error connecting to Reddus. So let's go back to docker
Reddus. So let's go back to docker compose and I see that retos is missing
compose and I see that retos is missing from my network. So I'm going to add it
from my network. So I'm going to add it to my network. Let me kill this process
to my network. Let me kill this process and I'm going to do docker compose up
and I'm going to do docker compose up and let's return to this link and we're
and let's return to this link and we're now in superset on the dashboard. You
now in superset on the dashboard. You could see some of the examples. Let's
could see some of the examples. Let's click on birth names and we see it looks
click on birth names and we see it looks like that. And now I want to see my
like that. And now I want to see my data. So in settings let's go to
data. So in settings let's go to database connections. And here we're
database connections. And here we're connected to examples but I want to
connected to examples but I want to bring in the DB database. So let's click
bring in the DB database. So let's click on add database Postgress. The host is
on add database Postgress. The host is DB. Port is 5432. The database name is
DB. Port is 5432. The database name is DB. The username is DB user. The
DB. The username is DB user. The password is DB password. The display
password is DB password. The display name I'm going to call it DB and
name I'm going to call it DB and connect. And it says database connected.
connect. And it says database connected. So let's finish. Now let's click on data
So let's finish. Now let's click on data sets and add data set. And let's go to
sets and add data set. And let's go to database. Click on DB. The schema is
database. Click on DB. The schema is dev. Let's check out the dev schema. And
dev. Let's check out the dev schema. And currently we have no tables because we
currently we have no tables because we deleted the Postgress data folder as we
deleted the Postgress data folder as we were trying to connect to superset.
were trying to connect to superset. Let's go back to the terminal. And here
Let's go back to the terminal. And here we have credentials for airflow. And I
we have credentials for airflow. And I can't find my new airflow credentials in
can't find my new airflow credentials in the terminal. It says that the password
the terminal. It says that the password has been previously generated. So maybe
has been previously generated. So maybe I just have to go to localhost 8000.
I just have to go to localhost 8000. Going to localhost 8000 still asks me to
Going to localhost 8000 still asks me to sign in. So, I'm going to open a new
sign in. So, I'm going to open a new terminal, cd into my weather data
terminal, cd into my weather data project, and let me do docker ps to see
project, and let me do docker ps to see what's running. And let me docker
what's running. And let me docker compose down the service name for
compose down the service name for airflow. And I'm going to docker compose
airflow. And I'm going to docker compose up just airflow. Let's give this a
up just airflow. Let's give this a minute. And here we can get our new
minute. And here we can get our new credentials. So, I'm going to sign into
credentials. So, I'm going to sign into airflow. Let's click on dags. And let's
airflow. Let's click on dags. And let's go to our orchestrator. I'm going to
go to our orchestrator. I'm going to unpause this DAG and refresh. And we see
unpause this DAG and refresh. And we see that it completed its first run. and the
that it completed its first run. and the next one is in five minutes. Now I can
next one is in five minutes. Now I can go back to superset and refresh the
go back to superset and refresh the table and we see that our tables were
table and we see that our tables were created. Now for this version of the
created. Now for this version of the recording I don't know how my weather
recording I don't know how my weather report model went missing. So as a proxy
report model went missing. So as a proxy let's just use stage weather data for
let's just use stage weather data for now and using this table I can create a
now and using this table I can create a data set and create a chart. The chart I
data set and create a chart. The chart I want to use is scatter plot and create
want to use is scatter plot and create chart. Now I can drag my columns like
chart. Now I can drag my columns like weather, time, local to the x-axis and I
weather, time, local to the x-axis and I can drag temperature to metrics and my
can drag temperature to metrics and my aggregation is going to be the sum which
aggregation is going to be the sum which is within the time grain. So let's save
is within the time grain. So let's save this. Let's adjust my time grain to
this. Let's adjust my time grain to minute. Then I can also drag my wind
minute. Then I can also drag my wind speed into metrics and maybe I want to
speed into metrics and maybe I want to take the average of this and save. Now
take the average of this and save. Now let's create chart and then I can go
let's create chart and then I can go ahead and save this and call the chart
ahead and save this and call the chart name weather report and save. And then I
name weather report and save. And then I can go into dashboards and create a new
can go into dashboards and create a new dashboard and either create a new chart
dashboard and either create a new chart or select one of the charts that I
or select one of the charts that I already have. So let's drag this here. I
already have. So let's drag this here. I can name my dashboard as something like
can name my dashboard as something like weather dashboard. And let's click on
weather dashboard. And let's click on more options and edit property and set
more options and edit property and set auto refresh interval. And I could set
auto refresh interval. And I could set my refresh frequency to as often as
my refresh frequency to as often as every 10 seconds to get essentially
every 10 seconds to get essentially realtime data. But since our Airflow DAG
realtime data. But since our Airflow DAG is running every 5 minutes, I just need
is running every 5 minutes, I just need to refresh this every five minutes. So
to refresh this every five minutes. So let's save and then save this dashboard.
let's save and then save this dashboard. And we've created our chart. And this is
And we've created our chart. And this is how we can visualize our data. Let's go
how we can visualize our data. Let's go back to the terminal and kill this
back to the terminal and kill this process and clear the terminal. And so
process and clear the terminal. And so we have everything set up, which means
we have everything set up, which means all I need to do to run all of these
all I need to do to run all of these tools is just simply docker compose up
tools is just simply docker compose up and sign in to the respective
and sign in to the respective applications.
Click on any text or timestamp to jump to that moment in the video
Share:
Most transcripts ready in under 5 seconds
One-Click Copy125+ LanguagesSearch ContentJump to Timestamps
Paste YouTube URL
Enter any YouTube video link to get the full transcript
Transcript Extraction Form
Most transcripts ready in under 5 seconds
Get Our Chrome Extension
Get transcripts instantly without leaving YouTube. Install our Chrome extension for one-click access to any video's transcript directly on the watch page.