I used the standard train_sampling_multi_gpu.py code, but with a few changes, below is the code:
“”"
import dgl
import numpy as np
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.multiprocessing as mp
from torch.utils.data import DataLoader
import dgl.function as fn
import dgl.nn.pytorch as dglnn
import time
import math
import argparse
from dgl.data import RedditDataset
from torch.nn.parallel import DistributedDataParallel
import tqdm
import traceback
from utils import thread_wrapped_func
from load_graph import load_reddit, load_ogb , inductive_split
class SAGE(nn.Module):
def init(self,
in_feats,
n_hidden,
n_classes,
n_layers,
activation,
dropout):
super().init()
self.n_layers = n_layers
self.n_hidden = n_hidden
self.n_classes = n_classes
self.layers = nn.ModuleList()
self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, ‘mean’))
for i in range(1, n_layers - 1):
self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, ‘mean’))
self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, ‘mean’))
self.dropout = nn.Dropout(dropout)
self.activation = activation
def forward(self, blocks, x):
h = x
for l, (layer, block) in enumerate(zip(self.layers, blocks)):
h = layer(block, h)
if l != len(self.layers) - 1:
h = self.activation(h)
h = self.dropout(h)
return h
def inference(self, g, x, batch_size, device):
"""
Inference with the GraphSAGE model on full neighbors (i.e. without neighbor sampling).
g : the entire graph.
x : the input of entire node set.
The inference code is written in a fashion that it could handle any number of nodes and
layers.
"""
# During inference with sampling, multi-layer blocks are very inefficient because
# lots of computations in the first few layers are repeated.
# Therefore, we compute the representation of all nodes layer by layer. The nodes
# on each layer are of course splitted in batches.
# TODO: can we standardize this?
nodes = th.arange(g.number_of_nodes())
for l, layer in enumerate(self.layers):
y = th.zeros(g.number_of_nodes(), self.n_hidden if l != len(self.layers) - 1 else self.n_classes)
sampler = dgl.dataloading.MultiLayerFullNeighborSampler(1)
dataloader = dgl.dataloading.NodeDataLoader(
g,
th.arange(g.number_of_nodes()),
sampler,
batch_size=args.batch_size,
shuffle=True,
drop_last=False,
num_workers=args.num_workers)
for input_nodes, output_nodes, blocks in tqdm.tqdm(dataloader):
block = blocks[0]
block = block.int().to(device)
h = x[input_nodes].to(device)
h = layer(block, h)
if l != len(self.layers) - 1:
h = self.activation(h)
h = self.dropout(h)
y[output_nodes] = h.cpu()
x = y
return y
def compute_acc(pred, labels):
“”"
Compute the accuracy of prediction given the labels.
“”"
return (th.argmax(pred, dim=1) == labels).float().sum() / len(pred)
def evaluate(model, g, inputs, labels, val_nid, batch_size, device):
“”"
Evaluate the model on the validation set specified by val_nid
.
g : The entire graph.
inputs : The features of all the nodes.
labels : The labels of all the nodes.
val_nid : A node ID tensor indicating which nodes do we actually compute the accuracy for.
batch_size : Number of nodes to compute at the same time.
device : The GPU device to evaluate on.
“”"
model.eval()
with th.no_grad():
pred = model.inference(g, inputs, batch_size, device)
model.train()
return compute_acc(pred[val_nid], labels[val_nid])
def load_subtensor(g, labels, seeds, input_nodes, dev_id):
“”"
Copys features and labels of a set of nodes onto GPU.
“”"
batch_inputs = g.ndata[‘features’][input_nodes].to(dev_id)
batch_labels = labels[seeds].to(dev_id)
return batch_inputs, batch_labels
Entry point
def run(proc_id, n_gpus, args, devices, data):
# Start up distributed training, if enabled.
dev_id = devices[proc_id]
if n_gpus > 1:
dist_init_method = ‘tcp://{master_ip}:{master_port}’.format(
master_ip=‘127.0.0.1’, master_port=‘12345’)
world_size = n_gpus
th.distributed.init_process_group(backend=“nccl”,
init_method=dist_init_method,
world_size=world_size,
rank=proc_id)
th.cuda.set_device(dev_id)
# Unpack data
in_feats, n_classes, train_g, val_g, test_g = data
train_mask = train_g.ndata['train_mask']
val_mask = val_g.ndata['val_mask']
test_mask = ~(test_g.ndata['train_mask'] | test_g.ndata['val_mask'])
train_nid = train_mask.nonzero().squeeze()
val_nid = val_mask.nonzero().squeeze()
test_nid = test_mask.nonzero().squeeze()
# Split train_nid
train_nid = th.split(train_nid, math.ceil(len(train_nid) / n_gpus))[proc_id]
# Create PyTorch DataLoader for constructing blocks
sampler = dgl.dataloading.MultiLayerNeighborSampler(
[int(fanout) for fanout in args.fan_out.split(',')])
dataloader = dgl.dataloading.NodeDataLoader(
train_g,
train_nid,
sampler,
batch_size=args.batch_size,
shuffle=True,
drop_last=False,
num_workers=args.num_workers)
# Define model and optimizer
model = SAGE(in_feats, args.num_hidden, n_classes, args.num_layers, F.relu, args.dropout)
model = model.to(dev_id)
if n_gpus > 1:
model = DistributedDataParallel(model, device_ids=[dev_id], output_device=dev_id)
loss_fcn = nn.CrossEntropyLoss()
loss_fcn = loss_fcn.to(dev_id)
optimizer = optim.Adam(model.parameters(), lr=args.lr)
# Training loop
avg = 0
iter_tput = []
for epoch in range(args.num_epochs):
tic = time.time()
# Loop over the dataloader to sample the computation dependency graph as a list of
# blocks.
for step, (input_nodes, seeds, blocks) in enumerate(dataloader):
if proc_id == 0:
tic_step = time.time()
# Load the input features as well as output labels
batch_inputs, batch_labels = load_subtensor(train_g, train_g.ndata['labels'], seeds, input_nodes, dev_id)
blocks = [block.int().to(dev_id) for block in blocks]
# Compute loss and prediction
batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, batch_labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if proc_id == 0:
iter_tput.append(len(seeds) * n_gpus / (time.time() - tic_step))
if step % args.log_every == 0 and proc_id == 0:
acc = compute_acc(batch_pred, batch_labels)
print('Epoch {:05d} | Step {:05d} | Loss {:.4f} | Train Acc {:.4f} | Speed (samples/sec) {:.4f} | GPU {:.1f} MB'.format(
epoch, step, loss.item(), acc.item(), np.mean(iter_tput[3:]), th.cuda.max_memory_allocated() / 1000000))
if n_gpus > 1:
th.distributed.barrier()
toc = time.time()
if proc_id == 0:
print('Epoch Time(s): {:.4f}'.format(toc - tic))
if epoch >= 5:
avg += toc - tic
if epoch % args.eval_every == 0 and epoch != 0:
if n_gpus == 1:
eval_acc = evaluate(
model, val_g, val_g.ndata['features'], val_g.ndata['labels'], val_nid, args.batch_size, devices[0])
test_acc = evaluate(
model, test_g, test_g.ndata['features'], test_g.ndata['labels'], test_nid, args.batch_size, devices[0])
else:
eval_acc = evaluate(
model.module, val_g, val_g.ndata['features'], val_g.ndata['labels'], val_nid, args.batch_size, devices[0])
test_acc = evaluate(
model.module, test_g, test_g.ndata['features'], test_g.ndata['labels'], test_nid, args.batch_size, devices[0])
print('Eval Acc {:.4f}'.format(eval_acc))
print('Test Acc: {:.4f}'.format(test_acc))
if n_gpus > 1:
th.distributed.barrier()
if proc_id == 0:
print('Avg epoch time: {}'.format(avg / (epoch - 4)))
if name == ‘main’:
argparser = argparse.ArgumentParser(“multi-gpu training”)
argparser.add_argument(’–gpu’, type=str, default=‘0’,
help=“Comma separated list of GPU device IDs.”)
argparser.add_argument(’–num-epochs’, type=int, default=20)
argparser.add_argument(’–num-hidden’, type=int, default=16)
argparser.add_argument(’–num-layers’, type=int, default=2)
argparser.add_argument(’–dataset’, type=str, default=‘reddit’)
argparser.add_argument(’–fan-out’, type=str, default=‘10,25’)
argparser.add_argument(’–batch-size’, type=int, default=1000)
argparser.add_argument(’–log-every’, type=int, default=20)
argparser.add_argument(’–eval-every’, type=int, default=5)
argparser.add_argument(’–lr’, type=float, default=0.003)
argparser.add_argument(’–dropout’, type=float, default=0.5)
argparser.add_argument(’–num-workers’, type=int, default=0,
help=“Number of sampling processes. Use 0 for no extra process.”)
argparser.add_argument(’–inductive’, action=‘store_true’,
help=“Inductive learning setting”)
args = argparser.parse_args()
devices = list(map(int, args.gpu.split(',')))
n_gpus = len(devices)
if args.dataset == ‘reddit’:
g, n_classes = load_reddit()
elif args.dataset == ‘ogb-papers’:
g, n_classes = load_ogb(‘ogbn-papers100M’)
# Construct graph
g = dgl.as_heterograph(g)
in_feats = g.ndata['features'].shape[1]
if args.inductive:
train_g, val_g, test_g = inductive_split(g)
else:
train_g = val_g = test_g = g
# Create csr/coo/csc formats before launching training processes with multi-gpu.
# This avoids creating certain formats in each sub-process, which saves momory and CPU.
train_g.create_formats_()
val_g.create_formats_()
test_g.create_formats_()
# Pack data
data = in_feats, n_classes, train_g, val_g, test_g
if n_gpus == 1:
run(0, n_gpus, args, devices, data)
else:
procs = []
for proc_id in range(n_gpus):
p = mp.Process(target=thread_wrapped_func(run),
args=(proc_id, n_gpus, args, devices, data))
p.start()
procs.append(p)
for p in procs:
p.join()
“”"