How to push a sparse data into kv_store?


#1

I tried to push a sparse data into kv_store, but the program reported an error.

Later, I checked the documentation and found that kv_store does not support push sparse arrays.

What should I do if I want to implement this operation?

Ps: I want to implement such a distributed algorithm MemSgd(paper available at https://arxiv.org/abs/1809.07599) ,
Each iteration, the Worker only transmits a sparse gradient to the Server to update the model.


#2

KVStore supports NDArray and RowSparseNDArray. How does your sparse data look like? Can you send a small example so that I can reproduce the error?


#3

It’s CSRNDArray.
KvStore doesn’t support it, but I have solved this problem.
The problem I am facing now is that creating a CSRNDArray is relatively slow.

def update(name, input, stored):
    # the shape of stored is (num_device, keep_num*2)
    # the shape of input is (1, keep_num*2+1)
    # input[0] represent the device id 
    # input[1:keep_num+1] represent the value of sparse gradient
    # input[keep_num+1:] represent the index of sparse gradient
    stored[input[0]] = input[1:]


def _recover_grad_topk(pull_array, keep_num, array_size, recover_shape):
    # the shape of pull_array is (num_device, keep_num*2)
    # pull_array [:keep_num] represent the value of sparse gradient
    # pull_array [keep_num:] represent the index of sparse gradient
    grad_topk_sum = nd.sparse.zeros('csr', shape=(1, array_size), ctx=pull_array.context)
    for grad_topk_array in pull_array:
        grad_topk_saprse = nd.sparse.csr_matrix((grad_topk_array[:keep_num],
                           grad_topk_array[keep_num:].astype('int32'),
                           [0, keep_num]), ctx=grad_topk_array.context)
        grad_topk_sum = grad_topk_sum+grad_topk_saprse
    agg_grad_topk_array = nd.sparse.cast_storage(grad_topk_sum, stype='default').reshape(recover_shape)
    return agg_grad_topk_array

...

kvstore.pull(name, pull_array_list, priority=-index)
for k, p in enumerate(zip(arg_list, pull_array_list)):
    agg_grad_topk_array =  _recover_grad_topk(pull_array_list, keep_num, arg_list.size, arg_list.shape)
    

But the recover process is time consuming