Breaking down local sampling and remote sampling time in DistDGL GNN sampling

DistDGL does both local (for nodes present locally on a machine) and remote (for nodes present on other machines) sampling of nodes and overlaps the two to optimize the total sampling time. Is there a way to break down the sampling time into time taken to sample remote and local vertices, respectively?

@Rhett-Ying @zhengda1936 Is there a way to achieve this? Mentioning you since I see you as authors for the collator and distdataloader modules. Thank you.

You could try to add timer to the local_access and remote_access calls to obtain sampling times for each rank in below code: dgl/python/dgl/distributed/graph_services.py at 05aebd80017a412138c782ee4096210087f72550 · dmlc/dgl · GitHub

1 Like

Thanks for the prompt reply!

Hi @Rhett-Ying !

Is this a good way to measure the total sampling time for an epoch? This is the standard training script present on the DiistDGL docs. t_sampling is supposed to record the total sampling time for an epoch while t_fwd, t_bwd & t_valid are supposed to record the total time for forward pass, backward pass and validation. All these timings are recorded on each worker machine.

model = th.nn.parallel.DistributedDataParallel(model)
sampler = dgl.dataloading.MultiLayerNeighborSampler([25,10])
train_dataloader = dgl.dataloading.DistNodeDataLoader(
                             g, train_nid, sampler, batch_size=1024,
                             shuffle=True,   drop_last=False)
valid_dataloader = dgl.dataloading.DistNodeDataLoader(
                             g, valid_nid, sampler, batch_size=1024,
                             shuffle=False, drop_last=False)
for epoch in range(5):
    t_sampling, t_fwd, t_bwd, t_valid = 0, 0, 0, 0
    t1 = time.time()
    # Loop over the dataloader to sample mini-batches.
    losses = []
    for step, (input_nodes, seeds, blocks) in enumerate(train_dataloader):
        # Load the input features as well as output labels
        batch_inputs = g.ndata['feat'][input_nodes]
        batch_labels = (g.ndata['labels'][seeds]).type(th.LongTensor)
        # Compute loss and prediction
        t2 = time.time()
        if not step:
            t_sampling += (t2 - t1)
        else:
            t_sampling += (t2 - t4)
        batch_pred = model(blocks, batch_inputs)
        loss = loss_fcn(batch_pred, batch_labels)
        optimizer.zero_grad()
        t3 = time.time()
        t_fwd += (t3 - t2)
        loss.backward()
        losses.append(loss.detach().cpu().numpy())
        optimizer.step()
        t4 = time.time()
        t_bwd += (t4 - t3)
    # validation
    predictions = []
    labels = []
    t5 = time.time()
    with th.no_grad():
        for step, (input_nodes, seeds, blocks) in enumerate(valid_dataloader):
            inputs = g.ndata['feat'][input_nodes]
            labels.append(g.ndata['labels'][seeds].numpy())
            predictions.append(model(blocks, inputs).argmax(1).numpy())
        predictions = np.concatenate(predictions)
        labels = np.concatenate(labels)
        accuracy = sklearn.metrics.accuracy_score(labels, predictions)
        print('Time {} - Epoch {}: Validation Accuracy {:.3f}'.format(now().time(), epoch, accuracy))
    t6 = time.time()
    t_valid = t6 - t5
    total = (t_sampling.__round__(3), t_fwd.__round__(3), t_bwd.__round__(3), t_valid.__round__(3), ((t6 - t1) - (t_sampling + t_fwd + t_bwd + t_valid)).__round__(3))
    print(f'Epoch - {epoch}, Host - {g.rank()}, Time - {total}')

t2 in your code should be moved before feature/label fetching as sampling happens in dataloader iteration. please refer to this example which collects sample/fwd/bwd times during epoch: dgl/examples/distributed/graphsage/node_classification.py at 6ba6dd602e7d2edad21a6aad46d314cb3bcf0190 · dmlc/dgl · GitHub

model = th.nn.parallel.DistributedDataParallel(model)
sampler = dgl.dataloading.MultiLayerNeighborSampler([25,10])
train_dataloader = dgl.dataloading.DistNodeDataLoader(
                             g, train_nid, sampler, batch_size=1024,
                             shuffle=True,   drop_last=False)
valid_dataloader = dgl.dataloading.DistNodeDataLoader(
                             g, valid_nid, sampler, batch_size=1024,
                             shuffle=False, drop_last=False)
for epoch in range(5):
    t_sampling, t_fwd, t_bwd, t_valid = 0, 0, 0, 0
    t1 = time.time()
    # Loop over the dataloader to sample mini-batches.
    losses = []
    for step, (input_nodes, seeds, blocks) in enumerate(train_dataloader):
        t2 = time.time()
        if not step:
            t_sampling += (t2 - t1)
        else:
            t_sampling += (t2 - t4)
        # Load the input features as well as output labels
        batch_inputs = g.ndata['feat'][input_nodes]
        batch_labels = (g.ndata['labels'][seeds]).type(th.LongTensor)
        # Compute loss and prediction
        batch_pred = model(blocks, batch_inputs)
        loss = loss_fcn(batch_pred, batch_labels)
        optimizer.zero_grad()
        t3 = time.time()
        t_fwd += (t3 - t2)
        loss.backward()
        losses.append(loss.detach().cpu().numpy())
        optimizer.step()
        t4 = time.time()
        t_bwd += (t4 - t3)
    # validation
    predictions = []
    labels = []
    t5 = time.time()
    with th.no_grad():
        for step, (input_nodes, seeds, blocks) in enumerate(valid_dataloader):
            inputs = g.ndata['feat'][input_nodes]
            labels.append(g.ndata['labels'][seeds].numpy())
            predictions.append(model(blocks, inputs).argmax(1).numpy())
        predictions = np.concatenate(predictions)
        labels = np.concatenate(labels)
        accuracy = sklearn.metrics.accuracy_score(labels, predictions)
        print('Time {} - Epoch {}: Validation Accuracy {:.3f}'.format(now().time(), epoch, accuracy))
    t6 = time.time()
    t_valid = t6 - t5
    total = (t_sampling.__round__(3), t_fwd.__round__(3), t_bwd.__round__(3), t_valid.__round__(3), ((t6 - t1) - (t_sampling + t_fwd + t_bwd + t_valid)).__round__(3))
    print(f'Epoch - {epoch}, Host - {g.rank()}, Time - {total}')

I made the change you suggested (moved t2 up, as shown in the code above), and I noticed that there is a significant drop in t_sampling (from 10 seconds to 2, in the example I am working on). Is it safe to say that the actual sampling time is 2 seconds and the feature and label fetch time is the remaining 8 seconds? Are the feature and label fetch done in the lines below the same as reading node information from the KV store, and is that why it takes 8 seconds? Is there a way to break down the local and remote fetch time?

batch_inputs = g.ndata['feat'][input_nodes]
batch_labels = (g.ndata['labels'][seeds]).type(th.LongTensor)

Yes, you could wrap it with a dedicated timer to confirm the time cost by feature fetching:

batch_inputs = g.ndata['feat'][input_nodes]
batch_labels = (g.ndata['labels'][seeds]).type(th.LongTensor)

As feature data could reside on different machines, so it’s a bit time consuming but I doubt it takes 8 seconds. Please try to measure it with dedicated timer. As for breaking down the local and remote fetch time, please refer to dgl/python/dgl/distributed/kvstore.py at cdf65f4dd6334a53d56c53055b629ee58bbe6a1d · dmlc/dgl · GitHub

Hi @Rhett-Ying

Thank for the clarification. I went through the pull function to breakdown the local and remote fetch times, but I couldn’t understand the logic behind the fast pull. It seems that only the fast pull logic is called (if block). Could you tell me what’s the difference between the fast pull and normal pull here - https://github.com/dmlc/dgl/blob/cdf65f4dd6334a53d56c53055b629ee58bbe6a1d/python/dgl/distributed/kvstore.py#L1439

Fast pull is when the features happen to co-locate on the same machine via shared memory. Normal pull will go through RPC to fetch data from remote.

Had a second check of the code with @peizhou001 . Fast pull also involves remote RPC call. It’s faster because it is implemented in C with more efficient memory operation.

Hey @minjie

Sorry, I completely missed your reply. Is there a way then to break down the time taken by the local and remote fetch times?

You could hack into https://github.com/dmlc/dgl/blob/47d37e919177d0f922ff3987d877de08a8d2e801/src/rpc/rpc.cc#L350 to obtain remote and local time respectively.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.