3.4 Part 4: Deploy with the Data Store

3.4.1 Explanation of the case

The goal of this use case is to accompany you in the use of the platform with a simple and basic machine learning application, using the iris dataset. You can test this example directly on the platform to understand the mechanics presented.

This page will present you how to add inputs and outputs to the pipeline and endpoint you created in the previous part.

Prerequisites

3.4.2 Drawbacks of the previous part

In part 3 we have built a complete machine learning pipeline to train and evaluate a model on the iris dataset. This pipeline takes as input some data for model training and outputs the predictions of the trained model on the data given as input.

We have learned a lot ! Though, this approach is not very realistic, and there are some issues that we would like to solve:

  • Model reusability: each time we make a prediction we train the same model which is not efficient at all. In a typical machine learning application we might want to train a model first and then use it whenever we have to make a prediction.

  • External sources: for now we conveniently used the iris dataset that is included in sklearn but in a real life application the training dataset would typically be stored on a database that we will read from at training time.

  • Inputs flexibility: with a dedicated train pipeline at our disposal, we might also want to pass some training parameters, and get some model evaluation metrics in return so that we can easily train and compare different models to see which one we want to use for the prediction part.

3.4.2.1 How to handle those cases with the platform ?

Within the Craft AI platform, this translates into having 2 different pipelines (cf figure below) :

  • A first pipeline for training that:

    • reads the iris dataset from a database,

    • trains a model using the given model parameters,

    • stores the trained model on the Data Store (a file storage that will be introduced to you further),

    • send a JSON with the model evaluation metrics.

  • A second pipeline for prediction that:

    • retrieves the trained model,

    • uses input data to serve the predictions, the iris species in this case.

To achieve all of that, we are going to build an app to train and deploy a model for the iris dataset with the following architecture:

../_images/step4_0.png

This might seem like a big step up from the previous part, but you will see how simple it is to build this application with the platform.

3.4.3 Storing Data on the Platform

At this point you have all the tools with the platform to build this application, except for the storage shared by the two pipelines.

That’s where the Data Store comes into play! The Data Store is a file storage on which you can upload and download unlimited files and organize them as you see fit using the SDK. All your steps can download and upload are files from and to the Data Store.

3.4.3.1 Pushing the dataset to the Data Store

In our case the first thing we want to do is to upload the iris dataset to the Data Store. You can do so with the upload_data_store_object function from the SDK like so:

from io import BytesIO
from sklearn import datasets
import pandas as pd

iris = datasets.load_iris(as_frame=True)
iris_df = pd.concat([iris.data, iris.target], axis=1)

file_buffer = BytesIO(iris_df.to_parquet())
sdk.upload_data_store_object(
    filepath_or_buffer=file_buffer,
    object_path_in_datastore="get_started/dataset/iris.parquet"
)

This function takes two parameters:

  • filepath_or_buffer: the path of the file you want to upload or a file like object. Note that in this case, to avoid writing the dataset locally, we wrote it in a BytesIO which is a file-like object. The dataset is written in parquet format using pandas to_parquet() function.

  • object_path_in_datastore: The path where the file will be stored in the Data Store.

We can then view the dataset that we pushed to the Data Store using the object_path_in_datastore function.

sdk.list_data_store_objects()

3.4.4 Building the new Train Pipeline

Since we want to have separate train and predict pipelines, we have to write a dedicated train function.

../_images/step4_1.png

Let’s create a dedicated folder and the a file in which to create the new train function.

On our side, see below the structure on which the code of this part is based:

.
└── src
    └── iris_train.py
.
└── requirements.txt

We will fill the requirements.txt file later, with the libraries needed by our function.

3.4.4.1 Implement the code

We can reuse most of the code of the previous part for the model training but we need to add a few things:

  • The new inputs we want for our train pipeline as function parameters:

    def train(
        **train_val_prop: float,
        model_parameters: dict,
        dataset_path: str,
        model_output_path: str,**
    ):
    

    Here we specify as input:

    • train_val_prop: the proportion of the dataset to use for the train set,

    • model_parameters: the hyperparameters,

    • dataset_path: the path of the dataset on the Data Store. This way, we can choose on which dataset we want to train our model at execution time. This way we do not have to build a new step if we want to train on a different version of the dataset.

    • model_output_path: where we want to store the model.

  • Instantiate a craft sdk object to use the Data Store:

    sdk = CraftAiSdk()
    

    Note that you can instantiate the sdk without parameter from the code of a step, because the token and the environment url are environment variables.

  • The code to retrieve the iris dataset from the Data Store:

    file_buffer = BytesIO()
    sdk.download_data_store_object(
                object_path_in_datastore=dataset_path, filepath_or_buffer=file_buffer
        )
    dataset_df = pd.read_parquet(file_buffer)
    

    Here we use the download_data_store_object function of the SDK. We first instantiate an empty file-like object f and we download the dataset into it. This way we do not have to write the file on the disk. We then read it as a dataframe using pd.read_parquet() since we stored the dataset as a parquet file on the Data Store.

  • The code to push the trained model on the Data Store:

    model_buffer = BytesIO()
    joblib.dump(knn, model_buffer)
    model_buffer.seek(0)
    sdk.upload_data_store_object(model_buffer, model_output_path)
    

Here, we use joblib.dump to serialize our trained model knn before uploading it on the Data Store with the upload_data_store_object function.

  • The code to compute the evaluation metrics and the new outputs containing a dictionary with evaluation metrics:

    mean_accuracy = knn.score(X_val, y_val)
    metrics_dict = {"accuracy": mean_accuracy}
    return {"metrics": metrics_dict}
    

All in all this gives the following function:

def train(
    **train_val_prop: float,
    model_parameters: dict,
    dataset_path: str,
    model_output_path: str,**
):

    sdk = CraftAiSdk()

    file_buffer = BytesIO()
    sdk.download_data_store_object(dataset_path, file_buffer)
    dataset_df = pd.read_parquet(file_buffer)

    X = dataset_df.loc[:, dataset_df.columns != "target"].values
    y = dataset_df.loc[:, "target"].values

    np.random.seed(0)
    indices = np.random.permutation(len(X))

    n_train_samples = int(train_val_prop * len(X))
    train_indices = indices[:n_train_samples]
    val_indices = indices[n_train_samples:]

    X_train = X[train_indices]
    y_train = y[train_indices]
    X_val = X[val_indices]
    y_val = y[val_indices]

        # Note how the model_parameters dict is passed directly to the
        # KNeighborsClassifier constructor for maximum flexibility
    knn = KNeighborsClassifier(**model_parameters)
    knn.fit(X_train, y_train)

    mean_accuracy = knn.score(X_val, y_val)
    metrics_dict = {"accuracy": mean_accuracy}

    model_buffer = BytesIO()
    joblib.dump(knn, model_buffer)
    model_buffer.seek(0)
    sdk.upload_data_store_object(model_buffer, model_output_path)

    return {"metrics": metrics_dict}

We fill the requirements.txt file with the depending libraries:

joblib==1.2.0
numpy==1.19.5
pandas==1.4.2
scikit_learn==1.2.1
pyarrow==11.0.0
fastparquet==2023.2.0
craft-ai-sdk==0.15.0

Warning

Since we updated our code, we must add and commit our changes with Git and push them to GitHub so that the platform can take them into account !

3.4.4.2 Create Train Step & Pipeline

Inputs & Outputs definition

We also need to declare the inputs and outputs of the step to the platform as usual, using the Input and Output objects:

train_val_prop_input = Input(name="train_val_prop", data_type="number")
model_parameters_input = Input(name="model_parameters", data_type="json")
dataset_input = Input(name="dataset_path", data_type="string")
model_output_path = Intput(name="model_output_path", data_type="string")
train_iris_inputs = [
    train_val_prop_input,
    model_parameters_input,
    dataset_input,
    model_output_path,
]

metrics_file_output = Output(name="metrics", data_type="json")
train_iris_outputs = [metrics_file_output]

Note that in this case, the trained model is not an output of the step since it is uploaded to the Data Store directly to the Data Store in the code of the step.

Step & Pipeline creation

We can now create the train step and pipeline with the following SDK command:

step = sdk.create_step(
    # The path must be relative to the root of the repo
    step_name="part-4-iris_train-step",
    function_path="src/part-4-iris_train.py",
    function_name="train",
    inputs=train_iris_inputs,
    outputs=train_iris_outputs,
        container_config={
        "requirements_path": "requirements.txt"
    },
)
pipeline = sdk.create_pipeline(
    pipeline_name="part-4-iris_train-pipeline",
    step_name="part-4-iris_train-step",
)

We now have built a train pipeline that will allow us to:

  • Choose our training set at execution time

  • Experiment with different model parameters, using the model_parameters input

  • Store the model on the Data Store for future use

  • Have some evaluation metrics to select the best model

And we did all that with less than 40 additional lines of code.

3.4.4.3 Deploy Train Pipeline and use it

The train pipeline is now just waiting to be deployed!

Deployment Input Mappings definition & Endpoint creation

To deploy this pipeline you just have to specify the output mapping, the pipeline name you want to deploy and the name of the endpoint you want to deploy in the create_endpoint function:

output_mappings = [
    OutputDestination(
        step_output_name='metrics',
        endpoint_output_name='metrics')
]

endpoint = sdk.create_deployment(
    pipeline_name="part-4-iris-train-pipeline",
    deployment_name="part-4-iris-train-endpoint",
    execution_rule="endpoint",
    outputs_mapping=output_mappings
)

print(endpoint)

In this case, we declare to the platform that we want the metrics output of the pipeline to be stored in the JSON response of the endpoint we are creating.

Endpoint triggering

Now we can call this endpoint with a standard HTTP request like we did in the previous part. Just fill in the inputs with the values you want for this execution.

endpoint_url = sdk.base_environment_url + "/endpoints/" + endpoint["name"]
inputs = {
        "dataset_path": "get_started/dataset/iris.parquet",
    "train_val_prop": 0.8,
    "model_parameters": {"n_neighbors": 10},
    "model_output_path": "get_started/models/test_model.joblib"
}
endpoint_token = endpoint["endpoint_token"]

request = requests.post(endpoint_url, headers={"Authorization": f"EndpointToken {endpoint_token}"}, json=inputs)
request.json()

We can see in the metrics fields of the JSON output that the model has an accuracy of about 96.7%.

We can easily train another model with different parameters by changing the value of n_neighbors form 10 to 1 for instance:

endpoint_url = os.path.join(os.environ.get("CRAFT_AI_ENVIRONMENT_URL"), "endpoints", endpoint["name"])
inputs = {
        "dataset_path": "get_started/dataset/iris.parquet",
    "train_val_prop": 0.8,
    "model_parameters": {"n_neighbors": 1},
    "model_output_path": "get_started/models/test_model_2.joblib"
}
endpoint_token = endpoint["endpoint_token"]

request = requests.post(endpoint_url, headers={"Authorization": f"EndpointToken {endpoint_token}"}, json=inputs)
request.json()

With these parameters, the accuracy drops to 93.3% so we might want to keep the first model.

You can check that these two models have been stored in the Data Store with the list_data_store_objects function of the SDK:

sdk.list_data_store_objects()

3.4.5 The Predict Pipeline

Now that we have a model we are satisfied with, we are going to build the predict pipeline that uses a trained model to make prediction on new input data, and deploy it to make our iris species prediction app available to the world.

../_images/step4_2.png

We will create a dedicated function for the predict step.

On our side, see below the structure on which the code of this part is based:

.
└── src
        └── iris_train.py
    └── iris_predict.py
.
└── requirements.txt

3.4.5.1 Implement the code

As always we have to write the code that will be executed for the predict pipeline. In this case the function is quite simple:

from io import BytesIO
import joblib
import pandas as pd
from craft_ai_sdk import CraftAiSdk

def predict(model_path: str, input_data: dict):
    sdk = CraftAiSdk()

    file_buffer = BytesIO()
    sdk.download_data_store_object(model_path, file_buffer)
    model = joblib.load(file_buffer)

    # TODO change to use simple 2D-arrays as input
    input_dataframe = pd.DataFrame.from_dict(input_data, orient="index")
    output_predictions = model.predict(input_dataframe)

    return {"predictions": output_predictions.tolist()}

It takes two arguments model_path, the path on the Data Store of the model we want to use for prediction and input_data, the data on which we want to make the predictions.

We then download the model as we did with the dataset on the train step, and use it to make the prediction on input_data which is expected to have the same format as in the previous part.

Finally, we return a predictions output as a list of predictions.

Warning

Since we updated our code, we must add and commit our changes with Git and push them to GitHub so that the platform can take them into account !

3.4.5.2 Create Predict Step & Pipeline

Inputs & Outputs definition

Then we specify the inputs and outputs of the steps:

model_path_input = Input(name="model_path", data_type="string")
input_data_input = Input(name="input_data", data_type="json")
predict_inputs = [
    model_path_input,
    input_data_input,
]

predictions_output = Output(name="predictions", data_type="json")
predict_outputs = [predictions_output]

Step & Pipeline creation

We declare the predict step to the platform putting together the function to execute, the inputs and the outputs.

step = sdk.create_step(
    step_name="get-started-part4-iris-predict",
    function_path="Part_4-Data_Store_and_Deployment_Mapping/iris_predict.py",
    function_name="predict",
    inputs=predict_inputs,
    outputs=predict_outputs,
    container_config={
        "requirements_path": "requirements.txt"
    }
)

We also create the corresponding pipeline

pipeline = sdk.create_pipeline(
    step_name="part-4-iris-predict-step",
    pipeline_name="part-4-iris-predict-pipeline"
)

3.4.5.3 Deploy Predict Pipeline and use it

Now that our predict pipeline is created on the platform, we are ready to deploy it !

Deployment Input & Output Mappings definition

However, if we were to deploy it like the previous pipeline, the endpoint would expect to have all its inputs given in the HTTP request. Concretely this means that we would ask the end user of our app to specify the path of the model on the Data Store which of course he has no idea about. We do not want our user to be responsible of the model that will be used for prediction.

The good news is that you do not have to plug the input of the pipeline to the input of the endpoint.

The InputSource allows you to map the input of the pipeline to either:

  • The input of the endpoint

  • A constant value

If you do not specify any input mapping for an input the endpoint will expect it to be passed in the body of the request with the same name as the input. That is why for the previous endpoints deployments we did not specify any input mapping and specified the inputs of the pipeline’s input in the request directly.

We can now create a constant input mapping to specify the path of the model we want to deploy and create the deployment like so:

input_mappings = [
    InputSource(
        step_input_name="model_path",
        constant_value="get_started/models/test_model.joblib")
]

output_mappings = [
    OutputDestination(
        step_output_name="predictions",
        endpoint_output_name="iris_species")
]

Endpoint creation

endpoint = sdk.create_deployment(
    pipeline_name="part-4-iris-predict-pipeline",
    endpoint_name="part-4-iris-predict-endpoint",
    execution_rule="endpoint",
    inputs_mapping=input_mappings,
    outputs_mapping=output_mappings
)

Endpoint triggering

We can now call our endpoint like in the previous part.

We first prepare the input data on which we want to do the prediction:

np.random.seed(0)
indices = np.random.permutation(150)
iris_X, iris_y = datasets.load_iris(return_X_y=True, as_frame=True)
iris_X_test = iris_X.loc[indices[90:120],:]
input_json_data = iris_X_test.to_dict(orient="index")

Then, we call the endpoint as we already did before. Note we do not have to specify the path of the model on the Data Store, but only the input data.

endpoint_url = sdk.base_environment_url + "/endpoints/" + endpoint["name"]

inputs = {"input_data": input_json_data}
endpoint_token = endpoint["endpoint_token"]

request = requests.post(endpoint_url, headers={"Authorization": f"EndpointToken {endpoint_token}"}, json=inputs)
request.json()["outputs"]["iris_species"]

Note: it may also be used to freeze some parameters and make them static. For instance in a train pipeline, to freeze dataset if it is not supposed to change, or parameters if they are optimal.

3.4.6 Conclusion

You are now able to:

  • Use the Data Store on the platform to upload and download files, models, images, etc.

  • Deploy some code to Production within a pipeline in a few lines of code.

  • Execute your pipeline via an endpoint that is accessible from outside with a secured token.

  • Make your inputs flexible: set constant values to avoid users to fill in, or let users to enter inputs values via the endpoint directly.