Skip to content

Commit d30e7c8

Browse files
authored
[FSTORE-1141] Mage-AI Tutorial (#226)
* Mage Tutorial for fraud use case
1 parent a386b1a commit d30e7c8

21 files changed

+1153
-0
lines changed

integrations/mage_ai/functions.py

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import numpy as np
2+
3+
# Define a function to compute Haversine distance between consecutive coordinates
4+
def haversine(long, lat):
5+
"""Compute Haversine distance between each consecutive coordinate in (long, lat)."""
6+
7+
# Shift the longitude and latitude columns to get consecutive values
8+
long_shifted = long.shift()
9+
lat_shifted = lat.shift()
10+
11+
# Calculate the differences in longitude and latitude
12+
long_diff = long_shifted - long
13+
lat_diff = lat_shifted - lat
14+
15+
# Haversine formula to compute distance
16+
a = np.sin(lat_diff/2.0)**2
17+
b = np.cos(lat) * np.cos(lat_shifted) * np.sin(long_diff/2.0)**2
18+
c = 2*np.arcsin(np.sqrt(a + b))
19+
20+
return c
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
.DS_Store
2+
.file_versions
3+
.gitkeep
4+
.log
5+
.logs/
6+
.mage_temp_profiles
7+
.preferences.yaml
8+
.variables/
9+
__pycache__/
10+
docker-compose.override.yml
11+
logs/
12+
mage-ai.db
13+
mage_data/
14+
secrets/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import hopsworks
2+
from mage_ai.data_preparation.shared.secrets import get_secret_value
3+
if 'custom' not in globals():
4+
from mage_ai.data_preparation.decorators import custom
5+
if 'test' not in globals():
6+
from mage_ai.data_preparation.decorators import test
7+
8+
9+
@custom
10+
def transform_custom(*args, **kwargs):
11+
"""
12+
args: The output from any upstream parent blocks (if applicable)
13+
14+
Returns:
15+
Anything (e.g. data frame, dictionary, array, int, str, etc.)
16+
"""
17+
TEST_SIZE = 0.2
18+
# Specify the window length as "4h"
19+
window_len = "4h"
20+
21+
# Specify your data exporting logic here
22+
project = hopsworks.login(
23+
api_key_value=get_secret_value('HOPSWORKS_API_KEY'),
24+
)
25+
26+
fs = project.get_feature_store()
27+
28+
trans_fg = fs.get_feature_group(
29+
name="transactions",
30+
version=1,
31+
)
32+
33+
window_aggs_fg = fs.get_feature_group(
34+
name=f"transactions_{window_len}_aggs",
35+
version=1,
36+
)
37+
38+
# Select features for training data.
39+
query = trans_fg.select(["fraud_label", "category", "amount", "age_at_transaction", "days_until_card_expires", "loc_delta"])\
40+
.join(window_aggs_fg.select_except(["cc_num"]))
41+
42+
# Load transformation functions.
43+
label_encoder = fs.get_transformation_function(name="label_encoder")
44+
45+
# Map features to transformations.
46+
transformation_functions = {
47+
"category": label_encoder,
48+
}
49+
50+
# Get or create the 'transactions_view' feature view
51+
feature_view = fs.get_or_create_feature_view(
52+
name='transactions_view',
53+
version=1,
54+
query=query,
55+
labels=["fraud_label"],
56+
transformation_functions=transformation_functions,
57+
)
58+
59+
return print('✅ Done')
60+
61+
@test
62+
def test_output(output, *args) -> None:
63+
"""
64+
Template code for testing the output of the block.
65+
"""
66+
assert output is not None, 'The output is undefined'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import hopsworks
2+
from mage_ai.data_preparation.shared.secrets import get_secret_value
3+
if 'data_exporter' not in globals():
4+
from mage_ai.data_preparation.decorators import data_exporter
5+
6+
7+
@data_exporter
8+
def inference(data, *args, **kwargs):
9+
"""
10+
Deployment inference.
11+
12+
Args:
13+
data: The output from the upstream parent block
14+
args: The output from any additional upstream blocks (if applicable)
15+
"""
16+
# Specify your data exporting logic here
17+
project = hopsworks.login(
18+
api_key_value=get_secret_value('HOPSWORKS_API_KEY'),
19+
)
20+
21+
# get Hopsworks Model Serving
22+
ms = project.get_model_serving()
23+
24+
# get deployment object
25+
deployment = ms.get_deployment("fraud")
26+
27+
# Start the deployment and wait for it to be running, with a maximum waiting time of 480 seconds
28+
deployment.start(await_running=480)
29+
30+
# Make predictions using the deployed model
31+
predictions = deployment.predict(
32+
inputs=[4700702588013561],
33+
)
34+
print(f'⛳️ Prediction: {predictions}')
35+
36+
deployment.stop()
37+
38+
print('🔮 Deployment is stopped!')
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
import hopsworks
2+
import xgboost as xgb
3+
import pandas as pd
4+
import os
5+
from sklearn.metrics import confusion_matrix
6+
from sklearn.metrics import f1_score
7+
from matplotlib import pyplot
8+
import seaborn as sns
9+
import joblib
10+
from hsml.schema import Schema
11+
from hsml.model_schema import ModelSchema
12+
from mage_ai.data_preparation.shared.secrets import get_secret_value
13+
if 'data_exporter' not in globals():
14+
from mage_ai.data_preparation.decorators import data_exporter
15+
16+
17+
def prepare_training_data(X_train, X_test, y_train, y_test):
18+
# Sort the training features DataFrame 'X_train' based on the 'datetime' column
19+
X_train = X_train.sort_values("datetime")
20+
21+
# Reindex the target variable 'y_train' to match the sorted order of 'X_train' index
22+
y_train = y_train.reindex(X_train.index)
23+
24+
# Sort the test features DataFrame 'X_test' based on the 'datetime' column
25+
X_test = X_test.sort_values("datetime")
26+
27+
# Reindex the target variable 'y_test' to match the sorted order of 'X_test' index
28+
y_test = y_test.reindex(X_test.index)
29+
30+
# Drop the 'datetime' column from the training features DataFrame 'X_train'
31+
X_train.drop(["datetime"], axis=1, inplace=True)
32+
33+
# Drop the 'datetime' column from the test features DataFrame 'X_test'
34+
X_test.drop(["datetime"], axis=1, inplace=True)
35+
36+
return X_train, X_test, y_train, y_test
37+
38+
39+
@data_exporter
40+
def train_model(data, *args, **kwargs):
41+
"""
42+
Train an XGBoost classifier for fraud detection and save it in the Hopsworks Model Registry.
43+
44+
Args:
45+
data: The output from the upstream parent block
46+
args: The output from any additional upstream blocks (if applicable)
47+
"""
48+
TEST_SIZE = 0.2
49+
50+
# Specify your data exporting logic here
51+
project = hopsworks.login(
52+
api_key_value=get_secret_value('HOPSWORKS_API_KEY'),
53+
)
54+
55+
fs = project.get_feature_store()
56+
57+
# Get the 'transactions_view' feature view
58+
feature_view = fs.get_feature_view(
59+
name='transactions_view',
60+
version=1,
61+
)
62+
63+
X_train, X_test, y_train, y_test = feature_view.train_test_split(
64+
description='transactions fraud training dataset',
65+
test_size=TEST_SIZE,
66+
)
67+
68+
X_train, X_test, y_train, y_test = prepare_training_data(
69+
X_train,
70+
X_test,
71+
y_train,
72+
y_test,
73+
)
74+
X_train.to_csv(f'X_train.csv')
75+
76+
# Create an XGBoost classifier
77+
model = xgb.XGBClassifier()
78+
79+
# Fit XGBoost classifier to the training data
80+
model.fit(X_train, y_train)
81+
82+
# Predict the training data using the trained classifier
83+
y_pred_train = model.predict(X_train)
84+
85+
# Predict the test data using the trained classifier
86+
y_pred_test = model.predict(X_test)
87+
88+
# Compute f1 score
89+
metrics = {
90+
"f1_score": f1_score(y_test, y_pred_test, average='macro')
91+
}
92+
93+
# Calculate and print the confusion matrix for the test predictions
94+
results = confusion_matrix(y_test, y_pred_test)
95+
print(results)
96+
97+
# Create a DataFrame for the confusion matrix results
98+
df_cm = pd.DataFrame(
99+
results,
100+
['True Normal', 'True Fraud'],
101+
['Pred Normal', 'Pred Fraud'],
102+
)
103+
104+
# Create a heatmap using seaborn with annotations
105+
cm = sns.heatmap(df_cm, annot=True)
106+
107+
# Get the figure and display it
108+
fig = cm.get_figure()
109+
110+
# Create a Schema for the input features using the values of X_train
111+
input_schema = Schema(X_train.values)
112+
113+
# Create a Schema for the output using y_train
114+
output_schema = Schema(y_train)
115+
116+
# Create a ModelSchema using the defined input and output schemas
117+
model_schema = ModelSchema(
118+
input_schema=input_schema,
119+
output_schema=output_schema,
120+
)
121+
122+
# Convert the model schema to a dictionary for inspection
123+
model_schema.to_dict()
124+
125+
# Specify the directory name for saving the model and related artifacts
126+
model_dir = "quickstart_fraud_model"
127+
128+
# Check if the directory already exists; if not, create it
129+
if not os.path.isdir(model_dir):
130+
os.mkdir(model_dir)
131+
132+
# Save the trained XGBoost classifier to a joblib file in the specified directory
133+
joblib.dump(model, model_dir + '/xgboost_model.pkl')
134+
135+
# Save the confusion matrix heatmap figure to an image file in the specified directory
136+
fig.savefig(model_dir + "/confusion_matrix.png")
137+
138+
# Get the model registry
139+
mr = project.get_model_registry()
140+
141+
# Create a Python model named "fraud" in the model registry
142+
fraud_model = mr.python.create_model(
143+
name="fraud",
144+
metrics=metrics, # Specify the metrics used to evaluate the model
145+
model_schema=model_schema, # Use the previously defined model schema
146+
input_example=[4700702588013561], # Provide an input example for testing deployments
147+
description="Quickstart Fraud Predictor", # Add a description for the model
148+
)
149+
150+
# Save the model to the specified directory
151+
fraud_model.save(model_dir)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import hopsworks
2+
import os
3+
import time
4+
from mage_ai.data_preparation.shared.secrets import get_secret_value
5+
if 'data_exporter' not in globals():
6+
from mage_ai.data_preparation.decorators import data_exporter
7+
8+
9+
@data_exporter
10+
def deploy_model(data, *args, **kwargs):
11+
"""
12+
Deploys the trained XGBoost classifier.
13+
14+
Args:
15+
data: The output from the upstream parent block
16+
args: The output from any additional upstream blocks (if applicable)
17+
"""
18+
# Specify your data exporting logic here
19+
project = hopsworks.login(
20+
api_key_value=get_secret_value('HOPSWORKS_API_KEY'),
21+
)
22+
23+
fs = project.get_feature_store()
24+
# Get the model registry
25+
mr = project.get_model_registry()
26+
27+
# Get model object
28+
fraud_model = mr.get_model(
29+
name="fraud",
30+
version=1,
31+
)
32+
print('Model is here!')
33+
34+
# Get the dataset API from the project
35+
dataset_api = project.get_dataset_api()
36+
37+
# Specify the file to upload ("predict_example.py") to the "Models" directory, and allow overwriting
38+
uploaded_file_path = dataset_api.upload(
39+
"predictor_script.py",
40+
"Models",
41+
overwrite=True,
42+
)
43+
44+
# Construct the full path to the uploaded predictor script
45+
predictor_script_path = os.path.join(
46+
"/Projects",
47+
project.name,
48+
uploaded_file_path,
49+
)
50+
51+
# Deploy the fraud model
52+
deployment = fraud_model.deploy(
53+
name="fraud", # Specify the deployment name
54+
script_file=predictor_script_path, # Provide the path to the predictor script
55+
)
56+
57+
print("Deployment is warming up...")
58+
time.sleep(45)

0 commit comments

Comments
 (0)