Data loader:
import io
import pandas as pd
import requests
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_api(*args, **kwargs):
"""
Template for loading data from API
"""
url_10 = '<https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2020-10.csv.gz>'
url_11 = '<https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2020-11.csv.gz>'
url_12 = '<https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2020-12.csv.gz>'
# response = requests.get(url)
taxi_dtypes = {
'VendorID': pd.Int64Dtype(),
'passenger_count': pd.Int64Dtype(),
'trip_distance': float,
'RatecodeID': pd.Int64Dtype(),
'store_and_fwd_flag': str,
'PULocationID': pd.Int64Dtype(),
'DOLocationID': pd.Int64Dtype(),
'payment_type': pd.Int64Dtype(),
'fare_amount': float,
'extra': float,
'mta_tax': float,
'tip_amount': float,
'tolls_amount': float,
'improvement_surcharge': float,
'total_amount': float,
'congestion_surcharge': float
}
parse_dates = ['lpep_pickup_datetime', 'lpep_dropoff_datetime']
df_10 = pd.read_csv(url_10, sep=',',
dtype=taxi_dtypes, parse_dates=parse_dates)
df_11 = pd.read_csv(url_11, sep=',',
dtype=taxi_dtypes, parse_dates=parse_dates)
df_12 = pd.read_csv(url_12, sep=',',
dtype=taxi_dtypes, parse_dates=parse_dates)
df = pd.concat([df_10, df_11, df_12])
return df
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Transformer:
if 'transformer' not in globals():
from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@transformer
def transform(data, *args, **kwargs):
print(data.columns)
old_cols = data.columns
print(f'What are the existing values of VendorID in the dataset? Ans: {data.VendorID.unique()}')
data = data[data.passenger_count > 0]
data = data[data.trip_distance > 0]
data["lpep_pickup_date"] = data.lpep_pickup_datetime.dt.date
print(data.columns)
print(f'Once exported, how many partitions (folders) are present in Google Cloud? Ans: {len(data.lpep_pickup_date.unique())}')
data.columns = (data.columns
.str.replace('(?<=[a-z])(?=[A-Z])', '_', regex=True)
.str.lower()
)
new_cols = data.columns
num_rename_cols = 0
for i in old_cols:
if i not in new_cols:
num_rename_cols += 1
print(f'How many columns need to be renamed to snake case? Ans: {num_rename_cols}')
return data
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert "vendor_id" in list(output.columns), 'vendor_id is not one of the existing values in the column'
assert (output.passenger_count <= 0).sum() == 0, "passenger_count is 0"
assert (output.trip_distance <= 0).sum() == 0, "trip_type is 0"
Export to Postgres
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from pandas import DataFrame
from os import path
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data_to_postgres(df: DataFrame, **kwargs) -> None:
schema_name = 'mage' # Specify the name of the schema to export data to
table_name = 'green_taxi' # Specify the name of the table to export data to
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'dev'
with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
loader.export(
df,
schema_name,
table_name,
index=False, # Specifies whether to include index in exported table
if_exists='replace', # Specify resolution policy if table name already exists
)
Write data as Parquet files to a bucket in GCP
import pyarrow as pa
import pyarrow.parquet as pq
import os
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/home/src/terraform-test-412018-ac067f9fe98e.json'
bucket_name = 'mage0928'
object_key = 'nyc_taxi_data.parquet'
project_id = 'terraform-test-412018'
table_name = 'nyc_green_taxi_data'
root_path = f'{bucket_name}/{table_name}'
@data_exporter
def export_data(data, *args, **kwargs):
table = pa.Table.from_pandas(data)
gcs = pa.fs.GcsFileSystem()
pq.write_to_dataset(
table,
root_path=root_path,
partition_cols=['lpep_pickup_date'],
filesystem=gcs
)
Schedule pipeline to run daily at 5AM UTC