Data parallelism for ConvLSTM

I’m trying to train ConvLSTM model in a distributed environment. I’m using two nodes with the same number of GPUs. However, I faced an error during the training phase. Please, help me to solve it.

Error:


Reproducible example:

import pandas as pd
import mxnet as mx
from mxnet import gluon, autograd, nd
from mxnet.gluon import nn, rnn
from mxnet.gluon.data import ArrayDataset, DataLoader
from mxnet import kv, nd
import time
import numpy as np

rows=14
columns=14
batch_size=32*2
dropout=0.2
lr=0.001
n_out=1
epochs=1
q=3
horizon=36


x_train = np.random.random((300,1,q,rows,columns)).astype(np.float32)
y_train = np.random.random((300,1)).astype(np.float32)
print(x_train.shape, y_train.shape)
print(type(x_train), type(y_train))

store = kv.create('dist')
print("Total number of workers: %d" % store.num_workers)
print("This worker's rank: %d" % store.rank)


class SplitSampler(gluon.data.sampler.Sampler):

    def __init__(self, length, num_parts=1, part_index=0):
        self.part_len = length // num_parts
        self.start = self.part_len * part_index
        self.end = self.start + self.part_len

    def __iter__(self):
        indices = list(range(self.start, self.end))
        return iter(indices)

    def __len__(self):
        return self.part_len


train_iter = gluon.data.DataLoader(gluon.data.ArrayDataset(x_train, y_train), batch_size=batch_size, sampler=SplitSampler(len(x_train), store.num_workers, store.rank))
print(type(train_iter))


class Net(gluon.HybridBlock):

    def __init__(self, **kwargs):
        super(Net, self).__init__(**kwargs)
        with self.name_scope():
            self.encoder = gluon.rnn.HybridSequentialRNNCell()
            self.encoder.add(mx.gluon.contrib.rnn.Conv3DLSTMCell(input_shape=(1,q, rows, columns),hidden_channels=64, activation='relu',i2h_kernel=(5,5,5),i2h_pad=(2,2,2), h2h_kernel=(5,5,5),conv_layout='NCDHW'))
            self.encoder.add(mx.gluon.contrib.rnn.Conv3DLSTMCell(input_shape=(64,q, rows, columns),hidden_channels=32, activation='relu',i2h_kernel=(3,3,3),i2h_pad=(1,1,1), h2h_kernel=(3,3,3),conv_layout='NCDHW'))
            self.middle = gluon.nn.HybridSequential()
            self.middle.add(mx.gluon.nn.BatchNorm())
            self.middle.add(mx.gluon.nn.Dropout(dropout))
            self.decoder =  gluon.nn.HybridSequential()
            self.decoder.add(mx.gluon.nn.Dense(10,flatten=True))
            self.decoder.add(mx.gluon.nn.Dense(1))
    
    def hybrid_forward(self, F, x,states=None):
        x,states= self.encoder(x, states)       
        x= self.middle(x)   
        x = self.decoder(x)          
        return x,states

def evaluate_accuracy(model, dataloader):
    eval_metrics_1 = mx.metric.MAE()
    eval_metrics_2 = mx.metric.MSE()
    eval_metrics_3 = mx.metric.RMSE()
    eval_metrics = mx.metric.CompositeEvalMetric()
    for child_metric in [eval_metrics_1, eval_metrics_2, eval_metrics_3]:
        eval_metrics.add(child_metric)

    for i, (data, label) in enumerate(dataloader):
        data = data.as_in_context(ctx)
        label = label.as_in_context(ctx)
        states =model.encoder.begin_state( batch_size=data.shape[0],ctx=ctx)
        preds,states=model(data,states)

        eval_metrics.update(labels=label, preds=preds)
    return eval_metrics.get()


def fit(model):
    print('Running on {}'.format(ctx))
    for e in range(epochs):
        tick = time.time()
        for batch in train_iter:
            data=batch[0]
            label=batch[1]
            data=data.as_in_context(ctx)
            label=label.as_in_context(ctx)
            states =net.encoder.begin_state( batch_size=data.shape[0],ctx=ctx)
            with autograd.record():
                losses = [loss1(model(data,states), label)]
            for l in losses:
               l.backward() 
            trainer.step(batch_size)
        nd.waitall()
        print('Epoch %s, training time = %.2f sec'%(e, time.time()-tick))
    weight = net.collect_params()
    print(weight)
    return model


ctx = mx.gpu()
net=Net()
net.collect_params().initialize(mx.init.Xavier(), ctx=ctx)
loss1 = mx.gluon.loss.L1Loss()
trainer = mx.gluon.Trainer(net.collect_params(), 'adam', {'learning_rate': lr},kvstore=store,update_on_kvstore=True)
print(" Loss before training:  ", evaluate_accuracy(net, train_iter))
net=fit(net)

This problem is not related to distributed environment per say - you can reproduce it with only 1 machine involved.

The problem is that your model returns a tuple of output and states:

    def hybrid_forward(self, F, x,states=None):
        x,states= self.encoder(x, states)       
        x= self.middle(x)   
        x = self.decoder(x)          
        return x,states

And you pass this as an input to the loss:

       with autograd.record():
            losses = [loss1(model(data,states), label)]

where loss expects the output only.

To fix the problem, just rewrite the code above like below and the error will go away:

        with autograd.record():
            output, state = model(data, states)
            losses = [loss1(output, label)]
1 Like

Thank you so much for your help.

however, is it right to get a different loss on each node?

image

when I use the same environment for LSTM(states are internally managed) model i get same loss values from both of the nodes

Yes, that is normal as there is always randomness involved in Neural Networks. For example, you generate data randomly in your test scenario, and when it runs on different machines the values are going to be different. And in the real life, you use distributed training for processing different chunks of data, so losses are going to be different. Moreover, layers of the NN are also initialized using random number generator, so it also introduces difference in losses.

If you want to have same losses you may want to fix random seeds of mxnet, numpy and python math modules + have same data for both machines:

import mxnet as mx
import numpy as np
import random

mx.random.seed(128)
np.random.seed(128)
random.seed(128)