Elastic distributed training usage and examples

Defining your elastic distributing training using FabricModel class with examples.

FabricModel class and train method usage

The FabricModel class is used to perform elastic distributed training given a model, data loader, loss function and optimizer. Parameters of the FabricModel class, include:
  • model: The name of the model that is trained.
  • datasets_function: Tuple of data loading functions. First item in the tuple loads the training data. The second item loads the testing data.
  • loss_function: The loss function to use to train the model.
  • optimizer: The optimizer is used to train the model.
  • driver_logger: LoggerCallback to use the driver.
  • worker_logger: LoggerCallback to use the worker.
Example:
edt_m = FabricModel(model, datasets_function, loss_function, optimizer, driver_logger=custom_logger, worker_logger=custom_logger)
The train method is used in elastic distributed training to train a model. Parameters of the train method, include:
  • epoch_number: The number of epochs to perform during training. Must be an integer.
  • batch_size: The batch size to use per GPU during training. Must be an integer.
  • engines_number: The maximum number of GPUs to use during training. The effective batch size becomes batch_size * engines_number. This parameter cannot be used with the effective_batch_size argument and must be an integer.
Example:
edt_m.train(epoch_number, batch_size, engines_number)

Defining your own driver_logger and worker_logger

You can define a custom logger class and add three methods (log_train_metrics, log_test_metrics and on_train_end) to trace train and test log. These can defined in a custom ELog class which can save train and test data to a log. This class can be called by driver_logger and worker_logger of the FabricModel class and log_train_metrics can be called at the end of each batch train. log_test_metrics can be called at the end of each batch test. on_train_end can be called at the end of train.

Example of a custom logger class:
from __future__ import print_function

import sys
import os

from callbacks import LoggerCallback
from emetrics import EMetrics
from elog import ELog

class customized logger(LoggerCallback):
    def __init__(self):
        self.gs =0

    def log_train_metrics(self, loss, acc, completed_batch,  worker=0):
        acc = acc/100.0
        self.gs += 1
        with EMetrics.open() as em:
            em.record(EMetrics.TEST_GROUP,completed_batch,{'loss': loss, 'accuracy': acc})
        with customized ELog.open() as log:
            log.recordTrain("Train", completed_batch, self.gs, loss, acc, worker)

    def log_test_metrics(self, loss, acc, completed_batch, worker=0):
        acc = acc/100.0
        with customized ELog.open() as log:
            log.recordTest("Test", loss, acc, worker)
   
    def on_train_end(self):
        print("--------END TRAINING: completed %d iterations --------" % self._gs)
Example of a custom ELog class:
import time
import os

class ELog(object):

    def __init__(self,subId,f):
        if "TRAINING_ID" in os.environ:
            self.trainingId = os.environ["TRAINING_ID"]
        elif "DLI_EXECID" in os.environ:
            self.trainingId = os.environ["DLI_EXECID"]
        else:
            self.trainingId = ""
        self.subId = subId
        self.f = f

    def __enter__(self):
        return self

    def __exit__(self, type, value, tb):
        self.close()

    @staticmethod
    def open(subId=None):
        if "LOG_DIR" in os.environ:
            folder = os.environ["LOG_DIR"]
        elif "JOB_STATE_DIR" in os.environ:
            folder = os.path.join(os.environ["JOB_STATE_DIR"],"logs")
        else:
            folder = "/tmp"

        if subId is not None:
            folder = os.path.join(folder, subId)

        if not os.path.exists(folder):
            os.makedirs(folder)

        f = open(os.path.join(folder, "stdout"), "a")
        return ELog(subId,f)

    def recordText(self,text):
        timestr = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        timestr = "["+ timestr + "]"
        if self.f:
            self.f.write(timestr + " " + text + "\n")
            self.f.flush()

    def recordTrain(self,title,iteration,global_steps,loss,accuracy,worker):
        text = title
        text = text + ",        Timestamp: " + str(int(round(time.time() * 1000)))
        text = text + ",        Global steps: " + str(global_steps)
        text = text + ",        Iteration: " + str(iteration)
        text = text + ",        Loss: " + str(float('%.5f' % loss) )
        text = text + ",        Accuracy: " + str(float('%.5f' % accuracy) )
        self.recordText(text)

    def recordTest(self,title,loss,accuracy,worker):
        text = title
        text = text + ",        Timestamp: " + str(int(round(time.time() * 1000)))
        text = text + ",        Loss: " + str(float('%.5f' % loss) )
        text = text + ",        Accuracy: " + str(float('%.5f' % accuracy) )
        self.recordText(text)

    def close(self):
        if self.f:
            self.f.close()