IterableDataset

class paddle.io. IterableDataset [source]

An abstract class to encapsulate methods and behaviors of iterable datasets.

All datasets in iterable-style (can only get sample one by one sequentially, like a Python iterator) should be a subclass of paddle.io.IterableDataset. All subclasses should implement following methods:

__iter__: yield sample sequentially. This method is required by reading dataset sample in paddle.io.DataLoader.

Note

do not implement __getitem__ and __len__ in IterableDataset, should not be called either.

see paddle.io.DataLoader.

Examples

import numpy as np
from paddle.io import Dataset

# define a random dataset
class RandomDataset(Dataset):
    def __init__(self, num_samples):
        self.num_samples = num_samples

    def __iter__(self):
        for i in range(self.num_samples):
            image = np.random.random([784]).astype('float32')
            label = np.random.randint(0, 9, (1, )).astype('int64')
            yield image, label

dataset = RandomDataset(10)
for img, lbl in dataset:
    print(img, lbl)

When num_workers > 0, each worker has a different copy of the dataset object and will yield whole dataset samples, which means samples in dataset will be repeated in num_workers times. If it is required for each sample to yield only once, there are two methods to configure different copy in each worker process to avoid duplicate data among workers as follows. In both the methods, worker information that can be getted in a worker process by paddle.io.get_worker_info will be needed.

Example 1: splitting data copy in each worker in __iter__

import math
import numpy as np
import paddle.fluid as fluid
from paddle.io import IterableDataset, DataLoader, get_worker_info

class SplitedIterableDataset(IterableDataset):
    def __init__(self, start, end):
        self.start = start
        self.end = end

    def __iter__(self):
        worker_info = get_worker_info()
        if worker_info is None:
            iter_start = self.start
            iter_end = self.end
        else:
            per_worker = int(
                math.ceil((self.end - self.start) / float(
                    worker_info.num_workers)))
            worker_id = worker_info.id
            iter_start = self.start + worker_id * per_worker
            iter_end = min(iter_start + per_worker, self.end)

        for i in range(iter_start, iter_end):
            yield np.array([i])

place = fluid.CPUPlace()
with fluid.dygraph.guard(place):
    dataset = SplitedIterableDataset(start=2, end=9)
    dataloader = DataLoader(
        dataset,
        places=place,
        num_workers=2,
        batch_size=1,
        drop_last=True)

    print(list(dataloader))
    # outputs: [2, 5, 3, 6, 4, 7]

Example 2: splitting data copy in each worker by worker_init_fn

import math
import numpy as np
import paddle.fluid as fluid
from paddle.io import IterableDataset, DataLoader, get_worker_info

class RangeIterableDataset(IterableDataset):
    def __init__(self, start, end):
        self.start = start
        self.end = end

    def __iter__(self):
        for i in range(self.start, self.end):
            yield np.array([i])

place = fluid.CPUPlace()
with fluid.dygraph.guard(place):
    dataset = RangeIterableDataset(start=2, end=9)

    def worker_init_fn(worker_id):
        worker_info = get_worker_info()

        dataset = worker_info.dataset
        start = dataset.start
        end = dataset.end
        num_per_worker = int(
            math.ceil((end - start) / float(worker_info.num_workers)))

        worker_id = worker_info.id
        dataset.start = start + worker_id * num_per_worker
        dataset.end = min(dataset.start + num_per_worker, end)

    dataloader = DataLoader(
        dataset,
        places=place,
        num_workers=2,
        batch_size=1,
        drop_last=True,
        worker_init_fn=worker_init_fn)

    print(list(dataloader))
    # outputs: [2, 5, 3, 6, 4, 7]