EPL DOCUMENTATION¶
EPL (Easy Parallel Library) 是一个高效易用的分布式大模型训练框架。
概览¶
Easy Parallel Library (EPL) 是一个高效易用的分布式模型训练框架。 EPL提供了简单易用的API来表达各种并行化策略, 用户仅需几行代码就可以轻松支持各种模型的高性能分布式训练。
EPL深度集成了各种训练优化技术,帮助更多的用户低成本,高性能,轻松开启大模型训练。
支持各种并行化策略及混合并行,用户仅通过转换并行化接口来实现不同并行化策略训练。
支持各种显存优化技术,包含自动Gradient Checkpoint,ZERO,CPU Offload技术等,帮助用户用更少的资源训练更大的模型。
支持通信优化技术,实现高效的分布式扩展性。
EPL助力了最大的中文多模态模型M6实现大规模分布式训练,通过512卡即可训练10万亿参数模型。
使用EPL添加分布式策略¶
通过添加几行代码,用户即可实现不同的并行化策略。完整的API介绍和并行化例子详见API。 你也可以参考使用教程 来训练EPL模型库例子。
数据并行
+ import epl
+ epl.init()
+ with epl.replicate(device_count=1):
model()
流水并行
+ import epl
+
+ config = epl.Config({"pipeline.num_micro_batch": 4})
+ epl.init(config)
+ with epl.replicate(device_count=1, name="stage_0"):
model_part1()
+ with epl.replicate(device_count=1, name="stage_1"):
model_part2()
在上述例子中,模型被切分成2部分,用户可以通过配置pipeline.num_micro_batch
参数来设定Pipeline的micro batch数量。
算子拆分
+ import epl
+ config = epl.Config({"cluster.colocate_split_and_replicate": True})
+ epl.init(config)
+ with epl.replicate(8):
resnet()
+ with epl.split(8):
classification()
在上述例子中,我们对ResNet模型部分进行数据并行,对分类层进行算子拆分。
Citation¶
@misc{jia2021whale,
title={Whale: Scaling Deep Learning Model Training to the Trillions},
author={Xianyan Jia and Le Jiang and Ang Wang and Jie Zhang and Xinyuan Li and Wencong Xiao and Langshi chen and Yong Li and Zhen Zheng and Xiaoyong Liu and Wei Lin},
year={2021},
eprint={2011.09208},
archivePrefix={arXiv},
primaryClass={cs.DC}
}
联系我们¶
欢迎给我们提issue, 或者加入EPL官方钉钉群。
安装¶
本文档介绍如何搭建EPL的运行环境。
依赖¶
TensorFlow-GPU 1.15
从源码安装¶
基于NVIDIA TF1.15镜像¶
nvidia-docker run -ti --gpus all --name build_epl_with_nvtf1.15_21.12 --net host --ipc host -v /mnt:/mnt nvcr.io/nvidia/tensorflow:21.12-tf1-py3 bash
# clone and install EPL
git clone https://github.com/alibaba/EasyParallelLibrary.git
cd EasyParallelLibrary
pip install .
基于TensorFlow TF1.15镜像¶
nvidia-docker run -ti --gpus all --name build_epl_with_tf1.15 --net host --ipc host -v /mnt:/mnt tensorflow/tensorflow:1.15.5-gpu-py3 bash
# install nccl
apt update
apt install libnccl2 libnccl-dev
# clone and install EPL
git clone https://github.com/alibaba/EasyParallelLibrary.git
cd EasyParallelLibrary
pip install .
快速开始¶
我们将通过一个简单的模型示例来演示如何使用EPL来实现一个分布式训练程序。
EPL 分布式策略表达¶
用户首先需要在本地模型定义文件local_model.py
上添加分布式策略的定义。
下面这个例子展示了一个通过添加三行代码实现数据并行的例子。
# local_model.py
import numpy as np
import tensorflow as tf
+ import epl
+ epl.init()
+ epl.set_default_strategy(epl.replicate(1))
# Define model
num_x = np.random.randint(0, 10, (500, 20)).astype(dtype=np.float32)
num_y = np.random.randint(0, 10, 500).astype(dtype=np.int64)
dataset = tf.data.Dataset.from_tensor_slices((num_x, num_y)).batch(10).repeat(1)
iterator = dataset.make_initializable_iterator()
tf.add_to_collection(tf.GraphKeys.TABLE_INITIALIZERS, iterator.initializer)
x, labels = iterator.get_next()
logits = tf.layers.dense(x, 2)
logits = tf.layers.dense(logits, 10)
loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits)
global_step = tf.train.get_or_create_global_step()
optimizer = tf.train.MomentumOptimizer(learning_rate=0.001, momentum=0.9)
train_op = optimizer.minimize(loss, global_step=global_step)
# Training session
with tf.train.MonitoredTrainingSession() as sess:
for i in range(10):
train_loss, _, step = sess.run([loss, train_op, global_step])
print("Iteration %s , Loss: %s ." % (step, train_loss))
print("Train Finished.")
启动分布式训练¶
定义好模型之后,用户需要提供一个本地单级单卡启动的训练脚本,比如run.sh
.
# run.sh
python local_model.py
通过下面的脚本我们可以拉起一个单机两卡的数据并行训练任务。
epl-launch --num_workers 1 --gpu_per_worker 2 run.sh
并行化接口¶
本文档主要介绍了EPL并行化原语接口定义,和接口使用的注意事项。
在开始介绍接口定义之前,用户需要了解以下基本概念:
模型副本 :用户定义的单机单卡模型(不包含任何并行化和GA操作)。
micro batch size(mb): 一个模型副本训练迭代一步学习的samples数量。
num_micro_batch: 一个模型副本 GA或pipeline 累计的micro batch数量。
global batch size: 假设我们对一个模型副本做数据并行操作,并行度为N,则global batch size为
N * mb * num_micro_batch
.TaskGraph: TaskGraph是一个并行化子图。
如果没有特殊说明,EPL默认用户定义的batch size为 micro batch size
。
Parallel Strategy 原语¶
EPL通过strategy annotation的方式来划分模型为多个TaskGraph
,并在此基础上进行并行化。
EPL有两类strategy:replicate
和 split
。每个strategy会定义一个TaskGraph
。
replicate¶
replicate
可以实现模型的数据并行计算,即将模型复制多份,每份模型副本消费不同数据,来实现计算的并行化。 replicate
scope下的子模型会构成一个TaskGraph
。
当整个模型都标记了
replicate
,当前只有一个TaskGraph做复制,就是传统的数据并行模式。当部分模型标记了
replicate
, EPL会对这部分TaskGraph做复制。
接口定义:
replicate(device_count=None, name=None)
Args |
Required |
Description |
---|---|---|
device_count |
True |
|
name |
strategy name |
对于数据并行,一个模型副本用一张卡来计算,EPL会根据当前资源总数推断出全局的副本数。
当device_count
大于1的时候,EPL在做模型复制的时候会对micro batch size进行拆分,平均到device_count
张卡上,保持用户模型的micro batch size保持不变。
示例:
import epl
epl.init()
with epl.replicate(device_count=1):
model()
上面这个例子是一个数据并行的例子,每个模型副本用一张卡来计算。如果用户申请了8张卡,就是一个并行度为8的数据并行任务。
split¶
split
可以实现模型的tensor拆分计算。split
scope下定义的子模型会构成一个TaskGraph
,该TaskGraph
会被拆分后放置在多张卡上计算。
接口定义:
split(device_count=None, name=None)
Args |
Required |
Description |
---|---|---|
device_count |
True |
split 对应的taskgraph拆分到device_count张卡上计算 |
name |
strategy name |
示例:
import epl
epl.init()
with epl.split(device_count=8):
model()
上述例子将模型拆分到8张卡上做计算,如果用户申请了16张卡,EPL会默认在拆分模型外面嵌套一层数据并行。
set_default_strategy¶
除了两个基本的并行化接口replicate
和split
外,EPL也提供了一个设置默认strategy的辅助接口,
如果用户调用set_default_strategy方法,会设置一个默认的并行化策略和对应的TaskGraph
。
这个接口可以帮助模型并行化表达更简洁,同时更灵活地表达出复杂的并行策略。
接口定义:
set_default_strategy(strategy)
Args |
Required |
Description |
---|---|---|
strategy |
True |
并行化策略。 |
示例:
import epl
epl.init()
epl.set_default_strategy(epl.replicate(device_count=1))
model()
上述例子设置了一个默认的replicate
策略,通过这种方式也可以实现模型的数据并行。
接口使用说明与要求¶
不同并行化strategy生成的TaskGraph默认会放置在不同Device上。
Strategy annotation不允许嵌套。
用户只需标记模型前向代码,backward和apply自动与Forward colocate在对应的
TaskGraph
里。
关于如何使用并行化接口实现更多灵活的并行化策略,比如Pipeline,混合并行等,您可以继续阅读 并行化例子。
接口使用范例¶
本文档主要介绍如何使用EPL的并行化接口实现常见的并行化策略,以及复杂的混合并行。
数据并行¶
import epl
epl.init()
with epl.replicate(device_count=1):
model()
上面这个例子是一个数据并行的例子,每个模型副本用一张卡来计算。如果用户申请了8张卡,就是一个并行度为8的数据并行任务。
流水并行¶
import epl
config = epl.Config({"pipeline.num_micro_batch": 4})
epl.init(config)
with epl.replicate(device_count=1, name="stage_0"):
model_part1()
with epl.replicate(device_count=1, name="stage_1"):
model_part2()
在上述例子中,模型被切分成2个 TaskGraph
"stage_0"和"stage_1",用户可以通过配置pipeline.num_micro_batch
参数来设定Pipeline的micro batch数量。
在这个例子里,"stage_0"和"stage_1"组成一个模型副本,共需要2张GPU卡。如果用户申请了8张卡,EPL会自动在pipeline外嵌套一层并行度为4的数据并行(4个pipeline副本并行执行)。
算子拆分¶
算子拆分 - 大规模图像分类¶
import epl
config = epl.Config({"cluster.colocate_split_and_replicate": True})
epl.init(config)
with epl.replicate(8):
resnet()
with epl.split(8):
classification()
上述是一个大规模图像分类的例子,在这个例子中,对图像特征部分采用数据并行,对分类层采用算子拆分的方式。
为了减少两个TaskGraph直接的通信开销,我们可以通过设置cluster.colocate_split_and_replicate
参数将两个TaskGraph放置在相同的卡上(默认不同的TaskGraph会放置在不同的卡上)。
算子拆分 - MOE Transformer¶
import epl
config = epl.Config({"cluster.colocate_split_and_replicate": True})
epl.init(config)
total_gpu_num = epl.Env.get().cluster.total_gpu_num
epl.set_default_strategy(epl.replicate(total_gpu_num))
AttentionAndGating()
with epl.split(total_gpu_num):
MOE_Variable_Define()
MOE_Calculation_Define()
在上述例子中,我们实现了一个简单的MOE模型,通过设置set_default_strategy
设置默认的并行化策略为replicate
,
并对MOE部分进行计算的拆分。
配置¶
用户可以通过配置项开启各种优化功能。目前可以通过环境变量或者配置接口的方式来更改默认配置。
以下配置表格包含
Param Key: 参数名, 在EPL中,参数名的命名规则为"param_category.attribute",
param_category
为参数的分类,比如pipeline
,attribute
为每个参数类别下的配置属性,比如num_micro_batch
。Type: 参数类型
Default: 默认值
Description: 解释
接口定义:
Config(param_dict=None)
Args |
Type |
Required |
Description |
---|---|---|---|
param_dict |
dict |
False |
参数字典,key为参数名,value为参数值。 |
示例:
import epl
config = epl.Config({"pipeline.num_micro_batch": 4})
epl.init(config)
在上述例子中用户通过构造一个dict类型的参数字典,来修改参数配置。具体的参数描述请查阅下文的参数列表。
Pipeline 配置¶
Param Key |
Type |
Default |
Description |
---|---|---|---|
"pipeline.num_micro_batch" |
integer |
1 |
Pipeline number of micro batches. |
"pipeline.num_stages" |
integer |
None |
如果开启了自动Pipeline, 可以配置pipeline的stage数。 |
"pipeline.strategy" |
str |
"PreferBackward" |
Pipeline调度策略,可选策略为 ["PreferBackward", "PreferForward"] |
Gradient Checkpoint (GC) 配置¶
Gradient checkpoint通过重算换显存的方式来降低训练过程中的峰值显存。
Param Key |
Type |
Default |
Description |
---|---|---|---|
"gradient_checkpoint.type" |
str |
"" |
Gradient checkpoint选点方式,现提供两种方式, "collection": 用户指定GC tensor, "auto": epl 自动选点 |
"gradient_checkpoint.end_taskgraph" |
integer |
-1 |
当开启auto GC,用于指定GC的结束taskgraph。 |
"gradient_checkpoint.check_gradients" |
bool |
False |
校验GC生成的梯度的正确性。 |
代码示例:
自动GC选点, 对于Transformer类模型,推荐使用auto GC的方式。
import epl
# Enable auto GC.
config = epl.Config({"gradient_checkpoint.type": "auto"})
epl.init(config)
手动选点
import tensorflow as tf
import epl
config = epl.Config({"gradient_checkpoint.type": "collection"})
epl.init(config)
# 手动指定checkpoint tensor
tensor = op1()
tf.add_to_collection("checkpoints", tensor)
Zero 配置¶
在数据并行的场景下,每个卡上会存放一个模型副本,optimizer state等,这些信息在每张卡上都是一样,存在很大的冗余量。当模型变大,很容易超出单卡的显存限制。
在分布式场景下,我们可以通过类似zero的思路,将optimizer state和gradient分片存在不同的卡上,从而减少单卡的persistent memory占用。
Param Key |
Type |
Default |
Description |
---|---|---|---|
"zero.level" |
str |
"" |
ZERO开启级别,目前EPL支持 level设置为 "v1", 对optimizer states和gradients进行拆分。 |
import epl
# 打开Zero
config = epl.Config({"zero.level": "v1"})
epl.init(config)
注意:
epl zero只能应用于数据并行部分。
目前不支持zero组合gradient accumulation使用。
支持的GPU cluster为多机一卡的场景,即多个worker,每个worker一张卡。
Offload 配置¶
当模型参数量很大,超出GPU显存限制,我们可以通过CPU Offload,利用内存来扩大单卡可以训练的模型规模。 epl可以通过设置offload.level来实现offload。
"v0": offload所有的参数到CPU上。
Param Key |
Type |
Default |
Description |
---|---|---|---|
"offload.level" |
str |
"" |
offload level. |
示例
import epl
config = epl.Config({"offload.level": "v0"})
epl.init(config)
Memory-efficient AMP 配置¶
TF原生的AMP设计会在显存中保留一份FP16的weight,对于参数量很大的模型,会额外增加显存占用。为了让AMP显存开销更友好,EPL实现了一版memory-efficient AMP, 通过实时转换和释放的方式来节省显存。
用户可以通过配置amp.level 参数来开启EPL的AMP。
Param Key |
Type |
Default |
Description |
---|---|---|---|
"amp.level" |
str |
"" |
Auto mixed precision level, currently only support O1. |
"amp.debug_log" |
bool |
False |
Enable amp debug log. |
"amp.loss_scale" |
integer/str |
"dynamic" |
Loss scale for amp, can be "dynamic" or number(for fix). |
示例
import epl
config = epl.Config({"amp.level": "O1", "amp.loss_scale": "dynamic"})
# fixed loss scaling
config = epl.Config({"amp.level": "O1", "amp.loss_scale": 128})
epl.init(config)
Optimizer 配置¶
训练任务在做参数更新的时候(optimizer apply), 对于一些有较多临时tensor buffer的optimizer实现,容易消耗较多的显存。可以通过配置num_apply_group参数实现分组apply的方式节省显存消耗。
Param Key |
Type |
Default |
Description |
---|---|---|---|
"optimizer.num_apply_group" |
integer |
1 |
Number of gradient apply groups. |
示例
import epl
config = epl.Config({"optimizer.num_apply_group": 30})
epl.init(config)
Cluster 配置¶
Param Key |
Type |
Default |
Description |
---|---|---|---|
"cluster.device_place_prefer_intra_node" |
bool |
True |
Prefer placing one model replica within node. |
"cluster.run_visible_devices" |
str |
"" |
Visible devices for session. Usually, its value is setted by scheduler. |
"cluster.colocate_split_and_replicate" |
bool |
False |
如果cluster.colocate_split_and_replicate设为True,不同的taskgraph会共享相同的device。 |
通信配置¶
Param Key |
Type |
Default |
Description |
---|---|---|---|
"communication.num_communicators" |
integer |
2 |
通信线程池的communicator个数。 |
"communication.sparse_as_dense" |
bool |
False |
是否将sparse tensor转换为dense tensor进行通信。 |
"communication.max_splits" |
integer |
5 |
最大通信梯度融合的分组数。 |
"communication.fp16" |
bool |
False |
是否开启fp16参数通信。 |
"communication.fp16_scale" |
integer |
128 |
开启fp16参数通信后,为防止梯度消失问题,梯度scale系数。 |
"communication.clip_after_allreduce" |
bool |
False |
选择通信后进行梯度Clip,还是在梯度Clip后进行通信。 |
"communication.gradients_reduce_method" |
str |
"mean" |
梯度AllReduce的方式,可以是 "mean" 和 "sum"。 |
IO配置¶
Param Key |
Type |
Default |
Description |
---|---|---|---|
"io.slicing" |
bool |
False |
是否自动对数据进行分片。 |
"io.unbalanced_io_slicing" |
bool |
False |
允许数据分片切分时worker分配的文件数目不相同,部分worker会多分配1个训练文件。 |
"io.drop_last_files" |
bool |
False |
对文件列表进行均匀切分,丢弃多余的文件。 |
Auto Parallel配置¶
Param Key |
Type |
Default |
Description |
---|---|---|---|
"auto.auto_parallel" |
bool |
False |
是否打开自动并行化(目前还在实验阶段)。 |
Env¶
本文档主要介绍EPL的Env中获取常用的运行时信息。
你可以通过epl.Env.get()
获取当前的env对象。
cluster¶
cluster包含当前分布式任务的集群信息。
Attribute |
Type |
Description |
---|---|---|
|
int |
worker数量 |
|
int |
当前worker的index |
示例
import epl
env = epl.Env.get()
worker_num = env.cluster.worker_num
worker_index = env.cluster.worker_index
config¶
config包含当前epl的配置信息。
示例
import epl
env = epl.Env.get()
config = env.config
数据并行¶
本节将介绍如何通过EPL来对ResNet-50
模型做数据并行分布式训练。
通过添加以下几行代码,EPL即可将本地训练程序转换成分布式训练程序。
+ import epl
+ epl.init()
+ epl.set_default_strategy(epl.replicate(device_count=1))
ResNet50()
training_session()
用户可以通过以下脚本来启动一个2卡的数据并行训练任务。
epl-launch --num_workers 2 --gpu_per_worker 1 scripts/train_dp.sh
scripts/train_dp.sh
是一个本地的训练脚本。
完整的训练代码可以参考EPL ResNet Example。
流水并行¶
本节将介绍如何通过EPL来对Bert模型做 Pipeline 分布式训练。
训练准备¶
这个例子采用的Bert模型代码基于Google官方的Bert Repo https://github.com/google-research/bert 。
下载预训练Bert模型文件¶
wget https://storage.googleapis.com/bert_models/2018_10_18/uncased_L-12_H-768_A-12.zip
unzip uncased_L-12_H-768_A-12.zip
准备数据集¶
mkdir data
cd data
wget https://rajpurkar.github.io/SQuAD-explorer/dataset/train-v1.1.json
wget https://rajpurkar.github.io/SQuAD-explorer/dataset/dev-v1.1.json
wget https://raw.githubusercontent.com/allenai/bi-att-flow/master/squad/evaluate-v1.1.py
分布式 Bert 流水并行训练¶
用户仅需要添加几行并行化策略和配置代码,即可实现Bert的流水并行训练策略。
+ import epl
+ epl.init(epl.Config({"pipeline.num_micro_batch": 4}))
# model annotation
+ epl.set_default_strategy(epl.replicate(1))
model_stage0()
+ epl.set_default_strategy(epl.replicate(1))
model_stage1()
用户可以通过以下脚本来启动一个2个stage的流水并行训练任务。
epl-launch --num_workers 1 --gpu_per_worker 2 scripts/train_bert_base_dp.sh
完整的训练代码可以参考 EPL Bert Example。
模型验证¶
完成训练后,你可以通过以下脚本来得到验证结果。
SQUAD_DIR=data
python $SQUAD_DIR/evaluate-v1.1.py $SQUAD_DIR/dev-v1.1.json ${output_dir}/predictions.json
在模型训练2 Epoch后,预期会得到 f1 ~= 88.0, exact_match ~= 79.8。
MoE算子拆分并行¶
本节将介绍如何通过EPL来实现 MoE (Mixture of Experts) transformer 模型训练。
训练准备¶
模型代码将基于tensor2tensor的组件。
准备数据集¶
t2t-datagen --data_dir=data --tmp_dir=data/original/dataset --problem=translate_ende_wmt32k
或者,通过在scripts/train_moe_t5.sh
脚本中设置FLAGS.generate_data
来自动下载和准备数据。
详细的准备流程可以参考 tensor2tensor文档.
分布式训练¶
EPL仅需添加几行代码来实现 MoE 算子拆分并行,如下所示:
+ import epl
+ config = epl.Config({"cluster.colocate_split_and_replicate": True})
+ epl.init(config)
+ epl.set_default_strategy(epl.replicate(total_gpu_num))
AttentionAndGating()
+ with epl.split(total_gpu_num):
MOE_Variable_Define()
MOE_Calculation_Define()
用户可以通过以下脚本来启动一个2卡的MOE算子拆分并行训练任务。
epl-launch --num_workers 2 --gpu_per_worker 1 scripts/train_moe_t5.sh
完整的训练代码可以参考 EPL MOE Example。