import json
import warnings
from copy import deepcopy
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy as np
import pandas as pd
import gobbli.io
from gobbli.docker import maybe_mount, run_container
from gobbli.model.base import BaseModel
from gobbli.model.context import ContainerTaskContext
from gobbli.model.mixin import EmbedMixin, PredictMixin, TrainMixin
from gobbli.util import assert_type, escape_line_delimited_texts
[docs]class SpaCyModel(BaseModel, TrainMixin, PredictMixin, EmbedMixin):
"""
gobbli interface for spaCy language models which allows for training
and prediction via the
`TextCategorizer pipeline component <https://spacy.io/api/textcategorizer>`__
and static embeddings via `Vectors <https://spacy.io/api/vectors>`__.
"""
_BUILD_PATH = Path(__file__).parent
_TRAIN_INPUT_FILE = "train.tsv"
_VALID_INPUT_FILE = "dev.tsv"
_TEST_INPUT_FILE = "test.tsv"
_LABELS_INPUT_FILE = "labels.tsv"
_TRAIN_OUTPUT_CHECKPOINT = "checkpoint"
_VALID_OUTPUT_FILE = "valid_results.json"
_TEST_OUTPUT_FILE = "test_results.tsv"
_EMBEDDING_INPUT_FILE = "input.tsv"
_EMBEDDING_OUTPUT_FILE = "embeddings.jsonl"
_CONTAINER_CACHE_DIR = Path("/cache")
[docs] def init(self, params: Dict[str, Any]):
"""
See :meth:`gobbli.model.base.BaseModel.init`.
spaCy parameters:
- ``model`` (:obj:`str`): Name of a spaCy model to use.
Available values are in `the spaCy model docs <https://spacy.io/models>`__ and
`the spacy-transformers docs <https://github.com/explosion/spacy-transformers>`__.
- ``architecture`` (:obj:`str`): Model architecture to use.
Available values are in `the spaCy API docs <https://spacy.io/api/textcategorizer#architectures>`__.
This is ignored if using a spacy-transformers model.
- ``dropout`` (:obj:`float`): Dropout proportion for training.
- ``full_pipeline`` (:obj:`bool`): If True, enable the full spaCy language pipeline
(including tagging, parsing, and named entity recognition) for the TextCategorizer
model used in training and prediction. This makes training/prediction much slower
but theoretically provides more information to the model. This is ignored if using a
spacy-transformers model.
Note that gobbli relies on spaCy to perform validation on these parameters,
so initialization errors may not be caught until model runtime.
"""
self.model = "en_core_web_lg"
self.architecture = "ensemble"
self.dropout = 0.2
self.full_pipeline = False
for name, value in params.items():
if name == "model":
self.model = value
elif name == "architecture":
self.architecture = value
elif name == "dropout":
assert_type(name, value, float)
self.dropout = value
elif name == "full_pipeline":
assert_type(name, value, bool)
self.full_pipeline = value
else:
raise ValueError(f"Unknown param '{name}'")
@property
def image_tag(self) -> str:
"""
Returns:
The Docker image tag to be used for the spaCy container.
"""
return "gobbli-spacy"
def _build(self):
# Add the spaCy model to the image build so it's properly installed
base_build_kwargs = deepcopy(self._base_docker_build_kwargs)
if "buildargs" not in base_build_kwargs:
base_build_kwargs["buildargs"] = {}
base_build_kwargs["buildargs"]["model"] = self.model
self.docker_client.images.build(
path=str(SpaCyModel._BUILD_PATH), tag=self.image_tag, **base_build_kwargs
)
@staticmethod
def _get_checkpoint(
user_checkpoint: Optional[Path], context: ContainerTaskContext
) -> Tuple[Optional[Path], Optional[Path]]:
"""
Determines the host checkpoint directory and container checkpoint directory
using the user-requested checkpoint (if any) and the container context.
Args:
user_checkpoint: An optional checkpoint passed in by the user. If the user doesn't
pass one, use the default pretrained checkpoint.
context: The container context to create the checkpoint in.
Returns:
A 2-tuple: the host checkpoint directory (if any) and
the container checkpoint directory (if any)
"""
if user_checkpoint is None:
host_checkpoint_dir = None
container_checkpoint_dir = None
else:
host_checkpoint_dir = user_checkpoint
container_checkpoint_dir = context.container_root_dir / "checkpoint"
return host_checkpoint_dir, container_checkpoint_dir
def _get_model(self, container_checkpoint_dir: Optional[Path]) -> Union[str, Path]:
"""
Determine the model to pass to the run_spacy script. If we don't have a
checkpoint, we'll use our stock model. Otherwise, we should use the
checkpoint.
"""
if container_checkpoint_dir is None:
return self.model
else:
return container_checkpoint_dir
@property
def host_cache_dir(self):
"""
Directory to be used for downloaded spaCy files.
Should be the same across all instances of the class, since these are
generally static model weights that can be reused.
"""
cache_dir = SpaCyModel.model_class_dir() / "cache"
cache_dir.mkdir(exist_ok=True, parents=True)
return cache_dir
def _write_input(
self, X: List[str], labels: Optional[List[List[str]]], input_path: Path
):
"""
Write the given input texts and (optionally) labels to the file pointed to by
``input_path``.
"""
df = pd.DataFrame({"Text": X})
if labels is not None:
df["Label"] = labels
df.to_csv(input_path, sep="\t", index=False)
def _write_labels(self, labels: List[str], labels_path: Path):
"""
Write the given labels to the file pointed at by ``labels_path``.
"""
labels_path.write_text(escape_line_delimited_texts(labels))
def _train(
self, train_input: gobbli.io.TrainInput, context: ContainerTaskContext
) -> gobbli.io.TrainOutput:
if train_input.valid_batch_size != gobbli.io.TrainInput.valid_batch_size:
warnings.warn(
"The spaCy model doesn't batch validation data, so the validation "
"batch size parameter will be ignored."
)
self._write_input(
train_input.X_train,
train_input.y_train_multilabel,
context.host_input_dir / SpaCyModel._TRAIN_INPUT_FILE,
)
self._write_input(
train_input.X_valid,
train_input.y_valid_multilabel,
context.host_input_dir / SpaCyModel._VALID_INPUT_FILE,
)
labels = train_input.labels()
self._write_labels(
labels, context.host_input_dir / SpaCyModel._LABELS_INPUT_FILE
)
# Determine checkpoint to use
host_checkpoint_dir, container_checkpoint_dir = self._get_checkpoint(
train_input.checkpoint, context
)
cmd = (
"python3 run_spacy.py"
" train"
f" --input-dir {context.container_input_dir}"
f" --output-dir {context.container_output_dir}"
f" --model {self._get_model(container_checkpoint_dir)}"
f" --architecture {self.architecture}"
f" --cache-dir {SpaCyModel._CONTAINER_CACHE_DIR}"
f" --train-batch-size {train_input.train_batch_size}"
f" --num-train-epochs {train_input.num_train_epochs}"
f" --dropout {self.dropout}"
)
if self.full_pipeline:
cmd += " --full-pipeline"
if train_input.multilabel:
cmd += " --multilabel"
run_kwargs = self._base_docker_run_kwargs(context)
# Mount the checkpoint in the container if needed
maybe_mount(
run_kwargs["volumes"], host_checkpoint_dir, container_checkpoint_dir
)
# Mount the cache directory
maybe_mount(
run_kwargs["volumes"], self.host_cache_dir, SpaCyModel._CONTAINER_CACHE_DIR
)
container_logs = run_container(
self.docker_client, self.image_tag, cmd, self.logger, **run_kwargs
)
# Read in the generated evaluation results
with open(context.host_output_dir / SpaCyModel._VALID_OUTPUT_FILE, "r") as f:
results = json.load(f)
return gobbli.io.TrainOutput(
valid_loss=results["mean_valid_loss"],
valid_accuracy=results["valid_accuracy"],
train_loss=results["mean_train_loss"],
labels=labels,
multilabel=train_input.multilabel,
checkpoint=context.host_output_dir / SpaCyModel._TRAIN_OUTPUT_CHECKPOINT,
_console_output=container_logs,
)
def _read_predictions(self, predict_path: Path):
return pd.read_csv(predict_path, sep="\t")
def _predict(
self, predict_input: gobbli.io.PredictInput, context: ContainerTaskContext
) -> gobbli.io.PredictOutput:
if (
predict_input.predict_batch_size
!= gobbli.io.PredictInput.predict_batch_size
):
warnings.warn(
"The spaCy model doesn't batch prediction data, so the prediction "
"batch size parameter will be ignored."
)
self._write_input(
predict_input.X, None, context.host_input_dir / SpaCyModel._TEST_INPUT_FILE
)
labels = predict_input.labels
self._write_labels(
labels, context.host_input_dir / SpaCyModel._LABELS_INPUT_FILE
)
host_checkpoint_dir, container_checkpoint_dir = self._get_checkpoint(
predict_input.checkpoint, context
)
cmd = (
"python3 run_spacy.py"
" predict"
f" --input-dir {context.container_input_dir}"
f" --output-dir {context.container_output_dir}"
f" --model {self._get_model(container_checkpoint_dir)}"
f" --architecture {self.architecture}"
f" --cache-dir {SpaCyModel._CONTAINER_CACHE_DIR}"
)
run_kwargs = self._base_docker_run_kwargs(context)
# Mount the checkpoint in the container if needed
maybe_mount(
run_kwargs["volumes"], host_checkpoint_dir, container_checkpoint_dir
)
# Mount the cache directory
maybe_mount(
run_kwargs["volumes"], self.host_cache_dir, SpaCyModel._CONTAINER_CACHE_DIR
)
container_logs = run_container(
self.docker_client, self.image_tag, cmd, self.logger, **run_kwargs
)
return gobbli.io.PredictOutput(
y_pred_proba=self._read_predictions(
context.host_output_dir / SpaCyModel._TEST_OUTPUT_FILE
),
_console_output=container_logs,
)
def _read_embeddings(
self, embed_path: Path, pooling: gobbli.io.EmbedPooling
) -> Tuple[List[np.ndarray], Optional[List[List[str]]]]:
embeddings = [] # type: List[np.ndarray]
doc_tokens = [] # type: List[List[str]]
with open(embed_path, "r") as f:
for line in f:
line_json = json.loads(line)
embeddings.append(np.array(line_json["embedding"]))
if pooling == gobbli.io.EmbedPooling.NONE:
doc_tokens.append(line_json["tokens"])
tokens = None
if pooling == gobbli.io.EmbedPooling.NONE:
tokens = doc_tokens
return embeddings, tokens
def _embed(
self, embed_input: gobbli.io.EmbedInput, context: ContainerTaskContext
) -> gobbli.io.EmbedOutput:
self._write_input(
embed_input.X,
None,
context.host_input_dir / SpaCyModel._EMBEDDING_INPUT_FILE,
)
if embed_input.checkpoint is not None:
warnings.warn(
"The spaCy model vectors can't be fine-tuned, so custom "
"checkpoints are ignored when generating embeddings."
)
cmd = (
"python3 run_spacy.py"
" embed"
f" --input-dir {context.container_input_dir}"
f" --output-dir {context.container_output_dir}"
f" --model {self.model}"
f" --architecture {self.architecture}"
f" --cache-dir {SpaCyModel._CONTAINER_CACHE_DIR}"
f" --embed-pooling {embed_input.pooling.value}"
)
run_kwargs = self._base_docker_run_kwargs(context)
# Mount the cache directory
maybe_mount(
run_kwargs["volumes"], self.host_cache_dir, SpaCyModel._CONTAINER_CACHE_DIR
)
container_logs = run_container(
self.docker_client, self.image_tag, cmd, self.logger, **run_kwargs
)
X_embedded, embed_tokens = self._read_embeddings(
context.host_output_dir / SpaCyModel._EMBEDDING_OUTPUT_FILE,
embed_input.pooling,
)
return gobbli.io.EmbedOutput(
X_embedded=X_embedded,
embed_tokens=embed_tokens,
_console_output=container_logs,
)