I’m trying to train ConvLSTM model in a distributed environment. I’m using two nodes with the same number of GPUs. However, I faced an error during the training phase. Please, help me to solve it.
Error:
Reproducible example:
import pandas as pd
import mxnet as mx
from mxnet import gluon, autograd, nd
from mxnet.gluon import nn, rnn
from mxnet.gluon.data import ArrayDataset, DataLoader
from mxnet import kv, nd
import time
import numpy as np
rows=14
columns=14
batch_size=32*2
dropout=0.2
lr=0.001
n_out=1
epochs=1
q=3
horizon=36
x_train = np.random.random((300,1,q,rows,columns)).astype(np.float32)
y_train = np.random.random((300,1)).astype(np.float32)
print(x_train.shape, y_train.shape)
print(type(x_train), type(y_train))
store = kv.create('dist')
print("Total number of workers: %d" % store.num_workers)
print("This worker's rank: %d" % store.rank)
class SplitSampler(gluon.data.sampler.Sampler):
def __init__(self, length, num_parts=1, part_index=0):
self.part_len = length // num_parts
self.start = self.part_len * part_index
self.end = self.start + self.part_len
def __iter__(self):
indices = list(range(self.start, self.end))
return iter(indices)
def __len__(self):
return self.part_len
train_iter = gluon.data.DataLoader(gluon.data.ArrayDataset(x_train, y_train), batch_size=batch_size, sampler=SplitSampler(len(x_train), store.num_workers, store.rank))
print(type(train_iter))
class Net(gluon.HybridBlock):
def __init__(self, **kwargs):
super(Net, self).__init__(**kwargs)
with self.name_scope():
self.encoder = gluon.rnn.HybridSequentialRNNCell()
self.encoder.add(mx.gluon.contrib.rnn.Conv3DLSTMCell(input_shape=(1,q, rows, columns),hidden_channels=64, activation='relu',i2h_kernel=(5,5,5),i2h_pad=(2,2,2), h2h_kernel=(5,5,5),conv_layout='NCDHW'))
self.encoder.add(mx.gluon.contrib.rnn.Conv3DLSTMCell(input_shape=(64,q, rows, columns),hidden_channels=32, activation='relu',i2h_kernel=(3,3,3),i2h_pad=(1,1,1), h2h_kernel=(3,3,3),conv_layout='NCDHW'))
self.middle = gluon.nn.HybridSequential()
self.middle.add(mx.gluon.nn.BatchNorm())
self.middle.add(mx.gluon.nn.Dropout(dropout))
self.decoder = gluon.nn.HybridSequential()
self.decoder.add(mx.gluon.nn.Dense(10,flatten=True))
self.decoder.add(mx.gluon.nn.Dense(1))
def hybrid_forward(self, F, x,states=None):
x,states= self.encoder(x, states)
x= self.middle(x)
x = self.decoder(x)
return x,states
def evaluate_accuracy(model, dataloader):
eval_metrics_1 = mx.metric.MAE()
eval_metrics_2 = mx.metric.MSE()
eval_metrics_3 = mx.metric.RMSE()
eval_metrics = mx.metric.CompositeEvalMetric()
for child_metric in [eval_metrics_1, eval_metrics_2, eval_metrics_3]:
eval_metrics.add(child_metric)
for i, (data, label) in enumerate(dataloader):
data = data.as_in_context(ctx)
label = label.as_in_context(ctx)
states =model.encoder.begin_state( batch_size=data.shape[0],ctx=ctx)
preds,states=model(data,states)
eval_metrics.update(labels=label, preds=preds)
return eval_metrics.get()
def fit(model):
print('Running on {}'.format(ctx))
for e in range(epochs):
tick = time.time()
for batch in train_iter:
data=batch[0]
label=batch[1]
data=data.as_in_context(ctx)
label=label.as_in_context(ctx)
states =net.encoder.begin_state( batch_size=data.shape[0],ctx=ctx)
with autograd.record():
losses = [loss1(model(data,states), label)]
for l in losses:
l.backward()
trainer.step(batch_size)
nd.waitall()
print('Epoch %s, training time = %.2f sec'%(e, time.time()-tick))
weight = net.collect_params()
print(weight)
return model
ctx = mx.gpu()
net=Net()
net.collect_params().initialize(mx.init.Xavier(), ctx=ctx)
loss1 = mx.gluon.loss.L1Loss()
trainer = mx.gluon.Trainer(net.collect_params(), 'adam', {'learning_rate': lr},kvstore=store,update_on_kvstore=True)
print(" Loss before training: ", evaluate_accuracy(net, train_iter))
net=fit(net)