PipelineOptimizer

class paddle.fluid.optimizer.PipelineOptimizer(optimizer, cut_list=None, place_list=None, concurrency_list=None, queue_size=30, sync_steps=1, start_cpu_core_id=0)[source]

Pipeline Optimizer

Train with pipeline mode. The program will be splited by cut_list.

If the len of cut_list is k, then the whole program (including backward part) will be splited to 2*k-1 sections.

So the length of place_list and concurrency_list must be also 2*k-1.

Note: Though the asynchronous mode is applied in pipeline training to speed up, the final performance depends on the training progress of each pipeline heavily.

And we will try the synchronous mode in the future.

Parameters
  • optimizer (Optimizer) – The based optimizer, such as SGD.

  • cut_list (list of Variable list) – The cut variable of the main_program.

  • place_list (list of Place) – The place where the section will run on.

  • concurrency_list (list of int) – The concurrency degree.

  • queue_size (int) – Each section will consume scopes from its in-scope queue and produce scopes to out-scope queue. And this parameter specify the scope queue size. [Optional. Default: 30].

  • sync_steps (int) – The synchronization steps between different cards. [Optional. Default: 1].

  • start_cpu_core_id (int) – specify the first cpu core id. [Optional. Default:0].

Examples

import paddle.fluid as fluid
import paddle.fluid.layers as layers

x = fluid.layers.data(name='x', shape=[1], dtype='int64', lod_level=0)
y = fluid.layers.data(name='y', shape=[1], dtype='int64', lod_level=0)
emb_x = layers.embedding(input=x, param_attr=fluid.ParamAttr(name="embx"), size=[10,2], is_sparse=False)
emb_y = layers.embedding(input=y, param_attr=fluid.ParamAttr(name="emby",learning_rate=0.9), size=[10,2], is_sparse=False)
concat = layers.concat([emb_x, emb_y], axis=1)
fc = layers.fc(input=concat, name="fc", size=1, num_flatten_dims=1, bias_attr=False)
loss = layers.reduce_mean(fc)
optimizer = fluid.optimizer.SGD(learning_rate=0.5)
optimizer = fluid.optimizer.PipelineOptimizer(optimizer,
        cut_list=[[emb_x, emb_y], [loss]],
        place_list=[fluid.CPUPlace(), fluid.CUDAPlace(0), fluid.CPUPlace()],
        concurrency_list=[1, 1, 4],
        queue_size=2,
        sync_steps=1,
        )
optimizer.minimize(loss)
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
filelist = [] # you should set your own filelist, e.g. filelist = ["dataA.txt"]
dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset")
dataset.set_use_var([x,y])
dataset.set_batch_size(batch_size)
dataset.set_filelist(filelist)
exe.train_from_dataset(
            fluid.default_main_program(),
            dataset,
            thread=2,
            debug=False,
            fetch_list=[],
            fetch_info=[],
            print_period=1)