InMemoryDataset

class paddle.distributed.fleet.dataset. InMemoryDataset [source]
Api_attr

Static Graph

It will load data into memory and shuffle data before training.

Examples

import paddle
paddle.enable_static()
dataset = paddle.distributed.InMemoryDataset()
update_settings ( **kwargs )
Api_attr

Static Graph

should be called in user’s python scripts to update setings of dataset instance.

Parameters
  • kwargs – Keyword arguments. Currently, we support following keys in **kwargs, including single node settings and advanced distributed related settings:

  • batch_size (int) – batch size. It will be effective during training. default is 1.

  • thread_num (int) – thread num, it is the num of readers. default is 1.

  • use_var (list) – list of variables. Variables which you will use. default is [].

  • input_type (int) – the input type of generated input. 0 is for one sample, 1 is for one batch. defalut is 0.

  • fs_name (str) – fs name. default is “”.

  • fs_ugi (str) – fs ugi. default is “”.

  • pipe_command (str) – pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default is “cat”

  • download_cmd (str) – customized download command. default is “cat”

  • data_feed_type (str) – data feed type used in c++ code. default is “MultiSlotInMemoryDataFeed”.

  • queue_num (int) – Dataset output queue num, training threads get data from queues. default is-1, which is set same as thread number in c++.

  • merge_size (int) – ins size to merge, if merge_size > 0, set merge by line id, instances of same line id will be merged after shuffle, you should parse line id in data generator. default is -1.

  • parse_ins_id (bool) – Set if Dataset need to parse ins_id. default is False.

  • parse_content (bool) – Set if Dataset need to parse content. default is False.

  • fleet_send_batch_size (int) – Set fleet send batch size in one rpc, default is 1024

  • fleet_send_sleep_seconds (int) – Set fleet send sleep time, default is 0

  • fea_eval (bool) – Set if Dataset need to do feature importance evaluation using slots shuffle. default is False.

  • candidate_size (int) – if fea_eval is set True, set the candidate size used in slots shuffle.

Examples

import paddle
paddle.enable_static()

dataset = paddle.distributed.InMemoryDataset()
dataset.init(
    batch_size=1,
    thread_num=2,
    input_type=1,
    pipe_command="cat",
    use_var=[])
dataset._init_distributed_settings(
    parse_ins_id=True,
    parse_content=True,
    fea_eval=True,
    candidate_size=10000)
dataset.update_settings(batch_size=2)
init ( **kwargs )
Api_attr

Static Graph

should be called only once in user’s python scripts to initialize setings of dataset instance

Parameters
  • kwargs – Keyword arguments. Currently, we support following keys in **kwargs:

  • batch_size (int) – batch size. It will be effective during training. default is 1.

  • thread_num (int) – thread num, it is the num of readers. default is 1.

  • use_var (list) – list of variables. Variables which you will use. default is [].

  • input_type (int) – the input type of generated input. 0 is for one sample, 1 is for one batch. defalut is 0.

  • fs_name (str) – fs name. default is “”.

  • fs_ugi (str) – fs ugi. default is “”.

  • pipe_command (str) – pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default is “cat”

  • download_cmd (str) – customized download command. default is “cat”

  • data_feed_type (str) – data feed type used in c++ code. default is “MultiSlotInMemoryDataFeed”.

  • queue_num (int) – Dataset output queue num, training threads get data from queues. default is -1, which is set same as thread number in c++.

Examples

import paddle
import os
paddle.enable_static()

with open("test_queue_dataset_run_a.txt", "w") as f:
    data = "2 1 2 2 5 4 2 2 7 2 1 3"
    f.write(data)
with open("test_queue_dataset_run_b.txt", "w") as f:
    data = "2 1 2 2 5 4 2 2 7 2 1 3"
    f.write(data)

slots = ["slot1", "slot2", "slot3", "slot4"]
slots_vars = []
for slot in slots:
    var = paddle.static.data(
        name=slot, shape=[None, 1], dtype="int64", lod_level=1)
    slots_vars.append(var)

dataset = paddle.distributed.InMemoryDataset()
dataset.init(
    batch_size=1,
    thread_num=2,
    input_type=1,
    pipe_command="cat",
    use_var=slots_vars)
dataset.set_filelist(
    ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"])
dataset.load_into_memory()

place = paddle.CPUPlace()
exe = paddle.static.Executor(place)
startup_program = paddle.static.Program()
main_program = paddle.static.Program()
exe.run(startup_program)

exe.train_from_dataset(main_program, dataset)

os.remove("./test_queue_dataset_run_a.txt")
os.remove("./test_queue_dataset_run_b.txt")
load_into_memory ( )
Api_attr

Static Graph

Load data into memory

Examples

import paddle
paddle.enable_static()

dataset = paddle.distributed.InMemoryDataset()
slots = ["slot1", "slot2", "slot3", "slot4"]
slots_vars = []
for slot in slots:
    var = paddle.static.data(
        name=slot, shape=[None, 1], dtype="int64", lod_level=1)
    slots_vars.append(var)
dataset.init(
    batch_size=1,
    thread_num=2,
    input_type=1,
    pipe_command="cat",
    use_var=slots_vars)
filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist)
dataset.load_into_memory()
preload_into_memory ( thread_num=None )
Api_attr

Static Graph

Load data into memory in async mode

Parameters

thread_num (int) – preload thread num

Examples

import paddle
paddle.enable_static()

dataset = paddle.distributed.InMemoryDataset()
slots = ["slot1", "slot2", "slot3", "slot4"]
slots_vars = []
for slot in slots:
    var = paddle.static.data(
        name=slot, shape=[None, 1], dtype="int64", lod_level=1)
    slots_vars.append(var)
dataset.init(
    batch_size=1,
    thread_num=2,
    input_type=1,
    pipe_command="cat",
    use_var=slots_vars)
filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist)
dataset.preload_into_memory()
dataset.wait_preload_done()
wait_preload_done ( )
Api_attr

Static Graph

Wait preload_into_memory done

Examples

import paddle
paddle.enable_static()

dataset = paddle.distributed.InMemoryDataset()
slots = ["slot1", "slot2", "slot3", "slot4"]
slots_vars = []
for slot in slots:
    var = paddle.static.data(
        name=slot, shape=[None, 1], dtype="int64", lod_level=1)
    slots_vars.append(var)
dataset.init(
    batch_size=1,
    thread_num=2,
    input_type=1,
    pipe_command="cat",
    use_var=slots_vars)
filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist)
dataset.preload_into_memory()
dataset.wait_preload_done()
local_shuffle ( )
Api_attr

Static Graph

Local shuffle

Examples

import paddle
paddle.enable_static()

dataset = paddle.distributed.InMemoryDataset()
slots = ["slot1", "slot2", "slot3", "slot4"]
slots_vars = []
for slot in slots:
    var = paddle.static.data(
        name=slot, shape=[None, 1], dtype="int64", lod_level=1)
    slots_vars.append(var)
dataset.init(
    batch_size=1,
    thread_num=2,
    input_type=1,
    pipe_command="cat",
    use_var=slots_vars)
filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist)
dataset.load_into_memory()
dataset.local_shuffle()
global_shuffle ( fleet=None, thread_num=12 )
Api_attr

Static Graph

Global shuffle. Global shuffle can be used only in distributed mode. i.e. multiple processes on single machine or multiple machines training together. If you run in distributed mode, you should pass fleet instead of None.

Examples

import paddle
paddle.enable_static()

dataset = paddle.distributed.InMemoryDataset()
slots = ["slot1", "slot2", "slot3", "slot4"]
slots_vars = []
for slot in slots:
    var = paddle.static.data(
        name=slot, shape=[None, 1], dtype="int64", lod_level=1)
    slots_vars.append(var)
dataset.init(
    batch_size=1,
    thread_num=2,
    input_type=1,
    pipe_command="cat",
    use_var=slots_vars)
filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist)
dataset.load_into_memory()
dataset.global_shuffle()
Parameters
  • fleet (Fleet) – fleet singleton. Default None.

  • thread_num (int) – shuffle thread num. Default is 12.

release_memory ( )
Api_attr

Static Graph

Release InMemoryDataset memory data, when data will not be used again.

Examples

import paddle
paddle.enable_static()

dataset = paddle.distributed.InMemoryDataset()
slots = ["slot1", "slot2", "slot3", "slot4"]
slots_vars = []
for slot in slots:
    var = paddle.static.data(
        name=slot, shape=[None, 1], dtype="int64", lod_level=1)
    slots_vars.append(var)
dataset.init(
    batch_size=1,
    thread_num=2,
    input_type=1,
    pipe_command="cat",
    use_var=slots_vars)
filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist)
dataset.load_into_memory()
dataset.global_shuffle()
exe = paddle.static.Executor(paddle.CPUPlace())
startup_program = paddle.static.Program()
main_program = paddle.static.Program()
exe.run(startup_program)
exe.train_from_dataset(main_program, dataset)
dataset.release_memory()
get_memory_data_size ( fleet=None )
Api_attr

Static Graph

Get memory data size, user can call this function to know the num of ins in all workers after load into memory.

Note

This function may cause bad performance, because it has barrier

Parameters

fleet (Fleet) – Fleet Object.

Returns

The size of memory data.

Examples

import paddle
paddle.enable_static()

dataset = paddle.distributed.InMemoryDataset()
slots = ["slot1", "slot2", "slot3", "slot4"]
slots_vars = []
for slot in slots:
    var = paddle.static.data(
        name=slot, shape=[None, 1], dtype="int64", lod_level=1)
    slots_vars.append(var)
dataset.init(
    batch_size=1,
    thread_num=2,
    input_type=1,
    pipe_command="cat",
    use_var=slots_vars)
filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist)
dataset.load_into_memory()
print dataset.get_memory_data_size()
get_shuffle_data_size ( fleet=None )
Api_attr

Static Graph

Get shuffle data size, user can call this function to know the num of ins in all workers after local/global shuffle.

Note

This function may cause bad performance to local shuffle, because it has barrier. It does not affect global shuffle.

Parameters

fleet (Fleet) – Fleet Object.

Returns

The size of shuffle data.

Examples

import paddle
paddle.enable_static()

dataset = paddle.distributed.InMemoryDataset()
dataset = paddle.distributed.InMemoryDataset()
slots = ["slot1", "slot2", "slot3", "slot4"]
slots_vars = []
for slot in slots:
    var = paddle.static.data(
        name=slot, shape=[None, 1], dtype="int64", lod_level=1)
    slots_vars.append(var)
dataset.init(
    batch_size=1,
    thread_num=2,
    input_type=1,
    pipe_command="cat",
    use_var=slots_vars)
filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist)
dataset.load_into_memory()
dataset.global_shuffle()
print dataset.get_shuffle_data_size()
slots_shuffle ( slots )

Slots Shuffle Slots Shuffle is a shuffle method in slots level, which is usually used in sparse feature with large scale of instances. To compare the metric, i.e. auc while doing slots shuffle on one or several slots with baseline to evaluate the importance level of slots(features).

Parameters

slots (list[string]) – the set of slots(string) to do slots shuffle.

Examples

import paddle
paddle.enable_static()

dataset = paddle.distributed.InMemoryDataset()
dataset._init_distributed_settings(fea_eval=True)
slots = ["slot1", "slot2", "slot3", "slot4"]
slots_vars = []
for slot in slots:
    var = paddle.static.data(
        name=slot, shape=[None, 1], dtype="int64", lod_level=1)
    slots_vars.append(var)
dataset.init(
    batch_size=1,
    thread_num=2,
    input_type=1,
    pipe_command="cat",
    use_var=slots_vars)
filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist)
dataset.load_into_memory()
dataset.slots_shuffle(['slot1'])
set_filelist ( filelist )

Set file list in current worker. The filelist is indicated by a list of file names (string).

Examples

import paddle
dataset = paddle.distributed.fleet.DatasetBase()
dataset.set_filelist(['a.txt', 'b.txt'])
Parameters

filelist (list[str]) – list of file names of inputs.