Core Dumped While GNN Inferencing

I am trying to simulate a server accepting requests (node IDs for a graph) for which it performs inference using a trained GNN (Graph Neural Network) model. Every time a request arrives, the server should launch an asynchronous thread to run inferencing for that node. To do so, I have used ThreadPoolExecutor from concurrent.futures. As soon as I run the script for more than 1 worker, I get an Aborted (core dumped) error and the process is killed. I have attached the code for reference. Can anyone point out how to get around this issue and what exactly am I doing wrong?

import argparse
import concurrent.futures
import gc
import logging
import os
import time

import dgl
import torch
from torch.cuda import empty_cache

from models.graphconv import GCN


def inference_function(model, sampler, node_id, submit_time):
    start_time = time.time()
    logger.info(f"Inf invoked for node {node_id[0].item()} at {submit_time}")
    queue_time = start_time - submit_time
    dataloader = dgl.dataloading.DataLoader(
        graph=g,
        indices=node_id,
        graph_sampler=sampler,
        batch_size=node_id.shape[0],
    )
    with torch.no_grad():
        input_nodes, output_nodes, blocks = next(iter(dataloader))
        blocks = [block.to('cuda') for block in blocks]
        x = blocks[0].srcdata['feat']
        label = model(blocks, x)
        del x
        empty_cache()
        # gc.collect()
    label = label[0].argmax()
    end_time = time.time()
    elapsed_time = end_time - start_time
    return label, queue_time, elapsed_time


def run_inference_asynchronously(model, sampler, node_ids, max_threads=4):
    completed = 0
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
        submit_times = {executor.submit(inference_function, model, sampler, torch.tensor([node_id]), time.time()): node_id for node_id in node_ids}
        for future in concurrent.futures.as_completed(submit_times):
            node_id = submit_times[future]
            try:
                returned_tuple = future.result()
                result, queue_time, inference_time = returned_tuple
                total_time = queue_time + inference_time
                completed += 1
                logger.info(f"Inference result for node {node_id}: {result}, queue time: {queue_time:.2f} sec, inference time: {inference_time:.2f} sec, total time: {total_time:.2f} sec, completed: {(completed/num_test_nodes) * 100:.2f}%")
            except Exception as exc:
                print(f"Node {node_id} generated an exception: {exc}")
    return results


if __name__ == '__main__':
    args = argparse.ArgumentParser()
    args.add_argument('--dataset', type=str, default='reddit', choices=['reddit', 'ogbn-products', 'ogbn-arxiv', 'ogbn-papers100M'])
    args.add_argument('--n_layers', type=int, default=2)
    args.add_argument('--bs', type=int, default=1)
    args.add_argument('--workers', type=int, default=2)
    args = args.parse_args()
    dataset = args.dataset
    n_layers = args.n_layers
    bs = args.bs

    logging.basicConfig(filename=f'logs/{dataset}-w{args.workers}.log',
                        filemode='a',
                        format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
                        datefmt='%H:%M:%S',
                        level=logging.DEBUG)
    logger = logging.getLogger(__name__)

    g = dgl.load_graphs(f'graph.dgl')[0][0]
    test_mask = torch.logical_or(g.ndata['test_mask'], g.ndata['val_mask'])
    test_nodes = test_mask.nonzero().squeeze()
    test_nodes = test_nodes[torch.randperm(test_nodes.shape[0])]
    num_test_nodes = test_nodes.shape[0]
    logger.info(f'Loaded graph with {g.num_nodes()} nodes and {g.num_edges()} edges')

    model = GCN(
        in_feats=g.ndata['feat'].shape[1],
        hidden_feats=128,
        n_classes=max(g.ndata['label']).item() + 1 if dataset != 'ogbn-papers100M' else 172
    )
    try:
        model.load_state_dict(torch.load(f'trained_models/model_{dataset}.pth'))
    except Exception as e:
        logger.error(f'Error loading model: {e}')
    model.load_state_dict(torch.load(f'trained_models/model_{dataset}.pth'))
    model = model.to('cuda')
    model.eval()
    logger.info('Loaded model and moved to GPU')

    sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2)

    results = run_inference_asynchronously(model, sampler, test_nodes, max_threads=args.workers if args.workers > 0 else os.cpu_count())

What I have tried so far -

  1. I have tried running this for different graph sizes. The script runs for smaller graph sizes (few hundred MBs in memory), but immediately aborts for medium sized graphs (few GBs in memory) which makes me think each thread might be loading the graph into memory. However, I am not sure how this could be happening.
  2. My understanding of ThreadPoolExecutor is that if we submit requests more than the number of available cores, those requests wait until a core is released. Regardless, my approach fails for even two workers.

My system specifications are:
CPU - 2 x AMD EPYC 7532 32-Core Processor,
RAM - 512GB,
GPU - Nvidia RTX A5000,
Running on Python 3.10

please try with ProcessPoolExecutor.

Thanks. Is there a particular reason ThreadPoolExecutor won’t work?

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.