fluid.transpiler

DistributeTranspiler

class paddle.fluid.transpiler.DistributeTranspiler(config=None)[source]

DistributeTranspiler

Convert the fluid program to distributed data-parallelism programs. Supports two modes: parameter server(pserver) mode and nccl2 mode.

In pserver mode, the main_program will be transformed to use a remote parameter server to do parameter optimization. And the optimization graph will be put into a parameter server program.

In nccl2 mode, the transpiler will append a NCCL_ID broadcasting op in startup_program to share the NCCL_ID across the job nodes. After transpile_nccl2 called, you *must* pass trainer_id and num_trainers argument to ParallelExecutor to enable NCCL2 distributed mode.

Examples

x = fluid.data(name='x', shape=[13], dtype='float32')
y = fluid.data(name='y', shape=[1], dtype='float32')
y_predict = fluid.layers.fc(input=x, size=1, act=None)

cost = fluid.layers.square_error_cost(input=y_predict, label=y)
avg_loss = fluid.layers.mean(cost)

sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001)
sgd_optimizer.minimize(avg_loss)

# for pserver mode
pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
trainer_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
current_endpoint = "192.168.0.1:6174"
trainer_id = 0
trainers = 4
role = "PSERVER"
t = fluid.DistributeTranspiler()
t.transpile(
     trainer_id, pservers=pserver_endpoints, trainers=trainers)
if role == "PSERVER":
     pserver_program = t.get_pserver_program(current_endpoint)
     pserver_startup_program = t.get_startup_program(current_endpoint,
                                                    pserver_program)
elif role == "TRAINER":
     trainer_program = t.get_trainer_program()

# for nccl2 mode
trainer_num = 2
trainer_id = 0
config = fluid.DistributeTranspilerConfig()
config.mode = "nccl2"
trainer_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
t = fluid.DistributeTranspiler(config=config)
t.transpile(trainer_id=trainer_id, trainers=trainer_endpoints, current_endpoint="192.168.0.1:6174")
exe = fluid.ParallelExecutor(
    use_cuda=True,
    loss_name=avg_loss.name,
    num_trainers=trainer_num,
    trainer_id=trainer_id
)
transpile(trainer_id, program=None, pservers='127.0.0.1:6174', trainers=1, sync_mode=True, startup_program=None, current_endpoint='127.0.0.1:6174')

Transpile the input program to distributed programs with config and arguments.

Parameters
  • trainer_id (int) – id for current trainer worker, if you have n workers, the id may range from 0 ~ n-1

  • program (Program|None) – program to transpile, default is fluid.default_main_program().

  • startup_program (Program|None) – startup_program to transpile, default is fluid.default_startup_program().

  • pservers (str) – comma separated ip:port string for the pserver list.

  • trainers (int|str) – in pserver mode this is the number of trainers, in nccl2 mode this is a string of trainer endpoints.

  • sync_mode (bool) – Do sync training or not, default is True.

  • startup_program – startup_program to transpile, default is fluid.default_main_program().

  • current_endpoint (str) – need pass current endpoint when transpile as nccl2 distributed mode. In pserver mode this argument is not used.

Examples

transpiler = fluid.DistributeTranspiler()
t.transpile(
    trainer_id=0,
    pservers="127.0.0.1:7000,127.0.0.1:7001",
    trainers=2,
    sync_mode=False,
    current_endpoint="127.0.0.1:7000")
get_trainer_program(wait_port=True)

Get transpiled trainer side program. The program on trainer side compared with origin program has following difference:

  • Delete optimizer related op, because parameter updated on Pserver

  • After the op which computed gradient of each parameter, add Send_op and Recv_op

Parameters
  • wait_port (bool) – Whether to wait for the parameter server to be ready before returning to program,

  • is True (default) –

Returns

trainer side program.

Return type

Program

Examples

import paddle.fluid as fluid
#this is an example, find available endpoints in your case
pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
trainer_id = 0
trainers = 4
t = fluid.DistributeTranspiler()
t.transpile(trainer_id, trainers=trainers, pservers=pserver_endpoints)
trainer_program = t.get_trainer_program()
get_pserver_program(endpoint)

Get parameter server side program.The program on pserver side compared with origin program has following difference:

  • Only the following op is included: optimize-related op and communication-related op

  • NO.0 block only has variable definitions and listen_and_serv_op

  • Every variable which need to be updated has a unique block

Parameters

endpoint (str) – current parameter server endpoint.

Returns

the program for current parameter server to run.

Return type

Program

Examples

import paddle.fluid as fluid
#this is an example, find available endpoints in your case
pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
current_endpoint = "192.168.0.1:6174"
trainer_id = 0
trainers = 4
t = fluid.DistributeTranspiler()
t.transpile(
     trainer_id, pservers=pserver_endpoints, trainers=trainers)
pserver_program = t.get_pserver_program(current_endpoint)
get_pserver_programs(endpoint)

Get pserver side main program and startup program for distributed training. The main_program returned by this function is consistent with the return value of the function get_pserver_program .

Parameters

endpoint (str) – current pserver endpoint.

Returns

(main_program, startup_program), of type “Program”

Return type

tuple

Examples

import paddle.fluid as fluid
#this is an example, find available endpoints in your case
pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
current_endpoint = "192.168.0.1:6174"
trainer_id = 0
trainers = 4
t = fluid.DistributeTranspiler()
t.transpile(
     trainer_id, pservers=pserver_endpoints, trainers=trainers)
pserver_program, pserver_startup_program = t.get_pserver_programs(current_endpoint)
get_startup_program(endpoint, pserver_program=None, startup_program=None)

Deprecated

Get startup program for current parameter server. Modify operator input variables if there are variables that were split to several blocks.

Parameters
  • endpoint (str) – current pserver endpoint.

  • pserver_program (Program) – deprecated, call get_pserver_program first.

  • startup_program (Program) – deprecated, should pass startup_program when initalizing

Returns

parameter server side startup program.

Return type

Program

Examples

pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
trainer_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
current_endpoint = "192.168.0.1:6174"
trainer_id = 0
trainers = 4

t = fluid.DistributeTranspiler()
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
pserver_program = t.get_pserver_program(current_endpoint)
pserver_startup_program = t.get_startup_program(current_endpoint,
                                                pserver_program)

DistributeTranspilerConfig

class paddle.fluid.transpiler.DistributeTranspilerConfig[source]

A configuration class that provide support for transpiler distributed jobs. Some important parameters are explained as follows:

slice_var_up(bool)

Whether to do Tensor slice for parameter servers, default is True.

split_method(PSDispatcher)

Methods of dispatching parameters for server, RoundRobin or HashName can be used and default is RoundRobin. Try to choose the best method to balance loads for parameter servers.

min_block_size(int)

Minimum number of splitted elements in block, default is 8192.

According to : https://github.com/PaddlePaddle/Paddle/issues/8638#issuecomment-369912156 We can use bandwidth effiently when data size is larger than 2MB.If you want to change it, please be sure you have read the slice_variable function. You can find the definition of slice_variable in https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/fluid/transpiler/distribute_transpiler.py .

Examples

from paddle.fluid.transpiler.ps_dispatcher import RoundRobin
import paddle.fluid as fluid

config = fluid.DistributeTranspilerConfig()
config.slice_var_up = True
config.split_method = RoundRobin
config.min_block_size = 81920

HashName

class paddle.fluid.transpiler.HashName(pserver_endpoints)[source]

Hash variable names to several endpoints using python “hash()” function.

Parameters

pserver_endpoints (list) – list of endpoint(ip:port).

Examples


pserver_endpoints = [“127.0.0.1:6007”, “127.0.0.1:6008”] vars = [“var1”,”var2”,”var3”,”var4”,”var5”]

rr = RoundRobin(pserver_endpoints) rr.dispatch(vars)

dispatch(varlist)

use HashName method to dispatch variables with each parameter server. :param varlist: a list of Variables :type varlist: list

reset()

reset the step counter, set it zero.

memory_optimize

paddle.fluid.transpiler.memory_optimize(input_program, skip_opt_set=None, print_log=False, level=0, skip_grads=True)[source]

This API is deprecated since 1.6. Please do not use it. The better memory optimization strategies are enabled by default.

release_memory

paddle.fluid.transpiler.release_memory(input_program, skip_opt_set=None)[source]

This API is deprecated since 1.6. Please do not use it. The better memory optimization strategies are enabled by default.

RoundRobin

class paddle.fluid.transpiler.RoundRobin(pserver_endpoints)[source]

Distribute variables to serveral endpoints using RondRobin<https://en.wikipedia.org/wiki/Round-robin_scheduling> method.

Parameters

pserver_endpoints (list) – list of endpoint(ip:port).

Examples


pserver_endpoints = [“127.0.0.1:6007”, “127.0.0.1:6008”] vars = [“var1”,”var2”,”var3”,”var4”,”var5”]

rr = RoundRobin(pserver_endpoints) rr.dispatch(vars)

dispatch(varlist)

use RoundRobin method to dispatch variables with each parameter server. :param varlist: a list of Variables :type varlist: list

reset()

reset the step counter, set it zero.