# biome.text.pipeline Module

# Pipeline Class


class Pipeline ()

Manages NLP models configuration and actions.

Use Pipeline for creating new models from a configuration or loading a pre-trained model.

Use instantiated Pipelines for training from scratch, fine-tuning, predicting, serving, or exploring predictions.

# Subclasses

  • biome.text.pipeline._BlankPipeline
  • biome.text.pipeline._PipelineCopy
  • biome.text.pipeline._PreTrainedPipeline

# from_yaml Static method


def from_yaml (
  path: str,
  vocab_path: Union[str, NoneType] = None,
)  -> Pipeline

Creates a pipeline from a config yaml file

Parameters

path : str
The path to a YAML configuration file
vocab_path : Optional[str]
If provided, the pipeline vocab will be loaded from this path

Returns

pipeline : Pipeline
A configured pipeline

# from_config Static method


def from_config (
  config: Union[PipelineConfiguration, dict],
  vocab_path: Union[str, NoneType] = None,
)  -> Pipeline

Creates a pipeline from a PipelineConfiguration object or a configuration dictionary

Parameters

config : Union[PipelineConfiguration, dict]
A PipelineConfiguration object or a configuration dict
vocab_path : Optional[str]
If provided, the pipeline vocabulary will be loaded from this path

Returns

pipeline : Pipeline
A configured pipeline

# from_pretrained Static method


def from_pretrained (
  path: str,
  **kwargs,
)  -> Pipeline

Loads a pipeline from a pre-trained pipeline providing a model.tar.gz file path

Parameters

path : str
The path to the model.tar.gz file of a pre-trained Pipeline

Returns

pipeline : Pipeline
A configured pipeline

# Instance variables

var name : str

Gets the pipeline name

var inputs : List[str]

Gets the pipeline input field names

var output : str

Gets the pipeline output field names

var backboneModelBackbone

Gets the model backbone of the pipeline

var headTaskHead

Gets the pipeline task head

var configPipelineConfiguration

Gets the pipeline configuration

var type_name : str

The pipeline name. Equivalent to task head name

var trainable_parameters : int

Returns the number of trainable parameters.

At training time, this number can change when freezing/unfreezing certain parameter groups.

var trainable_parameter_names : List[str]

Returns the names of the trainable parameters in the pipeline

# init_prediction_logger Method


def init_prediction_logger (
  self,
  output_dir: str,
  max_logging_size: int = 100,
) 

Initializes the prediction logging.

If initialized, all predictions will be logged to a file called predictions.json in the output_dir.

Parameters

output_dir : str
Path to the folder in which we create the predictions.json file.
max_logging_size : int
Max disk size to use for prediction logs

# init_prediction_cache Method


def init_prediction_cache (
  self,
  max_size: int,
)  -> NoneType

Initializes the cache for input predictions

Parameters

max_size
Save up to max_size most recent (inputs).

# find_lr Method


def find_lr (
  self,
  trainer_config: TrainerConfiguration,
  find_lr_config: FindLRConfiguration,
  training_data: Union[DataSource, allennlp.data.dataset_readers.dataset_reader.AllennlpDataset, allennlp.data.dataset_readers.dataset_reader.AllennlpLazyDataset],
) 

Returns a learning rate scan on the model.

It increases the learning rate step by step while recording the losses. For a guide on how to select the learning rate please refer to this excellent blog post

Parameters

trainer_config
A trainer configuration
find_lr_config
A configuration for finding the learning rate
training_data
The training data

Returns

(learning_rates, losses) Returns a list of learning rates and corresponding losses. Note: The losses are recorded before applying the corresponding learning rate

# train Method


def train (
  self,
  output: str,
  training: Union[DataSource, allennlp.data.dataset_readers.dataset_reader.AllennlpDataset, allennlp.data.dataset_readers.dataset_reader.AllennlpLazyDataset],
  trainer: Union[TrainerConfiguration, NoneType] = None,
  validation: Union[DataSource, allennlp.data.dataset_readers.dataset_reader.AllennlpDataset, allennlp.data.dataset_readers.dataset_reader.AllennlpLazyDataset, NoneType] = None,
  test: Union[DataSource, allennlp.data.dataset_readers.dataset_reader.AllennlpDataset, allennlp.data.dataset_readers.dataset_reader.AllennlpLazyDataset, NoneType] = None,
  extend_vocab: Union[VocabularyConfiguration, NoneType] = None,
  loggers: List[BaseTrainLogger] = None,
  restore: bool = False,
  quiet: bool = False,
)  -> TrainingResults

Launches a training run with the specified configurations and data sources

Parameters

output: The experiment output path training: The training DataSource trainer: The trainer file path validation: The validation DataSource (optional) test: The test DataSource (optional) extend_vocab: Extends the vocabulary tokens with the provided VocabularyConfiguration loggers: A list of loggers that execute a callback before the training, after each epoch, and at the end of the training (see biome.text.logger.MlflowLogger, for example) restore: If enabled, tries to read previous training status from the output folder and continues the training process quiet: If enabled, disables most logging messages keeping only warning and error messages. In any case, all logging info will be stored into a file at ${output}/train.log

Returns

Training results information, containing the generated model path and the related metrics
 

# create_dataset Method


def create_dataset (
  self,
  datasource: DataSource,
  lazy: bool = False,
)  -> Union[allennlp.data.dataset_readers.dataset_reader.AllennlpDataset, allennlp.data.dataset_readers.dataset_reader.AllennlpLazyDataset]

Creates an instances torch Dataset from an data source

Parameters

datasource: The source of data lazy: If enabled, the returned dataset is a subclass of torch.data.utils.IterableDataset

Returns

A torch Dataset containing the instances collection
 

# predict Method


def predict (
  self,
  *args,
  **kwargs,
)  -> Dict[str, numpy.ndarray]

Returns a prediction given some input data based on the current state of the model

The accepted input is dynamically calculated and can be checked via the self.inputs attribute (print(Pipeline.inputs))

Returns

predictions : Dict[str, numpy.ndarray]
A dictionary containing the predictions and additional information

# predict_batch Method


def predict_batch (
  self,
  input_dicts: Iterable[Dict[str, Any]],
)  -> List[Dict[str, numpy.ndarray]]

Returns predictions given some input data based on the current state of the model

The predictions will be computed batch-wise, which is faster than calling self.predict for every single input data.

Parameters

input_dicts
The input data. The keys of the dicts must comply with the self.inputs attribute

# explain Method


def explain (
  self,
  *args,
  n_steps: int = 5,
  **kwargs,
)  -> Dict[str, Any]

Returns a prediction given some input data including the attribution of each token to the prediction.

The attributions are calculated by means of the Integrated Gradients method.

The accepted input is dynamically calculated and can be checked via the self.inputs attribute (print(Pipeline.inputs))

Parameters

n_steps : int
The number of steps used when calculating the attribution of each token. If the number of steps is less than 1, the attributions will not be calculated.

Returns

predictions : Dict[str, numpy.ndarray]
A dictionary containing the predictions and attributions

# explain_batch Method


def explain_batch (
  self,
  input_dicts: Iterable[Dict[str, Any]],
  n_steps: int = 5,
)  -> List[Dict[str, numpy.ndarray]]

Returns a prediction given some input data including the attribution of each token to the prediction.

The predictions will be computed batch-wise, which is faster than calling self.predict for every single input data.

The attributions are calculated by means of the Integrated Gradients method.

The accepted input is dynamically calculated and can be checked via the self.inputs attribute (print(Pipeline.inputs))

Parameters

input_dicts
The input data. The keys of the dicts must comply with the self.inputs attribute
n_steps
The number of steps used when calculating the attribution of each token. If the number of steps is less than 1, the attributions will not be calculated.

Returns

predictions
A list of dictionaries containing the predictions and attributions

# save_vocabulary Method


def save_vocabulary (
  self,
  directory: str,
)  -> NoneType

Saves the pipeline's vocabulary in a directory

Parameters

directory : str
 

# explore Method


def explore (
  self,
  data_source: DataSource,
  explore_id: Union[str, NoneType] = None,
  es_host: Union[str, NoneType] = None,
  batch_size: int = 50,
  prediction_cache_size: int = 0,
  explain: bool = False,
  force_delete: bool = True,
  **metadata,
)  -> dask.dataframe.core.DataFrame

Launches the Explore UI for a given data source

Running this method inside an IPython notebook will try to render the UI directly in the notebook.

Running this outside a notebook will try to launch the standalone web application.

Parameters

data_source : DataSource
The data source or its yaml file path
explore_id : Optional[str]
A name or id for this explore run, useful for running and keep track of several explorations
es_host : Optional[str]
The URL to the Elasticsearch host for indexing predictions (default is localhost:9200)
batch_size : int
The batch size for indexing predictions (default is `500)
prediction_cache_size : int
The size of the cache for caching predictions (default is `0)
explain : bool
Whether to extract and return explanations of token importance (default is False)
force_delete : bool
Deletes exploration with the same explore_id before indexing the new explore items (default is `True)

Returns

pipeline : Pipeline
A configured pipeline

# serve Method


def serve (
  self,
  port: int = 9998,
) 

Launches a REST prediction service with the current model

Parameters

port : int
The port on which the prediction service will be running (default: 9998)

# set_head Method


def set_head (
  self,
  type: Type[TaskHead],
  **kwargs,
) 

Sets a new task head for the pipeline

Call this to reuse the weights and config of a pre-trained model (e.g., language model) for a new task.

Parameters

type : Type[TaskHead]
The TaskHead class to be set for the pipeline (e.g., TextClassification

**kwargs: The TaskHead specific arguments (e.g., the classification head needs a pooler layer)

Maintained by