Asynchronous Data Reading

Besides synchronous data reading, we provide DataLoader. The performance of DataLoader is better than Take Numpy Array as Training Data , because data reading and model training process is asynchronous when DataLoader is in use, and it can cooperate with double_buffer_reader to improve the performance of reading data. What’s more, double_buffer_reader can achieve the asynchronous transformation from CPU Tensor to GPU Tensor, which improves the efficiency of reading data to some extent.

Create DataLoader Object

You can create DataLoader object as follows:

import paddle.fluid as fluid

image = fluid.data(name='image', dtype='float32', shape=[None, 784])
label = fluid.data(name='label', dtype='int64', shape=[None, 1])

ITERABLE = True

data_loader = fluid.io.DataLoader.from_generator(
    feed_list=[image, label], capacity=64, use_double_buffer=True, iterable=ITERABLE)

In the code,

  • feed_list is the list of input variables;

  • capacity is the buffer size of the DataLoader object in batches;

  • use_double_buffer is True by default, which means double_buffer_reader is used. It is recommended, because it can improve data reading speed;

  • iterable is True by default, which means the DataLoader object is For-Range iterative. When iterable = True , DataLoader decouples from the Program, which means defining DataLoader objects does not change Program; when When iterable = False , DataLoader inserts operators related to data reading into Program.

Attention: Program.clone() (reference to Program )can’t copy DataLoader objects. If you want to create multiple DataLoader objects(such as two different DataLoaders in training and inference period respectively), you have to define different DataLoader objects. While using DataLoader, if you need to share the model parameters of training and testing periods, you can use fluid.unique_name.guard().

Notes: Paddle use different names to distinguish different variables, and the names are generated by the counter in unique_name module, which rises by one every time a variable name is generated. fluid.unique_name.guard() aims to reset the counter in unique_name module, in order to ensure that the variable names are the same when calling fluid.unique_name.guard() repeatedly, so that parameters can be shared.

An example of configuring networks during the training and testing periods by DataLoader is as follows:

import paddle
import paddle.fluid as fluid
import paddle.dataset.mnist as mnist

def network():
    image = fluid.data(name='image', dtype='float32', shape=[None, 784])
    label = fluid.data(name='label', dtype='int64', shape=[None, 1])
    loader = fluid.io.DataLoader.from_generator(feed_list=[image, label], capacity=64)

    # Define model.
    fc = fluid.layers.fc(image, size=10)
    xe = fluid.layers.softmax_with_cross_entropy(fc, label)
    loss = fluid.layers.reduce_mean(xe)
    return loss , loader

# Create main program and startup program for training.
train_prog = fluid.Program()
train_startup = fluid.Program()

with fluid.program_guard(train_prog, train_startup):
    # Use fluid.unique_name.guard() to share parameters with test network.
    with fluid.unique_name.guard():
        train_loss, train_loader = network()
        adam = fluid.optimizer.Adam(learning_rate=0.01)
        adam.minimize(train_loss)

# Create main program and startup program for testing.
test_prog = fluid.Program()
test_startup = fluid.Program()
with fluid.program_guard(test_prog, test_startup):
    # Use fluid.unique_name.guard() to share parameters with train network
    with fluid.unique_name.guard():
        test_loss, test_loader = network()

Configure data source of DataLoader object

DataLoader object sets the data source by set_sample_generator(), set_sample_list_generator() or set_batch_generator() . These three methods all receive the Python generator generator as parameters. The differences of are:

  • generator of set_sample_generator() should return data of [img_1, label_1] type, in which img_1 and label_1 is one sample’s data of Numpy array type.

  • generator of set_sample_list_generator() should return data of [(img_1, label_1), (img_2, label_2), ..., (img_n, label_n)] type, in which img_i and label_i is one sample’s data of Numpy array type, and n is batch size.

  • generator of set_batch_generator() should return data of [batched_imgs, batched_labels] type, in which batched_imgs and batched_labels is one batch’s data of Numpy array or LoDTensor type.

Please note that, when using DataLoader for multi-GPU card (or multi-CPU core) training, the actual total batch size is the batch size of incoming user generator multiplied by the number of devices.

When iterable = True (default) of DataLoader, places parameters must be passed to these three methods, specifying whether to convert data to CPU Tensor or GPU Tensor. When iterable = False of DataLoader, there is no need to pass the places parameter.

For example, suppose we have two readers, fake_sample_reader returns one sample’s data at a time and fake_batch_reader returns one batch’s data at a time.

import paddle.fluid as fluid
import numpy as np

# Declare sample reader.
def fake_sample_reader():
    for _ in range(100):
        sample_image = np.random.random(size=(784, )).astype('float32')
        sample_label = np.random.random_integers(size=(1, ), low=0, high=9).astype('int64')
        yield sample_image, sample_label

# Declare batch reader.
def fake_batch_reader():
    batch_size = 32
    for _ in range(100):
        batch_image = np.random.random(size=(batch_size, 784)).astype('float32')
        batch_label = np.random.random_integers(size=(batch_size, 1), low=0, high=9).astype('int64')
        yield batch_image, batch_label

image1 = fluid.data(name='image1', dtype='float32', shape=[None, 784])
label1 = fluid.data(name='label1', dtype='int64', shape=[None, 1])

image2 = fluid.data(name='image2', dtype='float32', shape=[None, 784])
label2 = fluid.data(name='label2', dtype='int64', shape=[None, 1])

image3 = fluid.data(name='image3', dtype='float32', shape=[None, 784])
label3 = fluid.data(name='label3', dtype='int64', shape=[None, 1])

The corresponding DataLoader are defined as follows:

import paddle
import paddle.fluid as fluid

ITERABLE = True
USE_CUDA = True
USE_DATA_PARALLEL = True

if ITERABLE:
    # If DataLoader is iterable, places should be set.
    if USE_DATA_PARALLEL:
        # Use all GPU cards or 8 CPU cores to train.
        places = fluid.cuda_places() if USE_CUDA else fluid.cpu_places(8)
    else:
        # Use single GPU card or CPU core.
        places = fluid.cuda_places(0) if USE_CUDA else fluid.cpu_places(1)
else:
    # If DataLoader is not iterable, places shouldn't be set.
    places = None

# Use sample reader to configure data source of DataLoader.
data_loader1 = fluid.io.DataLoader.from_generator(feed_list=[image1, label1], capacity=10, iterable=ITERABLE)
data_loader1.set_sample_generator(fake_sample_reader, batch_size=32, places=places)

# Use sample reader + fluid.io.batch to configure data source of DataLoader.
data_loader2 = fluid.io.DataLoader.from_generator(feed_list=[image2, label2], capacity=10, iterable=ITERABLE)
sample_list_reader = fluid.io.batch(fake_sample_reader, batch_size=32)
sample_list_reader = fluid.io.shuffle(sample_list_reader, buf_size=64) # Shuffle data if needed.
data_loader2.set_sample_list_generator(sample_list_reader, places=places)

# Use batch to configure data source of DataLoader.
data_loader3 = fluid.io.DataLoader.from_generator(feed_list=[image3, label3], capacity=10, iterable=ITERABLE)
data_loader3.set_batch_generator(fake_batch_reader, places=places)

Train and test model with DataLoader

Examples of using DataLoader to train and test models are as follows:

  • Step 1, we need to set up training network and testing network, define the corresponding DataLoader object, and configure the data source of DataLoader object.

import paddle
import paddle.fluid as fluid
import paddle.dataset.mnist as mnist
import six

ITERABLE = True

def network():
    # Create data holder.
    image = fluid.data(name='image', dtype='float32', shape=[None, 784])
    label = fluid.data(name='label', dtype='int64', shape=[None, 1])

    # Create DataLoader object.
    reader = fluid.io.DataLoader.from_generator(feed_list=[image, label], capacity=64, iterable=ITERABLE)

    # Define model.
    fc = fluid.layers.fc(image, size=10)
    xe = fluid.layers.softmax_with_cross_entropy(fc, label)
    loss = fluid.layers.reduce_mean(xe)
    return loss , reader

# Create main program and startup program for training.
train_prog = fluid.Program()
train_startup = fluid.Program()

# Define training network.
with fluid.program_guard(train_prog, train_startup):
    # fluid.unique_name.guard() to share parameters with test network
    with fluid.unique_name.guard():
        train_loss, train_loader = network()
        adam = fluid.optimizer.Adam(learning_rate=0.01)
        adam.minimize(train_loss)

# Create main program and startup program for testing.
test_prog = fluid.Program()
test_startup = fluid.Program()

# Define testing network.
with fluid.program_guard(test_prog, test_startup):
    # Use fluid.unique_name.guard() to share parameters with train network
    with fluid.unique_name.guard():
        test_loss, test_loader = network()

place = fluid.CUDAPlace(0)
exe = fluid.Executor(place)

# Run startup_program for initialization.
exe.run(train_startup)
exe.run(test_startup)

# Compile programs.
train_prog = fluid.CompiledProgram(train_prog).with_data_parallel(loss_name=train_loss.name)
test_prog = fluid.CompiledProgram(test_prog).with_data_parallel(share_vars_from=train_prog)

# Configure data source of DataLoader.
places = fluid.cuda_places() if ITERABLE else None

train_loader.set_sample_list_generator(
    fluid.io.shuffle(fluid.io.batch(mnist.train(), 512), buf_size=1024), places=places)

test_loader.set_sample_list_generator(fluid.io.batch(mnist.test(), 512), places=places)
  • Step 2, we choose different ways to run the network according to whether the DataLoader object is iterable or not.

If iterable = True, the DataLoader object is a Python generator that can iterate directly using for-range. The results returned by for-range are passed to the executor through the feed parameter of exe.run().

def run_iterable(program, exe, loss, data_loader):
    for data in data_loader():
        loss_value = exe.run(program=program, feed=data, fetch_list=[loss])
        print('loss is {}'.format(loss_value))

for epoch_id in six.moves.range(10):
    run_iterable(train_prog, exe, train_loss, train_loader)
    run_iterable(test_prog, exe, test_loss, test_loader)

If iterable = False, call the start() method to start the DataLoader object before each epoch starts, and call the reset() method to reset the status of the DataLoader object after catching the exception to start the iteration of next epoch, since exe.run() throws a fluid.core.EOFException exception at the end of each epoch. When iterable = False, there is no need to pass feed parameter to exe.run(). The specific ways are as follows:

def run_non_iterable(program, exe, loss, data_loader):
    data_loader.start()
    try:
        while True:
            loss_value = exe.run(program=program, fetch_list=[loss])
            print('loss is {}'.format(loss_value))
    except fluid.core.EOFException:
        print('End of epoch')
        data_loader.reset()

for epoch_id in six.moves.range(10):
    run_non_iterable(train_prog, exe, train_loss, train_loader)
    run_non_iterable(test_prog, exe, test_loss, test_loader)