Get the parameters every 4 times does not work

#1

The comm_buf_copy[key] is introduced to keep the updated parameters from the server. You can check my last discussion and the figure here is just makeup.
Note: I used the first 600 batches for warming up, and when I pull back the updated parameters every 3 times, the training result converges to the target accuracy. However, when I pull back the parameters every 4 times, the training accuracy decreases to ~0.1, even though the accuracy increases in the warming up phase.
In my view, the accuracy should not decrease after the warming up phase, what’s wrong with the values in comm_buf_copy[key] When I pull the updated parameter every 4 times, the comm_->Broadcast(key, comm_buf_copy[key], grouped_vals[i* *], priority ) operation does not pass the correct values to the arg_list or the values of comm_buf[key] is changed?

#2

Can you share your full code and how you are updating this value pull_firstst_time. Are you sure you push all the parameters every 4 times? I also think there should be an easier way for you to achieve that without modifying the cpp code. For example you could have 2 trainers one with a device kvstore one with a dist kvstore and update the dist kvstore only every n times.

#4

The pull_first_time is increased by 1 after the pull operation, and its initial value is 0. In fact, the parameters are pushed every time, and I chose to pull back the update parameters every 4 times. However, every 3 times is ok and every 4 times makes the training wrong.

        for (size_t i = 0; i < uniq_keys.size(); ++i) {
            int key = uniq_keys[i];
            // use the same array for merging to guarantee that pull always happens
            // after the previous push on this key
            auto& recv_buf = comm_buf_[key];
            auto& recv_buf_copy = comm_buf_copy[key]; // xym edit 4-8

            const auto storage_type = grouped_vals[i][0]->storage_type();
            CHECK_EQ(storage_type, kDefaultStorage)
                    << "Expected stype of value to be kDefaultStorage";
            if (recv_buf.is_none()) {
                // it may happen for the first time a no-rank-0 worker pull the weight.
                recv_buf = NDArray(grouped_vals[i][0]->shape(), pinned_ctx_,
                                   true, grouped_vals[i][0]->dtype());
            }

            if (recv_buf_copy.is_none()) {
                // it may happen for the first time a no-rank-0 worker pull the weight.
                recv_buf_copy = NDArray(grouped_vals[i][0]->shape(), pinned_ctx_,
                                        true, grouped_vals[i][0]->dtype());
            }

            auto pull_from_servers = [this, key, recv_buf](
                    RunContext rctx, Engine::CallbackOnComplete cb) {
                // convert to ps keys
                size_t size = recv_buf.shape().Size();
                const int dtype = recv_buf.dtype();
                const int num_bytes = mshadow::mshadow_sizeof(dtype);
                PSKV& pskv = (gradient_compression_->get_type() == CompressionType::kNone) ?
                             EncodeDefaultKey(key, size, num_bytes) :
                             EncodeCompressedKey(key, size, false, num_bytes);
                char* data = static_cast<char*> (recv_buf.data().dptr_);
                // false means not to delete data when SArray is deleted
                auto vals = new ps::SArray<char>(data, size * num_bytes, false);
                // issue pull
                RequestType mode = (gradient_compression_->get_type() != CompressionType::kNone) ?
                                   RequestType::kCompressedPushPull : RequestType::kDefaultPushPull;
                const int cmd = GetCommandType(mode, dtype);
                CHECK_NOTNULL(ps_worker_)->ZPull(
                        pskv.keys, vals, &pskv.lens, cmd, [vals, cb](){ delete vals; cb(); });
            };

            if(pull_firtst_time/parameter_count < 600) {
                CHECK_NOTNULL(Engine::Get())->PushAsync(
                        pull_from_servers,
                        pinned_ctx_,
                        {},
                        {recv_buf.var()},
                        FnProperty::kNormal,
                        priority,
                        "KVStoreDistDefaultStoragePull");
                CopyFromTo(recv_buf, &recv_buf_copy, priority);
            } else {
                if(pull_firtst_time/parameter_count % 3 == 0) {
                    CHECK_NOTNULL(Engine::Get())->PushAsync(
                            pull_from_servers,
                            pinned_ctx_,
                            {},
                            {recv_buf.var()},
                            FnProperty::kNormal,
                            priority,
                            "KVStoreDistDefaultStoragePull");
                    CopyFromTo(recv_buf, &recv_buf_copy, priority);
                }
            }
            comm_->Broadcast(key, comm_buf_copy[key], grouped_vals[i], priority);
            pull_firtst_time += 1;
        }
    }
#5

Thanks, Thomas.
I’m quite interested in your suggestion about setting 2 trainers, one with a device kvstore and one with a dist kvstore. Where should I modify then? Would you please give me some more details.