CPUPS 流式训练示例

在之前的参数服务器概述中曾经提到,由于推荐搜索场景的特殊性,在训练过程中使用到的训练数据通常不是固定的集合,而是随时间流式的加入到训练过程中,这种训练方式称为流式训练。

与传统的固定数据集的训练相比,在大规模稀疏参数场景下,流式训练有如下特点:

  1. 在线上系统提供服务的过程中,训练程序一直运行,等待线上服务产生的数据,并在数据准备完成后进行增量训练。

  2. 数据一般按时间组织,每个 Pass 的训练数据对应一个或几个时间分片(文件夹),并在该时间分片的数据生成结束后,在对应时间分片的文件夹中添加一个空文件用于表示数据准备完成。

  3. 线上服务产生的数据不断进入训练系统,导致稀疏参数不断增加,在模型精度不受影响的情况下,为控制模型总存储,增加稀疏参数统计量自动统计、稀疏参数准入、退场、增量保存等功能。

在学习流式训练具体使用方法之前,建议先详细阅读参数服务器快速开始章节,了解参数服务器的基本使用方法。

流式训练的完整代码示例参见:PaddleRec,具体的目录结构如下:

├── tools
    ├── static_ps_online_trainer.py      # 流式训练主脚本
    ├── utils                            # 流式训练所需各种工具函数封装
        ├── static_ps
            ├── flow_helper.py           # 流式训练所需 utils,包括保存、加载等
            ├── metric_helper.py         # 分布式指标计算所需 utils
            ├── time_helper.py           # 训练耗时计算所需 utils
            ├── program_helper.py        # 模型引入、分布式 strategy 生成所需 util
├── models                               # 存放具体模型组网和配置
    ├── rank
        ├── slot_dnn                     # 模型示例目录
            ├── net.py                   # mlp 具体组网
            ├── static_model.py          # 静态图训练调用(在 net.py 基础上封装)
            ├── config_online.yaml       # 流式训练配置文件
            ├── queuedataset_reader.py   # 数据处理脚本

1 模型组网

为实现稀疏参数统计量自动统计,在组网时需要注意以下两点:

  1. embedding 层需使用 paddle.static.nn.sparse_embedding ,该算子为大规模稀疏参数模型特定的 embedding 算子,支持对稀疏参数统计量自动统计。

  2. 稀疏参数的统计量,目前特指稀疏特征的展现量(show)和点击量(click),在组网时定义两个变量,指明特征是否展现和点击,通常取值为 0 或者 1, sparse_embedding 中通过 entry 参数传入一个 ShowClickEntry ,指明这两个变量(show 和 click)的名字。

# static_model.py
# 构造 show/click 对应的 data,变量名需要与 entry 中的名称一致
show = paddle.static.data(
    name="show", shape=[None, 1], dtype="int64")
label = paddle.static.data(
    name="click", shape=[None, 1], dtype="int64")

# net.py
# ShowClickEntry 接收的 show/click 变量数据类型为 float32,需做 cast 处理
show_cast = paddle.cast(show, dtype='float32')
click_cast = paddle.cast(click, dtype='float32')

# 构造 ShowClickEntry,指明展现和点击对应的变量名
self.entry = paddle.distributed.ShowClickEntry(show_cast.name,
                                               click_cast.name)
emb = paddle.static.nn.sparse_embedding(
    input=s_input,
    size=[self.dict_dim, self.emb_dim],
    padding_idx=0,
    entry=self.entry,   # 在 sparse_embedding 中传入 entry
    param_attr=paddle.ParamAttr(name="embedding"))

2 数据读取

本章节主要讲解流式训练中的数据组织形式、数据等待方式、数据拆分方式等内容,涉及到的概念如下:

  1. 数据分片:将数据按照固定的时间间隔组织成多个分片,一段时间间隔中的所有训练数据称为一个数据分片。

  2. Pass:把当前训练集的所有数据全部训练一遍,通常一个 Pass 对应一个或多个数据分片。

涉及到的具体配置如下:

名称

类型

取值

是否必须

作用描述

train_data_dir

string

任意

训练数据所在目录(如果数据存于 afs 或 hdfs 上,以 afs:/或 hdfs:/开头)

split_interval

int

任意

数据落盘分片间隔时间(分钟)

split_per_pass

int

任意

训练一个 Pass 包含多少个分片的数据

start_day

string

任意

训练开始的日期(例:20190720)

end_day

string

任意

训练结束的日期(例:20190720)

data_donefile

string

任意

用于探测当前分片数据是否落盘完成的标识文件

data_sleep_second

int

任意

当前分片数据尚未完成的等待时间

prefetch

bool

任意

是否开启数据 prefetch,即在当前 pass 训练过程中预读取下一个 pass 的数据

2.1 数据组织形式

在训练数据目录下,再建立两层目录,第一层目录对应训练数据的日期(8 位),第二层目录对应训练数据的具体时间(4 位,前两位为小时,后两位为分钟),并且需要与配置文件中的 split_interval 配置对应。 例如:train_data_dir 配置为“data”目录,split_interval 配置为 5,则具体的目录结构如下:

├── data
    ├── 20190720              # 训练数据的日期
        ├── 0000              # 训练数据的时间(第 1 个分片),0 时 0 分 - 0 时 5 分时间内的数据
            ├── data_part1    # 具体的训练数据文件
            ├── ......
        ├── 0005              # 训练数据的时间(第 2 个分片),0 时 5 分 - 0 时 10 分时间内的数据
            ├── data_part1    # 具体的训练数据文件
            ├── ......
        ├── 0010              # 训练数据的时间(第 3 个分片),0 时 10 分 - 0 时 15 分时间内的数据
            ├── data_part1    # 具体的训练数据文件
            ├── ......
        ├── ......
        ├── 2355              # 训练数据的时间(该日期下最后 1 个分片),23 时 55 分 - 24 时时间内的数据
            ├── data_part1    # 具体的训练数据文件
            ├── ......

根据 split_interval 和 split_per_pass 这两个配置项,在训练之前生成每个 Pass 所需要的数据分片列表,具体实现如下:

# 该方法定义在 tools/utils/static_ps/flow_helper.py 中
def get_online_pass_interval(split_interval, split_per_pass,
                            is_data_hourly_placed):
    split_interval = int(split_interval)
    split_per_pass = int(split_per_pass)
    splits_per_day = 24 * 60 // split_interval
    pass_per_day = splits_per_day // split_per_pass
    left_train_hour = 0
    right_train_hour = 23

    start = 0
    split_path = []
    for i in range(splits_per_day):
        h = start // 60
        m = start % 60
        if h < left_train_hour or h > right_train_hour:
            start += split_interval
            continue
        if is_data_hourly_placed:
            split_path.append("%02d" % h)
        else:
            split_path.append("%02d%02d" % (h, m))
        start += split_interval

    start = 0
    online_pass_interval = []
    for i in range(pass_per_day):
        online_pass_interval.append([])
        for j in range(start, start + split_per_pass):
            online_pass_interval[i].append(split_path[j])
        start += split_per_pass

    return online_pass_interval

# 根据 split_interval 和 split_per_pass,在训练之前生成每个 Pass 所需要的数据分片列表
self.online_intervals = get_online_pass_interval(
          self.split_interval, self.split_per_pass, False)

例如:split_interval 配置为 5,split_per_pass 配置为 2,即数据分片时间间隔为 5 分钟,每个 Pass 的训练数据包含 2 个分片,则 online_intervals 数组的具体值为:[[0000, 0005], [0005, 0010], ..., [2350, 2355]]。

2.2 数据等待方式

如果在训练过程中,需要等待数据准备完成,则需要配置 data_donefile 选项。

开启数据等待后,当数据目录中存在 data_donefile 配置对应的文件(一般是一个空文件)时,才会对该目录下的数据执行后续操作,否则,等待 data_sleep_second 时间后,重新探测是否存在 data_donefile 文件。

2.3 数据拆分方式

由于参数服务器中存在多个训练 Worker,为保证每个训练 Worker 只训练数据集中的一部分,需要使用 fleet.util.get_file_shard() 对训练集进行拆分

# 该方法定义在 tools/utils/static_ps/flow_helper.py 中
def file_ls(path_array, client):
    # 获取 path 数组下的所有文件
    # 如果数据存在 hdfs/afs 上,需要使用 hadoop_client
    result = []
    for path in path_array:
        if is_local(path):
            cur_path = os.listdir(path)
        else:
            cur_path = client.ls_dir(path)[1]
        if len(cur_path) > 0:
            result += [os.path.join(path, i) for i in cur_path]
    logger.info("file ls result = {}".format(result))
    return result

cur_path = []
for i in self.online_intervals[pass_index - 1]:
    # p 为一个具体的数据分片目录,例如:"data/20190720/0000"
    p = os.path.join(train_data_path, day, str(i))
    if self.data_donefile:
      # 数据等待策略生效,如果目录下无 data_donefile 文件,需等待 data_sleep_second 后再探测
      cur_donefile = os.path.join(p, self.data_donefile)
      data_ready(cur_donefile, self.data_sleep_second,
                self.hadoop_client)
    # cur_path 存储当前 Pass 下的所有数据目录,对应一个或多个数据分片文件夹
    # 例如:["data/20190720/0000", "data/20190720/0005"]
    cur_path.append(p)

# 获取当前数据分片下的所有数据文件
global_file_list = file_ls(cur_path, self.hadoop_client)
# 将数据文件拆分到每个 Worker 上
my_file_list = fleet.util.get_file_shard(global_file_list)

2.4 数据读取

流式训练通常采用 InMemoryDataset 来读取数据,InMemoryDataset 会将当前 Worker 中的所有数据全部加载到内存,并支持秒级全局打散等功能。

# 创建 InMemoryDataset
dataset = paddle.distributed.InMemoryDataset()

# InMemoryDataset 初始化
dataset.init(use_var=self.input_data,
             pipe_command=self.pipe_command,
             batch_size=batch_size,
             thread_num=thread_num)

# 设置文件列表为拆分到当前 Worker 的 file_list
dataset.set_filelist(my_file_list)

# 将训练数据加载到内存
dataset.load_into_memory()
# 数据全局打散
dataset.global_shuffle(fleet, shuffle_thread_num)
# 获取当前 Worker 在全局打散之后的训练数据样例数
shuffle_data_size = dataset.get_shuffle_data_size(fleet)

# 省略具体的训练过程

# 在当前 Pass 训练结束后,InMemoryDataset 需调用 release_memory()方法释放内存
dataset.release_memory()

2.5 数据预读取

由于数据读取是 IO 密集型任务,而模型训练是计算密集型任务,为进一步提升整体训练性能,可以将数据读取和模型训练两个阶段做 overlap 处理,即在上一个 pass 训练过程中预读取下一个 pass 的数据。

具体地,可以使用 dataset 的以下两个 API 进行数据预读取操作: 1. preload_into_memory() :创建 dataset 后,使用该 API 替换 load_into_memory() ,在当前 pass 的训练过程中,预读取下一个 pass 的训练数据。 2. wait_preload_done() :在下一个 pass 训练之前,调用 wait_preload_done() ,等待 pass 训练数据全部读取完毕,进行训练。

3 模型训练及预测

模型训练及预测使用 exe.train_from_dataset()exe.infer_from_dataset() 接口即可,本章节讲解一下在训练和预测过程中计算分布式指标上的一些细节以及如何利用 debug 模式下的 dump 功能打印模型计算的中间结果。

3.1 分布式指标计算

在之前的参数服务器概述中曾经提到,由于参数服务器存在多个训练节点,因此在计算指标时,需要汇总所有节点的全量数据,进行全局指标计算。

除此之外,分布式全局指标计算还需要注意以下两点:

  1. 参数服务器的训练节点一般会存在多个线程同时进行训练,而所有线程共享指标计算所需的中间变量,这就可能导致中间变量的累计计数不准确,因此需要让每个线程拥有自己独立的中间变量。

  2. 指标计算所需的中间变量在整个训练过程中会持续累计计数,因此需要在合适的位置进行清零操作,避免当前指标计算受之前累计计数的影响。

同样是以 AUC 指标为例,全局 AUC 指标计算示例如下:

# 该方法定义在 tools/utils/static_ps/metric_helper.py 中
def set_zero(var_name,
             scope=fluid.global_scope(),
             place=fluid.CPUPlace(),
             param_type="int64"):
    # 对变量进行清零操作
    param = scope.var(var_name).get_tensor()
    param_array = np.zeros(param._get_dims()).astype(param_type)
    param.set(param_array, place)

# 组网阶段,AUC 算子在计算 auc 指标同时,返回正负样例中间统计结果(stat_pos, stat_neg)
auc, batch_auc, [batch_stat_pos, batch_stat_neg, stat_pos, stat_neg] = \
    paddle.static.auc(input=pred, label=label)

strategy = fleet.DistributedStrategy()
strategy.a_sync = True

# 获取计算指标所需的中间变量的 name 列表,并将其配置到 strategy 的 stat_var_names 选项中
stat_var_names = [stat_pos.name, stat_neg.name]
strategy.trainer_desc_configs = {"stat_var_names": stat_var_names}

# 省略具体训练过程

# 训练结束后,利用 AUC 算子返回的中间计算结果,以及 fleet 提供的分布式指标计算接口,完成全局 AUC 计算。
global_auc = fleet.metrics.auc(stat_pos, stat_neg)

# 指标计算所需的中间变量清零
set_zero(stat_pos.name)
set_zero(stat_neg.name)

3.2 Dump 功能

Debug 模式下的 dump 功能主要为了解决以下两个问题:

  1. 在训练过程中希望打印模型计算的中间结果,用于监控模型是否收敛等情况。

  2. 为减轻线上推理服务的计算压力,在召回或者匹配模型中,一般需要将 doc 侧的向量预先计算出来,灌入向量搜索引擎(例如 milvus)中。因此需要在流式训练过程中加入预测阶段打印 doc 侧的向量计算结果。

# 该方法定义在 tools/utils/static_ps/program_helper.py 中
def set_dump_config(program, dump_config):
    # 配置 dump 相关信息
    if dump_config.get("dump_fields_path") is not None:
        # 打印出的中间结果存放路径
        program._fleet_opt["dump_fields_path"] = dump_config.get(
            "dump_fields_path")
    if dump_config.get("dump_fields") is not None:
        # 需要打印的中间层变量名
        program._fleet_opt["dump_fields"] = dump_config.get("dump_fields")
    if