Reg: Issue while reading data from GBT data loader from an ondiskdataset graph

Hi,

I am facing issues in building link prediction model for heterogeneous graph with Graphbolt for an on disk dataset. I have used Ondisk Dataset tutorial of heterogeneous graph as per dgl and have added dataloader function on top of it. But whenever I iterate over the data loader object to read the data, I am getting an error like this:

‘KeyError: "item\nThis exception is thrown by iter of FeatureFetcher(datapipe=MultiprocessingWrapper, edge_feature_keys=None, feature_store=TorchBasedFeatureStore(\n {(<OnDiskFeatureDataDomain.NODE: ‘node’>, ‘user’, ‘feat_0’):…’

Could you please help me resolve this? This issue is blocking my progress and any help is highly appreciated.

Pls note that I execute the below code in a GPU enabled cluster and I use dgl version 2.1.0.

I have given the code below for your reference, it is essentially the same as Heterogeneous Ondisk Dataset tutorial - with a simple data loader at the end.

import os
import torch
import numpy as np
os.environ['TORCH'] = torch.__version__
os.environ['DGLBACKEND'] = "pytorch"

# Install the CPU version.
device = torch.device("cuda")

try:
    import dgl
    import dgl.graphbolt as gb
    installed = True
except ImportError as error:
    installed = False
    print(error)
print("DGL installed!" if installed else "DGL not found!")

# For simplicity, we create a heterogeneous graph with
# 2 node types: `user`, `item`
# 2 edge types: `user:like:item`, `user:follow:user`
# And each node/edge type has the same number of nodes/edges.
num_nodes = 1000
num_edges = 10 * num_nodes

# Edge type: "user:like:item"
like_edges_path = os.path.join(base_dir, "like-edges.csv")
like_edges = np.random.randint(0, num_nodes, size=(num_edges, 2))
print(f"Part of [user:like:item] edges: {like_edges[:5, :]}\n")

df = pd.DataFrame(like_edges)
df.to_csv(like_edges_path, index=False, header=False)
print(f"[user:like:item] edges are saved into {like_edges_path}\n")

# Edge type: "user:follow:user"
follow_edges_path = os.path.join(base_dir, "follow-edges.csv")
follow_edges = np.random.randint(0, num_nodes, size=(num_edges, 2))
print(f"Part of [user:follow:user] edges: {follow_edges[:5, :]}\n")

df = pd.DataFrame(follow_edges)
df.to_csv(follow_edges_path, index=False, header=False)
print(f"[user:follow:user] edges are saved into {follow_edges_path}\n")

# Generate node[user] feature in numpy array.
node_user_feat_0_path = os.path.join(base_dir, "node-user-feat-0.npy")
node_user_feat_0 = np.random.rand(num_nodes, 5)
print(f"Part of node[user] feature [feat_0]: {node_user_feat_0[:3, :]}")
np.save(node_user_feat_0_path, node_user_feat_0)
print(f"Node[user] feature [feat_0] is saved to {node_user_feat_0_path}\n")

# Generate another node[user] feature in torch tensor
node_user_feat_1_path = os.path.join(base_dir, "node-user-feat-1.pt")
node_user_feat_1 = torch.rand(num_nodes, 5)
print(f"Part of node[user] feature [feat_1]: {node_user_feat_1[:3, :]}")
torch.save(node_user_feat_1, node_user_feat_1_path)
print(f"Node[user] feature [feat_1] is saved to {node_user_feat_1_path}\n")

# Generate node[item] feature in numpy array.
node_item_feat_0_path = os.path.join(base_dir, "node-item-feat-0.npy")
node_item_feat_0 = np.random.rand(num_nodes, 5)
print(f"Part of node[item] feature [feat_0]: {node_item_feat_0[:3, :]}")
np.save(node_item_feat_0_path, node_item_feat_0)
print(f"Node[item] feature [feat_0] is saved to {node_item_feat_0_path}\n")

# Generate another node[item] feature in torch tensor
node_item_feat_1_path = os.path.join(base_dir, "node-item-feat-1.pt")
node_item_feat_1 = torch.rand(num_nodes, 5)
print(f"Part of node[item] feature [feat_1]: {node_item_feat_1[:3, :]}")
torch.save(node_item_feat_1, node_item_feat_1_path)
print(f"Node[item] feature [feat_1] is saved to {node_item_feat_1_path}\n")

# Generate edge[user:like:item] feature in numpy array.
edge_like_feat_0_path = os.path.join(base_dir, "edge-like-feat-0.npy")
edge_like_feat_0 = np.random.rand(num_edges, 5)
print(f"Part of edge[user:like:item] feature [feat_0]: {edge_like_feat_0[:3, :]}")
np.save(edge_like_feat_0_path, edge_like_feat_0)
print(f"Edge[user:like:item] feature [feat_0] is saved to {edge_like_feat_0_path}\n")

# Generate another edge[user:like:item] feature in torch tensor
edge_like_feat_1_path = os.path.join(base_dir, "edge-like-feat-1.pt")
edge_like_feat_1 = torch.rand(num_edges, 5)
print(f"Part of edge[user:like:item] feature [feat_1]: {edge_like_feat_1[:3, :]}")
torch.save(edge_like_feat_1, edge_like_feat_1_path)
print(f"Edge[user:like:item] feature [feat_1] is saved to {edge_like_feat_1_path}\n")

# Generate edge[user:follow:user] feature in numpy array.
edge_follow_feat_0_path = os.path.join(base_dir, "edge-follow-feat-0.npy")
edge_follow_feat_0 = np.random.rand(num_edges, 5)
print(f"Part of edge[user:follow:user] feature [feat_0]: {edge_follow_feat_0[:3, :]}")
np.save(edge_follow_feat_0_path, edge_follow_feat_0)
print(f"Edge[user:follow:user] feature [feat_0] is saved to {edge_follow_feat_0_path}\n")

# Generate another edge[user:follow:user] feature in torch tensor
edge_follow_feat_1_path = os.path.join(base_dir, "edge-follow-feat-1.pt")
edge_follow_feat_1 = torch.rand(num_edges, 5)
print(f"Part of edge[user:follow:user] feature [feat_1]: {edge_follow_feat_1[:3, :]}")
torch.save(edge_follow_feat_1, edge_follow_feat_1_path)
print(f"Edge[user:follow:user] feature [feat_1] is saved to {edge_follow_feat_1_path}\n")

# For illustration, let's generate item sets for each edge type.
num_trains = int(num_edges * 0.6)
num_vals = int(num_edges * 0.2)
num_tests = num_edges - num_trains - num_vals

# Train node pairs for user:like:item.
lp_train_like_node_pairs_path = os.path.join(base_dir, "lp-train-like-node-pairs.npy")
lp_train_like_node_pairs = like_edges[:num_trains, :]
print(f"Part of train node pairs[user:like:item] for link prediction: {lp_train_like_node_pairs[:3]}")
np.save(lp_train_like_node_pairs_path, lp_train_like_node_pairs)
print(f"LP train node pairs[user:like:item] are saved to {lp_train_like_node_pairs_path}\n")

# Train node pairs for user:follow:user.
lp_train_follow_node_pairs_path = os.path.join(base_dir, "lp-train-follow-node-pairs.npy")
lp_train_follow_node_pairs = follow_edges[:num_trains, :]
print(f"Part of train node pairs[user:follow:user] for link prediction: {lp_train_follow_node_pairs[:3]}")
np.save(lp_train_follow_node_pairs_path, lp_train_follow_node_pairs)
print(f"LP train node pairs[user:follow:user] are saved to {lp_train_follow_node_pairs_path}\n")

# Val node pairs for user:like:item.
lp_val_like_node_pairs_path = os.path.join(base_dir, "lp-val-like-node-pairs.npy")
lp_val_like_node_pairs = like_edges[num_trains:num_trains+num_vals, :]
print(f"Part of val node pairs[user:like:item] for link prediction: {lp_val_like_node_pairs[:3]}")
np.save(lp_val_like_node_pairs_path, lp_val_like_node_pairs)
print(f"LP val node pairs[user:like:item] are saved to {lp_val_like_node_pairs_path}\n")

# Val negative dsts for user:like:item.
lp_val_like_neg_dsts_path = os.path.join(base_dir, "lp-val-like-neg-dsts.pt")
lp_val_like_neg_dsts = torch.randint(0, num_nodes, (num_vals, 10))
print(f"Part of val negative dsts[user:like:item] for link prediction: {lp_val_like_neg_dsts[:3]}")
torch.save(lp_val_like_neg_dsts, lp_val_like_neg_dsts_path)
print(f"LP val negative dsts[user:like:item] are saved to {lp_val_like_neg_dsts_path}\n")

# Val node pairs for user:follow:user.
lp_val_follow_node_pairs_path = os.path.join(base_dir, "lp-val-follow-node-pairs.npy")
lp_val_follow_node_pairs = follow_edges[num_trains:num_trains+num_vals, :]
print(f"Part of val node pairs[user:follow:user] for link prediction: {lp_val_follow_node_pairs[:3]}")
np.save(lp_val_follow_node_pairs_path, lp_val_follow_node_pairs)
print(f"LP val node pairs[user:follow:user] are saved to {lp_val_follow_node_pairs_path}\n")

# Val negative dsts for user:follow:user.
lp_val_follow_neg_dsts_path = os.path.join(base_dir, "lp-val-follow-neg-dsts.pt")
lp_val_follow_neg_dsts = torch.randint(0, num_nodes, (num_vals, 10))
print(f"Part of val negative dsts[user:follow:user] for link prediction: {lp_val_follow_neg_dsts[:3]}")
torch.save(lp_val_follow_neg_dsts, lp_val_follow_neg_dsts_path)
print(f"LP val negative dsts[user:follow:user] are saved to {lp_val_follow_neg_dsts_path}\n")

# Test node paris for user:like:item.
lp_test_like_node_pairs_path = os.path.join(base_dir, "lp-test-like-node-pairs.npy")
lp_test_like_node_pairs = like_edges[-num_tests, :]
print(f"Part of test node pairs[user:like:item] for link prediction: {lp_test_like_node_pairs[:3]}")
np.save(lp_test_like_node_pairs_path, lp_test_like_node_pairs)
print(f"LP test node pairs[user:like:item] are saved to {lp_test_like_node_pairs_path}\n")

# Test negative dsts for user:like:item.
lp_test_like_neg_dsts_path = os.path.join(base_dir, "lp-test-like-neg-dsts.pt")
lp_test_like_neg_dsts = torch.randint(0, num_nodes, (num_tests, 10))
print(f"Part of test negative dsts[user:like:item] for link prediction: {lp_test_like_neg_dsts[:3]}")
torch.save(lp_test_like_neg_dsts, lp_test_like_neg_dsts_path)
print(f"LP test negative dsts[user:like:item] are saved to {lp_test_like_neg_dsts_path}\n")

# Test node paris for user:follow:user.
lp_test_follow_node_pairs_path = os.path.join(base_dir, "lp-test-follow-node-pairs.npy")
lp_test_follow_node_pairs = follow_edges[-num_tests, :]
print(f"Part of test node pairs[user:follow:user] for link prediction: {lp_test_follow_node_pairs[:3]}")
np.save(lp_test_follow_node_pairs_path, lp_test_follow_node_pairs)
print(f"LP test node pairs[user:follow:user] are saved to {lp_test_follow_node_pairs_path}\n")

# Test negative dsts for user:follow:user.
lp_test_follow_neg_dsts_path = os.path.join(base_dir, "lp-test-follow-neg-dsts.pt")
lp_test_follow_neg_dsts = torch.randint(0, num_nodes, (num_tests, 10))
print(f"Part of test negative dsts[user:follow:user] for link prediction: {lp_test_follow_neg_dsts[:3]}")
torch.save(lp_test_follow_neg_dsts, lp_test_follow_neg_dsts_path)
print(f"LP test negative dsts[user:follow:user] are saved to {lp_test_follow_neg_dsts_path}\n")

yaml_content = f"""
    dataset_name: heterogeneous_graph_nc_lp
    graph:
      nodes:
        - type: user
          num: {num_nodes}
        - type: item
          num: {num_nodes}
      edges:
        - type: "user:like:item"
          format: csv
          path: {os.path.basename(like_edges_path)}
        - type: "user:follow:user"
          format: csv
          path: {os.path.basename(follow_edges_path)}
    feature_data:
      - domain: node
        type: user
        name: feat_0
        format: numpy
        path: {os.path.basename(node_user_feat_0_path)}
      - domain: node
        type: user
        name: feat_1
        format: torch
        path: {os.path.basename(node_user_feat_1_path)}
      - domain: node
        type: item
        name: feat_0
        format: numpy
        path: {os.path.basename(node_item_feat_0_path)}
      - domain: node
        type: item
        name: feat_1
        format: torch
        path: {os.path.basename(node_item_feat_1_path)}
      - domain: edge
        type: "user:like:item"
        name: feat_0
        format: numpy
        path: {os.path.basename(edge_like_feat_0_path)}
      - domain: edge
        type: "user:like:item"
        name: feat_1
        format: torch
        path: {os.path.basename(edge_like_feat_1_path)}
      - domain: edge
        type: "user:follow:user"
        name: feat_0
        format: numpy
        path: {os.path.basename(edge_follow_feat_0_path)}
      - domain: edge
        type: "user:follow:user"
        name: feat_1
        format: torch
        path: {os.path.basename(edge_follow_feat_1_path)}
    tasks:
      - name: link_prediction
        num_classes: 10
        train_set:
          - type: "user:like:item"
            data:
              - name: node_pairs
                format: numpy
                path: {os.path.basename(lp_train_like_node_pairs_path)}
          - type: "user:follow:user"
            data:
              - name: node_pairs
                format: numpy
                path: {os.path.basename(lp_train_follow_node_pairs_path)}
        validation_set:
          - type: "user:like:item"
            data:
              - name: node_pairs
                format: numpy
                path: {os.path.basename(lp_val_like_node_pairs_path)}
              - name: negative_dsts
                format: torch
                path: {os.path.basename(lp_val_like_neg_dsts_path)}
          - type: "user:follow:user"
            data:
              - name: node_pairs
                format: numpy
                path: {os.path.basename(lp_val_follow_node_pairs_path)}
              - name: negative_dsts
                format: torch
                path: {os.path.basename(lp_val_follow_neg_dsts_path)}
        test_set:
          - type: "user:like:item"
            data:
              - name: node_pairs
                format: numpy
                path: {os.path.basename(lp_test_like_node_pairs_path)}
              - name: negative_dsts
                format: torch
                path: {os.path.basename(lp_test_like_neg_dsts_path)}
          - type: "user:follow:user"
            data:
              - name: node_pairs
                format: numpy
                path: {os.path.basename(lp_test_follow_node_pairs_path)}
              - name: negative_dsts
                format: torch
                path: {os.path.basename(lp_test_follow_neg_dsts_path)}
"""
metadata_path = os.path.join(base_dir, "metadata.yaml")
with open(metadata_path, "w") as f:
  f.write(yaml_content)

dataset = gb.OnDiskDataset(base_dir).load()
graph = dataset.graph
print(f"Loaded graph: {graph}\n")

feature = dataset.feature.pin_memory_() # pin the feature
assert feature.is_pinned()

#feature = dataset.feature.pin_memory_()
print(f"Loaded feature store: {feature}\n")

tasks = dataset.tasks
lp_task = tasks[0]
print(f"Loaded link prediction task: {lp_task}\n")

train_set = dataset.tasks[0].train_set

node_feature_keys = {"user": ["feat_0", "feat_1"], "item":["feat_0", "feat_1"]}

fanouts=[torch.full((1,), 2), torch.full((1,), 2)]

from functools import partial
def create_train_dataloader():
    datapipe = gb.ItemSampler(train_set, batch_size=8, shuffle=True)
    datapipe = datapipe.sample_uniform_negative(graph, 5)
    datapipe = datapipe.sample_neighbor(graph, fanouts)
    #datapipe = datapipe.transform(partial(gb.exclude_seed_edges, include_reverse_edges=True))
    datapipe = datapipe.copy_to("cuda:0", extra_attrs=["input_nodes"])
    datapipe = datapipe.fetch_feature(feature, node_feature_keys)
    return gb.DataLoader(datapipe)


dataloader = create_train_dataloader()

data = next(iter(dataloader))
print(f"MiniBatch: {data}")

Exception that I obtain is:

KeyError: "item\nThis exception is thrown by iter of FeatureFetcher(datapipe=MultiprocessingWrapper, edge_feature_keys=None, feature_store=TorchBasedFeatureStore(\n {(<OnDiskFeatureDataDomain.NODE: ‘node’>, ‘user’, ‘feat_0’): TorchBasedFeature(\n feature=tensor([[0.9986, 0.7045, 0.0039, 0.4038, 0.4468],\n [0.8129, 0.3602, 0.4191, 0.9191, 0.2073],\n [0.0283, 0.6551, 0.4888, 0.5076, 0.7966],\n …,\n [0.9567, 0.0821, 0.8925, 0.9883, 0.6165],\n [0.0355, 0.3422, 0.8111, 0.4431, 0.1680],\n [0.9665, 0.4463, 0.7957, 0.6613, 0.3209]], dtype=torch.float64),\n metadata={},\n ), (<OnDiskFeatureDataDomain.NODE: ‘node’>, ‘user’, ‘feat_1’): TorchBasedFeature(\n feature=tensor([[0.3994, 0.9391, 0.7988, 0.1557, 0.4400],\n [0.2232, 0.3145, 0.0427, 0.2259, 0.2576],\n [0.7617, 0.8606, 0.2726, 0.1272, 0.8139],\n …,\n [0.8321, 0.5778, 0.2791, 0.4884, 0.5149],\n [0.6667, 0.0950, 0.2158, 0.7686, 0.7559],\n [0.4329, 0.2479, 0.6514, 0.4223, 0.9992]]),\n metadata={},\n ), (<OnDiskFeatureDataDomain.NODE: ‘node’>, ‘item’, ‘feat_0’):…

The KeyError is caused by no item nodes are sampled in subgraph as there are only 2 types of edges: user:like:item and user:follow:user. No source nodes that are typed as item in the original graph. So in this case, you should remove item from node_feature_keys like below:

node_feature_keys = {"user": ["feat_0", "feat_1"]}

@VVasanth FYI. I have created an request to bypass the exception in FeatureFetcher: [GraphBolt] FeatureFetcher throws exception if `input_nodes` does not contain target feature · Issue #7385 · dmlc/dgl · GitHub

thanks @Rhett-Ying…but when I remove ‘item’ from ‘node_feature_keys’ as you mentioned - I am getting below error…

KeyError: ‘item’

KeyError Traceback (most recent call last)
File :2
1 data = next(iter(dataloader))
----> 2 print(f"MiniBatch: {data}")

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-656589cd-8bd7-4b7b-a185-e526f6d03a79/lib/python3.9/site-packages/dgl/graphbolt/minibatch.py:181, in MiniBatch.repr(self)
180 def repr(self) → str:
→ 181 return _minibatch_str(self)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-656589cd-8bd7-4b7b-a185-e526f6d03a79/lib/python3.9/site-packages/dgl/graphbolt/minibatch.py:587, in _minibatch_str(minibatch)
585 final_str = “”
586 # Get all attributes in the class except methods.
→ 587 attributes = get_attributes(minibatch)
588 attributes.reverse()
589 # Insert key with its value into the string.

could you pls help me address the issue?

This is probably a bug and I’ve created an issue ticket on github to track this.

1 Like