Thanks for your suggestion. The script turns out to work after relabeling them to be consecutive. However, the training time per epoch seems extremely long (4257s). I don’t think this is normal. I’m trying to train a 3-layer graphsage with fanout=[20,15,10] and batchSize=1000. Not sure what can be improved here.
Here is the script:
import dgl
import pandas as pd
import tqdm
import argparse
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchmetrics.functional as MF
from dgl.data import DGLDataset
import dgl.nn as dglnn
from dgl.dataloading import DataLoader, NeighborSampler, MultiLayerFullNeighborSampler
PATH = '../dgl_friendster'
NODES = 65608366
NUM_CLASSES = 32
NUM_FEAT = 256
class FriendsterDataset(DGLDataset):
NODES = 65608366
NUM_CLASSES = 32
NUM_FEAT = 256
train_idx = None
val_idx = None
test_idx = None
def __init__(self):
super().__init__(name="friendster")
def process(self):
nodes_data = pd.read_csv("./nodes.csv")
edges_data = pd.read_csv("./edges.csv")
edges_src = torch.from_numpy(edges_data["src_id"].to_numpy())
edges_dst = torch.from_numpy(edges_data["dst_id"].to_numpy())
self.graph = dgl.graph(
(edges_src, edges_dst), num_nodes=nodes_data.shape[0]
)
# each node get NUM_FEAT * 0
node_features = torch.zeros(self.NODES, self.NUM_FEAT)
node_labels = torch.from_numpy(
nodes_data["label"].astype("category").cat.codes.to_numpy()
) # each node get any label = randrange(NUM_CLASSES)
self.graph.ndata["feat"] = node_features
self.graph.ndata["label"] = node_labels
# set training, validation, and test set
n_nodes = nodes_data.shape[0]
n_train = int(n_nodes * 0.8)
n_val = int(n_nodes * 0.1)
train_mask = torch.zeros(n_nodes, dtype=torch.bool)
val_mask = torch.zeros(n_nodes, dtype=torch.bool)
test_mask = torch.zeros(n_nodes, dtype=torch.bool)
self.train_idx = torch.from_numpy(nodes_data['node_id'][:n_train].to_numpy())
self.val_idx = torch.from_numpy(nodes_data['node_id'][n_train : n_train + n_val].to_numpy())
self.test_idx = torch.from_numpy(nodes_data['node_id'][n_train + n_val :].to_numpy())
train_mask[:n_train] = True
val_mask[n_train : n_train + n_val] = True
test_mask[n_train + n_val :] = True
self.graph.ndata["train_mask"] = train_mask
self.graph.ndata["val_mask"] = val_mask
self.graph.ndata["test_mask"] = test_mask
def __getitem__(self, i):
return self.graph
def __len__(self):
return 1
class SAGE(nn.Module):
def __init__(self, in_size, hid_size, out_size):
super().__init__()
self.layers = nn.ModuleList()
# three-layer GraphSAGE-mean
self.layers.append(dglnn.SAGEConv(in_size, hid_size, 'mean'))
self.layers.append(dglnn.SAGEConv(hid_size, hid_size, 'mean'))
self.layers.append(dglnn.SAGEConv(hid_size, out_size, 'mean'))
self.dropout = nn.Dropout(0.5)
self.hid_size = hid_size
self.out_size = out_size
def forward(self, blocks, x):
h = x
for l, (layer, block) in enumerate(zip(self.layers, blocks)):
h = layer(block, h)
if l != len(self.layers) - 1:
h = F.relu(h)
h = self.dropout(h)
return h
def inference(self, g, device, batch_size):
"""Conduct layer-wise inference to get all the node embeddings."""
feat = g.ndata['feat']
sampler = MultiLayerFullNeighborSampler(1, prefetch_node_feats=['feat'])
dataloader = DataLoader(
g, torch.arange(g.num_nodes()).to(g.device), sampler, device=device,
batch_size=batch_size, shuffle=False, drop_last=False,
num_workers=0)
buffer_device = torch.device('cpu')
pin_memory = (buffer_device != device)
for l, layer in enumerate(self.layers):
y = torch.empty(
g.num_nodes(), self.hid_size if l != len(self.layers) - 1 else self.out_size,
device=buffer_device, pin_memory=pin_memory)
feat = feat.to(device)
for input_nodes, output_nodes, blocks in tqdm.tqdm(dataloader):
x = feat[input_nodes]
h = layer(blocks[0], x)
if l != len(self.layers) - 1:
h = F.relu(h)
h = self.dropout(h)
y[output_nodes[0]:output_nodes[-1]+1] = h.to(buffer_device)
feat = y
return y
def evaluate(model, graph, dataloader):
model.eval()
ys = []
y_hats = []
for it, (input_nodes, output_nodes, blocks) in enumerate(dataloader):
with torch.no_grad():
x = blocks[0].srcdata['feat']
ys.append(blocks[-1].dstdata['label'])
y_hats.append(model(blocks, x))
return MF.accuracy(torch.cat(y_hats), torch.cat(ys))
def layerwise_infer(device, graph, nid, model, batch_size):
model.eval()
with torch.no_grad():
pred = model.inference(graph, device, batch_size) # pred in buffer_device
pred = pred[nid]
label = graph.ndata['label'][nid].to(pred.device)
return MF.accuracy(pred, label)
def train(args, device, g, dataset, model, train_idx, val_idx):
sampler = NeighborSampler([20, 15, 10], # fanout for [layer-0, layer-1, layer-2]
prefetch_node_feats=['feat'],
prefetch_labels=['label'])
use_uva = (args.mode == 'mixed')
train_dataloader = DataLoader(g, train_idx, sampler, device=device,
batch_size=1000, shuffle=True,
drop_last=False, num_workers=0,
use_uva=use_uva)
val_dataloader = DataLoader(g, val_idx, sampler, device=device,
batch_size=1000, shuffle=True,
drop_last=False, num_workers=0,
use_uva=use_uva)
opt = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=5e-4)
for epoch in range(10):
tic = time.time()
model.train()
total_loss = 0
for it, (input_nodes, output_nodes, blocks) in enumerate(train_dataloader):
blocks = [b.to(torch.device('cuda')) for b in blocks]
x = blocks[0].srcdata['feat']
y = blocks[-1].dstdata['label'].long()
y_hat = model(blocks, x)
loss = F.cross_entropy(y_hat, y)
opt.zero_grad()
loss.backward()
opt.step()
total_loss += loss.item()
toc = time.time()
#acc = evaluate(model, g, val_dataloader)
print("Epoch {:05d} | Loss {:.4f} | Time {:.4f} "
.format(epoch, total_loss / (it+1), toc-tic))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--mode", default='mixed', choices=['cpu', 'mixed', 'puregpu'],
help="Training mode. 'cpu' for CPU training, 'mixed' for CPU-GPU mixed training, "
"'puregpu' for pure-GPU training.")
args = parser.parse_args()
if not torch.cuda.is_available():
args.mode = 'cpu'
print(f'Training in {args.mode} mode.')
print('Loading data')
dataset = FriendsterDataset()
graph = dataset[0]
node_features = torch.zeros(NODES, NUM_FEAT)
graph.ndata["feat"] = node_features
graph = graph.to('cuda' if args.mode == 'puregpu' else 'cpu')
device = torch.device('cpu' if args.mode == 'cpu' else 'cuda')
# create GraphSAGE model
in_size = graph.ndata['feat'].shape[1]
out_size = 32 #dataset.num_classes
model = SAGE(in_size, 256, out_size).to(device)
# model training
print('Training...')
train(args, device, graph, dataset, model, dataset.train_idx, dataset.val_idx)
# test the model
print('Testing...')
acc = layerwise_infer(device, graph, dataset.test_idx, model, batch_size=4096)
print("Test Accuracy {:.4f}".format(acc.item()))