InMemoryDataset¶
- class paddle.distributed. InMemoryDataset [source]
- 
         - Api_attr
- 
           Static Graph 
 It will load data into memory and shuffle data before training. Examples import paddle paddle.enable_static() dataset = paddle.distributed.InMemoryDataset() - 
            
           update_settings
           (
           **kwargs
           )
           update_settings¶
- 
           - Api_attr
- 
             Static Graph 
 should be called in user’s python scripts to update setings of dataset instance. - Parameters
- 
             - kwargs – Keyword arguments. Currently, we support following keys in **kwargs, including single node settings and advanced distributed related settings: 
- batch_size (int) – batch size. It will be effective during training. default is 1. 
- thread_num (int) – thread num, it is the num of readers. default is 1. 
- use_var (list) – list of variables. Variables which you will use. default is []. 
- input_type (int) – the input type of generated input. 0 is for one sample, 1 is for one batch. default is 0. 
- fs_name (str) – fs name. default is “”. 
- fs_ugi (str) – fs ugi. default is “”. 
- pipe_command (str) – pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default is “cat” 
- download_cmd (str) – customized download command. default is “cat” 
- data_feed_type (str) – data feed type used in c++ code. default is “MultiSlotInMemoryDataFeed”. 
- queue_num (int) – Dataset output queue num, training threads get data from queues. default is-1, which is set same as thread number in c++. 
- merge_size (int) – ins size to merge, if merge_size > 0, set merge by line id, instances of same line id will be merged after shuffle, you should parse line id in data generator. default is -1. 
- parse_ins_id (bool) – Set if Dataset need to parse ins_id. default is False. 
- parse_content (bool) – Set if Dataset need to parse content. default is False. 
- fleet_send_batch_size (int) – Set fleet send batch size in one rpc, default is 1024 
- fleet_send_sleep_seconds (int) – Set fleet send sleep time, default is 0 
- fea_eval (bool) – Set if Dataset need to do feature importance evaluation using slots shuffle. default is False. 
- candidate_size (int) – if fea_eval is set True, set the candidate size used in slots shuffle. 
 
 Examples import paddle paddle.enable_static() dataset = paddle.distributed.InMemoryDataset() dataset.init( batch_size=1, thread_num=2, input_type=1, pipe_command="cat", use_var=[]) dataset._init_distributed_settings( parse_ins_id=True, parse_content=True, fea_eval=True, candidate_size=10000) dataset.update_settings(batch_size=2) 
 - 
            
           init
           (
           **kwargs
           )
           init¶
- 
           - Api_attr
- 
             Static Graph 
 should be called only once in user’s python scripts to initialize setings of dataset instance - Parameters
- 
             - kwargs – Keyword arguments. Currently, we support following keys in **kwargs: 
- batch_size (int) – batch size. It will be effective during training. default is 1. 
- thread_num (int) – thread num, it is the num of readers. default is 1. 
- use_var (list) – list of variables. Variables which you will use. default is []. 
- input_type (int) – the input type of generated input. 0 is for one sample, 1 is for one batch. default is 0. 
- fs_name (str) – fs name. default is “”. 
- fs_ugi (str) – fs ugi. default is “”. 
- pipe_command (str) – pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default is “cat” 
- download_cmd (str) – customized download command. default is “cat” 
- data_feed_type (str) – data feed type used in c++ code. default is “MultiSlotInMemoryDataFeed”. 
- queue_num (int) – Dataset output queue num, training threads get data from queues. default is -1, which is set same as thread number in c++. 
 
 Examples import paddle import os paddle.enable_static() with open("test_queue_dataset_run_a.txt", "w") as f: data = "2 1 2 2 5 4 2 2 7 2 1 3" f.write(data) with open("test_queue_dataset_run_b.txt", "w") as f: data = "2 1 2 2 5 4 2 2 7 2 1 3" f.write(data) slots = ["slot1", "slot2", "slot3", "slot4"] slots_vars = [] for slot in slots: var = paddle.static.data( name=slot, shape=[None, 1], dtype="int64", lod_level=1) slots_vars.append(var) dataset = paddle.distributed.InMemoryDataset() dataset.init( batch_size=1, thread_num=2, input_type=1, pipe_command="cat", use_var=slots_vars) dataset.set_filelist( ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]) dataset.load_into_memory() place = paddle.CPUPlace() exe = paddle.static.Executor(place) startup_program = paddle.static.Program() main_program = paddle.static.Program() exe.run(startup_program) exe.train_from_dataset(main_program, dataset) os.remove("./test_queue_dataset_run_a.txt") os.remove("./test_queue_dataset_run_b.txt") 
 - 
            
           set_date
           (
           date
           )
           set_date¶
- 
           - Api_attr
- 
             Static Graph 
 Set training date for pull sparse parameters, saving and loading model. Only used in psgpu - Parameters
- 
             date (str) – training date(format : YYMMDD). eg.20211111 
 Examples import paddle paddle.enable_static() dataset = paddle.distributed.InMemoryDataset() slots = ["slot1", "slot2", "slot3", "slot4"] slots_vars = [] for slot in slots: var = paddle.static.data( name=slot, shape=[None, 1], dtype="int64", lod_level=1) slots_vars.append(var) dataset.init( batch_size=1, thread_num=2, input_type=1, pipe_command="cat", use_var=slots_vars) dataset.set_date("20211111") 
 - 
            
           load_into_memory
           (
           is_shuffle=False
           )
           load_into_memory¶
- 
           - Api_attr
- 
             Static Graph 
 Load data into memory - Parameters
- 
             is_shuffle (bool) – whether to use local shuffle, default is False 
 Examples import paddle paddle.enable_static() dataset = paddle.distributed.InMemoryDataset() slots = ["slot1", "slot2", "slot3", "slot4"] slots_vars = [] for slot in slots: var = paddle.static.data( name=slot, shape=[None, 1], dtype="int64", lod_level=1) slots_vars.append(var) dataset.init( batch_size=1, thread_num=2, input_type=1, pipe_command="cat", use_var=slots_vars) filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() 
 - 
            
           preload_into_memory
           (
           thread_num=None
           )
           preload_into_memory¶
- 
           - Api_attr
- 
             Static Graph 
 Load data into memory in async mode - Parameters
- 
             thread_num (int) – preload thread num 
 Examples import paddle paddle.enable_static() dataset = paddle.distributed.InMemoryDataset() slots = ["slot1", "slot2", "slot3", "slot4"] slots_vars = [] for slot in slots: var = paddle.static.data( name=slot, shape=[None, 1], dtype="int64", lod_level=1) slots_vars.append(var) dataset.init( batch_size=1, thread_num=2, input_type=1, pipe_command="cat", use_var=slots_vars) filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.preload_into_memory() dataset.wait_preload_done() 
 - 
            
           wait_preload_done
           (
           )
           wait_preload_done¶
- 
           - Api_attr
- 
             Static Graph 
 Wait preload_into_memory done Examples import paddle paddle.enable_static() dataset = paddle.distributed.InMemoryDataset() slots = ["slot1", "slot2", "slot3", "slot4"] slots_vars = [] for slot in slots: var = paddle.static.data( name=slot, shape=[None, 1], dtype="int64", lod_level=1) slots_vars.append(var) dataset.init( batch_size=1, thread_num=2, input_type=1, pipe_command="cat", use_var=slots_vars) filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.preload_into_memory() dataset.wait_preload_done() 
 - 
            
           local_shuffle
           (
           )
           local_shuffle¶
- 
           - Api_attr
- 
             Static Graph 
 Local shuffle Examples import paddle paddle.enable_static() dataset = paddle.distributed.InMemoryDataset() slots = ["slot1", "slot2", "slot3", "slot4"] slots_vars = [] for slot in slots: var = paddle.static.data( name=slot, shape=[None, 1], dtype="int64", lod_level=1) slots_vars.append(var) dataset.init( batch_size=1, thread_num=2, input_type=1, pipe_command="cat", use_var=slots_vars) filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() dataset.local_shuffle() 
 - 
            
           global_shuffle
           (
           fleet=None, 
           thread_num=12
           )
           global_shuffle¶
- 
           - Api_attr
- 
             Static Graph 
 Global shuffle. Global shuffle can be used only in distributed mode. i.e. multiple processes on single machine or multiple machines training together. If you run in distributed mode, you should pass fleet instead of None. Examples import paddle paddle.enable_static() dataset = paddle.distributed.InMemoryDataset() slots = ["slot1", "slot2", "slot3", "slot4"] slots_vars = [] for slot in slots: var = paddle.static.data( name=slot, shape=[None, 1], dtype="int64", lod_level=1) slots_vars.append(var) dataset.init( batch_size=1, thread_num=2, input_type=1, pipe_command="cat", use_var=slots_vars) filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() dataset.global_shuffle() - Parameters
- 
             - fleet (Fleet) – fleet singleton. Default None. 
- thread_num (int) – shuffle thread num. Default is 12. 
 
 
 - 
            
           release_memory
           (
           )
           release_memory¶
- 
           - Api_attr
- 
             Static Graph 
 Release InMemoryDataset memory data, when data will not be used again. Examples import paddle paddle.enable_static() dataset = paddle.distributed.InMemoryDataset() slots = ["slot1", "slot2", "slot3", "slot4"] slots_vars = [] for slot in slots: var = paddle.static.data( name=slot, shape=[None, 1], dtype="int64", lod_level=1) slots_vars.append(var) dataset.init( batch_size=1, thread_num=2, input_type=1, pipe_command="cat", use_var=slots_vars) filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() dataset.global_shuffle() exe = paddle.static.Executor(paddle.CPUPlace()) startup_program = paddle.static.Program() main_program = paddle.static.Program() exe.run(startup_program) exe.train_from_dataset(main_program, dataset) dataset.release_memory() 
 - 
            
           get_memory_data_size
           (
           fleet=None
           )
           get_memory_data_size¶
- 
           - Api_attr
- 
             Static Graph 
 Get memory data size, user can call this function to know the num of ins in all workers after load into memory. Note This function may cause bad performance, because it has barrier - Parameters
- 
             fleet (Fleet) – Fleet Object. 
- Returns
- 
             The size of memory data. 
 Examples import paddle paddle.enable_static() dataset = paddle.distributed.InMemoryDataset() slots = ["slot1", "slot2", "slot3", "slot4"] slots_vars = [] for slot in slots: var = paddle.static.data( name=slot, shape=[None, 1], dtype="int64", lod_level=1) slots_vars.append(var) dataset.init( batch_size=1, thread_num=2, input_type=1, pipe_command="cat", use_var=slots_vars) filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() print dataset.get_memory_data_size() 
 - 
            
           get_shuffle_data_size
           (
           fleet=None
           )
           get_shuffle_data_size¶
- 
           - Api_attr
- 
             Static Graph 
 Get shuffle data size, user can call this function to know the num of ins in all workers after local/global shuffle. Note This function may cause bad performance to local shuffle, because it has barrier. It does not affect global shuffle. - Parameters
- 
             fleet (Fleet) – Fleet Object. 
- Returns
- 
             The size of shuffle data. 
 Examples import paddle paddle.enable_static() dataset = paddle.distributed.InMemoryDataset() dataset = paddle.distributed.InMemoryDataset() slots = ["slot1", "slot2", "slot3", "slot4"] slots_vars = [] for slot in slots: var = paddle.static.data( name=slot, shape=[None, 1], dtype="int64", lod_level=1) slots_vars.append(var) dataset.init( batch_size=1, thread_num=2, input_type=1, pipe_command="cat", use_var=slots_vars) filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() dataset.global_shuffle() print dataset.get_shuffle_data_size() 
 - 
            
           slots_shuffle
           (
           slots
           )
           slots_shuffle¶
- 
           Slots Shuffle Slots Shuffle is a shuffle method in slots level, which is usually used in sparse feature with large scale of instances. To compare the metric, i.e. auc while doing slots shuffle on one or several slots with baseline to evaluate the importance level of slots(features). - Parameters
- 
             slots (list[string]) – the set of slots(string) to do slots shuffle. 
 Examples import paddle paddle.enable_static() dataset = paddle.distributed.InMemoryDataset() dataset._init_distributed_settings(fea_eval=True) slots = ["slot1", "slot2", "slot3", "slot4"] slots_vars = [] for slot in slots: var = paddle.static.data( name=slot, shape=[None, 1], dtype="int64", lod_level=1) slots_vars.append(var) dataset.init( batch_size=1, thread_num=2, input_type=1, pipe_command="cat", use_var=slots_vars) filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() dataset.slots_shuffle(['slot1']) 
 - 
            
           set_filelist
           (
           filelist
           )
           set_filelist¶
- 
           Set file list in current worker. The filelist is indicated by a list of file names (string). Examples import paddle dataset = paddle.distributed.fleet.DatasetBase() dataset.set_filelist(['a.txt', 'b.txt']) - Parameters
- 
             filelist (list[str]) – list of file names of inputs. 
 
 
