HeteroGraph Loading in parallel with joblib issue

I wanted to try and load graphs in a parallel manner to speed things up :slight_smile:

i tried to use the code snippet below:

import joblib
from functools import partial
import dgl

def load_graphs_chunk(graphs_idxs, bin_path):
    graphs, labels_dict = dgl.load_graphs(bin_path, graphs_idxs)
    return graphs

# Constants
graphs_count = 411
graphs_load_chunks_size = 5
graph_dataset_path = "/some/path/to/file.bin"

# Extract relevant idxs chunks
graphs_idx_chunks = [list(range(idx, idx + graphs_load_chunks_size)) for idx in range(0, graphs_count, graphs_load_chunks_size)]

# Prepare functions for parallel work
run_func = partial(load_graphs_chunk, bin_path=graph_dataset_path)
jobs = (joblib.delayed(run_func)(graphs_idxs) for graphs_idxs in graphs_idx_chunks)

# Execute parallel jobs
res = joblib.Parallel(n_jobs=20, verbose=10)(jobs)

Where the file in graph_dataset_path is a bin file created with dgl.save_graphs.
But unfortunately, i get an error :frowning: tried with several different bin files with different content and get the same.
My graphs are heterographs - it seem relevant from the stack trace.
Also its seem that the functions did executed and finish because i get the progress messages from joblib:

Using backend: pytorch
[Parallel(n_jobs=20)]: Using backend LokyBackend with 20 concurrent workers.
[Parallel(n_jobs=20)]: Done   1 tasks      | elapsed:    1.0s
[Parallel(n_jobs=20)]: Done  10 tasks      | elapsed:    1.3s
[Parallel(n_jobs=20)]: Done  21 tasks      | elapsed:    1.7s
[Parallel(n_jobs=20)]: Done  32 tasks      | elapsed:    2.0s
[Parallel(n_jobs=20)]: Done  53 out of  83 | elapsed:    2.7s remaining:    1.5s
[Parallel(n_jobs=20)]: Done  62 out of  83 | elapsed:    2.9s remaining:    1.0s
[Parallel(n_jobs=20)]: Done  71 out of  83 | elapsed:    3.3s remaining:    0.6s
[Parallel(n_jobs=20)]: Done  80 out of  83 | elapsed:    3.7s remaining:    0.1s

And after that, I get an exception.

For context - Iā€™m using dgl 0.6.0post1 without CUDA.

Would love for your help :pray:

Here is the thrown exception:

_RemoteTraceback                          Traceback (most recent call last)
Traceback (most recent call last):
  File "anonymized/lib/python3.8/site-packages/joblib/externals/loky/process_executor.py", line 431, in _process_worker
    r = call_item()
  File "anonymized/python3.8/site-packages/joblib/externals/loky/process_executor.py", line 285, in __call__
    return self.fn(*self.args, **self.kwargs)
  File "anonymized/python3.8/site-packages/joblib/_parallel_backends.py", line 595, in __call__
    return self.func(*args, **kwargs)
  File "anonymized/python3.8/site-packages/joblib/parallel.py", line 262, in __call__
    return [func(*args, **kwargs)
  File "anonymized/python3.8/site-packages/joblib/parallel.py", line 262, in <listcomp>
    return [func(*args, **kwargs)
  File "<ipython-input-2-d4e22f518813>", line 6, in load_graphs_chunk
  File "anonymized/python3.8/site-packages/dgl/data/graph_serialize.py", line 182, in load_graphs
    return load_graph_v2(filename, idx_list)
  File "anonymized/python3.8/site-packages/dgl/data/graph_serialize.py", line 192, in load_graph_v2
    heterograph_list = _CAPI_LoadGraphFiles_V2(filename, idx_list)
  File "dgl/_ffi/_cython/./function.pxi", line 287, in dgl._ffi._cy3.core.FunctionBase.__call__
  File "dgl/_ffi/_cython/./function.pxi", line 222, in dgl._ffi._cy3.core.FuncCall
  File "dgl/_ffi/_cython/./function.pxi", line 211, in dgl._ffi._cy3.core.FuncCall3
  File "dgl/_ffi/_cython/./base.pxi", line 155, in dgl._ffi._cy3.core.CALL
dgl._ffi.base.DGLError: [21:01:07] /opt/dgl/src/graph/heterograph.cc:509: Check failed: magicNum == kDGLSerialize_HeteroGraph (3458765063554631570 vs. 15949673719616654015) : Invalid HeteroGraph Data
Stack trace:
  [bt] (0) anonymized/python3.8/site-packages/dgl/libdgl.so(dmlc::LogMessageFatal::~LogMessageFatal()+0x4f) [0x7fa8d00a513f]
  [bt] (1) anonymized/python3.8/site-packages/dgl/libdgl.so(dgl::HeteroGraph::Load(dmlc::Stream*)+0x151) [0x7fa8d07e79c1]
  [bt] (2) anonymized/python3.8/site-packages/dgl/libdgl.so(dgl::serialize::LoadHeteroGraphs(std::string const&, std::vector<unsigned long, std::allocator<unsigned long> >)+0x7ea) [0x7fa8d086d56a]
  [bt] (3) anonymized/python3.8/site-packages/dgl/libdgl.so(+0x9d842f) [0x7fa8d086642f]
  [bt] (4) anonymized/python3.8/site-packages/dgl/libdgl.so(DGLFuncCall+0x48) [0x7fa8d0781d38]
  [bt] (5) anonymized/python3.8/site-packages/dgl/_ffi/_cy3/core.cpython-38-x86_64-linux-gnu.so(+0x15f89) [0x7fa8cf7c7f89]
  [bt] (6) anonymized/python3.8/site-packages/dgl/_ffi/_cy3/core.cpython-38-x86_64-linux-gnu.so(+0x1626b) [0x7fa8cf7c826b]
  [bt] (7) anonymized/python(_PyObject_MakeTpCall+0x22f) [0x55d927b7e85f]
  [bt] (8) anonymized/python(_PyEval_EvalFrameDefault+0x475) [0x55d927c01e35]


The above exception was the direct cause of the following exception:

DGLError                                  Traceback (most recent call last)
<ipython-input-2-d4e22f518813> in <module>
     21 # Execute parallel jobs
---> 22 res = joblib.Parallel(n_jobs=20, verbose=10)(jobs)

anonymized/python3.8/site-packages/joblib/parallel.py in __call__(self, iterable)
   1053             with self._backend.retrieval_context():
-> 1054                 self.retrieve()
   1055             # Make sure that we get a last message telling us we are done
   1056             elapsed_time = time.time() - self._start_time

anonymized/python3.8/site-packages/joblib/parallel.py in retrieve(self)
    931             try:
    932                 if getattr(self._backend, 'supports_timeout', False):
--> 933                     self._output.extend(job.get(timeout=self.timeout))
    934                 else:
    935                     self._output.extend(job.get())

anonymized/python3.8/site-packages/joblib/_parallel_backends.py in wrap_future_result(future, timeout)
    540         AsyncResults.get from multiprocessing."""
    541         try:
--> 542             return future.result(timeout=timeout)
    543         except CfTimeoutError as e:
    544             raise TimeoutError from e

anonymized/python3.8/concurrent/futures/_base.py in result(self, timeout)
    430                 raise CancelledError()
    431             elif self._state == FINISHED:
--> 432                 return self.__get_result()
    434             self._condition.wait(timeout)

anonymized/python3.8/concurrent/futures/_base.py in __get_result(self)


Are you using multi-threading or multiprocessing with joblib?

Hey @VoVAllen,

I tried to use each one of the loky , threading , multiprocessing - and i get the same results :\

@VoVAllen Any ideas what should I try / check?

The error seems the file is corrupted. Can you load it without joblib directly?

Ok, my bad - found my issue.

When I tried to load the data as a whole as you suggested it worked.

Then, I tried to do the same as I did with the joblib but inside a for loop and not with a parallel loading and I discovered that in the graphs_idx_chunks creation I have a bug that Iā€™m trying to load graph with an index bigger than exists in the dataset.
The exception confused me.
Thank you for the help!