Hi,
I am facing issues in building link prediction model for heterogeneous graph with Graphbolt for an on disk dataset. I have used Ondisk Dataset tutorial of heterogeneous graph as per dgl and have added dataloader function on top of it. But whenever I iterate over the data loader object to read the data, I am getting an error like this:
‘KeyError: "item\nThis exception is thrown by iter of FeatureFetcher(datapipe=MultiprocessingWrapper, edge_feature_keys=None, feature_store=TorchBasedFeatureStore(\n {(<OnDiskFeatureDataDomain.NODE: ‘node’>, ‘user’, ‘feat_0’):…’
Could you please help me resolve this? This issue is blocking my progress and any help is highly appreciated.
Pls note that I execute the below code in a GPU enabled cluster and I use dgl version 2.1.0.
I have given the code below for your reference, it is essentially the same as Heterogeneous Ondisk Dataset tutorial - with a simple data loader at the end.
import os
import torch
import numpy as np
os.environ['TORCH'] = torch.__version__
os.environ['DGLBACKEND'] = "pytorch"
# Install the CPU version.
device = torch.device("cuda")
try:
import dgl
import dgl.graphbolt as gb
installed = True
except ImportError as error:
installed = False
print(error)
print("DGL installed!" if installed else "DGL not found!")
# For simplicity, we create a heterogeneous graph with
# 2 node types: `user`, `item`
# 2 edge types: `user:like:item`, `user:follow:user`
# And each node/edge type has the same number of nodes/edges.
num_nodes = 1000
num_edges = 10 * num_nodes
# Edge type: "user:like:item"
like_edges_path = os.path.join(base_dir, "like-edges.csv")
like_edges = np.random.randint(0, num_nodes, size=(num_edges, 2))
print(f"Part of [user:like:item] edges: {like_edges[:5, :]}\n")
df = pd.DataFrame(like_edges)
df.to_csv(like_edges_path, index=False, header=False)
print(f"[user:like:item] edges are saved into {like_edges_path}\n")
# Edge type: "user:follow:user"
follow_edges_path = os.path.join(base_dir, "follow-edges.csv")
follow_edges = np.random.randint(0, num_nodes, size=(num_edges, 2))
print(f"Part of [user:follow:user] edges: {follow_edges[:5, :]}\n")
df = pd.DataFrame(follow_edges)
df.to_csv(follow_edges_path, index=False, header=False)
print(f"[user:follow:user] edges are saved into {follow_edges_path}\n")
# Generate node[user] feature in numpy array.
node_user_feat_0_path = os.path.join(base_dir, "node-user-feat-0.npy")
node_user_feat_0 = np.random.rand(num_nodes, 5)
print(f"Part of node[user] feature [feat_0]: {node_user_feat_0[:3, :]}")
np.save(node_user_feat_0_path, node_user_feat_0)
print(f"Node[user] feature [feat_0] is saved to {node_user_feat_0_path}\n")
# Generate another node[user] feature in torch tensor
node_user_feat_1_path = os.path.join(base_dir, "node-user-feat-1.pt")
node_user_feat_1 = torch.rand(num_nodes, 5)
print(f"Part of node[user] feature [feat_1]: {node_user_feat_1[:3, :]}")
torch.save(node_user_feat_1, node_user_feat_1_path)
print(f"Node[user] feature [feat_1] is saved to {node_user_feat_1_path}\n")
# Generate node[item] feature in numpy array.
node_item_feat_0_path = os.path.join(base_dir, "node-item-feat-0.npy")
node_item_feat_0 = np.random.rand(num_nodes, 5)
print(f"Part of node[item] feature [feat_0]: {node_item_feat_0[:3, :]}")
np.save(node_item_feat_0_path, node_item_feat_0)
print(f"Node[item] feature [feat_0] is saved to {node_item_feat_0_path}\n")
# Generate another node[item] feature in torch tensor
node_item_feat_1_path = os.path.join(base_dir, "node-item-feat-1.pt")
node_item_feat_1 = torch.rand(num_nodes, 5)
print(f"Part of node[item] feature [feat_1]: {node_item_feat_1[:3, :]}")
torch.save(node_item_feat_1, node_item_feat_1_path)
print(f"Node[item] feature [feat_1] is saved to {node_item_feat_1_path}\n")
# Generate edge[user:like:item] feature in numpy array.
edge_like_feat_0_path = os.path.join(base_dir, "edge-like-feat-0.npy")
edge_like_feat_0 = np.random.rand(num_edges, 5)
print(f"Part of edge[user:like:item] feature [feat_0]: {edge_like_feat_0[:3, :]}")
np.save(edge_like_feat_0_path, edge_like_feat_0)
print(f"Edge[user:like:item] feature [feat_0] is saved to {edge_like_feat_0_path}\n")
# Generate another edge[user:like:item] feature in torch tensor
edge_like_feat_1_path = os.path.join(base_dir, "edge-like-feat-1.pt")
edge_like_feat_1 = torch.rand(num_edges, 5)
print(f"Part of edge[user:like:item] feature [feat_1]: {edge_like_feat_1[:3, :]}")
torch.save(edge_like_feat_1, edge_like_feat_1_path)
print(f"Edge[user:like:item] feature [feat_1] is saved to {edge_like_feat_1_path}\n")
# Generate edge[user:follow:user] feature in numpy array.
edge_follow_feat_0_path = os.path.join(base_dir, "edge-follow-feat-0.npy")
edge_follow_feat_0 = np.random.rand(num_edges, 5)
print(f"Part of edge[user:follow:user] feature [feat_0]: {edge_follow_feat_0[:3, :]}")
np.save(edge_follow_feat_0_path, edge_follow_feat_0)
print(f"Edge[user:follow:user] feature [feat_0] is saved to {edge_follow_feat_0_path}\n")
# Generate another edge[user:follow:user] feature in torch tensor
edge_follow_feat_1_path = os.path.join(base_dir, "edge-follow-feat-1.pt")
edge_follow_feat_1 = torch.rand(num_edges, 5)
print(f"Part of edge[user:follow:user] feature [feat_1]: {edge_follow_feat_1[:3, :]}")
torch.save(edge_follow_feat_1, edge_follow_feat_1_path)
print(f"Edge[user:follow:user] feature [feat_1] is saved to {edge_follow_feat_1_path}\n")
# For illustration, let's generate item sets for each edge type.
num_trains = int(num_edges * 0.6)
num_vals = int(num_edges * 0.2)
num_tests = num_edges - num_trains - num_vals
# Train node pairs for user:like:item.
lp_train_like_node_pairs_path = os.path.join(base_dir, "lp-train-like-node-pairs.npy")
lp_train_like_node_pairs = like_edges[:num_trains, :]
print(f"Part of train node pairs[user:like:item] for link prediction: {lp_train_like_node_pairs[:3]}")
np.save(lp_train_like_node_pairs_path, lp_train_like_node_pairs)
print(f"LP train node pairs[user:like:item] are saved to {lp_train_like_node_pairs_path}\n")
# Train node pairs for user:follow:user.
lp_train_follow_node_pairs_path = os.path.join(base_dir, "lp-train-follow-node-pairs.npy")
lp_train_follow_node_pairs = follow_edges[:num_trains, :]
print(f"Part of train node pairs[user:follow:user] for link prediction: {lp_train_follow_node_pairs[:3]}")
np.save(lp_train_follow_node_pairs_path, lp_train_follow_node_pairs)
print(f"LP train node pairs[user:follow:user] are saved to {lp_train_follow_node_pairs_path}\n")
# Val node pairs for user:like:item.
lp_val_like_node_pairs_path = os.path.join(base_dir, "lp-val-like-node-pairs.npy")
lp_val_like_node_pairs = like_edges[num_trains:num_trains+num_vals, :]
print(f"Part of val node pairs[user:like:item] for link prediction: {lp_val_like_node_pairs[:3]}")
np.save(lp_val_like_node_pairs_path, lp_val_like_node_pairs)
print(f"LP val node pairs[user:like:item] are saved to {lp_val_like_node_pairs_path}\n")
# Val negative dsts for user:like:item.
lp_val_like_neg_dsts_path = os.path.join(base_dir, "lp-val-like-neg-dsts.pt")
lp_val_like_neg_dsts = torch.randint(0, num_nodes, (num_vals, 10))
print(f"Part of val negative dsts[user:like:item] for link prediction: {lp_val_like_neg_dsts[:3]}")
torch.save(lp_val_like_neg_dsts, lp_val_like_neg_dsts_path)
print(f"LP val negative dsts[user:like:item] are saved to {lp_val_like_neg_dsts_path}\n")
# Val node pairs for user:follow:user.
lp_val_follow_node_pairs_path = os.path.join(base_dir, "lp-val-follow-node-pairs.npy")
lp_val_follow_node_pairs = follow_edges[num_trains:num_trains+num_vals, :]
print(f"Part of val node pairs[user:follow:user] for link prediction: {lp_val_follow_node_pairs[:3]}")
np.save(lp_val_follow_node_pairs_path, lp_val_follow_node_pairs)
print(f"LP val node pairs[user:follow:user] are saved to {lp_val_follow_node_pairs_path}\n")
# Val negative dsts for user:follow:user.
lp_val_follow_neg_dsts_path = os.path.join(base_dir, "lp-val-follow-neg-dsts.pt")
lp_val_follow_neg_dsts = torch.randint(0, num_nodes, (num_vals, 10))
print(f"Part of val negative dsts[user:follow:user] for link prediction: {lp_val_follow_neg_dsts[:3]}")
torch.save(lp_val_follow_neg_dsts, lp_val_follow_neg_dsts_path)
print(f"LP val negative dsts[user:follow:user] are saved to {lp_val_follow_neg_dsts_path}\n")
# Test node paris for user:like:item.
lp_test_like_node_pairs_path = os.path.join(base_dir, "lp-test-like-node-pairs.npy")
lp_test_like_node_pairs = like_edges[-num_tests, :]
print(f"Part of test node pairs[user:like:item] for link prediction: {lp_test_like_node_pairs[:3]}")
np.save(lp_test_like_node_pairs_path, lp_test_like_node_pairs)
print(f"LP test node pairs[user:like:item] are saved to {lp_test_like_node_pairs_path}\n")
# Test negative dsts for user:like:item.
lp_test_like_neg_dsts_path = os.path.join(base_dir, "lp-test-like-neg-dsts.pt")
lp_test_like_neg_dsts = torch.randint(0, num_nodes, (num_tests, 10))
print(f"Part of test negative dsts[user:like:item] for link prediction: {lp_test_like_neg_dsts[:3]}")
torch.save(lp_test_like_neg_dsts, lp_test_like_neg_dsts_path)
print(f"LP test negative dsts[user:like:item] are saved to {lp_test_like_neg_dsts_path}\n")
# Test node paris for user:follow:user.
lp_test_follow_node_pairs_path = os.path.join(base_dir, "lp-test-follow-node-pairs.npy")
lp_test_follow_node_pairs = follow_edges[-num_tests, :]
print(f"Part of test node pairs[user:follow:user] for link prediction: {lp_test_follow_node_pairs[:3]}")
np.save(lp_test_follow_node_pairs_path, lp_test_follow_node_pairs)
print(f"LP test node pairs[user:follow:user] are saved to {lp_test_follow_node_pairs_path}\n")
# Test negative dsts for user:follow:user.
lp_test_follow_neg_dsts_path = os.path.join(base_dir, "lp-test-follow-neg-dsts.pt")
lp_test_follow_neg_dsts = torch.randint(0, num_nodes, (num_tests, 10))
print(f"Part of test negative dsts[user:follow:user] for link prediction: {lp_test_follow_neg_dsts[:3]}")
torch.save(lp_test_follow_neg_dsts, lp_test_follow_neg_dsts_path)
print(f"LP test negative dsts[user:follow:user] are saved to {lp_test_follow_neg_dsts_path}\n")
yaml_content = f"""
dataset_name: heterogeneous_graph_nc_lp
graph:
nodes:
- type: user
num: {num_nodes}
- type: item
num: {num_nodes}
edges:
- type: "user:like:item"
format: csv
path: {os.path.basename(like_edges_path)}
- type: "user:follow:user"
format: csv
path: {os.path.basename(follow_edges_path)}
feature_data:
- domain: node
type: user
name: feat_0
format: numpy
path: {os.path.basename(node_user_feat_0_path)}
- domain: node
type: user
name: feat_1
format: torch
path: {os.path.basename(node_user_feat_1_path)}
- domain: node
type: item
name: feat_0
format: numpy
path: {os.path.basename(node_item_feat_0_path)}
- domain: node
type: item
name: feat_1
format: torch
path: {os.path.basename(node_item_feat_1_path)}
- domain: edge
type: "user:like:item"
name: feat_0
format: numpy
path: {os.path.basename(edge_like_feat_0_path)}
- domain: edge
type: "user:like:item"
name: feat_1
format: torch
path: {os.path.basename(edge_like_feat_1_path)}
- domain: edge
type: "user:follow:user"
name: feat_0
format: numpy
path: {os.path.basename(edge_follow_feat_0_path)}
- domain: edge
type: "user:follow:user"
name: feat_1
format: torch
path: {os.path.basename(edge_follow_feat_1_path)}
tasks:
- name: link_prediction
num_classes: 10
train_set:
- type: "user:like:item"
data:
- name: node_pairs
format: numpy
path: {os.path.basename(lp_train_like_node_pairs_path)}
- type: "user:follow:user"
data:
- name: node_pairs
format: numpy
path: {os.path.basename(lp_train_follow_node_pairs_path)}
validation_set:
- type: "user:like:item"
data:
- name: node_pairs
format: numpy
path: {os.path.basename(lp_val_like_node_pairs_path)}
- name: negative_dsts
format: torch
path: {os.path.basename(lp_val_like_neg_dsts_path)}
- type: "user:follow:user"
data:
- name: node_pairs
format: numpy
path: {os.path.basename(lp_val_follow_node_pairs_path)}
- name: negative_dsts
format: torch
path: {os.path.basename(lp_val_follow_neg_dsts_path)}
test_set:
- type: "user:like:item"
data:
- name: node_pairs
format: numpy
path: {os.path.basename(lp_test_like_node_pairs_path)}
- name: negative_dsts
format: torch
path: {os.path.basename(lp_test_like_neg_dsts_path)}
- type: "user:follow:user"
data:
- name: node_pairs
format: numpy
path: {os.path.basename(lp_test_follow_node_pairs_path)}
- name: negative_dsts
format: torch
path: {os.path.basename(lp_test_follow_neg_dsts_path)}
"""
metadata_path = os.path.join(base_dir, "metadata.yaml")
with open(metadata_path, "w") as f:
f.write(yaml_content)
dataset = gb.OnDiskDataset(base_dir).load()
graph = dataset.graph
print(f"Loaded graph: {graph}\n")
feature = dataset.feature.pin_memory_() # pin the feature
assert feature.is_pinned()
#feature = dataset.feature.pin_memory_()
print(f"Loaded feature store: {feature}\n")
tasks = dataset.tasks
lp_task = tasks[0]
print(f"Loaded link prediction task: {lp_task}\n")
train_set = dataset.tasks[0].train_set
node_feature_keys = {"user": ["feat_0", "feat_1"], "item":["feat_0", "feat_1"]}
fanouts=[torch.full((1,), 2), torch.full((1,), 2)]
from functools import partial
def create_train_dataloader():
datapipe = gb.ItemSampler(train_set, batch_size=8, shuffle=True)
datapipe = datapipe.sample_uniform_negative(graph, 5)
datapipe = datapipe.sample_neighbor(graph, fanouts)
#datapipe = datapipe.transform(partial(gb.exclude_seed_edges, include_reverse_edges=True))
datapipe = datapipe.copy_to("cuda:0", extra_attrs=["input_nodes"])
datapipe = datapipe.fetch_feature(feature, node_feature_keys)
return gb.DataLoader(datapipe)
dataloader = create_train_dataloader()
data = next(iter(dataloader))
print(f"MiniBatch: {data}")
Exception that I obtain is:
KeyError: "item\nThis exception is thrown by iter of FeatureFetcher(datapipe=MultiprocessingWrapper, edge_feature_keys=None, feature_store=TorchBasedFeatureStore(\n {(<OnDiskFeatureDataDomain.NODE: ‘node’>, ‘user’, ‘feat_0’): TorchBasedFeature(\n feature=tensor([[0.9986, 0.7045, 0.0039, 0.4038, 0.4468],\n [0.8129, 0.3602, 0.4191, 0.9191, 0.2073],\n [0.0283, 0.6551, 0.4888, 0.5076, 0.7966],\n …,\n [0.9567, 0.0821, 0.8925, 0.9883, 0.6165],\n [0.0355, 0.3422, 0.8111, 0.4431, 0.1680],\n [0.9665, 0.4463, 0.7957, 0.6613, 0.3209]], dtype=torch.float64),\n metadata={},\n ), (<OnDiskFeatureDataDomain.NODE: ‘node’>, ‘user’, ‘feat_1’): TorchBasedFeature(\n feature=tensor([[0.3994, 0.9391, 0.7988, 0.1557, 0.4400],\n [0.2232, 0.3145, 0.0427, 0.2259, 0.2576],\n [0.7617, 0.8606, 0.2726, 0.1272, 0.8139],\n …,\n [0.8321, 0.5778, 0.2791, 0.4884, 0.5149],\n [0.6667, 0.0950, 0.2158, 0.7686, 0.7559],\n [0.4329, 0.2479, 0.6514, 0.4223, 0.9992]]),\n metadata={},\n ), (<OnDiskFeatureDataDomain.NODE: ‘node’>, ‘item’, ‘feat_0’):…