DataLoader

class paddle.fluid.io.DataLoader(dataset, feed_list=None, places=None, return_list=False, batch_sampler=None, batch_size=1, shuffle=False, drop_last=False, collate_fn=None, num_workers=0, use_buffer_reader=True, use_shared_memory=True, timeout=0, worker_init_fn=None)[source]

DataLoader prodives an iterator which iterates given dataset once by the batch_sampler.

DataLoader supports single-process and multi-prcess data loading, multi-process workers will be used to load data asynchronously if num_workers is set as a positive number.

DataLoader only supports map-style dataset(can get a sample from dataset with a given index) currently, for a map-style dataset, please see paddle.io.Dataset.

batch_sampler please see paddle.io.BatchSampler

Parameters
  • dataset (Dataset) – the dataset to load data from, should be an instance of subclass of paddle.io.Dataset.

  • feed_list (list(Variable)|tuple(Variable)) – feed variable list. The variables should be created by fluid.data(). feed_list must be set if return_list is False. Default None.

  • places (list(Place)|tuple(Place)) – a list of Place, to put data onto, places must be set in both static graph and dynamic graph mode, in dynamic graph mode, place number must be 1. Default None.

  • return_list (bool) – whether the return value on each device is presented as a list. If return_list=False, the return value on each device would be a dict of str -> LoDTensor, where the key of the dict is the name of each fed variables. If return_list=True, the return value on each device would be a list(LoDTensor). return_list can only be True in dynamic graph mode. Default False.

  • batch_sampler (BatchSampler) – an instance of paddle.io.BatchSampler to generate batch indices to draw samples from dataset and combine a batch. Default None.

  • batch_size (int) – sample number in a mini-batch, a substitution parameter for batch_sampler, if batch_sampler is not set, a default paddle.io.BatchSampler will be used and initialize by batch_size, shuffle and drop_last. Default 1.

  • shuffle (bool) – whther to shuffle indices order before genrate batch indices, a substitution parameter for batch_sampler see batch_size. Default False.

  • drop_last (bool) – whether drop the last incomplete batch dataset size is not divisible by the batch size, a substitution parameter for batch_sampler, see batch_size. Default False

  • collate_fn (callable) – function to generate mini-batch data by merging the sample list, None for only stack each fields of sample in axis 0(same as :attr::np.stack(…, axis=0)). Default None

  • num_workers (int) – the number of subprocess to load data, 0 for no subprocess used and loading data in main process. Default 0

  • use_buffer_reader (bool) – whether to use bufferred reader. If use_buffer_reader=True, the DataLoader would prefetch next batch data asynchronously, so it would speed up data feeding and occupies a little more CPU or GPU memory, i.e., the memory of one batch input data. Default True.

  • use_shared_memory (bool) – whether to use shared memory to speed up putting data into inter-process queue, set use_shared_memory as True only when the shared memory space on your machine(e.g. space of ‘/dev/shm’ on Linux operating sysytem) is large enough. Shared memory will only be enabled in multi-process mode(num_workers > 0). Default True.

  • timeout (int) – the timeout value for getting data form output queue of subprocesses. Default 0.

  • worker_init_fn (callable) – init function which will be called with worker id on each subproces starting if not set as None. Default None.

Returns

an iterable object for data iterating

Return type

DataLoader

Examples

import numpy as np
import paddle.fluid as fluid
from paddle.io import Dataset, BatchSampler, DataLoader

BATCH_NUM = 20
BATCH_SIZE = 16
EPOCH_NUM = 4

IMAGE_SIZE = 784
CLASS_NUM = 10

USE_GPU = False # whether use GPU to run model

# define a random dataset
class RandomDataset(Dataset):
    def __init__(self, num_samples):
        self.num_samples = num_samples

    def __getitem__(self, idx):
        image = np.random.random([IMAGE_SIZE]).astype('float32')
        label = np.random.randint(0, CLASS_NUM - 1, (1, )).astype('int64')
        return image, label

    def __len__(self):
        return self.num_samples

# get places
places = fluid.cuda_places() if USE_GPU else fluid.cpu_places()

# -------------------- static graph ---------------------

def simple_net(image, label):
    fc_tmp = fluid.layers.fc(image, size=CLASS_NUM, act='softmax')
    cross_entropy = fluid.layers.softmax_with_cross_entropy(image, label)
    loss = fluid.layers.reduce_mean(cross_entropy)
    sgd = fluid.optimizer.SGD(learning_rate=1e-3)
    sgd.minimize(loss)
    return loss

image = fluid.data(name='image', shape=[None, IMAGE_SIZE], dtype='float32')
label = fluid.data(name='label', shape=[None, 1], dtype='int64')

loss = simple_net(image, label)

exe = fluid.Executor(places[0])
exe.run(fluid.default_startup_program())

prog = fluid.CompiledProgram(fluid.default_main_program()).with_data_parallel(loss_name=loss.name)

dataset = RandomDataset(BATCH_NUM * BATCH_SIZE)

loader = DataLoader(dataset,
                    feed_list=[image, label],
                    places=places,
                    batch_size=BATCH_SIZE,
                    shuffle=True,
                    drop_last=True,
                    num_workers=2)

for e in range(EPOCH_NUM):
    for i, data in enumerate(loader()):
        l = exe.run(prog, feed=data, fetch_list=[loss], return_numpy=True)
        print("Epoch {} batch {}: loss = {}".format(e, i, l[0][0]))

# -------------------------------------------------------

# --------------------- dygraph mode --------------------

class SimpleNet(fluid.dygraph.Layer):
    def __init__(self):
        super(SimpleNet, self).__init__()
        self.fc = fluid.dygraph.nn.Linear(IMAGE_SIZE, CLASS_NUM, act='softmax')

    def forward(self, image, label=None):
        return self.fc(image)

with fluid.dygraph.guard(places[0]):
    simple_net = SimpleNet()
    opt = fluid.optimizer.SGD(learning_rate=1e-3,
                              parameter_list=simple_net.parameters())

    loader = DataLoader(dataset,
                        places=places[0],
                        batch_size=BATCH_SIZE,
                        shuffle=True,
                        drop_last=True,
                        num_workers=2)

    for e in range(EPOCH_NUM):
        for i, (image, label) in enumerate(loader()):
            out = simple_net(image)
            loss = fluid.layers.cross_entropy(out, label)
            avg_loss = fluid.layers.reduce_mean(loss)
            avg_loss.backward()
            opt.minimize(avg_loss)
            simple_net.clear_gradients()
            print("Epoch {} batch {}: loss = {}".format(e, i, np.mean(loss.numpy())))

# -------------------------------------------------------
static from_generator(feed_list=None, capacity=None, use_double_buffer=True, iterable=True, return_list=False, use_multiprocess=False, drop_last=True)

Note

The framework ensures that the data loading order of DataLoader is exactly the same as the user-defined data source.

Create a DataLoader object for loading data from Python generator. Data would be prefetched using Python thread and be pushed into a queue asynchronously.

The created DataLoader object provides 3 methods to set the data source set_sample_generator , set_sample_list_generator and set_batch_generator . Please see the following example codes to know their usages.

If iterable = True, the created DataLoader object is a Python generator object, which is iterable using for-range loop.

If iterable = False, the created DataLoader object provides start() and reset() method to control the data reading process. This mode is designed to be compatible with the fluid.layers.py_reader interface. Users can migrate the codes from fluid.layers.py_reader to fluid.io.DataLoader easily when using iterable=False.

Parameters
  • feed_list (list(Variable)|tuple(Variable)) – feed variable list. The variables should be created by fluid.data().

  • capacity (int) – capacity of the queue maintained in DataLoader. The unit is batch number. Set larger capacity if your reader is fast.

  • use_double_buffer (bool) – whether to use double_buffer_reader. If use_double_buffer=True, the DataLoader would prefetch next batch data asynchronously, so it would speed up data feeding and occupies a little more CPU or GPU memory, i.e., the memory of one batch input data.

  • iterable (bool) – whether the created DataLoader is iterable.

  • return_list (bool) – whether the return value on each device is presented as a list. It is only valid when iterable=True. If return_list=False, the return value on each device would be a dict of str -> LoDTensor, where the key of the dict is the name of each fed variables. If return_list=True, the return value on each device would be a list(LoDTensor). It is recommended to use return_list=False in static graph mode and use return_list=True in dygraph mode.

  • use_multiprocess (bool) – whether to use multi-process to speed up the data loading process in dygraph. Note: this parameter only can be used in the dygraph mode. In the static graph mode, whether this parameter is set or not has no effect. The Default value is False.

  • drop_last (bool) – whether to drop the last batches whose number is less than the CPU core/GPU card number. The default value is True. In training phase, users should not set drop_last=False, because all CPU cores/GPU cards must read data from DataLoader. In inference phase, users can set drop_last=False, so that the last batches whose number is less than the CPU core/GPU card number can be tested.

Returns

the created DataLoader object.

Return type

loader (DataLoader)

Examples 1:

import paddle.fluid as fluid
import numpy as np

BATCH_NUM = 10
BATCH_SIZE = 16
EPOCH_NUM = 4

CLASS_NUM = 10

ITERABLE = True # whether the created DataLoader object is iterable
USE_GPU = False # whether to use GPU

DATA_FORMAT = 'batch_generator' # data format of data source user provides

def simple_net(image, label):
    fc_tmp = fluid.layers.fc(image, size=CLASS_NUM)
    cross_entropy = fluid.layers.softmax_with_cross_entropy(image, label)
    loss = fluid.layers.reduce_mean(cross_entropy)
    sgd = fluid.optimizer.SGD(learning_rate=1e-3)
    sgd.minimize(loss)
    return loss

def get_random_images_and_labels(image_shape, label_shape):
    image = np.random.random(size=image_shape).astype('float32')
    label = np.random.random(size=label_shape).astype('int64')
    return image, label

# If the data generator yields one sample each time,
# use DataLoader.set_sample_generator to set the data source.
def sample_generator_creator():
    def __reader__():
        for _ in range(BATCH_NUM * BATCH_SIZE):
            image, label = get_random_images_and_labels([784], [1])
            yield image, label

    return __reader__

# If the data generator yield list of samples each time,
# use DataLoader.set_sample_list_generator to set the data source.
def sample_list_generator_creator():
    def __reader__():
        for _ in range(BATCH_NUM):
            sample_list = []
            for _ in range(BATCH_SIZE):
                image, label = get_random_images_and_labels([784], [1])
                sample_list.append([image, label])

            yield sample_list

    return __reader__

# If the data generator yields a batch each time,
# use DataLoader.set_batch_generator to set the data source.
def batch_generator_creator():
    def __reader__():
        for _ in range(BATCH_NUM):
            batch_image, batch_label = get_random_images_and_labels([BATCH_SIZE, 784], [BATCH_SIZE, 1])
            yield batch_image, batch_label

    return __reader__

# If DataLoader is iterable, use for loop to train the network
def train_iterable(exe, prog, loss, loader):
    for _ in range(EPOCH_NUM):
        for data in loader():
            exe.run(prog, feed=data, fetch_list=[loss])

# If DataLoader is not iterable, use start() and reset() method to control the process
def train_non_iterable(exe, prog, loss, loader):
    for _ in range(EPOCH_NUM):
        loader.start() # call DataLoader.start() before each epoch starts
        try:
            while True:
                exe.run(prog, fetch_list=[loss])
        except fluid.core.EOFException:
            loader.reset() # call DataLoader.reset() after catching EOFException

def set_data_source(loader, places):
    if DATA_FORMAT == 'sample_generator':
        loader.set_sample_generator(sample_generator_creator(), batch_size=BATCH_SIZE, drop_last=True, places=places)
    elif DATA_FORMAT == 'sample_list_generator':
        loader.set_sample_list_generator(sample_list_generator_creator(), places=places)
    elif DATA_FORMAT == 'batch_generator':
        loader.set_batch_generator(batch_generator_creator(), places=places)
    else:
        raise ValueError('Unsupported data format')

image = fluid.data(name='image', shape=[None, 784], dtype='float32')
label = fluid.data(name='label', shape=[None, 1], dtype='int64')

# Define DataLoader
loader = fluid.io.DataLoader.from_generator(feed_list=[image, label], capacity=16, iterable=ITERABLE)

# Define network
loss = simple_net(image, label)

# Set data source of DataLoader
#
# If DataLoader is iterable, places must be given and the number of places must be the same with device number.
#  - If you are using GPU, call `fluid.cuda_places()` to get all GPU places.
#  - If you are using CPU, call `fluid.cpu_places()` to get all CPU places.
#
# If DataLoader is not iterable, places can be None.
places = fluid.cuda_places() if USE_GPU else fluid.cpu_places()
set_data_source(loader, places)

exe = fluid.Executor(places[0])
exe.run(fluid.default_startup_program())

prog = fluid.CompiledProgram(fluid.default_main_program()).with_data_parallel(loss_name=loss.name)

if loader.iterable:
    train_iterable(exe, prog, loss, loader)
else:
    train_non_iterable(exe, prog, loss, loader)


'''
Users can use return_list = True in dygraph mode.
'''
with fluid.dygraph.guard(places[0]):
    loader = fluid.io.DataLoader.from_generator(capacity=2, return_list=True)
    set_data_source(loader, places[0])
    for image, label in loader():
        relu = fluid.layers.relu(image)
        assert image.shape == [BATCH_SIZE, 784]
        assert label.shape == [BATCH_SIZE, 1]
        assert relu.shape == [BATCH_SIZE, 784]

Examples 2:

import paddle.fluid as fluid
import numpy as np
import os

# We use 2 CPU cores to run inference network
os.environ['CPU_NUM'] = '2'

# The data source has only 3 batches, which can not be
# divided evenly to each CPU core
def batch_generator():
    for i in range(3):
        yield np.array([i+1]).astype('float32'),

x = fluid.data(name='x', shape=[None], dtype='float32')
y = x * x

def run_inference(drop_last):
    loader = fluid.io.DataLoader.from_generator(feed_list=[x],
            capacity=8, drop_last=drop_last)
    loader.set_batch_generator(batch_generator, fluid.cpu_places())

    exe = fluid.Executor(fluid.CPUPlace())
    prog = fluid.CompiledProgram(fluid.default_main_program())
    prog = prog.with_data_parallel()

    result = []
    for data in loader():
        each_ret, = exe.run(prog, feed=data, fetch_list=[y])
        result.extend(each_ret)
    return result

# Set drop_last to True, so that the last batch whose
# number is less than CPU core number would be discarded.
print(run_inference(drop_last=True)) # [1.0, 4.0]

# Set drop_last to False, so that the last batch whose
# number is less than CPU core number can be tested.
print(run_inference(drop_last=False)) # [1.0, 4.0, 9.0]
static from_dataset(dataset, places, drop_last=True)

Create an iterable DataLoader object for loading data from Dataset. Dataset is only supported in Linux system currently.

Parameters
  • dataset (InMemoryDataset|QueueDataset) – the dataset object.

  • places (list(CUDAPlace)|list(CPUPlace)) – places where the result data should be converted.

  • drop_last (bool) – whether to drop the last batch whose sample number is less than batch size. If drop_last = True, they would be dropped. If drop_last = False, they would be kept.

Returns

the created DataLoader object, which can be

treated as a Python generator.

Return type

loader (DataLoader)

Examples

import paddle.fluid as fluid

image = fluid.data(name='image', shape=[None, 784], dtype='float32')
label = fluid.data(name='label', shape=[None, 1], dtype='int64')

dataset = fluid.DatasetFactory().create_dataset("QueueDataset")
dataset.set_batch_size(32)
dataset.set_filelist(['a.txt', 'b.txt', 'c.txt'])
dataset.set_use_var([image, label])
dataset.set_pipe_command('cat')

loader = fluid.io.DataLoader.from_dataset(dataset, fluid.cpu_places())