Error upon running ogbn-papers100M dataset

I am trying to run dataset on multi-gpus but I run into the following error:
Expected input batch_size () to match target batch_size ()

I added the following lines to run the code on ogbn-papers100M dataset:
elif args.dataset == ‘ogb-papers’:
g, n_classes = load_ogb(‘ogbn-papers100M’)
Can someone help me with this?
TIA

Are you using an existing DGL example? What’s the link? Otherwise, can you provide the link to your code?

1 Like

I used the standard train_sampling_multi_gpu.py code, but with a few changes, below is the code:
“”"
import dgl
import numpy as np
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.multiprocessing as mp
from torch.utils.data import DataLoader
import dgl.function as fn
import dgl.nn.pytorch as dglnn
import time
import math
import argparse
from dgl.data import RedditDataset
from torch.nn.parallel import DistributedDataParallel
import tqdm
import traceback

from utils import thread_wrapped_func
from load_graph import load_reddit, load_ogb , inductive_split

class SAGE(nn.Module):
def init(self,
in_feats,
n_hidden,
n_classes,
n_layers,
activation,
dropout):
super().init()
self.n_layers = n_layers
self.n_hidden = n_hidden
self.n_classes = n_classes
self.layers = nn.ModuleList()
self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, ‘mean’))
for i in range(1, n_layers - 1):
self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, ‘mean’))
self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, ‘mean’))
self.dropout = nn.Dropout(dropout)
self.activation = activation

def forward(self, blocks, x):
    h = x
    for l, (layer, block) in enumerate(zip(self.layers, blocks)):
        h = layer(block, h)
        if l != len(self.layers) - 1:
            h = self.activation(h)
            h = self.dropout(h)
    return h

def inference(self, g, x, batch_size, device):
    """
    Inference with the GraphSAGE model on full neighbors (i.e. without neighbor sampling).
    g : the entire graph.
    x : the input of entire node set.
    The inference code is written in a fashion that it could handle any number of nodes and
    layers.
    """
    # During inference with sampling, multi-layer blocks are very inefficient because
    # lots of computations in the first few layers are repeated.
    # Therefore, we compute the representation of all nodes layer by layer.  The nodes
    # on each layer are of course splitted in batches.
    # TODO: can we standardize this?
    nodes = th.arange(g.number_of_nodes())
    for l, layer in enumerate(self.layers):
        y = th.zeros(g.number_of_nodes(), self.n_hidden if l != len(self.layers) - 1 else self.n_classes)

        sampler = dgl.dataloading.MultiLayerFullNeighborSampler(1)
        dataloader = dgl.dataloading.NodeDataLoader(
            g,
            th.arange(g.number_of_nodes()),
            sampler,
            batch_size=args.batch_size,
            shuffle=True,
            drop_last=False,
            num_workers=args.num_workers)

        for input_nodes, output_nodes, blocks in tqdm.tqdm(dataloader):
            block = blocks[0]

            block = block.int().to(device)
            h = x[input_nodes].to(device)
            h = layer(block, h)
            if l != len(self.layers) - 1:
                h = self.activation(h)
                h = self.dropout(h)

            y[output_nodes] = h.cpu()

        x = y
    return y

def compute_acc(pred, labels):
“”"
Compute the accuracy of prediction given the labels.
“”"
return (th.argmax(pred, dim=1) == labels).float().sum() / len(pred)

def evaluate(model, g, inputs, labels, val_nid, batch_size, device):
“”"
Evaluate the model on the validation set specified by val_nid.
g : The entire graph.
inputs : The features of all the nodes.
labels : The labels of all the nodes.
val_nid : A node ID tensor indicating which nodes do we actually compute the accuracy for.
batch_size : Number of nodes to compute at the same time.
device : The GPU device to evaluate on.
“”"
model.eval()
with th.no_grad():
pred = model.inference(g, inputs, batch_size, device)
model.train()
return compute_acc(pred[val_nid], labels[val_nid])

def load_subtensor(g, labels, seeds, input_nodes, dev_id):
“”"
Copys features and labels of a set of nodes onto GPU.
“”"
batch_inputs = g.ndata[‘features’][input_nodes].to(dev_id)
batch_labels = labels[seeds].to(dev_id)
return batch_inputs, batch_labels

Entry point

def run(proc_id, n_gpus, args, devices, data):
# Start up distributed training, if enabled.
dev_id = devices[proc_id]
if n_gpus > 1:
dist_init_method = ‘tcp://{master_ip}:{master_port}’.format(
master_ip=‘127.0.0.1’, master_port=‘12345’)
world_size = n_gpus
th.distributed.init_process_group(backend=“nccl”,
init_method=dist_init_method,
world_size=world_size,
rank=proc_id)
th.cuda.set_device(dev_id)

# Unpack data
in_feats, n_classes, train_g, val_g, test_g = data
train_mask = train_g.ndata['train_mask']
val_mask = val_g.ndata['val_mask']
test_mask = ~(test_g.ndata['train_mask'] | test_g.ndata['val_mask'])
train_nid = train_mask.nonzero().squeeze()
val_nid = val_mask.nonzero().squeeze()
test_nid = test_mask.nonzero().squeeze()

# Split train_nid
train_nid = th.split(train_nid, math.ceil(len(train_nid) / n_gpus))[proc_id]

# Create PyTorch DataLoader for constructing blocks
sampler = dgl.dataloading.MultiLayerNeighborSampler(
    [int(fanout) for fanout in args.fan_out.split(',')])
dataloader = dgl.dataloading.NodeDataLoader(
    train_g,
    train_nid,
    sampler,
    batch_size=args.batch_size,
    shuffle=True,
    drop_last=False,
    num_workers=args.num_workers)

# Define model and optimizer
model = SAGE(in_feats, args.num_hidden, n_classes, args.num_layers, F.relu, args.dropout)
model = model.to(dev_id)
if n_gpus > 1:
    model = DistributedDataParallel(model, device_ids=[dev_id], output_device=dev_id)
loss_fcn = nn.CrossEntropyLoss()
loss_fcn = loss_fcn.to(dev_id)
optimizer = optim.Adam(model.parameters(), lr=args.lr)

# Training loop
avg = 0
iter_tput = []
for epoch in range(args.num_epochs):
    tic = time.time()

    # Loop over the dataloader to sample the computation dependency graph as a list of
    # blocks.
    for step, (input_nodes, seeds, blocks) in enumerate(dataloader):
        if proc_id == 0:
            tic_step = time.time()

        # Load the input features as well as output labels
        batch_inputs, batch_labels = load_subtensor(train_g, train_g.ndata['labels'], seeds, input_nodes, dev_id)
        blocks = [block.int().to(dev_id) for block in blocks]
        # Compute loss and prediction
        batch_pred = model(blocks, batch_inputs)
        loss = loss_fcn(batch_pred, batch_labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if proc_id == 0:
            iter_tput.append(len(seeds) * n_gpus / (time.time() - tic_step))
        if step % args.log_every == 0 and proc_id == 0:
            acc = compute_acc(batch_pred, batch_labels)
            print('Epoch {:05d} | Step {:05d} | Loss {:.4f} | Train Acc {:.4f} | Speed (samples/sec) {:.4f} | GPU {:.1f} MB'.format(
                epoch, step, loss.item(), acc.item(), np.mean(iter_tput[3:]), th.cuda.max_memory_allocated() / 1000000))

    if n_gpus > 1:
        th.distributed.barrier()

    toc = time.time()
    if proc_id == 0:
        print('Epoch Time(s): {:.4f}'.format(toc - tic))
        if epoch >= 5:
            avg += toc - tic
        if epoch % args.eval_every == 0 and epoch != 0:
            if n_gpus == 1:
                eval_acc = evaluate(
                    model, val_g, val_g.ndata['features'], val_g.ndata['labels'], val_nid, args.batch_size, devices[0])
                test_acc = evaluate(
                    model, test_g, test_g.ndata['features'], test_g.ndata['labels'], test_nid, args.batch_size, devices[0])
            else:
                eval_acc = evaluate(
                    model.module, val_g, val_g.ndata['features'], val_g.ndata['labels'], val_nid, args.batch_size, devices[0])
                test_acc = evaluate(
                    model.module, test_g, test_g.ndata['features'], test_g.ndata['labels'], test_nid, args.batch_size, devices[0])
            print('Eval Acc {:.4f}'.format(eval_acc))
            print('Test Acc: {:.4f}'.format(test_acc))


if n_gpus > 1:
    th.distributed.barrier()
if proc_id == 0:
    print('Avg epoch time: {}'.format(avg / (epoch - 4)))

if name == ‘main’:
argparser = argparse.ArgumentParser(“multi-gpu training”)
argparser.add_argument(’–gpu’, type=str, default=‘0’,
help=“Comma separated list of GPU device IDs.”)
argparser.add_argument(’–num-epochs’, type=int, default=20)
argparser.add_argument(’–num-hidden’, type=int, default=16)
argparser.add_argument(’–num-layers’, type=int, default=2)
argparser.add_argument(’–dataset’, type=str, default=‘reddit’)
argparser.add_argument(’–fan-out’, type=str, default=‘10,25’)
argparser.add_argument(’–batch-size’, type=int, default=1000)
argparser.add_argument(’–log-every’, type=int, default=20)
argparser.add_argument(’–eval-every’, type=int, default=5)
argparser.add_argument(’–lr’, type=float, default=0.003)
argparser.add_argument(’–dropout’, type=float, default=0.5)
argparser.add_argument(’–num-workers’, type=int, default=0,
help=“Number of sampling processes. Use 0 for no extra process.”)
argparser.add_argument(’–inductive’, action=‘store_true’,
help=“Inductive learning setting”)
args = argparser.parse_args()

devices = list(map(int, args.gpu.split(',')))
n_gpus = len(devices)

if args.dataset == ‘reddit’:
    g, n_classes = load_reddit()
elif args.dataset == ‘ogb-papers’:
    g, n_classes = load_ogb(‘ogbn-papers100M’)
# Construct graph
g = dgl.as_heterograph(g)
in_feats = g.ndata['features'].shape[1]

if args.inductive:
    train_g, val_g, test_g = inductive_split(g)
else:
    train_g = val_g = test_g = g

# Create csr/coo/csc formats before launching training processes with multi-gpu.
# This avoids creating certain formats in each sub-process, which saves momory and CPU.
train_g.create_formats_()
val_g.create_formats_()
test_g.create_formats_()
# Pack data
data = in_feats, n_classes, train_g, val_g, test_g

if n_gpus == 1:
    run(0, n_gpus, args, devices, data)
else:
    procs = []
    for proc_id in range(n_gpus):
        p = mp.Process(target=thread_wrapped_func(run),
                       args=(proc_id, n_gpus, args, devices, data))
        p.start()
        procs.append(p)
    for p in procs:
        p.join()

“”"

can you be more specific where the error happens? it’s hard to find from the code you copied.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.