# Copyright The Lightning AI team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from contextlib import AbstractContextManager
from datetime import timedelta
from typing import TYPE_CHECKING, Any, Callable, Optional, Union
import torch
from torch.nn import Module
from torch.optim import Optimizer
from typing_extensions import override
from lightning.fabric.accelerators import Accelerator
from lightning.fabric.plugins.collectives.torch_collective import default_pg_timeout
from lightning.fabric.plugins.environments.cluster_environment import ClusterEnvironment
from lightning.fabric.plugins.precision import Precision
from lightning.fabric.strategies.ddp import DDPStrategy
from lightning.fabric.strategies.registry import _StrategyRegistry
from lightning.fabric.strategies.strategy import _Sharded
from lightning.fabric.utilities.imports import _raise_enterprise_not_available
from lightning.fabric.utilities.types import _PATH
if TYPE_CHECKING:
from deepspeed import DeepSpeedEngine
from torch.optim.lr_scheduler import _LRScheduler
# TODO(fabric): Links in the docstrings to PL-specific deepspeed user docs need to be replaced.
[docs]class DeepSpeedStrategy(DDPStrategy, _Sharded):
DEEPSPEED_ENV_VAR = "PL_DEEPSPEED_CONFIG_PATH"
def __init__(
self,
accelerator: Optional[Accelerator] = None,
zero_optimization: bool = True,
stage: int = 2,
remote_device: Optional[str] = None,
offload_optimizer: bool = False,
offload_parameters: bool = False,
offload_params_device: str = "cpu",
nvme_path: str = "/local_nvme",
params_buffer_count: int = 5,
params_buffer_size: int = 100_000_000,
max_in_cpu: int = 1_000_000_000,
offload_optimizer_device: str = "cpu",
optimizer_buffer_count: int = 4,
block_size: int = 1048576,
queue_depth: int = 8,
single_submit: bool = False,
overlap_events: bool = True,
thread_count: int = 1,
pin_memory: bool = False,
sub_group_size: int = 1_000_000_000_000,
contiguous_gradients: bool = True,
overlap_comm: bool = True,
allgather_partitions: bool = True,
reduce_scatter: bool = True,
allgather_bucket_size: int = 200_000_000,
reduce_bucket_size: int = 200_000_000,
zero_allow_untested_optimizer: bool = True,
logging_batch_size_per_gpu: Optional[int] = None,
config: Optional[Union[_PATH, dict[str, Any]]] = None,
logging_level: int = logging.WARN,
parallel_devices: Optional[list[torch.device]] = None,
cluster_environment: Optional[ClusterEnvironment] = None,
loss_scale: float = 0,
initial_scale_power: int = 16,
loss_scale_window: int = 1000,
hysteresis: int = 2,
min_loss_scale: int = 1,
partition_activations: bool = False,
cpu_checkpointing: bool = False,
contiguous_memory_optimization: bool = False,
synchronize_checkpoint_boundary: bool = False,
load_full_weights: bool = False,
precision: Optional[Precision] = None,
process_group_backend: Optional[str] = None,
timeout: Optional[timedelta] = default_pg_timeout,
exclude_frozen_parameters: bool = False,
) -> None:
"""Provides capabilities to run training using the DeepSpeed library, with training optimizations for large
billion parameter models. `For more information: https://pytorch-
lightning.readthedocs.io/en/stable/advanced/model_parallel.html#deepspeed`.
.. warning:: This is an :ref:`experimental <versioning:Experimental API>` feature.
Defaults have been set to enable ZeRO-Offload and some have been taken from the link below.
These defaults have been set generally, but may require tuning for optimum performance based on your model size.
`For more information: https://www.deepspeed.ai/docs/config-json/#zero-optimizations-for-fp16-training`.
Arguments:
zero_optimization: Enable ZeRO optimization. This is compatible with either ``precision="16-mixed"`` or
``precision="bf16-mixed"``.
stage: Different stages of the ZeRO Optimizer. 0 is disabled,
1 is optimizer state partitioning, 2 is optimizer+gradient state partitioning,
3 is optimizer+gradient_parameter partitioning using the infinity engine.
remote_device: Device to instantiate the model on initially (``cpu`` or ``nvme``). Defaults to GPU.
offload_optimizer: Enable offloading optimizer memory and computation to CPU or NVMe
based on ``offload_optimizer_device``.
offload_parameters: When using ZeRO Stage 3, Enable offloading parameter memory and computation
to CPU or NVMe based on ``offload_params_device``.
offload_params_device: When offloading parameters choose the device to offload to, ``cpu`` or ``nvme``.
offload_optimizer_device: When offloading optimizer state choose the device to offload to,
``cpu`` or ``nvme``.
params_buffer_count: Number of buffers in buffer pool for
parameter offloading when ``offload_params_device`` is ``nvme``.
params_buffer_size: Size of buffers in buffer pool for parameter offloading
when ``offload_params_device`` is ``nvme``.
max_in_cpu: Number of parameter elements to maintain in CPU memory when offloading to NVMe is enabled.
nvme_path: Filesystem path for NVMe device for optimizer/parameter state offloading.
optimizer_buffer_count: Number of buffers in buffer pool for optimizer state offloading
when ``offload_optimizer_device`` is set to ``nvme``.
This should be at least the number of states maintained per parameter by the optimizer.
For example, Adam optimizer has 4 states (parameter, gradient, momentum, and variance).
block_size: When using NVMe Offloading, the I/O block size in bytes.
queue_depth: When using NVMe Offloading, the I/O queue depth.
single_submit: When using NVMe Offloading,
submit requests to storage device as multiple individual requests,
as opposed to one block of requests.
overlap_events: When using NVMe Offloading,
submit requests to storage device in an overlapped fashion
without waiting for completion of earlier requests.
thread_count: When using NVMe Offloading,
Intra-request parallelism for each read/write submitted by a user thread.
pin_memory: When using ZeRO stage 3, pin optimizer state memory on CPU.
This could boost throughput at the cost of extra memory overhead.
sub_group_size: When using ZeRO stage 3, defines the number of parameters
within a sub group to offload at a time.
Smaller numbers require more communication, but improve memory efficiency.
contiguous_gradients: Copies gradients to a continuous buffer as they are produced.
Avoids memory fragmentation during backwards. Useful when training large models.
overlap_comm: Overlap the reduction (synchronization) of gradients with the backwards computation.
This is a speed optimization when training across multiple GPUs/machines.
allgather_partitions: All gather updated parameters at the end of training step,
instead of using a series of broadcast collectives.
reduce_scatter: Use reduce/scatter instead of allreduce to average gradients.
allgather_bucket_size: Number of elements to allgather at once.
Used to limit the memory required for larger model sizes, with a tradeoff with speed.
reduce_bucket_size: Number of elements to reduce at once.
Used to limit the memory required for larger model sizes, with a tradeoff with speed.
zero_allow_untested_optimizer: Allow untested optimizers to be used with ZeRO. Currently only Adam is a
DeepSpeed supported optimizer when using ZeRO.
logging_batch_size_per_gpu: Config used in DeepSpeed to calculate verbose timing for logging
on a per sample per second basis (only displayed if logging=logging.INFO).
To obtain accurate logs when using datasets that do not support batch samplers,
set this to the actual per gpu batch size.
config: Pass in a deepspeed formatted config dict,
or path to a deepspeed config: https://www.deepspeed.ai/docs/config-json.
All defaults will be ignored if a config is passed in.
logging_level: Set logging level for deepspeed.
loss_scale: Loss scaling value for FP16 training.
0.0 results in dynamic loss scaling, otherwise static.
initial_scale_power: Power of the initial dynamic loss scale value. Loss scale is computed
by ``2^initial_scale_power``.
loss_scale_window: Window in which to raise/lower the dynamic FP16 loss scaling value.
hysteresis: FP16 Delay shift in Dynamic Loss scaling.
min_loss_scale: The minimum FP16 dynamic loss scaling value.
partition_activations: Enables partition activation when used with ZeRO stage 3 and model parallelism.
Still requires you to wrap your forward functions in deepspeed.checkpointing.checkpoint.
See `deepspeed tutorial
<https://www.deepspeed.ai/tutorials/megatron/#deepspeed-activation-checkpoints-optional>`_.
cpu_checkpointing: Offloads partitioned activations to CPU if ``partition_activations`` is enabled.
contiguous_memory_optimization: Copies partitioned activations so that they are contiguous in memory.
Not supported by all models.
synchronize_checkpoint_boundary: Insert :func:`torch.cuda.synchronize` at each checkpoint boundary.
load_full_weights: True when loading a single checkpoint file containing the model state dict
when using ZeRO Stage 3. This differs from the DeepSpeed checkpoint which contains shards
per worker.
exclude_frozen_parameters: Exclude frozen parameters when saving checkpoints.
"""
_raise_enterprise_not_available()
from pytorch_lightning_enterprise.strategies.deepspeed import (
DeepSpeedStrategyFabric as EnterpriseDeepSpeedStrategy,
)
super().__init__(
accelerator=accelerator,
parallel_devices=parallel_devices,
cluster_environment=cluster_environment,
precision=precision,
process_group_backend=process_group_backend,
)
self.deepspeed_impl = EnterpriseDeepSpeedStrategy(
outer_object=self,
accelerator=accelerator,
zero_optimization=zero_optimization,
stage=stage,
remote_device=remote_device,
offload_optimizer=offload_optimizer,
offload_parameters=offload_parameters,
offload_params_device=offload_params_device,
nvme_path=nvme_path,
params_buffer_count=params_buffer_count,
params_buffer_size=params_buffer_size,
max_in_cpu=max_in_cpu,
offload_optimizer_device=offload_optimizer_device,
optimizer_buffer_count=optimizer_buffer_count,
block_size=block_size,
queue_depth=queue_depth,
single_submit=single_submit,
overlap_events=overlap_events,
thread_count=thread_count,
pin_memory=pin_memory,
sub_group_size=sub_group_size,
contiguous_gradients=contiguous_gradients,
overlap_comm=overlap_comm,
allgather_partitions=allgather_partitions,
reduce_scatter=reduce_scatter,
allgather_bucket_size=allgather_bucket_size,
reduce_bucket_size=reduce_bucket_size,
zero_allow_untested_optimizer=zero_allow_untested_optimizer,
logging_batch_size_per_gpu=logging_batch_size_per_gpu,
config=config,
logging_level=logging_level,
parallel_devices=parallel_devices,
cluster_environment=cluster_environment,
loss_scale=loss_scale,
initial_scale_power=initial_scale_power,
loss_scale_window=loss_scale_window,
hysteresis=hysteresis,
min_loss_scale=min_loss_scale,
partition_activations=partition_activations,
cpu_checkpointing=cpu_checkpointing,
contiguous_memory_optimization=contiguous_memory_optimization,
synchronize_checkpoint_boundary=synchronize_checkpoint_boundary,
load_full_weights=load_full_weights,
precision=precision,
process_group_backend=process_group_backend,
timeout=timeout,
exclude_frozen_parameters=exclude_frozen_parameters,
)
@property
def zero_stage_3(self) -> bool:
return self.deepspeed_impl.zero_stage_3
@property
@override
def distributed_sampler_kwargs(self) -> dict[str, int]:
return self.deepspeed_impl.distributed_sampler_kwargs
@property
def model(self) -> "DeepSpeedEngine":
return self.deepspeed_impl._deepspeed_engine
[docs] @override
def setup_module_and_optimizers(
self, module: Module, optimizers: list[Optimizer], scheduler: Optional["_LRScheduler"] = None
) -> tuple["DeepSpeedEngine", list[Optimizer], Any]:
"""Set up a model and multiple optimizers together, along with an optional learning rate scheduler. Currently,
only a single optimizer is supported.
Return:
The model wrapped into a :class:`deepspeed.DeepSpeedEngine`, a list with a single
deepspeed optimizer, and an optional learning rate scheduler.
"""
return self.deepspeed_impl.setup_module_and_optimizers(
module=module, optimizers=optimizers, scheduler=scheduler
)
[docs] @override
def setup_module(self, module: Module) -> "DeepSpeedEngine":
"""Set up a module for inference (no optimizers).
For training, see :meth:`setup_module_and_optimizers`.
"""
return self.deepspeed_impl.setup_module(module=module)
[docs] @override
def setup_optimizer(self, optimizer: Optimizer) -> Optimizer:
"""Optimizers can only be set up jointly with the model in this strategy.
Please use :meth:`setup_module_and_optimizers` to set up both module and optimizer together.
"""
return self.deepspeed_impl.setup_optimizer(optimizer=optimizer)
[docs] @override
def module_init_context(self, empty_init: Optional[bool] = None) -> AbstractContextManager:
return self.deepspeed_impl.module_init_context(empty_init=empty_init)
[docs] @override
def module_sharded_context(self) -> AbstractContextManager:
return self.deepspeed_impl.module_sharded_context()
[docs] @override
def save_checkpoint(
self,
path: _PATH,
state: dict[str, Union[Module, Optimizer, Any]],
storage_options: Optional[Any] = None,
filter: Optional[dict[str, Callable[[str, Any], bool]]] = None,
) -> None:
"""Save model, optimizer, and other state in a checkpoint directory.
Args:
path: A path to where the files should be saved
state: A dictionary with contents to be saved. If the dict contains modules or optimizers, their
state-dict will be retrieved and converted automatically.
storage_options: Unused by this strategy, since it doesn't use a ``CheckpointIO`` plugin.
filter: Unsupported.
Raises:
TypeError:
If the unused ``storage_options`` gets passed.
ValueError:
When no :class:`deepspeed.DeepSpeedEngine` objects were found in the state, or when multiple
:class:`deepspeed.DeepSpeedEngine` objects were found.
"""
return self.deepspeed_impl.save_checkpoint(
path=path, state=state, storage_options=storage_options, filter=filter
)
[docs] @override
def load_checkpoint(
self,
path: _PATH,
state: Optional[Union[Module, Optimizer, dict[str, Union[Module, Optimizer, Any]]]] = None,
strict: bool = True,
weights_only: Optional[bool] = None,
) -> dict[str, Any]:
"""Load the contents from a checkpoint and restore the state of the given objects.
Args:
path: A path to where the file is located
state: A dictionary of objects whose state will be restored in-place from the checkpoint path.
This should contain exactly one model, and the model must already be set up by DeepSpeed.
strict: Whether to enforce that the keys in `state` match the keys in the checkpoint.
Returns:
Dictionary with the state inside DeepSpeed's engine
Raises:
ValueError:
If no state is provided, when no :class:`deepspeed.DeepSpeedEngine` objects were found in the
state, or when multiple :class:`deepspeed.DeepSpeedEngine` objects were found.
RuntimeError:
If DeepSpeed was unable to load the checkpoint due to missing files or because the checkpoint is
not in the expected DeepSpeed format.
"""
return self.deepspeed_impl.load_checkpoint(path=path, state=state, strict=strict)
[docs] @override
def clip_gradients_norm(
self,
module: "DeepSpeedEngine",
optimizer: Optimizer,
max_norm: Union[float, int],
norm_type: Union[float, int] = 2.0,
error_if_nonfinite: bool = True,
) -> torch.Tensor:
return self.deepspeed_impl.clip_gradients_norm(
module=module,
optimizer=optimizer,
max_norm=max_norm,
norm_type=norm_type,
error_if_nonfinite=error_if_nonfinite,
)
[docs] @override
def clip_gradients_value(
self, module: "DeepSpeedEngine", optimizer: Optimizer, clip_val: Union[float, int]
) -> None:
return self.deepspeed_impl.clip_gradients_value(module=module, optimizer=optimizer, clip_val=clip_val)
@classmethod
@override
def register_strategies(cls, strategy_registry: _StrategyRegistry) -> None:
strategy_registry.register("deepspeed", cls, description="Default DeepSpeed Strategy")
strategy_registry.register("deepspeed_stage_1", cls, description="DeepSpeed with ZeRO Stage 1 enabled", stage=1)
strategy_registry.register(
"deepspeed_stage_1_offload",
cls,
description="DeepSpeed with ZeRO Stage 1 and optimizer CPU Offload",
stage=1,
offload_optimizer=True,
)
strategy_registry.register("deepspeed_stage_2", cls, description="DeepSpeed with ZeRO Stage 2 enabled", stage=2)
strategy_registry.register(
"deepspeed_stage_2_offload",
cls,
description="DeepSpeed ZeRO Stage 2 and CPU Offload",
stage=2,
offload_optimizer=True,
)
strategy_registry.register("deepspeed_stage_3", cls, description="DeepSpeed ZeRO Stage 3", stage=3)
strategy_registry.register(
"deepspeed_stage_3_offload",
cls,
description="DeepSpeed ZeRO Stage 3 and CPU Offload",
stage=3,
offload_optimizer=True,
offload_parameters=True,
)
strategy_registry.register(
"deepspeed_stage_3_offload_nvme",
cls,
description="DeepSpeed ZeRO Stage 3 and NVMe Offload",
stage=3,
offload_optimizer=True,
offload_parameters=True,
remote_device="nvme",
offload_params_device="nvme",
offload_optimizer_device="nvme",
)
[docs] @override
def setup_environment(self) -> None:
return self.deepspeed_impl.setup_environment()
@override
def _setup_distributed(self) -> None:
return self.deepspeed_impl._setup_distributed()
@property
def config(self) -> dict[str, Any]:
return self.deepspeed_impl.config
@config.setter
def config(self, config: dict[str, Any]) -> None:
self.deepspeed_impl.config = config
@property
def load_full_weights(self) -> bool:
return self.deepspeed_impl.load_full_weights