Data loader:

import io
import pandas as pd
import requests
if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

@data_loader
def load_data_from_api(*args, **kwargs):
    """
    Template for loading data from API
    """
    url_10 = '<https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2020-10.csv.gz>'
    url_11 = '<https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2020-11.csv.gz>'
    url_12 = '<https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2020-12.csv.gz>'
    
    
    # response = requests.get(url)
    taxi_dtypes = {
        'VendorID': pd.Int64Dtype(),
        'passenger_count': pd.Int64Dtype(),
        'trip_distance': float,
        'RatecodeID': pd.Int64Dtype(),
        'store_and_fwd_flag': str,
        'PULocationID': pd.Int64Dtype(),
        'DOLocationID': pd.Int64Dtype(),
        'payment_type': pd.Int64Dtype(),
        'fare_amount': float,
        'extra': float,
        'mta_tax': float,
        'tip_amount': float,
        'tolls_amount': float,
        'improvement_surcharge': float,
        'total_amount': float,
        'congestion_surcharge': float 
    }

    parse_dates = ['lpep_pickup_datetime', 'lpep_dropoff_datetime']

    df_10 = pd.read_csv(url_10, sep=',', 
            dtype=taxi_dtypes, parse_dates=parse_dates)
    df_11 = pd.read_csv(url_11, sep=',', 
        dtype=taxi_dtypes, parse_dates=parse_dates)
    df_12 = pd.read_csv(url_12, sep=',', 
        dtype=taxi_dtypes, parse_dates=parse_dates)
    
    df = pd.concat([df_10, df_11, df_12])

    return df

@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'

Transformer:

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

@transformer
def transform(data, *args, **kwargs):
    print(data.columns)
    old_cols = data.columns
    print(f'What are the existing values of VendorID in the dataset? Ans: {data.VendorID.unique()}')
    data = data[data.passenger_count > 0]
    data = data[data.trip_distance > 0]
    data["lpep_pickup_date"] = data.lpep_pickup_datetime.dt.date
    print(data.columns)
    print(f'Once exported, how many partitions (folders) are present in Google Cloud? Ans: {len(data.lpep_pickup_date.unique())}')
    data.columns = (data.columns
                .str.replace('(?<=[a-z])(?=[A-Z])', '_', regex=True)
                .str.lower()
             )
    new_cols = data.columns
    num_rename_cols = 0
    for i in old_cols:
        if i not in new_cols:
            num_rename_cols += 1
    print(f'How many columns need to be renamed to snake case? Ans: {num_rename_cols}')

    return data

@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert "vendor_id" in list(output.columns), 'vendor_id is not one of the existing values in the column'
    assert (output.passenger_count <= 0).sum() == 0, "passenger_count is 0"
    assert (output.trip_distance <= 0).sum() == 0, "trip_type is 0"

Export to Postgres

from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from pandas import DataFrame
from os import path

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter

@data_exporter
def export_data_to_postgres(df: DataFrame, **kwargs) -> None:

    schema_name = 'mage'  # Specify the name of the schema to export data to
    table_name = 'green_taxi'  # Specify the name of the table to export data to
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'dev'

    with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
        loader.export(
            df,
            schema_name,
            table_name,
            index=False,  # Specifies whether to include index in exported table
            if_exists='replace',  # Specify resolution policy if table name already exists
        )

Write data as Parquet files to a bucket in GCP

import pyarrow as pa
import pyarrow.parquet as pq
import os

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/home/src/terraform-test-412018-ac067f9fe98e.json'

bucket_name = 'mage0928'
object_key = 'nyc_taxi_data.parquet'
project_id = 'terraform-test-412018'
table_name = 'nyc_green_taxi_data'
root_path = f'{bucket_name}/{table_name}'

@data_exporter
def export_data(data, *args, **kwargs):
    table = pa.Table.from_pandas(data)
    gcs = pa.fs.GcsFileSystem()
    pq.write_to_dataset(
        table,
        root_path=root_path,
        partition_cols=['lpep_pickup_date'],
        filesystem=gcs
    )

Schedule pipeline to run daily at 5AM UTC

Untitled