Within the previous lecture, now we have logged metrics, parameters, numerous artifacts, however haven’t logged a mannequin but. You could possibly simply saved a mannequin in a .pkl file, however MLflow goes past that: it supplies a standardized format known as an MLflow Mannequin, which defines how a mannequin, its dependencies, and its code are saved. That is important for downstream duties like real-time serving, which will likely be lined later within the course.
A mannequin may be logged utilizing the mlflow.
To show logging, we’ll begin with coaching a scikit-learn pipeline (known as Primary mannequin) and logging it utilizing sklearn taste. We’ll stroll by the notebooks/lecture4.train_register_basic_model.py code from the course GitHub repo.
Since we’re interacting with MLflow, we have to arrange monitoring and registry URIs simply as we did in lecture 3:
import mlflow
import os
from dotenv import load_dotenvdef is_databricks():
return "DATABRICKS_RUNTIME_VERSION" in os.environ
if not is_databricks():
load_dotenv()
profile = os.environ["PROFILE"]
mlflow.set_tracking_uri(f"databricks://{profile}")
mlflow.set_registry_uri(f"databricks-uc://{profile}")
Then we’ll load the mission configuration, initialize the SparkSession, and outline tags we’ll must tag the MLflow run and registered mannequin:
from pyspark.sql import SparkSessionfrom marvel_characters.config import ProjectConfig, Tags
config = ProjectConfig.from_yaml(config_path="../project_config_marvel.yml", env="dev")
spark = SparkSession.builder.getOrCreate()
tags = Tags(**{"git_sha": "abcd12345", "department": "foremost"})
We’ll want these to initialize an occasion of BasicModel class. Then we load the information, put together options, practice and log the mannequin:
from marvel_characters.fashions.basic_model import BasicModelbasic_model = BasicModel(config=config,
tags=tags,
spark=spark)
basic_model.load_data()
basic_model.prepare_features()
basic_model.practice()
basic_model.log_model()
Let’s undergo the logics behind the BasicModel class to know what’s occurring. After the category will get initialized, we set sure class attributes similar to options, goal, parameters, and mannequin title.
We load the practice and the take a look at set utilizing pyspark, and we’ll want these pyspark dataframes later to log the mannequin enter, along with the delta desk model we retrieve. We additionally use toPandas() command to create pandas dataframes that are used for mannequin coaching and analysis.
Be aware that toPandas() command is reasonably inefficient, and in case your dataset is giant, chances are you’ll wish to search for options, similar to utilizing deltatable bundle and exterior credentials merchandising in the best way described in an earlier article. Logging enter knowledge on this case may be fairly difficult.
import mlflow
import pandas as pd
from delta.tables import DeltaTable
from lightgbm import LGBMClassifier
from loguru import logger
from mlflow import MlflowClient
from mlflow.fashions import infer_signature
from pyspark.sql import SparkSession
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipelinefrom marvel_characters.config import ProjectConfig, Tags
class BasicModel:
"""A fundamental mannequin class for Marvel character survival prediction
utilizing LightGBM.
"""
def __init__(self, config: ProjectConfig,
tags: Tags, spark: SparkSession) -> None:
self.config = config
self.spark = spark
self.tags = tags.to_dict()
# Extract settings from the config
self.num_features = self.config.num_features
self.cat_features = self.config.cat_features
self.goal = self.config.goal
self.parameters = self.config.parameters
self.catalog_name = self.config.catalog_name
self.schema_name = self.config.schema_name
self.experiment_name = self.config.experiment_name_basic
self.model_name = f"{self.catalog_name}.{self.schema_name}.marvel_character_model_basic"
def load_data(self) -> None:
"""Load coaching and testing knowledge from Delta tables.
"""
logger.data("🔄 Loading knowledge from Databricks tables...")
self.train_set_spark = self.spark.desk(f"{self.catalog_name}.{self.schema_name}.train_set")
self.train_set = self.train_set_spark.toPandas()
self.test_set_spark = self.spark.desk(f"{self.catalog_name}.{self.schema_name}.test_set")
self.test_set = self.test_set_spark.toPandas()
self.X_train = self.train_set[self.num_features + self.cat_features]
self.y_train = self.train_set[self.target]
self.X_test = self.test_set[self.num_features + self.cat_features]
self.y_test = self.test_set[self.target]
self.eval_data = self.test_set[self.num_features + self.cat_features + [self.target]]
train_delta_table = DeltaTable.forName(self.spark,
f"{self.catalog_name}.{self.schema_name}.train_set")
self.train_data_version = str(train_delta_table.historical past().choose("model").first()[0])
test_delta_table = DeltaTable.forName(self.spark,
f"{self.catalog_name}.{self.schema_name}.test_set")
self.test_data_version = str(test_delta_table.historical past().choose("model").first()[0])
logger.data("✅ Information efficiently loaded.")
The following methodology outlined within the class is prepare_features(), which defines the sklearn pipeline that consists of two steps: encoding categorical variables utilizing a customized encoder CatToIntTransofrmer, and LGBMClassifier.
LightGBM helps integer-encoded categorical options, which typically performs better than one-hot encoding. A customized encoder is important to verify the LightGBM mannequin treats integer-encoded options as categorical options, and earlier unseen classes get worth -1 assigned to keep away from errors whereas computing predictions.
You could discover that the CatToIntTransformer class is outlined contained in the prepare_features methodology. Whereas this isn’t very best from a design standpoint, it retains the mannequin self-contained, and we don’t must log our personal bundle along with the mannequin if we wish to use the mannequin for the downstream duties. We’ll present a greater to deal with personal dependencies once we focus on a customized pyfunc mannequin later on this article.
def prepare_features(self) -> None:
"""Encode categorical options and outline a preprocessing pipeline.
"""
logger.data("🔄 Defining preprocessing pipeline...")class CatToIntTransformer(BaseEstimator, TransformerMixin):
"""Transformer that encodes categorical columns as
integer codes for LightGBM.
Unknown classes at rework time are encoded as -1.
"""
def __init__(self, cat_features: listing[str]) -> None:
"""Initialize the transformer with categorical characteristic names."""
self.cat_features = cat_features
self.cat_maps_ = {}
def match(self, X: pd.DataFrame, y=None) -> None:
"""Match the transformer to the DataFrame X."""
self.fit_transform(X)
return self
def fit_transform(self, X: pd.DataFrame, y=None) -> pd.DataFrame:
"""Match and rework the DataFrame X."""
X = X.copy()
for col in self.cat_features:
c = pd.Categorical(X[col])
# Construct mapping: {class: code}
self.cat_maps_[col] = dict(zip(c.classes,
vary(len(c.classes)), strict=False))
X[col] = X[col].map(lambda val, col=col: self.cat_maps_[col].get(val, -1)).astype("class")
return X
def rework(self, X: pd.DataFrame) -> pd.DataFrame:
"""Remodel the DataFrame X by encoding categorical options as integers."""
X = X.copy()
for col in self.cat_features:
X[col] = X[col].map(lambda val, col=col: self.cat_maps_[col].get(val, -1)).astype("class")
return X
preprocessor = ColumnTransformer(
transformers=[("cat", CatToIntTransformer(self.cat_features), self.cat_features)],
the rest="passthrough"
)
self.pipeline = Pipeline(
steps=[("preprocessor", preprocessor),
("classifier", LGBMClassifier(**self.parameters))])
logger.data("✅ Preprocessing pipeline outlined.")
The practice() suits the pipeline, and the log_model() methodology logs the mannequin with all of the required data:
- Signature is inferred utilizing mannequin enter (X_train) and mannequin output (the results of working the predict perform on the pipeline), and handed when logging the mannequin. If the signature shouldn’t be offered, we’d not have the ability to register mannequin in Unity Catalog later.
- Enter datasets (practice and take a look at units, together with the delta desk model) are logged below the MLflow run to make sure that we will get the precise model of information used for coaching and analysis, even when knowledge was modified later, because of the time journey performance of delta tables. Bear in mind to set a correct retention interval on the delta desk (default is 7 days), in any other case chances are you’ll not have the ability to entry the precise model of the desk if VACUUM command was executed. Most accounts have predictive optimization enabled by default, which implies that Databricks routinely executes it as a part of the optimization course of.
def practice(self) -> None:
"""Prepare the mannequin."""
logger.data("🚀 Beginning coaching...")
self.pipeline.match(self.X_train, self.y_train)def log_model(self) -> None:
"""Log the mannequin utilizing MLflow."""
mlflow.set_experiment(self.experiment_name)
with mlflow.start_run(tags=self.tags) as run:
self.run_id = run.data.run_id
signature = infer_signature(model_input=self.X_train,
model_output=self.pipeline.predict(self.X_train))
train_dataset = mlflow.knowledge.from_spark(
self.train_set_spark,
table_name=f"{self.catalog_name}.{self.schema_name}.train_set",
model=self.train_data_version,
)
mlflow.log_input(train_dataset, context="coaching")
test_dataset = mlflow.knowledge.from_spark(
self.test_set_spark,
table_name=f"{self.catalog_name}.{self.schema_name}.test_set",
model=self.test_data_version,
)
mlflow.log_input(test_dataset, context="testing")
self.model_info = mlflow.sklearn.log_model(
sk_model=self.pipeline,
artifact_path="lightgbm-pipeline-model",
signature=signature,
input_example=self.X_test[0:1]
)
outcome = mlflow.fashions.consider(
self.model_info.model_uri,
self.eval_data,
targets=self.config.goal,
model_type="classifier",
evaluators=["default"],
)
self.metrics = outcome.metrics
Discover that we don’t log any metrics. The metrics will get computed and logged below the identical run utilizing the mlflow.fashions.consider() perform, which requires mannequin URI, analysis knowledge, goal, mannequin sort, and evaluators to run. Right here, we use default evaluators, which implies that commonplace metrics from the default evaluator will get logged:
After the mannequin is logged, we will get the logged mannequin utilizing the mannequin id (we will additionally use mannequin id within the mannequin URI to load the mannequin):
logged_model = mlflow.get_logged_model(basic_model.model_info.model_id)
mannequin = mlflow.sklearn.load_model(f"fashions:/{basic_model.model_info.model_id}")
This was not potential earlier than MLflow 3, which launched the idea of the LoggedModel. We additionally now have a separate mannequin tab below the MLflow experiments within the UI. Let’s examine the LoggedModel class. On function, I eliminated some metrics from the illustration (actually, there’s a separate entry for every metric proven within the desk from the UI earlier).
It’s potential to entry the mannequin’s metrics and parameters (now we have not logged any) instantly from the LoggedModel class (which was solely potential through the MLflow run in earlier variations of MLflow):
logged_model.params
logged_model.metrics
We nonetheless nee the run object to retrieve the details about the dataset inputs which have been used to coach and to guage the mannequin:
run = mlflow.get_run(basic_model.run_id)inputs = run.inputs.dataset_inputs
training_input = subsequent((x for x in inputs if len(x.tags)>0 and x.tags[0].worth == 'coaching'), None)
training_source = mlflow.knowledge.get_source(training_input)
training_source.load()
testing_input = subsequent((x for x in inputs if len(x.tags)>0 and x.tags[0].worth == 'testing'), None)
testing_source = mlflow.knowledge.get_source(testing_input)
testing_source.load()
The BasicModel class has one other methodology, register_model(), which registers mannequin within the Unity Catalog, along with the offered tags.
def register_model(self) -> None:
"""Register mannequin in Unity Catalog."""
logger.data("🔄 Registering the mannequin in UC...")
registered_model = mlflow.register_model(
model_uri=self.model_info.model_uri,
title=self.model_name,
tags=self.tags,
)
logger.data(f"✅ Mannequin registered as model {registered_model.model}.")latest_version = registered_model.model
shopper = MlflowClient()
shopper.set_registered_model_alias(
title=self.model_name,
alias="latest-model",
model=latest_version,
)
return latest_version
Discover that we set the “latest-model” alias to make it simple to seek out the most recent model of the registered mannequin. “Newest” is a reserved worth for the alias and may’t be used, and fashions can’t be referred as “newest” both.
Trying to find mannequin variations is fairly exhausting in any other case: you’ll be able to solely search by mannequin title or alias. Looking utilizing filter strings shouldn’t be supported when mannequin is registered in Unity Catalog.
Mannequin signature in MLflow defines how completely different interfaces work together with the mannequin. As an illustration, it defines the payload of the endpoint if the mannequin will get served utilizing Databricks mannequin serving.
We’ve simply registered a sklearn pipeline. If we deploy it behind an endpoint and question it, we’ll get an output within the format: {“Predictions”: [0]}. A pyfunc mannequin taste turns into helpful if we wish to modify the mannequin payload.
There are different eventualities when chances are you’ll wish to use a pyfunc. For instance, if we have to entry different programs (for instance, a database) to return predictions, or if mannequin serving requires particular artifacts (different information and even fashions).
Primarily, we’re utilizing pyfunc as a wrapper (In a sure sense, it’s similar to the performance of a FastAPI). Conserving the definition of the payload separate from the mannequin itself is handy : we will simply modify the pyfunc wrapper definition with out touching the registered mannequin itself.
Let’s show how a pyfunc wrapper can be utilized. Underneath the custom_model module of the marvel-characters bundle, we outlined the MarvelModelWrapper class. It has the load_context methodology which hundreds the fundamental mannequin we skilled earlier. The fundamental mannequin will get loaded from the context, which will get saved along with the logged pyfunc mannequin once we run the mlflow.pyfunc.log_model() perform.
Discover that the predict methodology makes use of the adjust_predictions perform outlined exterior of the MarvelModelWrapper, which implies that the marvel_characters bundle should be now logged along with the pyfunc wrapper.
from datetime import datetimeimport mlflow
import numpy as np
import pandas as pd
from mlflow import MlflowClient
from mlflow.fashions import infer_signature
from mlflow.pyfunc import PythonModelContext
from mlflow.utils.atmosphere import _mlflow_conda_env
from marvel_characters.config import Tags
def adjust_predictions(predictions):
return {"Survival prediction": ["alive" if pred == 1 else "dead" for pred in predictions]}
class MarvelModelWrapper(mlflow.pyfunc.PythonModel):
def load_context(self, context: PythonModelContext) -> None:
self.mannequin = mlflow.sklearn.load_model(
context.artifacts["lightgbm-pipeline"]
)
def predict(self, context: PythonModelContext, model_input: pd.DataFrame | np.ndarray) -> dict:
predictions = self.mannequin.predict(model_input)
return adjust_predictions(predictions)
Let’s check out the log_register_model methodology. It takes the code_paths argument, which comprises a neighborhood path to the marvel_characters bundle wheel. We use this listing to outline the conda_env. The situation of the wheel within the artifacts folder (it is going to be saved within the code folder) is outlined as a dependency.
Each code_paths and conda_env should be handed as an argument to the mlflow.pyfunc.log_model() perform. Right here, we additionally cross artifacts, which is a dictionary that comprises the fundamental mannequin URI.
def log_register_model(self, wrapped_model_uri: str, pyfunc_model_name: str,
experiment_name: str, tags: Tags, code_paths: listing[str],
input_example: pd.DataFrame) -> None:
mlflow.set_experiment(experiment_name=experiment_name)
with mlflow.start_run(run_name=f"wrapper-lightgbm-{datetime.now().strftime('%Y-%m-%d')}",
tags=tags.to_dict()):
additional_pip_deps = []
for bundle in code_paths:
whl_name = bundle.break up("/")[-1]
additional_pip_deps.append(f"code/{whl_name}")
conda_env = _mlflow_conda_env(additional_pip_deps=additional_pip_deps)signature = infer_signature(model_input=input_example,
model_output={"Survival prediction": ["alive"]})
model_info = mlflow.pyfunc.log_model(
python_model=self,
title="pyfunc-wrapper",
artifacts={
"lightgbm-pipeline": wrapped_model_uri},
signature=signature,
code_paths=code_paths,
conda_env=conda_env,
)
shopper = MlflowClient()
registered_model = mlflow.register_model(
model_uri=model_info.model_uri,
title=pyfunc_model_name,
tags=tags.to_dict(),
)
latest_version = registered_model.model
shopper.set_registered_model_alias(
title=pyfunc_model_name,
alias="latest-model",
model=latest_version,
)
return latest_version
The pyfunc wrapper will get registered in the identical manner as the fundamental mannequin, right here we additionally set the “latest-model” alias. That is how we log and register the pyfunc wrapper in notebooks/lecture4.train_register_custom_model.py:
from importlib.metadata import modelmarvel_characters_v = model("marvel_characters")
code_paths=[f"../dist/marvel_characters-{marvel_characters_v}-py3-none-any.whl"]
shopper = MlflowClient()
wrapped_model_version = shopper.get_model_version_by_alias(
title=f"{config.catalog_name}.{config.schema_name}.marvel_character_model_basic",
alias="latest-model")
test_set = spark.desk(f"{config.catalog_name}.{config.schema_name}.test_set").toPandas()
X_test = test_set[config.num_features + config.cat_features]
pyfunc_model_name = f"{config.catalog_name}.{config.schema_name}.marvel_character_model_custom"
wrapper = MarvelModelWrapper()
wrapper.log_register_model(wrapped_model_uri=f"fashions:/{wrapped_model_version.model_id}",
pyfunc_model_name=pyfunc_model_name,
experiment_name=config.experiment_name_custom,
input_example=X_test[0:1],
tags=tags,
code_paths=code_paths)
After the pyfunc mannequin is logged and registered, we will see within the UI how its artifacts are saved. We will discover the fundamental mannequin’s artifacts within the artifacts folder, and the bundle wheel within the code folder. Discover that the wheel is referenced within the necessities.txt. When the atmosphere will get created, all of the dependencies of our personal bundle get put in.
The mannequin may be loaded utilizing the mlflow.pyfunc.load_model() perform. If we wish to entry the unique MarvelModelWrapper class and its attributes, we should use the unwrap_python_model() methodology.
We will run the predict perform after we loaded the mannequin. Nevertheless, this doesn’t assure that the mannequin will likely be loaded efficiently on the serving step. That’s as a result of we’re using our present atmosphere.
loaded_pufunc_model = mlflow.pyfunc.load_model(f"fashions:/{pyfunc_model_name}@latest-model")unwraped_model = loaded_pufunc_model.unwrap_python_model()
unwraped_model.predict(context=None, model_input=X_test[0:1])
There’s a extra dependable manner that mimics the creation of mannequin serving atmosphere. Be aware that this code solely runs from Databricks atmosphere and wouldn’t work within the VS Code.
predictions = mlflow.fashions.predict(
f"fashions:/{pyfunc_model_name}@latest-model",
X_test[0:1])
On this lecture, we went past logging metrics and parameters, and logged and registered a mannequin with MLflow. We made positive to seize the mannequin signature, dataset variations, and tags containing the code model (for now, only a dummy worth) so our runs are absolutely reproducible.
We registered the mannequin in Unity Catalog and wrapped it in a pyfunc to regulate the output and bundle further dependencies for serving.
Subsequent up, we’ll dive into mannequin serving architectures and see how all of this comes collectively in manufacturing.