Distributed training questions


#1

Dear all,

I am getting my hands dirty with asynchronous distributed training. All looks good, and this suggested tutorial is awesome. Based on this, I have few questions, perhaps someone can help.

This function splits the data:

class SplitSampler(gluon.data.sampler.Sampler):
    """ Split the dataset into `num_parts` parts and sample from the part with index `part_index`
    Parameters
    ----------
    length: int
      Number of examples in the dataset
    num_parts: int
      Partition the data into multiple parts
    part_index: int
      The index of the part to read from
    """
    def __init__(self, length, num_parts=1, part_index=0):
        # Compute the length of each partition
        self.part_len = length // num_parts
        # Compute the start index for this partition
        self.start = self.part_len * part_index
        # Compute the end index for this partition
        self.end = self.start + self.part_len

    def __iter__(self):
        # Extract examples between `start` and `end`, shuffle and return them.
        indices = list(range(self.start, self.end))
        random.shuffle(indices)
        return iter(indices)

    def __len__(self):
        return self.part_len
  1. The case where the data cannot be evenly divided with the number of workers is not capture from this function. The DataLoader provides the option last_batch but I don’t know how this works with sampler in the distributed setting. I can always modify the self,start and self.end indices to accurately describe the dataset but I don’t know how this will behave in the distributed setting. For example, from the function above I see that shuffling is always happening in the partition of data that belongs to a specific machine (part_index). So I must always shuffle within the range of indices that belong to a particular worker.

  2. Since this is going to be a multiple machines/multiple gpu training, I am going to use dist_async (and not `dist_device_async’ - is this correct? based on the discussion here)

  3. A general question between modes dist_sync and dist_async: it is my understanding that dist_sync is used as in a single machine training context that uses multiple machines to increase the batch size (and aggregate gradients from all machines). So dist_sync is used when we want to increase the batch size. Now dist_async when used, updates the weights in each worker independently from other machines. So there is no increase in batch size for the gradient evaluation, however due to many machines, the model trains faster (as many more updates, as machines available). Is this correct? Could please someone verify? I am a bit confused based on the guidelines here.

Thank you very much for your time and apologies for silly questions (the inner geek speaks within me :slight_smile: )


#2
  • You can let the last worker read the additional data in the end and set last_batch to rollover. That should work, though I haven’t tried it. But note that when other workers complete an epoch, the last worker wouldn’t have completed the epoch. You need to be careful about that if you are writing some logic at the end of every epoch.

  • There should be no difference in accuracy between dist_device_async and dist_async. In my experience I haven’t seen dist_device_* giving better performance than dist_*, probably because the parameters need to be sent through the ethernet anyway and that cost dominates). I would suggest sticking with ‘dist_async’ or ‘dist_sync’. This has the added advantage of freeing up some space in the GPU since you won’t be using the GPU space for parameter server.

  • That is correct. Note also that in ‘dist_async’ some updates from a worker could get overwritten by other workers since the update is not synchronous. Studies have shown faster convergence when using ‘dist_sync’. I’ve seen ‘dist_sync’ being used more frequently than ‘dist_async’. If it was me, I would use ‘dist_sync’ unless there is a specific reason to use ‘dist_async’ (like maybe I have too many machines and synchronization cost becomes the bottleneck. For small number of machines, this is not the case).


#3

Thank you very much @indu for your reply. Extremely appreciated.

I am doing tests with dist_async, because I want to keep the batch size small. LeCun suggests:

image

and this is why I want to experiment with small batch size first. It seems that the DCSAG is the best available optimizer for this at the moment.

Again, many thanks!
Foivos


#4

Hi again, I have some more questions, as I am trying to make this run. I am working on an HPC cluster that is managed by SLURM environment. I have implemented parallel gpu training on a single node, so I can efficiently split and train my data etc. This is well tested. I can also run distributed hyper parameter optimization, using ray for hyperparameter optimization (works great). In this, each node is using 4 GPUs, however these runs are (with regards to mxnet) completely independent.

Now, l am trying to use mxnet/tools/launch.py to start distributed training of my model. I have the following problems/questions.

  1. The scripts I am using to launch my job is the following (currently testing):
#!/bin/bash -l

#SBATCH --job-name="DSTR"
#SBATCH --job-name="DSTR"
#SBATCH -t 00:03:30
#SBATCH --nodes=4
#SBATCH --cpus-per-task=28
#SBATCH --gres=gpu:4
#SBATCH --mem=128gb


./get_nodes_ip.sh > workers_ip.txt


srun python /data/dia021/Software/mxnet/tools/launch.py  -n $(wc -l < workers_ip.txt) -s  $(wc -l < workers_ip.txt) -H workers_ip.txt --sync-dst-dir /home/dia021/Projects/isprs_potsdam/distributed  --launcher ssh "python main.py"

The script get_nodes_ip.sh is a small script that writes on the ascii file the workers ip address (or node names, as suggested by the tutorial, I tried both). So for example, the contents of workers_ip.txt are

b001
b002
b003
b004

With this I am assigning 4 workers and 4 servers. It is my understanding that with this configuration I will have 4 nodes, each running a single parameter server and a single worker. However when I do that I end up with 16 instances of code running. To make this more clear, each run of main.py that I use to execute my code writes a directory where in there outputs files for monitoring training. I see a variable number of them, not 4 as I expected (sometimes I see 16 = 4x4, others 22 != 4x4). I suspect the problem may be the srun command, but I am not sure. So here is my question: the number of workers -n 4 refers to the total number of workers, not the number of workers per node, is this correct? The same for the servers, -s 4 is the total number of (parameter) servers, right?

  1. If I understand correctly, the tool /mxnet/tools/launch.py is supposed to launch as many processes as there are workers. So in this particular example I will have 4 instances of my code (as I expect), each consuming 4 gpus per worker. So I suspect there is definitely something wrong going on between slurm and launch.py, right?

  2. Following the above, the “print screen” message I get after launching the job until is killed is

2018-07-13 20:12:50,786 INFO rsync /home/dia021/Projects/isprs_potsdam/distributed/ -> b001:/home/dia021/Projects/isprs_potsdam/distributed
2018-07-13 20:12:50,855 INFO rsync /home/dia021/Projects/isprs_potsdam/distributed/ -> b001:/home/dia021/Projects/isprs_potsdam/distributed
2018-07-13 20:12:50,867 INFO rsync /home/dia021/Projects/isprs_potsdam/distributed/ -> b001:/home/dia021/Projects/isprs_potsdam/distributed
2018-07-13 20:12:50,868 INFO rsync /home/dia021/Projects/isprs_potsdam/distributed/ -> b001:/home/dia021/Projects/isprs_potsdam/distributed

what is weird in this message (to me) is that all 4 lines refer to the same node. However this is not always the case (some times I may have a line refering to a different node). Is this normal?

  1. Despite the fact that I do see directories being created, there is no content in them. Normally my script writes a txt file that has all information but now nothing is happening. Any ideas?

  2. Is there a tutorial where I can launch mxnet distributed from within a python environment? Any pointers most appreciated. In addition, could I use another manager for distributed training - say spark or ray - with the same efficiency as dlmc? Or mxnet performance will drop and is best to use dlmc?

Thank you very much for your time.