Source code for gobbli.experiment.base

import logging
import os
from abc import ABC
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union

import ray

from gobbli.dataset.base import BaseDataset
from gobbli.model.sklearn import SKLearnClassifier
from gobbli.util import generate_uuid, gobbli_dir, is_dir_empty, write_metadata


[docs]def experiment_dir() -> Path: return gobbli_dir() / "experiment"
[docs]def init_worker_env( gobbli_dir: Optional[Path] = None, log_level: Union[int, str] = logging.WARNING ) -> logging.Logger: """ Initialize environment on a ray worker. Args: gobbli_dir: Used as the value of the GOBBLI_DIR environment variable; determines where gobbli data is stored on the worker's filesystem. log_level: Level for logging coming from the worker. """ if gobbli_dir is not None: os.environ["GOBBLI_DIR"] = str(gobbli_dir) logging.basicConfig(level=log_level) # Provide a logger for our workers # Workers should only log using loggers they've created to avoid # logger pickling, which generally doesn't work # https://stackoverflow.com/questions/55272066/how-can-i-use-the-python-logging-in-ray return logging.getLogger(__name__)
[docs]def init_gpu_config() -> Tuple[bool, str]: """ Determine the GPU configuration from the current ray environment on a worker. Returns: 2-tuple: whether GPU should be used and a comma-separated string containing the ids of the GPUs that should be used """ try: gpu_ids = ray.get_gpu_ids() except Exception as e: # This message is either 'ray.get_gpu_ids() currently does not work in PYTHON MODE' # or '... in LOCAL MODE' depending on the version of ray installed if "ray.get_gpu_ids() currently does not work in" in str(e): gpu_ids = [] else: raise use_gpu = len(gpu_ids) > 0 nvidia_visible_devices = ",".join(str(i) for i in gpu_ids) return use_gpu, nvidia_visible_devices
[docs]def get_worker_ip() -> str: """ Determine the IP address of the current ray worker. Returns: A string containing the IP address. """ global_worker = ray.worker.global_worker return getattr(global_worker, "node_ip_address", "127.0.0.1")
[docs]class BaseExperiment(ABC): """ Base class for all derived Experiments. """ _METADATA_FILENAME = "gobbli-experiment-meta.json" def __init__( self, model_cls: Any, dataset: Union[Tuple[List[str], List[str]], BaseDataset], test_dataset: Optional[Tuple[List[str], List[str]]] = None, data_dir: Optional[Path] = None, name: Optional[str] = None, param_grid: Optional[Dict[str, List[Any]]] = None, task_num_cpus: int = 1, task_num_gpus: int = 0, worker_gobbli_dir: Optional[Path] = None, worker_log_level: Union[int, str] = logging.WARNING, limit: Optional[int] = None, overwrite_existing: bool = False, ignore_ray_initialized_error: bool = False, distributed: bool = False, ray_kwargs: Optional[Dict[str, Any]] = None, ): """ Construct an experiment. Args: model_cls: The class of model to be used for the experiment. dataset: Dataset to be used for the experiment. Can be either a 2-tuple containing a list of texts and a corresponding list of labels or a :class:`gobbli.dataset.base.BaseDataset`. test_dataset: An optional separate dataset to be used for calculating test metrics. If passed, should be a 2-tuple containing a list of texts and corresponding list of labels. If not passed, a test dataset will be automatically split out of the `dataset`. data_dir: Optional path to a directory used to store data for the experiment. If not given, a directory under GOBBLI_DIR will be created and used. name: A descriptive name for the experiment, used to label directories in the filesystem. If not passed, a random name will be generated and used. The name must be unique (i.e., there should not be another experiment with the same name). param_grid: Optional grid of parameters. If passed, it should be a dictionary with keys being valid parameter names for the passed model and values being lists of parameter values. Every combination of parameter values will be tried in the experiment, and the results for the best combination will be returned. If not passed, only the model's default parameters will be used. task_num_cpus: Number of CPUs to reserve per task. task_num_gpus: Number of GPUs to reserve per task. worker_gobbli_dir: Directory to use for gobbli file storage by workers. worker_log_level: Logging level to use for logs output by workers running training tasks. limit: Read up to this many rows from the passed dataset. Useful for debugging. overwrite_existing: If True, don't fail if there's an existing experiment in the same directory. ignore_ray_initialized_error: If True, don't error when a ray connection is already initialized; instead, shut it down and restart it with the passed `ray_kwargs`. distributed: If True, run the ray cluster assuming workers are distributed over multiple nodes. This requires model weights for all trials to fit in the ray object store, which requires a lot of memory. If False, run the ray cluster assuming all workers are on the master node, and weights will be passed around as filepaths; an error will be thrown if a remote worker tries to run a task. ray_kwargs: Dictionary containing keyword arguments to be passed directly to :func:`ray.init`. By default, a new ray cluster will be initialized on the current node using all available CPUs and no GPUs, but these arguments can be used to connect to a remote cluster, limit resource usage, and much more. """ self.model_cls = model_cls self.worker_gobbli_dir = worker_gobbli_dir self.name = name if self.name is None: self.name = generate_uuid() if data_dir is None: self._data_dir = experiment_dir() / self.__class__.__name__ / self.name else: self._data_dir = data_dir self._data_dir.mkdir(parents=True, exist_ok=True) if not overwrite_existing and not is_dir_empty(self._data_dir): raise ValueError(f"Experiment already exists for name '{self.name}'") if isinstance(dataset, BaseDataset): self.X = dataset.X_train() + dataset.X_test() self.y = dataset.y_train() + dataset.y_test() elif isinstance(dataset, tuple): if len(dataset) != 2: raise ValueError( f"`dataset` must be a 2-tuple, got length {len(dataset)}" ) self.X, self.y = dataset else: raise TypeError(f"Invalid type for dataset: {type(dataset)}") self.X_test = None # type: Optional[List[str]] self.y_test = None # type: Optional[List[str]] if test_dataset is not None: if not (isinstance(dataset, tuple) and len(dataset) == 2): raise ValueError(f"`test_dataset` must be a 2-tuple") self.X_test, self.y_test = test_dataset if limit is not None: self.X = self.X[:limit] self.y = self.y[:limit] self.param_grid = param_grid if param_grid is None: self.param_grid = {} self.task_num_cpus = task_num_cpus self.task_num_gpus = task_num_gpus self.worker_log_level = worker_log_level self.distributed = distributed if self.model_cls is SKLearnClassifier and distributed: raise ValueError( "The scikit-learn classifier is not supported for distributed " "experiments, since it needs to load a pickle from a file path " "which may not be on a given worker node." ) _ray_kwargs = ray_kwargs if _ray_kwargs is None: _ray_kwargs = {} self.is_ray_local_mode = _ray_kwargs.get("local_mode", False) # We may have an existing ray connection active -- throw an error or # clear it out to ensure it's re-initialized with the passed params if ray.is_initialized(): if ignore_ray_initialized_error: ray.shutdown() else: raise RuntimeError( "A ray connection is already initialized. To ignore this error" " and shut down the existing connection, pass" " `ignore_ray_initialized_error=True`." ) ray.init(**_ray_kwargs) metadata = { "model": model_cls.__name__, "len_X": len(self.X), "len_y": len(self.y), "param_grid": self.param_grid, } write_metadata(metadata, self.metadata_path) @property def metadata_path(self) -> Path: """ Returns: The path to the experiment's metadata file containing information about the experiment parameters. """ return self.data_dir() / BaseExperiment._METADATA_FILENAME
[docs] def data_dir(self) -> Path: """ Returns: The main data directory unique to this experiment. """ return self._data_dir