Source code for gobbli.model.bert.model

import json
import shutil
import tempfile
from collections import OrderedDict
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union

import numpy as np
import pandas as pd

import gobbli.io
from gobbli.docker import maybe_mount, run_container
from gobbli.model.base import BaseModel
from gobbli.model.context import ContainerTaskContext
from gobbli.model.mixin import EmbedMixin, PredictMixin, TrainMixin
from gobbli.util import (
    assert_in,
    assert_type,
    download_archive,
    escape_line_delimited_texts,
)


def _preprocess_text(text_series: pd.Series) -> pd.Series:
    """
    Preprocess a Series of text for a BERT model.
    """
    return text_series.str.replace(r"\s", " ", regex=True)


def _df_to_train_tsv(df: pd.DataFrame, output_file: Path):
    """
    Write a dataframe with "Text" and "Label" columns to the given
    output file in the format expected by BERT for a TSV file for
    training/validation.
    """
    ordered_df = pd.DataFrame(
        OrderedDict(
            (
                ("Label", df["Label"]),
                ("a", np.repeat("a", df.shape[0])),
                ("Text", _preprocess_text(df["Text"])),
            )
        )
    )
    ordered_df.to_csv(output_file, sep="\t", index=True, header=False)


def _write_labels(labels: List[Any], output_file: Path):
    """
    Write the given set of labels to the given file.
    """
    output_file.write_text(escape_line_delimited_texts(labels))


def _read_predictions(labels: List[Any], output_file: Path) -> pd.DataFrame:
    """
    Read predictions from the BERT model into a dataframe containing the predicted
    probability for each label for each observation.
    """
    return pd.read_csv(output_file, sep="\t", names=labels)


def _read_embeddings(
    output_file: Path, pooling: gobbli.io.EmbedPooling
) -> Tuple[List[np.ndarray], Optional[List[List[str]]]]:
    """
    Read embeddings from the BERT model into a ndarray with the embedding values.  Also return
    a corresponding list of tokens if no pooling was applied.
    """
    embeddings = []  # type: List[np.ndarray]
    doc_tokens = []  # type: List[List[str]]
    with open(output_file, "r") as f:
        for line in f:
            line_json = json.loads(line)
            line_tokens = []  # type: List[str]
            line_layers = []  # type: List[List[float]]
            for token_info in line_json["features"]:
                line_tokens.append(token_info["token"])
                # Only take the first layer output; we don't currently support a
                # way to combine multiple layers
                line_layers.append(token_info["layers"][0]["values"])

            # Apply pooling, if necessary
            line_embedding = np.array(line_layers)
            if pooling == gobbli.io.EmbedPooling.MEAN:
                line_embedding = np.mean(line_embedding, axis=0)

            embeddings.append(line_embedding)
            doc_tokens.append(line_tokens)

    # Don't return tokens if we're doing any pooling, since
    # the pooled results combine all the tokens
    tokens = None
    if pooling == gobbli.io.EmbedPooling.NONE:
        tokens = doc_tokens

    return embeddings, tokens


BERT_MODEL_ARCHIVES = {
    "bert-base-uncased": "https://storage.googleapis.com/bert_models/2018_10_18/uncased_L-12_H-768_A-12.zip",
    "bert-base-cased": "https://storage.googleapis.com/bert_models/2018_10_18/cased_L-12_H-768_A-12.zip",
    "bert-large-uncased": "https://storage.googleapis.com/bert_models/2018_10_18/uncased_L-24_H-1024_A-16.zip",
    "bert-large-cased": "https://storage.googleapis.com/bert_models/2018_10_18/cased_L-24_H-1024_A-16.zip",
    "bert-large-whole-word-masking-uncased": "https://storage.googleapis.com/bert_models/2019_05_30/wwm_uncased_L-24_H-1024_A-16.zip",
    "bert-large-whole-word-masking-cased": "https://storage.googleapis.com/bert_models/2019_05_30/wwm_cased_L-24_H-1024_A-16.zip",
    "bert-base-multilingual-cased": "https://storage.googleapis.com/bert_models/2018_11_23/multi_cased_L-12_H-768_A-12.zip",
    "bert-base-chinese": "https://storage.googleapis.com/bert_models/2018_11_03/chinese_L-12_H-768_A-12.zip",
    "clinical-bert-cased": "https://www.dropbox.com/s/8armk04fu16algz/pretrained_bert_tf.tar.gz?dl=1",
    "biobert-cased": "https://github.com/naver/biobert-pretrained/releases/download/v1.1-pubmed/biobert_v1.1_pubmed.tar.gz",
    "scibert-uncased": "https://s3-us-west-2.amazonaws.com/ai2-s2-research/scibert/tensorflow_models/scibert_scivocab_uncased.tar.gz",
    "scibert-cased": "https://s3-us-west-2.amazonaws.com/ai2-s2-research/scibert/tensorflow_models/scibert_scivocab_cased.tar.gz",
    "ncbi-bert-base-pubmed-uncased": "https://ftp.ncbi.nlm.nih.gov/pub/lu/Suppl/NCBI-BERT/NCBI_BERT_pubmed_uncased_L-12_H-768_A-12.zip",
    "ncbi-bert-base-pubmed-mimic-uncased": "https://ftp.ncbi.nlm.nih.gov/pub/lu/Suppl/NCBI-BERT/NCBI_BERT_pubmed_mimic_uncased_L-12_H-768_A-12.zip",
    "ncbi-bert-large-pubmed-uncased": "https://ftp.ncbi.nlm.nih.gov/pub/lu/Suppl/NCBI-BERT/NCBI_BERT_pubmed_uncased_L-24_H-1024_A-16.zip",
    "ncbi-bert-large-pubmed-mimic-uncased": "https://ftp.ncbi.nlm.nih.gov/pub/lu/Suppl/NCBI-BERT/NCBI_BERT_pubmed_mimic_uncased_L-24_H-1024_A-16.zip",
}  # type: Dict[str, str]
"""
A mapping from model names to archives.
See `the BERT repo <https://github.com/google-research/bert>`__ for guidelines on when
to use which model.  "bert-base-uncased" is a safe default for most situations.
Larger models require more time and GPU memory to run.
"""


[docs]class BERT(BaseModel, TrainMixin, PredictMixin, EmbedMixin): """ Classifier/embedding wrapper for Google Research's BERT: https://github.com/google-research/bert """ _BUILD_PATH = Path(__file__).parent _TRAIN_INPUT_FILE = "train.tsv" _VALID_INPUT_FILE = "dev.tsv" _TEST_INPUT_FILE = "test.tsv" _LABELS_INPUT_FILE = "labels.tsv" _TEST_OUTPUT_FILE = "test_results.tsv" _EMBEDDING_INPUT_FILE = "input.txt" _EMBEDDING_OUTPUT_FILE = "embeddings.jsonl"
[docs] def init(self, params: Dict[str, Any]): """ See :meth:`gobbli.model.base.BaseModel.init`. BERT parameters: - ``max_seq_length`` (:obj:`int`): The maximum total input sequence length after WordPiece tokenization. Sequences longer than this will be truncated, and sequences shorter than this will be padded. Default: 128 - ``bert_model`` (:obj:`str`): Name of a pretrained BERT model to use. See :obj:`BERT_MODEL_ARCHIVES` for a listing of available BERT models. """ self.max_seq_length = 128 self.bert_model = "bert-base-uncased" for name, value in params.items(): if name == "max_seq_length": assert_type(name, value, int) self.max_seq_length = value elif name == "bert_model": assert_in(name, value, set(BERT_MODEL_ARCHIVES.keys())) self.bert_model = value else: raise ValueError(f"Unknown param '{name}'")
@property def weights_dir(self) -> Path: """ Returns: Directory containing pretrained weights for this instance. """ return self.class_weights_dir / self.bert_model @property def image_tag(self) -> str: """ Returns: The Docker image tag to be used for the BERT container. """ device = "gpu" if self.use_gpu else "cpu" return f"gobbli-bert-classifier-{device}" @property def do_lower_case(self) -> bool: """ Returns: Whether the BERT tokenizer should lowercase its input. """ return "uncased" in self.bert_model def _build(self): # Download data if we don't already have it # Download into a temp dir and move the result into the destination dir # to ensure partial downloads don't leave corrupted state if not self.weights_dir.exists(): with tempfile.TemporaryDirectory() as tmpdir: tmp_weights_dir = Path(tmpdir) / self.weights_dir.name tmp_weights_dir.mkdir() self.logger.info("Downloading pre-trained weights.") download_archive( BERT_MODEL_ARCHIVES[self.bert_model], tmp_weights_dir, junk_paths=True, ) shutil.move(tmp_weights_dir, self.weights_dir) self.logger.info("Weights downloaded.") # Build the docker image self.docker_client.images.build( path=str(BERT._BUILD_PATH), tag=self.image_tag, **self._base_docker_build_kwargs, ) @staticmethod def _get_checkpoint( user_checkpoint: Optional[Path], context: ContainerTaskContext ) -> Tuple[Optional[Path], Path, str]: """ Determines the host checkpoint directory, container checkpoint directory, and checkpoint name using the user-requested checkpoint (if any) and the container context. Args: user_checkpoint: An optional checkpoint passed in by the user. If the user doesn't pass one, use the default pretrained checkpoint. context: The container context to create the checkpoint in. Returns: A 3-tuple: the host checkpoint directory (if any), the container checkpoint directory, and the checkpoint name. """ if user_checkpoint is None: # Default BERT weights host_checkpoint_dir = None checkpoint_name = "bert_model.ckpt" container_checkpoint_dir = BaseModel._CONTAINER_WEIGHTS_PATH else: # Trained weights, which will be mounted in the container host_checkpoint_dir = user_checkpoint.parent checkpoint_name = user_checkpoint.name container_checkpoint_dir = context.container_root_dir / "checkpoint" return host_checkpoint_dir, container_checkpoint_dir, checkpoint_name def _write_train_input(self, train_input: gobbli.io.TrainInput, input_dir: Path): """ Write the given gobbli input into the format expected by BERT. Make sure the given directory exists first. """ train_path = input_dir / BERT._TRAIN_INPUT_FILE valid_path = input_dir / BERT._VALID_INPUT_FILE train_df = pd.DataFrame( {"Text": train_input.X_train, "Label": train_input.y_train_multiclass} ) valid_df = pd.DataFrame( {"Text": train_input.X_valid, "Label": train_input.y_valid_multiclass} ) _df_to_train_tsv(train_df, train_path) _df_to_train_tsv(valid_df, valid_path) labels_path = input_dir / BERT._LABELS_INPUT_FILE _write_labels(train_input.labels(), labels_path) def _train( self, train_input: gobbli.io.TrainInput, context: ContainerTaskContext ) -> gobbli.io.TrainOutput: if train_input.multilabel: raise ValueError( "gobbli BERT model doesn't support multilabel classification." ) self._write_train_input(train_input, context.host_input_dir) # Determine checkpoint to use host_checkpoint_dir, container_checkpoint_dir, checkpoint_name = self._get_checkpoint( train_input.checkpoint, context ) labels = train_input.labels() cmd = ( "bash -c 'python run_classifier.py" " --task_name=cola" " --do_train=true" " --do_eval=true" f" --data_dir={context.container_input_dir}" f" --vocab_file={BaseModel._CONTAINER_WEIGHTS_PATH}/vocab.txt" f" --bert_config_file={BaseModel._CONTAINER_WEIGHTS_PATH}/bert_config.json" f" --init_checkpoint={BaseModel._CONTAINER_WEIGHTS_PATH}/bert_model.ckpt" f" --max_seq_length={self.max_seq_length}" f" --train_batch_size={train_input.train_batch_size}" f" --eval_batch_size={train_input.valid_batch_size}" f" --learning_rate=2e-5" f" --do_lower_case={self.do_lower_case}" f" --num_train_epochs={train_input.num_train_epochs}" f" --output_dir={context.container_output_dir}'" ) run_kwargs = self._base_docker_run_kwargs(context) # Mount the checkpoint in the container if needed maybe_mount( run_kwargs["volumes"], host_checkpoint_dir, container_checkpoint_dir ) container_logs = run_container( self.docker_client, self.image_tag, cmd, self.logger, **run_kwargs ) # Parse the generated evaluation results file results_file = context.host_output_dir / "eval_results.txt" eval_results = {} # type: Dict[str, Union[int, float]] with open(results_file, "r") as f: for line in f: key, str_val = line.split(" = ") if key == "global_step": val: Union[int, float] = int(str_val) else: val = float(str_val) eval_results[key] = val return gobbli.io.TrainOutput( valid_loss=eval_results["eval_loss"], valid_accuracy=eval_results["eval_accuracy"], train_loss=eval_results["loss"], labels=labels, multilabel=False, checkpoint=context.host_output_dir / f"model.ckpt-{eval_results['global_step']}", _console_output=container_logs, ) def _write_predict_input( self, predict_input: gobbli.io.PredictInput, input_dir: Path ): """ Write the given gobbli prediction input into the format expected by BERT. Make sure the given directory exists first. """ test_path = input_dir / BERT._TEST_INPUT_FILE test_df = pd.DataFrame({"sentence": predict_input.X}) test_df["sentence"] = _preprocess_text(test_df["sentence"]) test_df.index.name = "id" test_df.to_csv(test_path, sep="\t", index=True, header=True) labels_path = input_dir / BERT._LABELS_INPUT_FILE _write_labels(predict_input.labels, labels_path) def _predict( self, predict_input: gobbli.io.PredictInput, context: ContainerTaskContext ) -> gobbli.io.PredictOutput: self._write_predict_input(predict_input, context.host_input_dir) # Determine checkpoint to use host_checkpoint_dir, container_checkpoint_dir, checkpoint_name = self._get_checkpoint( predict_input.checkpoint, context ) cmd = ( "bash -c 'python run_classifier.py" " --task_name=cola" " --do_predict=true" f" --data_dir={context.container_input_dir}" f" --vocab_file={BaseModel._CONTAINER_WEIGHTS_PATH}/vocab.txt" f" --bert_config_file={BaseModel._CONTAINER_WEIGHTS_PATH}/bert_config.json" f" --predict-batch-size={predict_input.predict_batch_size}" f" --do_lower_case={self.do_lower_case}" f" --init_checkpoint={container_checkpoint_dir / checkpoint_name}" f" --max_seq_length={self.max_seq_length}" f" --output_dir={context.container_output_dir}'" ) run_kwargs = self._base_docker_run_kwargs(context) # Mount the checkpoint in the container if needed maybe_mount( run_kwargs["volumes"], host_checkpoint_dir, container_checkpoint_dir ) container_logs = run_container( self.docker_client, self.image_tag, cmd, self.logger, **run_kwargs ) return gobbli.io.PredictOutput( y_pred_proba=_read_predictions( predict_input.labels, context.host_output_dir / BERT._TEST_OUTPUT_FILE ), _console_output=container_logs, ) def _write_embed_input(self, embed_input: gobbli.io.EmbedInput, input_dir: Path): """ Write the given gobbli embedding input into the format expected by BERT. Make sure the given directory exists first. """ input_dir.mkdir(exist_ok=True, parents=True) input_path = input_dir / BERT._EMBEDDING_INPUT_FILE input_path.write_text(escape_line_delimited_texts(embed_input.X)) def _embed( self, embed_input: gobbli.io.EmbedInput, context: ContainerTaskContext ) -> gobbli.io.EmbedOutput: self._write_embed_input(embed_input, context.host_input_dir) # Determine checkpoint to use host_checkpoint_dir, container_checkpoint_dir, checkpoint_name = self._get_checkpoint( embed_input.checkpoint, context ) # Use the second-to-last layer for embeddings as suggested: # https://github.com/hanxiao/bert-as-service#q-why-not-the-last-hidden-layer-why-second-to-last cmd = ( "bash -c 'python extract_features.py" f" --input_file={context.container_input_dir / BERT._EMBEDDING_INPUT_FILE}" f" --output_file={context.container_output_dir / BERT._EMBEDDING_OUTPUT_FILE}" f" --vocab_file={BaseModel._CONTAINER_WEIGHTS_PATH}/vocab.txt" f" --bert_config_file={BaseModel._CONTAINER_WEIGHTS_PATH}/bert_config.json" f" --init_checkpoint={container_checkpoint_dir / checkpoint_name}" f" --do_lower_case={self.do_lower_case}" f" --layers=-2" f" --max_seq_length={self.max_seq_length}" f" --batch_size={embed_input.embed_batch_size}'" ) run_kwargs = self._base_docker_run_kwargs(context) # Mount the checkpoint in the container if needed maybe_mount( run_kwargs["volumes"], host_checkpoint_dir, container_checkpoint_dir ) container_logs = run_container( self.docker_client, self.image_tag, cmd, self.logger, **run_kwargs ) X_embedded, embed_tokens = _read_embeddings( context.host_output_dir / BERT._EMBEDDING_OUTPUT_FILE, embed_input.pooling ) return gobbli.io.EmbedOutput( X_embedded=X_embedded, embed_tokens=embed_tokens, _console_output=container_logs, )