import warnings
from pathlib import Path
from typing import Any, Dict, Optional
import joblib
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, ClassifierMixin, is_classifier
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import GridSearchCV
from sklearn.multiclass import OneVsRestClassifier
from sklearn.pipeline import Pipeline
import gobbli.io
from gobbli.model.base import BaseModel
from gobbli.model.context import ContainerTaskContext
from gobbli.model.mixin import EmbedMixin, PredictMixin, TrainMixin
from gobbli.util import assert_type, generate_uuid, multilabel_to_indicator_df
[docs]def persist_estimator(estimator: BaseEstimator) -> Path:
"""
Saves the given estimator to a gobbli-managed filepath, where it can be loaded from
disk by the SKLearnClassifier. This is useful if you want to use an estimator but
don't want to bother with saving it to disk on your own.
Args:
estimator: The estimator to load.
Returns:
The path where the estimator was saved.
"""
estimator_dir = (
SKLearnClassifier.model_class_dir() / "user_estimators" / generate_uuid()
)
estimator_dir.mkdir(exist_ok=True, parents=True)
estimator_path = estimator_dir / SKLearnClassifier._TRAIN_OUTPUT_CHECKPOINT
SKLearnClassifier._dump_estimator(estimator, estimator_path)
return estimator_path
[docs]def make_default_tfidf_logistic_regression() -> BaseEstimator:
"""
Returns:
A pipeline composing a TF-IDF vectorizer and a logistic regression model using
default parameters.
"""
return Pipeline(
[("tfidf", TfidfVectorizer()), ("logreg", LogisticRegression(random_state=1))]
)
[docs]def make_cv_tfidf_logistic_regression(
grid_params: Optional[Dict[str, Any]] = None
) -> BaseEstimator:
"""
Args:
grid_params: Grid search parameters for the pipeline. Passed directly to
:class:`sklearn.model_selection.GridSearchCV`. See
:func:`make_default_tfidf_logistic_regression` for the names of the pipeline
components. If not given, will use a somewhat reasonable default.
Returns:
A cross-validated pipeline combining a TF-IDF vectorizer and logistic regression model
with the specified grid parameters.
"""
if grid_params is None:
grid_params = {
"tfidf__ngram_range": [(1, 2)],
"tfidf__min_df": [0.01],
"tfidf__max_df": [0.95],
"logreg__C": [0.1, 0.5, 1],
"logreg__l1_ratio": [0, 0.25, 0.5, 0.75, 1],
"logreg__solver": ["saga"],
"logreg__penalty": ["elasticnet"],
}
return GridSearchCV(
make_default_tfidf_logistic_regression(),
grid_params,
cv=5,
return_train_score=True,
verbose=10,
)
_AT_LEAST_TWO_CLASSES_ERR_MSG = (
"This solver needs samples of at least 2 classes in the data"
)
class _SafeEstimator(BaseEstimator, ClassifierMixin):
"""
Wrap an arbitrary classifier estimator to catch errors when fitting
models that require more than 1 class in the data.
"""
def __init__(self, base_estimator: BaseEstimator):
self.base_estimator = base_estimator
self.classes_: Optional[np.ndarray] = None
if hasattr(base_estimator, "classes_"):
self.classes_ = self.base_estimator.classes_
def fit(self, *args, **kwargs):
try:
return self.base_estimator.fit(*args, **kwargs)
except ValueError as e:
if _AT_LEAST_TWO_CLASSES_ERR_MSG not in str(e):
raise
finally:
if hasattr(self.base_estimator, "classes_"):
self.classes_ = self.base_estimator.classes_
def predict_proba(self, X):
if self.classes_ is None:
raise ValueError(
"Can't predict without knowing what the estimator's classes are."
)
if len(self.classes_) == 1:
return np.ones((len(X), 1))
return self.base_estimator.predict_proba(X)
def predict(self, X):
if self.classes_ is None:
raise ValueError(
"Can't predict without knowing what the estimator's classes are."
)
if len(self.classes_) == 1:
return np.full_like(X, self.classes_[0])
return self.base_estimator.predict(X)
[docs]class SKLearnClassifier(BaseModel, TrainMixin, PredictMixin):
"""
Classifier wrapper for `scikit-learn <https://scikit-learn.org/stable/>`__ classifiers.
Wraps a :class:`sklearn.base.BaseEstimator` which accepts text input and outputs
predictions.
Creating an estimator that meets those conditions will generally require
some use of :class:`sklearn.pipeline.Pipeline` to compose a transform (e.g. a vectorizer
to vectorize text) and an estimator (e.g. logistic regression). See the helper functions
in this module for some examples. You may also consider wrapping the pipeline with
:class:`sklearn.model_selection.GridSearchCV` to tune hyperparameters.
For multilabel classification, the passed estimator will be automatically wrapped in a
:class:`sklearn.multiclass.OneVsRestClassifier`.
"""
_TRAIN_OUTPUT_CHECKPOINT = "estimator.joblib"
[docs] def init(self, params: Dict[str, Any]):
"""
See :meth:`gobbli.model.base.BaseModel.init`.
SKLearnClassifier parameters:
- ``estimator_path`` (:obj:`str`): Path to an estimator pickled by joblib.
The pickle will be loaded, and the resulting object will be used as the estimator.
If not provided, a default pipeline composed of a TF-IDF vectorizer and a
logistic regression will be used.
"""
estimator = None
for name, value in params.items():
if name == "estimator_path":
assert_type(name, value, str)
estimator = SKLearnClassifier._load_estimator(Path(value))
SKLearnClassifier._validate_estimator(estimator)
else:
raise ValueError(f"Unknown param '{name}'")
if estimator is None:
self.estimator = _SafeEstimator(make_default_tfidf_logistic_regression())
else:
self.estimator = _SafeEstimator(estimator)
@staticmethod
def _load_estimator(estimator_path: Path) -> BaseEstimator:
return joblib.load(estimator_path)
@staticmethod
def _dump_estimator(estimator: BaseEstimator, estimator_path: Path):
joblib.dump(estimator, estimator_path)
@staticmethod
def _validate_estimator(estimator: BaseEstimator):
"""
Run some checks on the given object to determine if it's an estimator which is
valid for our purposes.
"""
# sklearn has a function that does a lot more intensive checking regarding
# the interface of a candidate Estimator
# (sklearn.utils.estimator_checks.check_estimator), but the function
# doesn't work well for our use case as of version 0.22. It doesn't properly
# detect Pipeline X_types based on the first pipeline component and won't
# test anything that doesn't accept a 2-D numpy array as input. We'll settle
# for lax checks here until sklearn has something that works better for us.
if not is_classifier(estimator):
raise ValueError(
"Estimator must be a classifier according to sklearn.base.is_classifier()"
)
if not hasattr(estimator, "predict_proba"):
raise ValueError(
"Estimator must support the predict_proba() method to fulfill gobbli's "
"interface requirements for a prediction model."
)
def _build(self):
"""
No build step required for this model.
"""
def _train(
self, train_input: gobbli.io.TrainInput, context: ContainerTaskContext
) -> gobbli.io.TrainOutput:
if train_input.checkpoint is not None:
warnings.warn(
"SKLearnClassifier does not support training from an existing "
"checkpoint, so the passed checkpoint will be ignored."
)
# Input must be a numpy array for OneVsRestClassifier case
X_train = np.array(train_input.X_train)
X_valid = np.array(train_input.X_valid)
y_train = train_input.y_train
y_valid = train_input.y_valid
labels = train_input.labels()
if train_input.multilabel:
self.estimator.base_estimator = OneVsRestClassifier(
self.estimator.base_estimator
)
y_train = multilabel_to_indicator_df(train_input.y_train_multilabel, labels)
y_valid = multilabel_to_indicator_df(train_input.y_valid_multilabel, labels)
self.estimator.fit(X_train, y_train)
if train_input.multilabel:
# The fit method for OneVsRestClassifier uses LabelBinarizer to determine the
# classes, which doesn't take string column names from a pandas DataFrame,
# so the classes will come back as integer indexes. Fix that manually here.
# Use a numpy array to ensure compatilibity with the automatically-created classes.
np_labels = np.array(labels)
self.estimator.classes_ = np_labels
self.estimator.base_estimator.classes_ = np_labels
self.estimator.base_estimator.label_binarizer_.classes_ = np_labels
y_train_pred = self.estimator.predict(X_train)
y_valid_pred = self.estimator.predict(X_valid)
train_loss = -f1_score(
y_train, y_train_pred, zero_division="warn", average="weighted"
)
valid_loss = -f1_score(
y_valid, y_valid_pred, zero_division="warn", average="weighted"
)
valid_accuracy = accuracy_score(y_valid, y_valid_pred)
checkpoint_path = (
context.host_output_dir / SKLearnClassifier._TRAIN_OUTPUT_CHECKPOINT
)
self._dump_estimator(self.estimator.base_estimator, checkpoint_path)
return gobbli.io.TrainOutput(
valid_loss=valid_loss,
valid_accuracy=valid_accuracy,
train_loss=train_loss,
labels=labels,
multilabel=train_input.multilabel,
checkpoint=checkpoint_path,
)
def _predict(
self, predict_input: gobbli.io.PredictInput, context: ContainerTaskContext
) -> gobbli.io.PredictOutput:
if predict_input.checkpoint is not None:
self.estimator = _SafeEstimator(
self._load_estimator(predict_input.checkpoint)
)
elif predict_input.multilabel:
# A trained checkpoint on a multilabel problem will already
# be wrapped with OneVsRestClassifier, so only need to do it
# if we're predicting without a checkpoint for whatever reason
self.estimator.base_estimator = OneVsRestClassifier(
self.estimator.base_estimator
)
# Input must be a numpy array for OneVsRestClassifier case
X = np.array(predict_input.X)
pred_proba_df = pd.DataFrame(self.estimator.predict_proba(X))
if self.estimator.classes_ is None:
raise ValueError(
"Can't determine column names for predicted probabilities."
)
pred_proba_df.columns = self.estimator.classes_.astype("str")
labels = predict_input.labels
for label in labels:
if label not in pred_proba_df.columns:
pred_proba_df[label] = 0.0
return gobbli.io.PredictOutput(y_pred_proba=pred_proba_df)
[docs]class TfidfEmbedder(BaseModel, EmbedMixin):
"""
Embedding wrapper for scikit-learn's :class:`sklearn.feature_extraction.text.TfidfVectorizer`.
Generates "embeddings" composed of TF-IDF vectors.
"""
[docs] def init(self, params: Dict[str, Any]):
"""
See :meth:`gobbli.model.base.BaseModel.init`.
TFidfEmbedder parameters will be passed directly to the
:class:`sklearn.feature_extraction.text.TfidfVectorizer` constructor, which will
perform its own validation.
"""
# This should raise an error if there's anything wrong with any of the passed
# parameters
self.vec = TfidfVectorizer(**params)
def _build(self):
"""
No build step required for this model.
"""
def _embed(
self, embed_input: gobbli.io.EmbedInput, context: ContainerTaskContext
) -> gobbli.io.EmbedOutput:
if embed_input.checkpoint is not None:
warnings.warn(
"TfidfEmbedder does not support embedding from an existing "
"checkpoint, so the passed checkpoint will be ignored."
)
if embed_input.pooling == gobbli.io.EmbedPooling.NONE:
raise ValueError(
"TfidfEmbedder embeds whole documents, so pooling is required."
)
X_vectorized = self.vec.fit_transform(embed_input.X)
return gobbli.io.EmbedOutput(
X_embedded=[np.squeeze(np.asarray(vec.todense())) for vec in X_vectorized]
)