DistNodeDataLoader did not include srcdata/dstdata

Hi, I am currently trying to use DGL’s distributed training. First, following the guidance at 7.1 Data Preprocessing — DGL 2.1.0 documentation, I obtained a distributed graph and it can now be read correctly.

# Load data and build graph
g = dgl.distributed.DistGraph(args.graph_name)
pb = g.get_partition_book()

# Model
model = GCL(fea_dict, model_dict)
model = torch.nn.parallel.DistributedDataParallel(model)

# Neighbor random sampling
neighbor_sampler = dgl.dataloading.NeighborSampler(train_dict.get('n_neighbors'), dgl.distributed.sample_neighbors)

# Data sampling
nids = {k: g.get_ntype_id(k) for k in g.ntypes} 
dataloader = dgl.dataloading.DistNodeDataLoader(
    g, nids, neighbor_sampler, batch_size=train_dict.get('batch_size'), shuffle=True, drop_last=False)
# train
for epoch in range(train_dict.get('n_epochs')):
    loss = 0.
    with model.join():
        for it, (input_nodes, output_nodes, blocks) in enumerate(dataloader):
            # forward pass
            h = model(blocks)

However, I encountered some issues when using DistNodeDataLoader.
The returned blocks object has its srcdata and dstdata as an empty defaultdict. I have not checked edata, but I suspect the situation is the same, as in a distributed graph, they are stored separately.
Is there any way to directly distribute the related ndata and edata into srcdata, dstdata, and edata? Thank you!

Seems copy_ndata/edata is not supported in distributed mode?

Here is the code of dgl.sampling.sample_neighbors .
@BarclayII Will it be fixed in the future? :rofl:

    # handle features
    # (TODO) (BarclayII) DGL distributed fails with bus error, freezes, or other
    # incomprehensible errors with lazy feature copy.
    # So in distributed training context, we fall back to old behavior where we
    # only set the edge IDs.
    if not _dist_training:
        if copy_ndata:
            if fused:
                src_node_ids = [F.from_dgl_nd(src) for src in induced_nodes]
                dst_node_ids = [
                    utils.toindex(
                        nodes.get(ntype, []), g._idtype_str
                    ).tousertensor(ctx=F.to_backend_ctx(g._graph.ctx))
                    for ntype in g.ntypes
                ]
                node_frames = utils.extract_node_subframes_for_block(
                    g, src_node_ids, dst_node_ids
                )
                utils.set_new_frames(ret, node_frames=node_frames)
            else:
                node_frames = utils.extract_node_subframes(g, device)
                utils.set_new_frames(ret, node_frames=node_frames)

        if copy_edata:
            if fused:
                edge_ids = [F.from_dgl_nd(eid) for eid in induced_edges]
                edge_frames = utils.extract_edge_subframes(g, edge_ids)
                utils.set_new_frames(ret, edge_frames=edge_frames)
            else:
                edge_frames = utils.extract_edge_subframes(g, induced_edges)
                utils.set_new_frames(ret, edge_frames=edge_frames)

    else:
        for i, etype in enumerate(ret.canonical_etypes):
            ret.edges[etype].data[EID] = induced_edges[i]

    return ret

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.