ParallelExecutor

api_attr

declarative programming (static graph)

class paddle.fluid.ParallelExecutor(use_cuda, loss_name=None, main_program=None, share_vars_from=None, exec_strategy=None, build_strategy=None, num_trainers=1, trainer_id=0, scope=None)[source]

The ParallelExecutor is an upgraded version of fluid.Executor that supports multi-node model training and testing based on the data-parallel mode. In data-parallel mode, ParallelExecutor will broadcast the parameters from Node0 to other nodes during construction and copy the input Program to other nodes from Node0 to make sure that the initial state on each node is the same. Each node runs the model independently and the parameters’ gradient is aggregated between those nodes during backward computation, and then each node independently updates its parameters. If you use the GPU to run the model, i.e. use_cuda=True, the node refers to the GPU, ParallelExecutor will automatically get the GPU resources available on the current machine, users can also set the available GPU resources in the environment variable, for example: want to use GPU0, GPU1, export CUDA_VISIBLEDEVICES=0,1; If the operation is performed on the CPU, i.e. use_cuda=False, the node refers to the CPU. Note: At this time, the user needs to manually add CPU_NUM to the environment variable and set the number of CPU devices. For example, export CPU_NUM=4, if the environment variable is not set, the executor will add the variable to the environment variable and set it to 1.

Parameters
  • use_cuda (bool) – Whether to use CUDA or not.

  • loss_name (str) – This parameter is the name of the loss variable of the model. Note: If it is data-parallel model training, you must set loss_name, otherwise, the results may be wrong. The default is None.

  • main_program (Program) – This parameter represents the Program to be executed. If this parameter is not provided, that parameter is None, the program will be set to fluid.default_main_program(). The default is None.

  • share_vars_from (ParallelExecutor) – If share_vars_from is set, the current ParallelExecutor will share the parameters with the ParallelExecutor specified by share_vars_from. This parameter needs to be set when model testing is required during model training, and the data parallel mode is used for training and testing. Since ParallelExecutor will only distribute parameter variables to other devices when it is first executed, the ParallelExecutor specified by share_vars_from must be run before the current ParallelExecutor. The default is None.

  • exec_strategy (ExecutionStrategy) – exec_strategy specifies the options that can be changed when running the current model, such as the thread pool size. For more information about exec_strategy, please refer to fluid.ExecutionStrategy. The default is None.

  • build_strategy (BuildStrategy) – By configuring build_strategy, we can optimize the computational graph, such as operators’ fusion in the computational graph and memory optimization during the execution of the computational graph. For more information about build_strategy, please refer to fluid.BuildStrategy. The default is None.

  • num_trainers (int) – This parameter needs to be set in GPU distributed training. If the parameter value is greater than 1, NCCL will be initialized by multi-level nodes. Each node should have the same number of GPUs. The default is 1.

  • trainer_id (int) – This parameter needs to be set when performing GPU distributed training. This parameter must be used with the num_trainers parameter. Trainer_id indicates the “rank” of the current node. The trainer_id starts counting from 0. The default is 0.

  • scope (Scope) – Specifies the scope in which the program is executed. The default is fluid.global_scope().

Returns

The initialized ParallelExecutor object.

Return type

ParallelExecutor

Raises

TypeError – If share_vars_from is provided, but not ParallelExecutor object.

Notes

  1. If you only use ParallelExecutor to do multi-card test, you don’t need to set loss_name and share_vars_from.

  2. If you need to train and test the model with ParallelExecutor, the share_vars_from must be set when building the ParallelExecutor corresponding to the model test. Otherwise, the parameters used in the model test and the model training are inconsistent.

Examples

import paddle.fluid as fluid
import numpy
import os

use_cuda = True
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()

# NOTE: If you use CPU to run the program, you need
# to specify the CPU_NUM, otherwise, fluid will use
# all the number of the logic core as the CPU_NUM,
# in that case, the batch size of the input should be
# greater than CPU_NUM, if not, the process will be
# failed by an exception.
if not use_cuda:
    os.environ['CPU_NUM'] = str(2)

exe = fluid.Executor(place)

train_program = fluid.Program()
startup_program = fluid.Program()
with fluid.program_guard(train_program, startup_program):
    data = fluid.data(name='X', shape=[None, 1], dtype='float32')
    hidden = fluid.layers.fc(input=data, size=10)
    loss = fluid.layers.mean(hidden)
    test_program = fluid.default_main_program().clone(for_test=True)
    fluid.optimizer.SGD(learning_rate=0.01).minimize(loss)

startup_program.random_seed=1
exe.run(startup_program)

train_exe = fluid.ParallelExecutor(use_cuda=use_cuda,
                                   main_program=train_program,
                                   loss_name=loss.name)
test_exe = fluid.ParallelExecutor(use_cuda=use_cuda,
                                  main_program=test_program,
                                  share_vars_from=train_exe)

x = numpy.random.random(size=(10, 1)).astype('float32')
loss_data, = train_exe.run(feed={"X": x},
                           fetch_list=[loss.name])

loss_data, = test_exe.run(feed={"X": x},
                          fetch_list=[loss.name])
run(fetch_list, feed=None, feed_dict=None, return_numpy=True)

This interface is used to run the current model. It should be noted that the executor will execute all the operators in the Program, and will not prune some operators in the Program according to the fetch_list.

Parameters
  • fetch_list (list) – This parameter represents the variables that need to be returned after the model runs. The default is None.

  • feed (list|dict) – This parameter represents the input variables of the model. If it is single card training, the feed is dict type, and if it is multi-card training, the parameter feed can be dict or list type variable. If the parameter type is dict, the data in the feed will be split and sent to multiple devices (CPU/GPU), that is to say, the input data will be evenly sent to different devices, so you should make sure the number of samples of the current mini-batch must be greater than the number of places; if the parameter type is list, those data are copied directly to each device, so the length of this list should be equal to the number of places. The default is None.

  • feed_dict – Alias for feed parameter, for backward compatibility. This parameter has been deprecated. Default None.

  • return_numpy (bool) – This parameter indicates whether convert the fetched variables (the variable specified in the fetch list) to numpy.ndarray. if it is False, the type of the return value is a list of LoDTensor. The default is True.

Returns

The fetched result list.

Return type

List

Raises

ValueError – If the feed is a list, but its length is not equal the length of active places, or its element’s is not dict.

Notes

  1. If the feed parameter is dict type, the input data will be evenly distributed to different cards. For example, using two GPUs to run the model, the input sample number is 3, that is, [0, 1, 2], the sample number on GPU0 is 1, that is, [0], and the sample number on GPU1 is 2, that is, [1, 2]. If the number of samples is less than the number of devices, the program will throw an exception, so when running the model, you should make sure that the number of samples of the last batch of the data set should be greater than the number of CPU cores or GPU cards, if it is less than, it is recommended that the batch be discarded.

  2. If the number of CPU cores or GPU cards available is greater than 1, the fetch results are spliced together in dimension 0 for the same variable values (variables in fetch_list) on different devices.

Examples

import paddle.fluid as fluid
import numpy
import os

use_cuda = True
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()

# NOTE: If you use CPU to run the program, you need
# to specify the CPU_NUM, otherwise, fluid will use
# all the number of the logic core as the CPU_NUM,
# in that case, the batch size of the input should be
# greater than CPU_NUM, if not, the process will be
# failed by an exception.
if not use_cuda:
    os.environ['CPU_NUM'] = str(2)

exe = fluid.Executor(place)

train_program = fluid.Program()
startup_program = fluid.Program()
with fluid.program_guard(train_program, startup_program):
    data = fluid.data(name='X', shape=[None, 1], dtype='float32')
    hidden = fluid.layers.fc(input=data, size=10)
    loss = fluid.layers.mean(hidden)
    fluid.optimizer.SGD(learning_rate=0.01).minimize(loss)

exe.run(startup_program)

train_exe = fluid.ParallelExecutor(use_cuda=use_cuda,
                                   main_program=train_program,
                                   loss_name=loss.name)

# If the feed is a dict:
# the image will be split into devices. If there is two devices
# each device will process an image with shape (5, 1)
x = numpy.random.random(size=(10, 1)).astype('float32')
loss_data, = train_exe.run(feed={"X": x},
                           fetch_list=[loss.name])

# If the feed is a list:
# each device will process each element in the list.
# the 1st device will process an image with shape (10, 1)
# the 2nd device will process an image with shape (9, 1)
#
# you can use exe.device_count to get the device number.
x2 = numpy.random.random(size=(9, 1)).astype('float32')
loss_data, = train_exe.run(feed=[{"X": x}, {"X": x2}],
                           fetch_list=[loss.name])
drop_local_exe_scopes()

Drop the local execution scopes immediately. In order to avoid frequently application and release of temporary variables, the strategy adopted by ParallelExecutor is to drop the local execution scopes after several iterations. ParallelExecutor provides the num_iteration_per_drop_scope option in fluid.ExecutionStrategy, which indicates how many iterations are intervened to drop the local execution scopes. If the num_iteration_per_drop_scope value is 100, but you want to drop the local execution scopes after 50 iterations, you can call the interface manually.

Returns

None

Examples

import paddle.fluid as fluid
import numpy
import os

use_cuda = True
# NOTE: If you use CPU to run the program, you need
# to specify the CPU_NUM, otherwise, fluid will use
# all the number of the logic core as the CPU_NUM,
# in that case, the batch size of the input should be
# greater than CPU_NUM, if not, the process will be
# failed by an exception.
if not use_cuda:
    os.environ['CPU_NUM'] = str(2)

train_program = fluid.Program()
startup_program = fluid.Program()
with fluid.program_guard(train_program, startup_program):
    data = fluid.data(name='X', shape=[None, 1], dtype='float32')
    hidden = fluid.layers.fc(input=data, size=10)
    loss = fluid.layers.mean(hidden)

place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup_program)

parallel_exe = fluid.ParallelExecutor(use_cuda=use_cuda,
                                   main_program=train_program,
                                   loss_name=loss.name)

x = numpy.random.random(size=(10, 1)).astype('float32')
loss_data, = parallel_exe.run(feed={"X": x},
                           fetch_list=[loss.name])

parallel_exe.drop_local_exe_scopes()