global_gather

paddle.distributed.utils. global_gather ( x, local_count, global_count, group=None, use_calc_stream=True ) [源代码]

根据global_count将x的数据收集到n_expert * world_size个expert,然后根据local_count接收数据。 其中expert是用户定义的专家网络,n_expert是指每张卡拥有的专家网络数目,world_size是指运行网络的显卡数目。

参数

  • x (Tensor) - 输入Tensor。Tensor的数据类型必须是float16、float32、 float64、int32、int64。

  • local_count (Tensor) - 拥有n_expert * world_size个数据的Tensor,用于表示有多少数据接收。Tensor的数据类型必须是int64。

  • global_count (Tensor) - 拥有n_expert * world_size个数据的Tensor,用于表示有多少数据发送。Tensor的数据类型必须是int64。

  • group (Group, 可选) - new_group返回的Group实例,或者设置为None表示默认地全局组。默认值:None。

  • use_calc_stream (bool,可选) - 标识使用计算流还是通信流。默认值:True,表示用计算流。

返回

Tensor, 从所有expert接收的数据。

代码示例

# required: distributed
import numpy as np
import paddle
from paddle.distributed import init_parallel_env
init_parallel_env()
n_expert = 2
world_size = 2
d_model = 2
in_feat = d_model
local_input_buf = np.array([[1, 2],[3, 4],[5, 6],[7, 8],[9, 10]],                                        dtype=np.float32)
if paddle.distributed.ParallelEnv().local_rank == 0:
    local_count = np.array([2, 1, 1, 1])
    global_count = np.array([2, 1, 1, 1])
else:
    local_count = np.array([1, 1, 2, 1])
    global_count = np.array([1, 1, 2, 1])
local_input_buf = paddle.to_tensor(local_input_buf, dtype="float32", stop_gradient=False)
local_count = paddle.to_tensor(local_count, dtype="int64")
global_count = paddle.to_tensor(global_count, dtype="int64")
a = paddle.distributed.utils.global_gather(local_input_buf, local_count, global_count)
print(a)
# out for rank 0: [[1, 2], [3, 4], [7, 8], [1, 2], [7, 8]]
# out for rank 1: [[5, 6], [9, 10], [3, 4], [5, 6], [9, 10]]
a.stop_gradient = False
c = a * a
c.backward()
print("local_input_buf.grad", local_input_buf.grad)
# out for rank 0: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]]
# out for rank 1: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]]