Posted in: Cloud Computing, Open Source

Efficient Deep Learning Training on the Cloud with Small Files

Here I describe an approach to efficiently train deep learning models on machine learning cloud platforms (e.g., IBM Watson Machine Learning) when the training dataset consists of a large number of small files (e.g., JPEG format) and is stored in an object store like IBM Cloud Object Storage (COS). As an example, I train a PyTorch model using the Oxford flowers dataset. Despite Oxford flowers being a small dataset, it is representative of the problems that one will encounter with large datasets like Imagenet1K.

Visualizations of the Oxford flowers dataset, used an example for training a deep learning job on the cloud

Visualizations of the Oxford flowers dataset, used an example for training a deep learning job on the cloud

This work was initiated when the performance of training jobs with datasets of many small files stored on COS was reported to be very poor. Both the initial set-up time (before the job starts running and producing logs) and the runtime of the job (e.g., per epoch time) were increased. In particular, the problem was observed when running PyTorch models with ImageNet1K stored as single JPEG files on COS. The PyTorch model code is available here: https://github.com/pytorch/examples/tree/master/imagenet.

In the following post I first explain how data loading and batching works in PyTorch and then add shared memory and elaborate how the number of workers and amount of available shared memory influence training time and performance. Finally, I migrate the dataset into a single file in the Hierarchical Data Format (HDF5) and remeasure performance impact, showing a >40x improvement. I discuss how this impact could be further increased in the conclusion.

Steps in training

There are two major steps involved in data loading and processing in the training phase. We need to first preprocess the data, then load the dataset, and finally iterate over the images in it during training. PyTorch provides many tools to facilitate data pre-processing, loading, and iteration, in particular though the classes torch.utils.data.Dataset and torch.utils.data.Dataloader (see https://pytorch.org/docs/stable/data.html). Dataset is an abstract class which needs to be inherited by a custom Dataset class which, in turn, has to provide the __len__ and __getitem__ methods which need to be overridden:

  • len(dataset) returns the size of the dataset
  • getitem(dataset) supports the indexing such that dataset[i] returns the i-th sample

Torchvision already provides an implementation of Dataset for most popular custom dataset classes, cmp (see https://pytorch.org/docs/stable/torchvision/datasets.html).

All datasets in torchvision.datasets are subclasses of torch.utils.data.Dataset with preimplemented __getitem__ and __len__ methods.

In the PyTorch code with ImageNet the torchvision.datasets class is invoked in line 119:

import torchvision.datasets as datasets
import torchvision.transforms as transforms
train_dataset = datasets.ImageFolder(
        traindir,
        transforms.Compose([
            transforms.RandomResizedCrop(224),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            normalize,
        ])
)

 

ImageFolder is a sub-class of Dataset provided by torchvision.datasets. There are various transforms that can be applied to the images when loading them in Dataset. The class torchvision.transforms.Compose(transform) takes a list of transform objects as input which shall be applied sequentially on the images in traindir when Dataset is sampled. Above we see four transforms on each image when creating the dataset:

  • RandomResizedCrop(224)
  • RandomHorizontalFlip()
  • ToTensor()
  • normalize After the train_dataset is instantiated, an instance of torch.utils.data.Dataloader is created in line 133:
train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None),
        num_workers=args.workers, pin_memory=True, sampler=train_sampler
)

 

The for loop in line 185 iterates over the training dataset.

for i, (input, target) in enumerate(train_loader):
        # measure data loading time
        data_time.update(time.time() - end)

        target = target.cuda(non_blocking=True)

        # compute output
        output = model(input)
        loss = criterion(output, target)

        # measure accuracy and record loss
        prec1, prec5 = accuracy(output, target, topk=(1, 5))
        losses.update(loss.item(), input.size(0))
        top1.update(prec1[0], input.size(0))
        top5.update(prec5[0], input.size(0))

        # compute gradient and do SGD step
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # measure elapsed time
        batch_time.update(time.time() - end)
        end = time.time()

 

The Dataloader class combines a dataset and a sampler. It is an iterator which supports batching, shuffling, and parallel loading of data using multi-processing worker. If shuffle is true, the dataset is reshuffled at the beginning of any training epoch. Then a batch of images is loaded in memory using num_workers subprocesses, with num_workers=0 meaning the data will be loaded in the main process. Before a batch of images is loaded in memory, all the transforms in transforms.Compose() are applied to each image. Each of these transforms adds to the data loading time of the dataset. Furthermore, the time consumed by this step is linear in the number of images (batch size) to be read from COS, transformed, and loaded at each iteration. After the data is loaded there is a forward pass and a backward pass step.

To benchmark the performance of PyTorch on an image dataset, we first run main.py with the Oxford flowers dataset, which has 102 classes with 10 images per class, both for the training and validation set. The default model is resnet18. We instrumented main.py to get time for different stages:

  1. model_loading: Time to load the neural network model into GPU memory.
  2. data_preparation: Time create instances of the Dataset and Dataloader classes.
  3. epoch_time: Time to finish one training epoch. At the beginning of each training epoch Python restarts the train_loader iterator by calling train_loader.__iter__() which returns an object with a .next() method. Then Python calls the .next() method of this object in order to get the first and subsequent values used by the for loop. In each iteration, after data is loaded in memory, a forward and backward pass is executed. Thus, time for one iteration (batch_time) includes both data loading and compute time. epoch_time is a sum of batch_time over all the batches in one epoch.
  • data loading: Time to sample images for one iteration using the DataLoader iterator as well as loading them into memory.
  • batch_time: Time to load data for one iteration as well as to compute forward and backward pass.

Performance using dataset with JPEG files

We first quantify the performance of PyTorch with images as JPEG files. Each job is run for 20 epochs in a Kubernetes pod with 1 Nvidia Tesla P100 GPU, 8 CPUs, and 24GiB of memory. The dataset is stored in a COS bucket which is locally mounted on the pod. We first tried to run main.py with default parameters using the command:

python main-timing.py --epochs 20 --batch-size 64 /mnt/oxford-flowers

 

The job crashed after the first iteration with the following error:

ERROR: Unexpected bus error encountered in worker. This might be caused by insufficient shared memory (shm).
Epoch: [0][0/16]	Time 4.430 (4.430)	Data 3.707 (3.707)	Loss 6.9139 (6.9139)	Prec@1 0.000 (0.000)	Prec@5 0.000 (0.000)
ERROR: Unexpected bus error encountered in worker. This might be caused by insufficient shared memory (shm).
Traceback (most recent call last):
  File "main-timing.py", line 333, in 
    main()
  File "main-timing.py", line 166, in main
    train(train_loader, model, criterion, optimizer, epoch)
  File "main-timing.py", line 205, in train
    for i, (input, target) in enumerate(train_loader):
  File "/opt/conda/lib/python2.7/site-packages/torch/utils/data/dataloader.py", line 280, in __next__
    idx, batch = self._get_batch()
  File "/opt/conda/lib/python2.7/site-packages/torch/utils/data/dataloader.py", line 259, in _get_batch
    return self.data_queue.get()
  File "/opt/conda/lib/python2.7/Queue.py", line 168, in get
    self.not_empty.wait()
  File "/opt/conda/lib/python2.7/threading.py", line 340, in wait
    waiter.acquire()
  File "/opt/conda/lib/python2.7/site-packages/torch/utils/data/dataloader.py", line 178, in handler
    _error_if_any_worker_fails()
RuntimeError: DataLoader worker (pid 170) is killed by signal: Bus error.

 

Next, we changed the set num_workers to 0 and ran the following command:

python main_jpg-timing.py --epochs 20 --workers 0 --batch-size 64 /mnt/oxford-flowers

 

This time the job ran to completion. The instrumented code main_jpg-timing.py is simply main.py with hooks to get time for each step and can be downloaded here: https://github.ibm.com/dl-res/autolauncher/blob/master/main_jpg-timing.py.

The table below shows total execution time, average epoch_time, average batch_time, data_loading, and average GPU utilization for each run. Also shown are the top-1 and top-5 accuracy after each training epoch. The accuracy is calculated on the validation set which also has 1020 images (102 classes with 10 images each). Note that total_execution_time can be approximated as:

total_execution_time = model_loading + data_preparation + average_epoch_time * number_epochs

 

So, we can only run 0 workers configuration with this dataset. The average GPU utilization is extremely low (<2%) and the job runtime is 3033.43 sec. The code supports multiprocessing during data loading part when workers > 0. On checking the shared memory of the pod, it turned out to be only 64M (run df -h inside the pod). We next increased the shared memory of the pod by adding:

spec:
 volumes:
 - name: shm
   emptyDir:
     medium: Memory
 containers:
 - image: pytorch/pytorch:0.4.1-cuda9-cudnn7-devel
   volumeMounts:
     - mountPath: /dev/shm
       name: shm

 

in the pod deployment YAML (with shared memory mount) and deployed the pod using kubectl create.

Now we can run the job with multiple workers. The table below summarizes the results:

batch size = 64, epochs = 20, number of batches = 16
shm workers total_exec_time av_epoch_time av_epoch_time*num_epochs av_gpu_util Prec@1 Prec@5
64M 0 3033.43 150.66 3013.2 1.52% 28.33 58.43
32G 0 2917.70 142.09 2841.8 1.46% 23.24 52.75
32G 1 2334.081 115.26 2305.2 1.88% 25.69 53.73
32G 2 1340.48 63.31 1266.2 4.29% 24.61 52.84
32G 4 688.24 30.80 616 7.46% 22.059 51.471
32G 8 432.95 20.62 412.4 12.51% 24.71 56.57
32G 16 307.96 14.47 289.4 12.45% 22.255 51.373
32G 32 269.60 12.40 248 15.30% 25.392 54.020
32G 64 358.82 16.78 335.6 10.10% 25.980 55.980

 

As we increase the number of worker threads the av_epoch_time gets smaller and hence the job total_exec_time. For example, running with 32 workers, we see a 91% drop (2917.70 to 269.6) in job execution time (compared to running with 0 workers) and 10.5x increase in average GPU utilization (1.46% to 15.30%).

Next we look at average epoch time. Observe that:

average_epoch_time = average_train_time + average_test_time
average_train_time = average_train_batch_time * number_train_batches_per_epoch
average_test_time = average_test_batch_time * number_test_batches_per_epoch
train_batch_time = train_data_loading_time + train_compute_time
test_batch_time = test_data_loading_time + test_compute_time

 

The table below shows the decomposition of average_epoch_time into average_train_time and average_test_time. Both train and test time are equally affected by the increase in workers. While the compute time (train_compute_time and test_compute_time) is not affected by an increase in workers, the data loading time in both train and test phase significantly reduces with more workers.

shared memory = 32G, batch size = 64, epochs = 20, number of batches = 16
workers av_train_time av_test_time av_train_data_load_time av_train_compute_time data_load_overhead
0 67.898 74.196 4.374 0.061 98.6%
1 56.831 58.426 3.444 0.041 98.8%
2 29.676 33.635 2.029 0.040 98.07%
4 15.395 15.409 0.961 0.075 92.8%
8 9.022 11.602 0.659 0.075 89.8%
16 9.468 5.005 0.430 0.086 83.3%
32 6.586 5.809 0.449 0.087 83.8%
64 7.106 9.677 0.598 0.076

 

Note that these numbers are all from one run for each configuration. For more accurate estimates of time one needs to run several jobs (for each configuration) and average out the time.

Performance using HDF5 dataset

We next converted our JPEG dataset into an HDF5 file. The code to prepare an HDF5 file from JPEG files is here:

with h5py.File('dataset.hdf5', 'w') as hdf5_file:
...
hdf5_dataset = hdf5_file.create_dataset(
                   name=str(current_name),
                   data=image,
                   shape=(image_height, image_width, image_channels),
                   maxshape=(image_height, image_width, image_channels),
                   compression="gzip",
                   compression_opts=9
               )

 

For PyTorch to be able to load data from an HDF5 file, a new subclass of the Dataset class, which can return images in HDF5 file as tensors, is needed. The code for this can be found in main-hdf5-timining.py which is available here: https://github.ibm.com/dl-res/autolauncher/blob/master/main_hdf5-timing.py.

We first ran with default shared memory settings for 0 workers:

python main_hdf5-timing.py --epochs 20 --workers 0 --batch-size 64 /mnt/oxford-flowers

 

This time the job ran to completion.

Next when we try to run with workers > 0, the job again crashed with same insufficient shared memory (shm) error as we got before with the JPEG dataset. Again, we deployed a pod with increased shared memory and ran jobs with multiple workers. The jobs ran fine for small number of workers (0,1). The table below shows the results from these runs.

shm workers total_exec_time av_epoch_time av_epoch_time*num_epochs av_gpu_util Prec@1 Prec@5
64M 0 107.560 4.936 98.72 35.88% 21.67 51.96
126G 0 108.226 4.944 98.88 26.46% 22.35 50.39
126G 1 83.5 3.688 73.76 41.46% 26.47 52.65

 

shared memory = 126G, batch size = 64, epochs = 20, number of batches = 16

 

workers av_train_time av_test_time av_train_data_load_time av_train_compute_time data_load_overhead
0 2.66 2.284 0.120 0.040 75%
1 1.880 1.808 0.074 0.041 64.35%

 

Conclusion

The performance of training jobs depends on the storage and the file format. We found that with COS, datasets in a single HDF5 file perform much better (high GPU utilization and reduced runtime) compared to datasets consisting of multiple small JPEG files. The performance should further improve if we can run multiple workers with HDF5 files which we are currently investigating. The goal is to prevent GPU starvation (achieve greater than 95% utilization) by increasing the number of workers. The findings in this study are based on the Oxford flowers dataset but are also applicable to larger image datasets and most likely other modalities as well.