Cuda malloc when going distributed

python

#1

Dear all,

I have a model for semantic segmentation that I can successfully train in parallel in a single node using a batch size of 32 (8 input images per GPU, 4 GPUs in total, P100-SXM2). When going distributed, based on the example found in the tutorials I cannot use a batch size larger than 8 (2 images/GPU). If I do, I get cuda malloc error. The command I use to launch the distributed training is:

python /SomeDir/Software/mxnet/tools/launch.py  -n $(wc -l < workers_ip.txt) -s  $(wc -l < workers_ip.txt)  -H workers_ip.txt   --sync-dst-dir /SomeDir/  --launcher ssh  "python main.py"

I assume this behavior has to do with communication of results between nodes (hence the increased memory footprint), and perhaps the number (or ratio?) of workers and servers? Any ideas if it is possible to fix this?

Thank you very much for your time,
Foivos


#2

Hi @feevos,

What kvstore are you using in these cases? Just want to confirm this is a fair comparison.

Also make sure you’ve got some form of blocking operation in your training loop. Something like a logging a metric would do the trick. Otherwise the next few batches of data could be sent to the GPU before the first batch has completed. Sometimes this is good for performance, but for it’s not what you want for memory constrained training.


#3

Thank you for your reply @thomelane.

I am using store = kv.create('dist'). I added a checkpoint every update iteration (printing out training loss basically, is this enough?). I don’t know if it makes a difference, but this is the strategy I follow in my code:

  1. I declare a global variable store = kv.create('dist')
  2. I create a dictionary (that takes as input the above global variable) of parameters that includes all parameters as well as some critical modules as members, like the data generator (gluon.data.DataLoader), the network etc.
  3. This dictionary is then fed in some class that is responsible for training/checkpointing and so on. It works great in single node and I can use it as well to train in multiple nodes if I reduce the batch size. The training consumes all GPUs per node. The dictionary prints some log messages on screen (like depth below, run ID, software version etc). The network I use has ~160M params.

Thank you very much for your time.

PS The error message I get is this, when I use batch size that runs on single node:

dia021@bracewell-login:~/Projects/isprs_potsdam/Run_large_batch_TanimotoDual_MTSK/B256_dist> cat slurm-13527256.out 
2019-01-15 14:32:48,435 INFO rsync /home/dia021/Projects/isprs_potsdam/Run_large_batch_TanimotoDual_MTSK/B256_dist/ -> 10.141.1.69:/home/dia021/Projects/isprs_potsdam/Run_large_batch_TanimotoDual_MTSK/B256_dist
2019-01-15 14:32:48,435 INFO rsync /home/dia021/Projects/isprs_potsdam/Run_large_batch_TanimotoDual_MTSK/B256_dist/ -> 10.141.1.70:/home/dia021/Projects/isprs_potsdam/Run_large_batch_TanimotoDual_MTSK/B256_dist
/data/dia021/Software/anaconda3/lib/python3.6/site-packages/sklearn/externals/joblib/externals/cloudpickle/cloudpickle.py:47: DeprecationWarning: the imp module is deprecated in favour of importlib; see the module's documentation for alternative uses
  import imp
depth:= 0, nfilters: 32
depth:= 1, nfilters: 64
depth:= 2, nfilters: 128
depth:= 3, nfilters: 256
depth:= 4, nfilters: 512
depth:= 5, nfilters: 1024
depth:= 6, nfilters: 2048
depth:= 7, nfilters: 1024
depth:= 8, nfilters: 512
depth:= 9, nfilters: 256
depth:= 10, nfilters: 128
depth:= 11, nfilters: 64
depth:= 12, nfilters: 32
/data/dia021/Software/anaconda3/lib/python3.6/site-packages/sklearn/externals/joblib/externals/cloudpickle/cloudpickle.py:47: DeprecationWarning: the imp module is deprecated in favour of importlib; see the module's documentation for alternative uses
  import imp
depth:= 0, nfilters: 32
depth:= 1, nfilters: 64
depth:= 2, nfilters: 128
depth:= 3, nfilters: 256
depth:= 4, nfilters: 512
depth:= 5, nfilters: 1024
depth:= 6, nfilters: 2048
depth:= 7, nfilters: 1024
depth:= 8, nfilters: 512
depth:= 9, nfilters: 256
depth:= 10, nfilters: 128
depth:= 11, nfilters: 64
depth:= 12, nfilters: 32
/data/dia021/Software/anaconda3/lib/python3.6/site-packages/sklearn/externals/joblib/externals/cloudpickle/cloudpickle.py:47: DeprecationWarning: the imp module is deprecated in favour of importlib; see the module's documentation for alternative uses
  import imp
[14:33:03] src/operator/nn/./cudnn/./cudnn_algoreg-inl.h:97: Running performance tests to find the best convolution algorithm, this can take a while... (setting env variable MXNET_CUDNN_AUTOTUNE_DEFAULT to 0 to disable)
[14:33:03] src/operator/nn/./cudnn/./cudnn_algoreg-inl.h:97: Running performance tests to find the best convolution algorithm, this can take a while... (setting env variable MXNET_CUDNN_AUTOTUNE_DEFAULT to 0 to disable)
[14:33:15] src/operator/nn/./cudnn/./cudnn_algoreg-inl.h:97: Running performance tests to find the best convolution algorithm, this can take a while... (setting env variable MXNET_CUDNN_AUTOTUNE_DEFAULT to 0 to disable)
[14:33:15] src/operator/nn/./cudnn/./cudnn_algoreg-inl.h:97: Running performance tests to find the best convolution algorithm, this can take a while... (setting env variable MXNET_CUDNN_AUTOTUNE_DEFAULT to 0 to disable)
[14:35:01] src/operator/nn/./cudnn/./cudnn_algoreg-inl.h:97: Running performance tests to find the best convolution algorithm, this can take a while... (setting env variable MXNET_CUDNN_AUTOTUNE_DEFAULT to 0 to disable)
[14:35:02] src/operator/nn/./cudnn/./cudnn_algoreg-inl.h:97: Running performance tests to find the best convolution algorithm, this can take a while... (setting env variable MXNET_CUDNN_AUTOTUNE_DEFAULT to 0 to disable)
------------------------------------------
DL-RunID:= adbe870a-9fc8-44cb-83ee-34e4a4051fb4
------------------------------------------
------------------------------------------
mxnet version:= 1.5.0
------------------------------------------
------------------------------------------
phaino git version:= 451d900eebf383d334943b4cb3ee616a4f7f852b
------------------------------------------
------------------------------------------
Available GPUs: [gpu(0), gpu(1), gpu(2), gpu(3)]
------------------------------------------
0.5786752118305727
Traceback (most recent call last):
  File "main.py", line 26, in <module>
    trainer.train()
  File "/data/dia021/Software/phaino/utils/agoge_base.py", line 316, in train
    avgLoss = self.step_epoch(e,lr_policy)
  File "/data/dia021/Software/phaino/utils/agoge_base.py", line 271, in step_epoch
    avgLoss = self.step_batch(i, data,label)
  File "/data/dia021/Software/phaino/utils/agoge_base.py", line 225, in step_batch
    losses = self._forward_backward_step(tdata,tlabel, _trainer)
  File "/data/dia021/Software/phaino/utils/agoge_base.py", line 200, in _forward_backward_step
    self.trainer.step(self.config[C.C_BATCH_SIZE] * self.config[C.C_UPDT_DELAY_RATE])  # _data are from split_and_load, therefore, the 2nd axis is the batch size. 
  File "/data/dia021/Software/mxnet/gluon/trainer.py", line 325, in step
    self._init_params()
  File "/data/dia021/Software/mxnet/gluon/trainer.py", line 152, in _init_params
    self._kvstore.init(idx, param_arrays[0])
  File "/data/dia021/Software/mxnet/kvstore.py", line 158, in init
    check_call(_LIB.MXKVStoreInit(self.handle, mx_uint(len(ckeys)), ckeys, cvals))
  File "/data/dia021/Software/mxnet/base.py", line 252, in check_call
    raise MXNetError(py_str(_LIB.MXGetLastError()))
mxnet.base.MXNetError: [14:35:04] src/storage/./pinned_memory_storage.h:61: Check failed: e == cudaSuccess || e == cudaErrorCudartUnloading CUDA: out of memory

Stack trace returned 10 entries:
[bt] (0) /data/dia021/Software/mxnet/libmxnet.so(+0x405bea) [0x2aaab10c4bea]
[bt] (1) /data/dia021/Software/mxnet/libmxnet.so(+0x406201) [0x2aaab10c5201]
[bt] (2) /data/dia021/Software/mxnet/libmxnet.so(+0x33c808f) [0x2aaab408708f]
[bt] (3) /data/dia021/Software/mxnet/libmxnet.so(+0x33c813c) [0x2aaab408713c]
[bt] (4) /data/dia021/Software/mxnet/libmxnet.so(+0x33ce61f) [0x2aaab408d61f]
[bt] (5) /data/dia021/Software/mxnet/libmxnet.so(void mxnet::CopyFromToDnsImpl<mshadow::gpu, mshadow::cpu>(mxnet::NDArray const&, mxnet::NDArray const&, mxnet::RunContext)+0x33a) [0x2aaab3b4994a]
[bt] (6) /data/dia021/Software/mxnet/libmxnet.so(void mxnet::CopyFromToImpl<mshadow::gpu, mshadow::cpu>(mxnet::NDArray const&, mxnet::NDArray const&, mxnet::RunContext, std::vector<mxnet::Resource, std::allocator<mxnet::Resource> > const&)+0x45d) [0x2aaab3b63a6d]
[bt] (7) /data/dia021/Software/mxnet/libmxnet.so(+0x2ea4b8b) [0x2aaab3b63b8b]
[bt] (8) /data/dia021/Software/mxnet/libmxnet.so(+0x2c78304) [0x2aaab3937304]
[bt] (9) /data/dia021/Software/mxnet/libmxnet.so(+0x2c7f183) [0x2aaab393e183]


terminate called without an active exception
bash: line 1:  9245 Aborted                 (core dumped) python main.py
Exception in thread Thread-8:
Traceback (most recent call last):
  File "/data/dia021/Software/anaconda3/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/data/dia021/Software/anaconda3/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/data/dia021/Software/dmlc_tracker/ssh.py", line 62, in run
    subprocess.check_call(prog, shell = True)
  File "/data/dia021/Software/anaconda3/lib/python3.6/subprocess.py", line 291, in check_call
    raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command 'ssh -o StrictHostKeyChecking=no 10.141.1.70 -p 22 'export OMP_NUM_THREADS=1; export LD_LIBRARY_PATH=/apps/cuda/9.2.88/lib64:/apps/cuda/9.2.88/extras/CUPTI/lib64:/apps/intel/fc/16.0.4.258/compiler/lib/intel64:/apps/intel/cc/16.0.4.258/tbb/lib/intel64/gcc4.4:/apps/intel/cc/16.0.4.258/compiler/lib/intel64:/opt/intel/mic/coi/device-linux-release/lib:/opt/intel/mic/myo/lib:/cm/local/apps/cuda-driver/libs/current/lib64:/cm/shared/apps/slurm/17.11.8/lib64; export DMLC_NUM_WORKER=2; export DMLC_NUM_SERVER=2; export DMLC_PS_ROOT_URI=10.141.1.69; export DMLC_PS_ROOT_PORT=9091; export DMLC_ROLE=worker; export DMLC_NODE_HOST=10.141.1.70; cd /home/dia021/Projects/isprs_potsdam/Run_large_batch_TanimotoDual_MTSK/B256_dist; python main.py'' returned non-zero exit status 134.


#4

Regarding logging, what you mention is sufficient to block the training iteration until complete before starting the next one.

Regarding the memory usage, if you were using a ‘device’ strategy, then I could understand more memory usage on the GPUs as the aggregation is done on GPU rather than CPU, but you’re using ‘dist’ (which is ‘dist_sync’) which means the aggregation is done on the CPU.

Just to confirm, when using dist_sync, the batch size means batch size per node. Is it possible that the batch size wasn’t configured to take this into account. E.g. the batch size is “halving” on two nodes (say) and each node is simply getting half as much data or something similar because the batch size wasn’t multiplied by 2?

dist_sync : Behaves similarly to local but with one major difference. With dist_sync , batch-size now means the batch size used on each machine. So if there are n machines and we use batch size b then dist_sync behaves like local with batch size n * b .

What modifications did you make to the example indicated?

Thanks!
Vishaal


#5

Hi @VishaalKapoor, thank you very much for your answer. So I just created a new version of my code, that I built explicitely on top of cifar10 distributed example. It runs on a single node, but when I go distributed everything hangs. For this debugging I am using a batch_size_per_gpu = 4, and a total batch_size = 4*4=16. The batch size for the validation loss is 4, i.e. the batch_size_per_gpu. The code runs successfully (single node) also for batch_size_per_gpu=6

This is my code, it defers from the original in the following ways: different network called from external library (works great, debugged etc), different dataset (derived from gluon dataset), different transformations of the dataset and normalization, custom loss function (works great, debugged etc). The following code runs successfully in a single node (local HPC), with 4xP100 gpus, with either batch size per gpu of 4 or 6. With the same batch size (per gpu) of 4 or 6 in distributed training returns cuda malloc error. In addition, if I use batch_size_per_gpu = 2, the code hangs, meaning that the gpus are not working - although there is memory ocupied. This is what I see when the code hangs:


image

This is the code:

#!/usr/bin/env python

from __future__ import print_function
import random, sys

import mxnet as mx
from mxnet import autograd, gluon, kv, nd
from mxnet.gluon.model_zoo import vision

import numpy as np


# @@@@@@@@@@@@@@@@ Foivos @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
from phaino.nn.loss.loss import *
from phaino.models.resunet_d7_causal_mtskcolor_ddist import *
import os
import sys
sys.path.append(r'/home/dia021/Projects/isprs_potsdam/src/')
import uuid 
import pathlib 
from ISPRSDataset import *
from ISPRSNormal import *
from phaino.datagen.semseg_aug_cv2 import * # dataset augmentor 
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@


# Create a distributed key-value store
store = kv.create('dist') ## ORIGINAL FOR DISTRIBUTED 
#store = kv.create('device') ## FOR SINGLE MACHINE 

# @@@@@@@@@@@@@@@@@@ Foivos @@@@@@@@@@@@@@@@@@@@@@@@@@
NClasses = 6
nfilters_init = 32
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@

# How many epochs to run the training
epochs = 4
# batch_per_gpu
batch_size_per_gpu = 2 # 4 works - single machine, 6 works after calculating some scalar on cpu 
# How many GPUs per machine
gpus_per_machine = 4
# Effective batch size across all GPUs - single machine 
batch_size = batch_size_per_gpu * gpus_per_machine
# Number of cpus:
num_cpus = 8


# Create the context (a list of all GPUs to be used for training)
ctx = [mx.gpu(i) for i in range(gpus_per_machine)]



class SplitSampler(gluon.data.sampler.Sampler):
    """ Split the dataset into `num_parts` parts and sample from the part with index `part_index`

    Parameters
    ----------
    length: int
      Number of examples in the dataset
    num_parts: int
      Partition the data into multiple parts
    part_index: int
      The index of the part to read from
    """
    def __init__(self, length, num_parts=1, part_index=0):
        # Compute the length of each partition
        self.part_len = length // num_parts
        # Compute the start index for this partition
        self.start = self.part_len * part_index
        # Compute the end index for this partition
        self.end = self.start + self.part_len

    def __iter__(self):
        # Extract examples between `start` and `end`, shuffle and return them.
        indices = list(range(self.start, self.end))
        random.shuffle(indices)
        return iter(indices)

    def __len__(self):
        return self.part_len


# @@@@@@@@@@@@@@@@@ Foivos Datasets/DataGens @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
params_range = ParamsRange() # Parameters range for CV data augmentation 
tnorm =  ISPRSNormal()
ttransform   = SemSegAugmentor_CV(params_range = params_range, norm = None, one_hot = True) #Default for semseg problems 
data_dir =  r'/data/dia021/isprs_potsdam/Data/'
RUN_ID =  str(uuid.uuid4())
dataset_train  = ISPRSDataset(root=  data_dir, 
                              mode = 'train' , 
                              mtsk = True,
                              color = True,
                              norm= tnorm, 
                              transform= ttransform )

dataset_val = ISPRSDataset(root= data_dir, 
                           mode = 'val', 
                           mtsk = True,
                           color = True,
                           norm= tnorm, 
                           transform=None) 

# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@


# Load the training data
train_data = gluon.data.DataLoader(dataset_train,
                                   batch_size,
                                   sampler=SplitSampler(len(dataset_train), store.num_workers, store.rank),
                                   last_batch='discard',
                                   num_workers = num_cpus)

# Load the test data 
test_data = gluon.data.DataLoader(dataset_val,
                                  batch_size_per_gpu, 
                                  shuffle=False,
                                  last_batch='discard',
                                  num_workers = num_cpus)

# Use ResNet from model zoo
net = ResUNet_d7(_nfilters_init = nfilters_init, _NClasses= NClasses)

# Initialize the parameters with Xavier initializer
net.collect_params().initialize(mx.init.Xavier(), ctx=ctx)
# ########################### Foivos Loss function 
TrainLoss = Tanimoto_wth_dual()
def val_loss (pred, label):
    return 1. - TrainLoss(pred,label)


def loss_Tnmt(_prediction,_label):
    """
    The output of the loss is a tensor of dimensions (_label.shape[0],)
    """
    skip=6

    pred_segm  = _prediction[0]
    pred_bound = _prediction[1]
    pred_dists = _prediction[2]

     
    pred_c = _prediction[3]  

        
    label_segm  = _label[:,:skip,:,:]
    label_bound = _label[:,skip:2*skip,:,:]
    label_dists = _label[:,2*skip:-3,:,:]

     
    label_c = _label[:,-3:,:,:]



    loss_segm = 1.0  - TrainLoss(pred_segm,   label_segm)
    loss_bound = 1.0 - TrainLoss(pred_bound, label_bound)
    loss_dists = 1.0 - TrainLoss(pred_dists, label_dists)   #self.loss2(pred_dists,label_dists) 

    loss_c = 1.0 - TrainLoss(pred_c, label_c)

     
    return (loss_segm+loss_bound+loss_dists+loss_c)/4.0



# Use Adam optimizer. Ask trainer to use the distributer kv store.
trainer = gluon.Trainer(net.collect_params(), 'adam', {'learning_rate': .001}, kvstore=store)

# @@@@@@@@@@@@@@@@@@ Foivos @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
def eval_mcc(data_iterator, net):
    skip = 6
    mcc = mx.metric.MCC(average='micro')
    avgLoss = 0.0
    for i, data in enumerate(data_iterator):
        d, l = data
        d = nd.array(d)
        l = nd.array(l)
        l = l[:,:skip,:,:]

        # Use single GPU for validation
        data = d.as_in_context(ctx[0])
        label = l.as_in_context(ctx[0])
        with autograd.predict_mode():
            prediction, _ , _ , _ = net(data)
            tloss = val_loss(prediction, label)
        avgLoss += nd.mean(tloss).asscalar()


        if (l.shape[1] == prediction.shape[1]):
            tl = nd.reshape(data=nd.flatten(l),shape=(-3))
        else:
            tl = nd.transpose(nd.one_hot(nd.squeeze(l),depth = NClasses ),axes=[0,3,1,2])
            tl = nd.reshape(data=nd.flatten(l),shape=(-3))
        tp = nd.reshape(data=nd.flatten(prediction.as_in_context(mx.cpu())),shape=(-3))
        tp_2D = nd.stack( 1.-tp , tp, axis=-1)
        
        mcc.update(labels = tl, preds = tp_2D)
    
    avgLoss /= (i+1.0)

    return avgLoss, mcc.get()[1]
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@






# We'll use cross entropy loss since we are doing multiclass classification
loss = loss_Tnmt # gluon.loss.SoftmaxCrossEntropyLoss()



# Run one forward and backward pass on multiple GPUs
def forward_backward(net, data, label):

    # Ask autograd to remember the forward pass
    with autograd.record():
        # Compute the loss on all GPUs
        losses = [loss(net(X), Y) for X, Y in zip(data, label)]

    # Run the backward pass (calculate gradients) on all GPUs
    tsum = 0.0
    for l in losses:
        l.backward()
        # Calculate mean loss, only to avoid memory flooding 
        tsum += nd.mean(l.as_in_context(mx.cpu())).asscalar()

    #return tsum / float(len(losses))

# Train a batch using multiple GPUs
def train_batch(batch, ctx, net, trainer):

    # Split and load data into multiple GPUs
    data = batch[0]
    data = gluon.utils.split_and_load(data, ctx)

    # Split and load label into multiple GPUs
    label = batch[1]
    label = gluon.utils.split_and_load(label, ctx)

    # Run the forward and backward pass
    forward_backward(net, data, label)

    # Update the parameters
    this_batch_size = batch[0].shape[0]
    trainer.step(this_batch_size)



# Run as many epochs as required
print ("Initiating training, RUN-ID:{}".format(RUN_ID))

#test_accuracy = evaluate_accuracy(test_data, net)
#test_accuracy, test_loss = eval_mcc(test_data, net)
#print("First test evaluation, Test_mcc %f: test_Tnmt: %f" % (test_accuracy,test_loss))
#sys.stdout.flush()
for epoch in range(epochs):


    # Iterate through batches and run training using multiple GPUs
    for batch in train_data:
        # Train the batch using multiple GPUs
        train_batch(batch, ctx, net, trainer)


    # Print test accuracy after every epoch
    #test_accuracy = evaluate_accuracy(test_data, net)
    test_accuracy, test_loss = eval_mcc(test_data, net)
    print("Epoch %d: Test_mcc %f: test_Tnmt: %f" % (epoch, test_accuracy,test_loss))
    sys.stdout.flush()

A similar “hang” of the code happens even in single node if I use the following definition for the average inside the forward_backward function:

# Run one forward and backward pass on multiple GPUs
def forward_backward(net, data, label):

    # Ask autograd to remember the forward pass
    with autograd.record():
        # Compute the loss on all GPUs
        losses = [loss(net(X), Y) for X, Y in zip(data, label)]

    # Run the backward pass (calculate gradients) on all GPUs
    tsum = 0.0
    for l in losses:
        l.backward()
        # Calculate mean loss, only to avoid memory flooding 
        tsum += nd.mean(l.as_in_context(mx.cpu())).asscalar() # This works OK
        #tsum += nd.mean(l).asscalar() # This makes the code hang (gpus are idle), with memory occupied. 

do you think this is a bug in mxnet (version 1.5.0)?

Thank you very much for your time,
Foivos