Joining multiple mx.recordio.MXIndexedRecordIO files

Dear all,

assuming I have a set of record files with their indices, e.g. train1.rec, train1.idx, train2.rec, train2.idx what is the most efficient way to join them together in a single train.rec, train.idx?

I have a large set of large raster files ~ 300 x (3 x 10k x 10k), and I want to process them (extract training chips for segmentation) in a distributed environment, to create fast a single rec file. I can process in parallel the extraction from a single file (with locks) but then am bound in serially processing all 300 images one by one - for a single rec file (am actually trying to hack this, but not there yet). Any ideas?

Thank you very much for your time,
Foivos

My question relates to question 3141.

Actually, opening multiple files and joining them together is a viable (timewise) option, and much faster than creating them at once ~ 1 minute for 4 large rasters. Something like (needs polishing):

from multiprocessing import Lock
from pathos.pools import ThreadPool as pp
import mxnet as mx
import pickle
import glob

class JoinRecIO(object):
    def __init__(self,prefix_read=r'../temp/'):
        
        
        flname_recs = sorted(glob.glob(prefix_read + r'*/*.rec'))
        flname_idxs = sorted(glob.glob(prefix_read + r'*/*.idx'))
        
        self.nameTuples = list(zip(flname_idxs, flname_recs))
        
        self.record_target = mx.recordio.MXIndexedRecordIO(idx_path=prefix_read+'global.idx', uri=prefix_read +'global.rec', flag='w')
        
        self.lock = Lock()
        self.global_idx = 0
        
        
    def thread_write(self,nthreads=8):
        
        pool = pp(nthreads)
        pool.map(self.write_rec,self.nameTuples)
            
        self.record_target.close()
    
    def write_rec(self, names_idx_rec):
        name1, name2 = names_idx_rec
        record_read = mx.recordio.MXIndexedRecordIO(idx_path=name1, uri=name2, flag='r')

        for key in record_read.keys:
            read_in = record_read.read_idx(key)
            self.lock.acquire()
            self.record_target.write_idx(self.global_idx,read_in)
            self.global_idx += 1
            self.lock.release()

Operation:

myJoin = JoinRecIO()
myJoin.thread_write()