Data Reader

DataFeeder

class paddle.fluid.data_feeder.DataFeeder(feed_list, place, program=None)[source]

DataFeeder converts the data that returned by a reader into a data structure that can feed into Executor. The reader is usually a python generator that returns a list of mini-batch data entries.

Parameters
  • feed_list (list) – Variables or names of Variables that need to feed.

  • place (CPUPlace | CUDAPlace) – place indicates the device (CPU | GPU) the data will be fed into, if you want to feed data into GPU, please using fluid.CUDAPlace(i) (i represents the GPU id), or if you want to feed data into CPU, please using fluid.CPUPlace().

  • program (Program , optional) – The Program that will feed data into, if program is None, it will use default_main_program(). Default None.

Raises

ValueError - If some Variables are not in this Program.

Example

import numpy as np
import paddle
import paddle.fluid as fluid

place = fluid.CPUPlace()
def reader():
    for _ in range(4):
        yield np.random.random([4]).astype('float32'), np.random.random([3]).astype('float32'),

main_program = fluid.Program()
startup_program = fluid.Program()

with fluid.program_guard(main_program, startup_program):
    data_1 = fluid.data(name='data_1', shape=[None, 2, 2], dtype='float32')
    data_2 = fluid.data(name='data_2', shape=[None, 1, 3], dtype='float32')
    out = fluid.layers.fc(input=[data_1, data_2], size=2)
    # ...
feeder = fluid.DataFeeder([data_1, data_2], place)

exe = fluid.Executor(place)
exe.run(startup_program)

feed_data = feeder.feed(reader())

# print feed_data to view feed results
# print(feed_data['data_1'])
# print(feed_data['data_2'])

outs = exe.run(program=main_program,
                feed=feed_data,
                fetch_list=[out])
print(outs)
feed(iterable)

According to feed_list of DataFeeder and iterable , converts the input into a data structure that can feed into Executor.

Parameters

iterable (generator) – user defined python generator to read the raw input data

Returns

a dict that contains (variable name - converted tensor) pairs

Return type

dict

Example

# In this example, reader - generator will return a list of ndarray of 3 elements
# feed API will convert each ndarray input into a tensor
# the return result is a dict with keys: data_1, data_2, data_3
# result['data_1']  a LoD-Tensor with shape of  [5, 2, 1, 3]. 5 is batch size, and [2, 1, 3] is the real shape of data_1.
# result['data_2'], result['data_3'] are similar.
import numpy as np
import paddle.fluid as fluid

def reader(limit=5):
    for i in range(1, limit + 1):
        yield np.ones([6]).astype('float32') * i , np.ones([1]).astype('int64') * i, np.random.random([9]).astype('float32')

data_1 = fluid.data(name='data_1', shape=[None, 2, 1, 3])
data_2 = fluid.data(name='data_2', shape=[None, 1], dtype='int64')
data_3 = fluid.data(name='data_3', shape=[None, 3, 3], dtype='float32')
feeder = fluid.DataFeeder(['data_1','data_2', 'data_3'], fluid.CPUPlace())


result = feeder.feed(reader())
print(result['data_1'])
print(result['data_2'])
print(result['data_3'])
feed_parallel(iterable, num_places=None)

Similar with feed function, feed_parallel is used with multiple devices (CPU|GPU). Here iterable is a list of python generators. The data return by each generator in the list will be fed into a seperate device.

Parameters
  • iterable (list|tuple) – list of user-defined python geneators. The element number should match the num_places.

  • num_places (int, optional) – the number of devices. If not provided (None), all available devices on the machine will be used. Default None.

Returns

a generator that generate dict which contains (variable name - converted tensor) pairs, the total number of dicts will be generated matches with the num_places

Return type

generator

Note

The number of devices - num_places should equal to the generator (element of iterable ) number

Example

import numpy as np
import paddle.fluid as fluid

def generate_reader(batch_size, base=0, factor=1):
    def _reader():
        for i in range(batch_size):
            yield np.ones([4]) * factor + base, np.ones([4]) * factor + base + 5
    return _reader()

x = fluid.data(name='x', shape=[None, 2, 2])
y = fluid.data(name='y', shape=[None, 2, 2], dtype='float32')

z = fluid.layers.elementwise_add(x, y)

feeder = fluid.DataFeeder(['x','y'], fluid.CPUPlace())
place_num = 2
places = [fluid.CPUPlace() for x in range(place_num)]
data = []
exe = fluid.Executor(fluid.CPUPlace())
exe.run(fluid.default_startup_program())
program = fluid.CompiledProgram(fluid.default_main_program()).with_data_parallel(places=places)

# print sample feed_parallel r resultt
# for item in list(feeder.feed_parallel([generate_reader(5, 0, 1), generate_reader(3, 10, 2)], 2)):
#     print(item['x'])
#     print(item['y'])

reader_list = [generate_reader(5, 0, 1), generate_reader(3, 10, 2)]
res = exe.run(program=program, feed=list(feeder.feed_parallel(reader_list, 2)), fetch_list=[z])
print(res)
decorate_reader(reader, multi_devices, num_places=None, drop_last=True)

Decorate the reader (generator) to fit multiple devices. The reader generate multiple mini-batches. Each mini-batch will be fed into a single device.

Parameters
  • reader (generator) – a user defined python generator used to get mini-batch of data. A mini-batch can be regarded as a python generator that returns batchs of input entities, just like the below _mini_batch in the code example.

  • multi_devices (bool) – indicate whether to use multiple devices or not.

  • num_places (int, optional) – if multi_devices is True, you can specify the number of devices(CPU|GPU) to use, if multi_devices is None, the function will use all the devices of the current machine. Default None.

  • drop_last (bool, optional) – whether to drop the last round of data if it is not enough to feed all devices. Default True.

Returns

a new generator which return converted dicts that can be fed into Executor

Return type

generator

Raises

ValueError – If drop_last is False and the data cannot fit devices perfectly.

Example

import numpy as np
import paddle
import paddle.fluid as fluid
import paddle.fluid.compiler as compiler

def reader():
    def _mini_batch(batch_size):
        for i in range(batch_size):
            yield np.random.random([16]).astype('float32'), np.random.randint(10, size=[1])

    for _ in range(10):
        yield _mini_batch(np.random.randint(1, 10))

place_num = 3
places = [fluid.CPUPlace() for _ in range(place_num)]

# a simple network sample
data = fluid.data(name='data', shape=[None, 4, 4], dtype='float32')
label = fluid.data(name='label', shape=[None, 1], dtype='int64')
hidden = fluid.layers.fc(input=data, size=10)

feeder = fluid.DataFeeder(place=places[0], feed_list=[data, label])
reader = feeder.decorate_reader(reader, multi_devices=True, num_places=3, drop_last=True)

exe = fluid.Executor(places[0])
exe.run(fluid.default_startup_program())
compiled_prog = compiler.CompiledProgram(
         fluid.default_main_program()).with_data_parallel(places=places)

for i,data in enumerate(reader()):
    # print data if you like
    # print(i, data)
    ret = exe.run(compiled_prog, feed=data, fetch_list=[hidden])
    print(ret)

Reader

At training and testing time, PaddlePaddle programs need to read data. To ease the users’ work to write data reading code, we define that

  • A reader is a function that reads data (from file, network, random number generator, etc) and yields data items.

  • A reader creator is a function that returns a reader function.

  • A reader decorator is a function, which accepts one or more readers, and returns a reader.

  • A batch reader is a function that reads data (from reader, file, network, random number generator, etc) and yields a batch of data items.

Data Reader Interface

Indeed, data reader doesn’t have to be a function that reads and yields data items. It can be any function with no parameter that creates a iterable (anything can be used in for x in iterable):

iterable = data_reader()

Element produced from the iterable should be a single entry of data, not a mini batch. That entry of data could be a single item, or a tuple of items. Item should be of supported type (e.g., numpy array or list/tuple of float or int).

An example implementation for single item data reader creator:

def reader_creator_random_image(width, height):
    def reader():
        while True:
            yield numpy.random.uniform(-1, 1, size=width*height)
return reader

An example implementation for multiple item data reader creator:

def reader_creator_random_image_and_label(width, height, label):
    def reader():
        while True:
            yield numpy.random.uniform(-1, 1, size=width*height), label
return reader
paddle.reader.cache(reader)[source]

Cache the reader data into memory.

Be careful that this method may take long time to process, and consume lots of memory. reader() would only call once.

Parameters

reader (generator) – a reader object which yields data each time.

Returns

a decorated reader object which yields data from cached memory.

Return type

generator

paddle.reader.map_readers(func, *readers)[source]

Creates a data reader that outputs return value of function using output of each data reader as arguments.

If input readers output the following data entries: 2 3, and the input func is mul(x, y), the output of the resulted reader will be 6.

Parameters
  • func – a function to read data and compute result, the output of this function will be set as the output of the resulted data reader.

  • readers (Reader|list of Reader) – list of readers whose outputs will be used as arguments of func.

Returns

the resulted data reader (Reader)

Examples

import paddle.reader
d = {"h": 0, "i": 1}
def func(x):
    return d[x]
def reader():
    yield "h"
    yield "i"
map_reader_result = paddle.reader.map_readers(func, reader)
paddle.reader.buffered(reader, size)[source]

Creates a buffered data reader.

The buffered data reader will read and save data entries into a buffer. Reading from the buffered data reader will proceed as long as the buffer is not empty.

Parameters
  • reader (callable) – The data reader to read from.

  • size (int) – Max buffer size.

Returns

The buffered data reader.

Return type

Variable

Examples

import paddle.reader as reader
import time

def reader_creator_10(dur):
    def reader():
        for i in range(10):
            time.sleep(dur)
            yield i
    return reader

for size in range(20):
    b = reader.buffered(reader_creator_10(0), size)
    c = 0
    for i in b():
        assert i == c
        c += 1
    assert c == 10
paddle.reader.compose(*readers, **kwargs)[source]

Creates a data reader whose output is the combination of input readers.

If input readers output following data entries: (1, 2) 3 (4, 5) The composed reader will output: (1, 2, 3, 4, 5)

Parameters
  • readers (Reader|list of Reader) – readers that will be composed together.

  • check_alignment (bool, optional) – Indicates whether the input readers are checked for alignment. If True, whether input readers are aligned correctly will be checked, else alignment will not be checkout and trailing outputs will be discarded. Defaults to True.

Returns

the new data reader (Reader).

Raises

ComposeNotAligned – outputs of readers are not aligned. This will not raise if check_alignment is set to False.

Examples

import paddle.fluid as fluid
def reader_creator_10(dur):
    def reader():
       for i in range(10):
           yield i
    return reader
reader = fluid.io.compose(reader_creator_10(0), reader_creator_10(0))
paddle.reader.chain(*readers)[source]

Use the input data readers to create a chained data reader. The new created reader chains the outputs of input readers together as its output.

Note:

paddle.reader.chain is the alias of paddle.fluid.io.chain, and paddle.fluid.io.chain is recommended to use.

For example, if three input readers’ outputs are as follows: [0, 0, 0], [10, 10, 10], [20, 20, 20]. The chained reader will output: [[0, 0, 0], [10, 10, 10], [20, 20, 20]].

Parameters

readers (list) – input data readers.

Returns

the new chained data reader.

Return type

callable

Examples

import paddle

def reader_creator_3(start):
    def reader():
        for i in range(start, start + 3):
            yield [i, i, i]
    return reader

c = paddle.reader.chain(reader_creator_3(0), reader_creator_3(10), reader_creator_3(20))
for e in c():
    print(e)
# Output:
# [0, 0, 0]
# [1, 1, 1]
# [2, 2, 2]
# [10, 10, 10]
# [11, 11, 11]
# [12, 12, 12]
# [20, 20, 20]
# [21, 21, 21]
# [22, 22, 22]
paddle.reader.shuffle(reader, buf_size)[source]

paddle.fluid.io.shuffle ( api_fluid_io_shuffle ) is recommended to use, and paddle.reader.shuffle is an alias.

This API creates a decorated reader that outputs the shuffled data.

The output data from the origin reader will be saved into a buffer, and then shuffle the data. The size of buffer is determined by argument buf_size.

Parameters
  • reader (callable) – the original reader whose data will be shuffled.

  • buf_size (int) – the size of shuffled buffer.

Returns

a decorated reader.

Return type

callable

Examples

import paddle.fluid as fluid

def reader():
    for i in range(5):
        yield i
shuffled_reader = fluid.io.shuffle(reader, 3)
for e in shuffled_reader():
    print(e)
# outputs are 0~4 unordered arrangement
exception paddle.reader.ComposeNotAligned[source]
paddle.reader.firstn(reader, n)[source]

paddle.fluid.io.firstn ( api_fluid_io_firstn ) is recommended to use, and paddle.reader.firstn is an alias.

This API creates a decorated reader, and limits the max number of samples that reader could return.

Parameters
  • reader (callable) – the input reader.

  • n (int) – the max number of samples in the reader.

Returns

the decorated reader.

Return type

callable

Examples

import paddle.fluid as fluid

def reader():
    for i in range(100):
        yield i
firstn_reader = fluid.io.firstn(reader, 5)
for e in firstn_reader():
    print(e)
# the outputs are: 0 1 2 3 4
paddle.reader.xmap_readers(mapper, reader, process_num, buffer_size, order=False)[source]

Use multi-threads to map samples from reader by a mapper defined by user.

Parameters
  • mapper (callable) – A function to map the data from reader.

  • reader (callable) – A data reader which yields the data.

  • process_num (int) – Thread number to handle original sample.

  • buffer_size (int) – Size of the queue to read data in.

  • order (bool) – Whether to keep the data order from original reader. Default False.

Returns

A decorated reader with data mapping.

Example

import paddle.reader as reader
import time

def reader_creator_10(dur):
    def reader():
        for i in range(10):
            time.sleep(dur)
            yield i
    return reader

def mapper(x):
    return (x + 1)

orders = (True, False)
thread_num = (1, 2, 4, 8, 16)
buffer_size = (1, 2, 4, 8, 16)
for order in orders:
    for t_num in thread_num:
        for size in buffer_size:
            user_reader = reader.xmap_readers(mapper,
                                              reader_creator_10(0),
                                              t_num, size, order)
            for n in range(3):
                result = list()
                for i in user_reader():
                    result.append(i)
                if not order:
                    result.sort()
                for idx, e in enumerate(result):
                    assert e == mapper(idx)
paddle.reader.multiprocess_reader(readers, use_pipe=True, queue_size=1000)[source]

This API use python multiprocessing to read data from readers parallelly, and then multiprocess.Queue or multiprocess.Pipe is used to merge these data. A seperate process will be created for each reader in the readers list, please guarantee every reader can work independently to avoid conflicts in parallel environment.

Multiprocess.Queue require the rw access right to /dev/shm, and it’s not suppported in some platforms.

Parameters
  • readers (list( generator ) | tuple( generator )) – a python generator list used to read input data

  • use_pipe (bool, optional) – control the inner API used to implement the multi-processing, default True - use multiprocess.Pipe which is recommended

  • queue_size (int, optional) – only useful when use_pipe is False - multiprocess.Queue is used, default 1000. Increase this value can speed up the data reading, and more memory will be consumed.

Returns

a new reader which can be run parallelly

Return type

generator

Example:

import paddle.fluid as fluid
from paddle.fluid.io import multiprocess_reader
import numpy as np

sample_files = ['sample_file_1', 'sample_file_2']

def fake_input_files():
    with open(sample_files[0], 'w') as f:
       np.savez(f, a=np.array([1, 2]), b=np.array([3, 4]), c=np.array([5, 6]), d=np.array([7, 8]))
    with open(sample_files[1], 'w') as f:
       np.savez(f, a=np.array([9, 10]), b=np.array([11, 12]), c=np.array([13, 14]))


def generate_reader(file_name):
    # load data file
    def _impl():
        data = np.load(file_name)
        for item in sorted(data.files):
            yield data[item],
    return _impl

if __name__ == '__main__':
    # generate sample input files
    fake_input_files()

    with fluid.program_guard(fluid.Program(), fluid.Program()):
        place = fluid.CPUPlace()
        # the 1st 2 is batch size
        image = fluid.data(name='image', dtype='int64', shape=[2, 1, 2])
        fluid.layers.Print(image)
        # print detailed tensor info of image variable

        reader = fluid.io.PyReader(feed_list=[image], capacity=2)

        decorated_reader = multiprocess_reader(
            [generate_reader(sample_files[0]), generate_reader(sample_files[1])], False)

        reader.decorate_sample_generator(decorated_reader, batch_size=2, places=[place])

        exe = fluid.Executor(place)
        exe.run(fluid.default_startup_program())

        for data in reader():
            res = exe.run(feed=data, fetch_list=[image])
            print(res[0])
            # print below content in this case
            # [[[1 2]], [[3 4]]]
            # [[[5 6]], [[7 8]]]
            # [[[9 10]], [[11 12]]]
            # [13,14] will be dropped