import os
import os.path as osp
from typing import Callable, Optional, Any
import numpy as np
import pandas as pd
import pandas.api.types as ptypes
import torch
import torch_frame
from torch_frame import TensorFrame, stype
from torch_frame.data.stats import StatType
from torch_geometric.data import Data, InMemoryDataset, download_url, extract_zip
from torch_geometric.transforms import ToUndirected
from torch_geometric.utils import subgraph
def _load_yaml(path: str) -> dict:
import yaml # type: ignore
with open(path) as f:
return yaml.safe_load(f)
[docs]class GraphLandTensorFrameDataset(InMemoryDataset):
r"""GraphLand dataset rewritten to store node attributes in TensorFrame.
Differences from the original implementation:
- Graph structure is stored in `Data.edge_index`.
- Node attributes are stored in `Data.x` (a `torch_frame.TensorFrame`).
- Masks and targets are still stored in `Data`.
Notes:
- The original sklearn-based feature preprocessing is intentionally removed.
In a torch-frame workflow, semantic types are preserved and feature
encoding/normalization/imputation is usually handled by the model-side
encoders.
"""
_url = 'https://zenodo.org/records/16895532'
GRAPHLAND_DATASETS = {
'hm-categories': 'multiclass_classification',
'pokec-regions': 'multiclass_classification',
'web-topics': 'multiclass_classification',
'tolokers-2': 'binary_classification',
'city-reviews': 'binary_classification',
'artnet-exp': 'binary_classification',
'web-fraud': 'binary_classification',
'hm-prices': 'regression',
'avazu-ctr': 'regression',
'city-roads-M': 'regression',
'city-roads-L': 'regression',
'twitch-views': 'regression',
'artnet-views': 'regression',
'web-traffic': 'regression',
}
def __init__(
self,
root: str,
name: str,
split: str,
to_undirected: bool = False,
transform: Optional[Callable] = None,
pre_transform: Optional[Callable] = None,
force_reload: bool = False,
) -> None:
assert name in self.GRAPHLAND_DATASETS, f'Unsupported dataset name: {name}'
assert split in ['RL', 'RH', 'TH', 'THI'], f'Unsupported split name: {split}'
if split in ['TH', 'THI']:
assert name not in [
'city-reviews',
'city-roads-M',
'city-roads-L',
'web-traffic',
], ('Temporal split is not available for city-reviews, '
'city-roads-M, city-roads-L, web-traffic.')
self.name = name
self.split = split
self.task = self.GRAPHLAND_DATASETS[name]
self._to_undirected = to_undirected
super().__init__(root, transform, pre_transform, force_reload=force_reload)
self.load(self.processed_paths[0])
@property
def raw_dir(self) -> str:
return osp.join(self.root, self.name, 'raw')
@property
def processed_dir(self) -> str:
return osp.join(
self.root,
self.name,
'processed',
f'{self.split}__to_undirected_{str(self._to_undirected).lower()}__tensorframe',
)
@property
def raw_file_names(self) -> str:
return self.name
@property
def processed_file_names(self) -> str:
return 'data.pt'
[docs] def download(self) -> None:
zip_url = f"{self._url}/files/{self.name}.zip"
path = download_url(zip_url, self.raw_dir)
extract_zip(path, self.raw_dir)
os.unlink(path)
def _get_raw_data(self) -> dict[str, Any]:
raw_data_dir = osp.join(self.raw_dir, self.name)
info = _load_yaml(osp.join(raw_data_dir, 'info.yaml'))
features_df = pd.read_csv(
osp.join(raw_data_dir, 'features.csv'),
index_col=0,
)
targets_df = pd.read_csv(
osp.join(raw_data_dir, 'targets.csv'),
index_col=0,
)
targets = targets_df[info['target_name']].values
masks_df = pd.read_csv(
osp.join(raw_data_dir, f'split_masks_{self.split[:2]}.csv'),
index_col=0,
)
masks = {
k: np.array(v, dtype=bool)
for k, v in masks_df.to_dict('list').items()
}
edges_df = pd.read_csv(osp.join(raw_data_dir, 'edgelist.csv'))
edges = edges_df.values
return {
'info': info,
'features_df': features_df,
'targets': targets,
'masks': masks,
'edges': edges,
}
def _build_col_to_stype(self, info: dict) -> dict[str, torch_frame.stype]:
col_to_stype: dict[str, torch_frame.stype] = {}
num_cols = list(info['fraction_features_names']) + list(info['numerical_features_names'])
cat_cols = list(info['categorical_features_names'])
for col in num_cols:
col_to_stype[col] = stype.numerical
for col in cat_cols:
col_to_stype[col] = stype.categorical
return col_to_stype
def _default_stats_for_stype(
self,
st: torch_frame.stype,
) -> dict[StatType, Any]:
if st == stype.numerical:
return {
StatType.MEAN: np.nan,
StatType.STD: np.nan,
StatType.QUANTILES: [np.nan, np.nan, np.nan, np.nan, np.nan],
}
if st == stype.categorical:
return {
StatType.COUNT: ([], []),
}
raise ValueError(f'Unsupported stype: {st}')
def _compute_col_stats(
self,
df: pd.DataFrame,
col_to_stype: dict[str, torch_frame.stype],
) -> dict[str, dict[StatType, Any]]:
"""Compute column statistics in a batched manner.
Numerical columns are processed together using vectorized NumPy
operations. Categorical columns are still processed per column, but
grouped under the same pass to reduce overhead.
"""
col_stats: dict[str, dict[StatType, Any]] = {}
num_cols = [col for col, st in col_to_stype.items() if st == stype.numerical]
cat_cols = [col for col, st in col_to_stype.items() if st == stype.categorical]
# Batch-compute statistics for numerical columns.
if len(num_cols) > 0:
num_df = df[num_cols].copy()
for col in num_cols:
num_df[col] = num_df[col].mask(num_df[col].isin([np.inf, -np.inf]), np.nan)
if not ptypes.is_numeric_dtype(num_df[col]):
raise TypeError(
f"Numerical series '{col}' contains invalid entries. "
"Please make sure it contains only numerical values or NaNs."
)
arr = num_df.to_numpy(dtype=np.float64, copy=True)
finite_mask = np.isfinite(arr)
arr[~finite_mask] = np.nan
all_nan_mask = np.isnan(arr).all(axis=0)
means = np.nanmean(arr, axis=0)
stds = np.nanstd(arr, axis=0)
quants = np.nanquantile(
arr,
q=[0.0, 0.25, 0.5, 0.75, 1.0],
axis=0,
)
for idx, col in enumerate(num_cols):
if all_nan_mask[idx]:
col_stats[col] = self._default_stats_for_stype(stype.numerical)
else:
col_stats[col] = {
StatType.MEAN: float(means[idx]),
StatType.STD: float(stds[idx]),
StatType.QUANTILES: quants[:, idx].tolist(),
}
# Compute statistics for categorical columns.
if len(cat_cols) > 0:
cat_df = df[cat_cols]
for col in cat_cols:
ser = cat_df[col]
if ser.isnull().all():
col_stats[col] = self._default_stats_for_stype(stype.categorical)
continue
count = ser.dropna().value_counts(ascending=False)
col_stats[col] = {
StatType.COUNT: (count.index.tolist(), count.values.tolist())
}
return col_stats
def _encode_categorical_column(
self,
ser: pd.Series,
stats: dict[StatType, Any],
) -> torch.Tensor:
"""Encode a categorical column into integer indices.
Missing values are mapped to -1 to match TensorFrame conventions.
Unknown categories are also mapped to -1.
"""
categories, _ = stats[StatType.COUNT]
cat_to_idx = {cat: i for i, cat in enumerate(categories)}
values = ser.to_numpy(copy=False)
out = np.full(len(values), -1, dtype=np.int64)
for i, value in enumerate(values):
if pd.isna(value):
continue
out[i] = cat_to_idx.get(value, -1)
return torch.from_numpy(out)
def _build_tensor_frame(
self,
df: pd.DataFrame,
col_to_stype: dict[str, torch_frame.stype],
col_stats: dict[str, dict[StatType, Any]],
y: Optional[torch.Tensor] = None,
) -> TensorFrame:
"""Build a TensorFrame directly from a pandas DataFrame."""
feat_dict: dict[torch_frame.stype, torch.Tensor] = {}
col_names_dict: dict[torch_frame.stype, list[str]] = {}
num_cols = [col for col, st in col_to_stype.items() if st == stype.numerical]
cat_cols = [col for col, st in col_to_stype.items() if st == stype.categorical]
if len(num_cols) > 0:
num_arr = df[num_cols].to_numpy(dtype=np.float32, copy=True)
num_arr[np.isinf(num_arr)] = np.nan
feat_dict[stype.numerical] = torch.from_numpy(num_arr)
col_names_dict[stype.numerical] = num_cols
if len(cat_cols) > 0:
cat_tensors = []
for col in cat_cols:
cat_tensors.append(
self._encode_categorical_column(df[col], col_stats[col]).view(-1, 1)
)
feat_dict[stype.categorical] = torch.cat(cat_tensors, dim=1)
col_names_dict[stype.categorical] = cat_cols
return TensorFrame(
feat_dict=feat_dict,
col_names_dict=col_names_dict,
y=y,
num_rows=len(df),
)
def _prepare_targets(
self,
raw_data: dict[str, Any],
) -> tuple[torch.Tensor, np.ndarray]:
targets = raw_data['targets']
labeled_mask = ~pd.isna(targets)
if self.task == 'regression':
y_np = np.asarray(targets, dtype=np.float32)
y = torch.from_numpy(y_np)
else:
y_np = np.asarray(targets, dtype=np.float32)
# y_np[~labeled_mask] = -1
# y = torch.from_numpy(y_np.astype(np.int64))
y = torch.from_numpy(y_np)
return y, labeled_mask
def _get_transductive_data(self) -> list[Data]:
raw_data = self._get_raw_data()
info = raw_data['info']
features_df = raw_data['features_df']
masks = raw_data['masks']
col_to_stype = self._build_col_to_stype(info)
y, labeled_mask = self._prepare_targets(raw_data)
# Compute column statistics only on training nodes to avoid leakage.
train_df = features_df.loc[masks['train']]
train_col_stats = self._compute_col_stats(train_df, col_to_stype)
tf = self._build_tensor_frame(
df=features_df,
col_to_stype=col_to_stype,
col_stats=train_col_stats,
y=None,
)
train_mask = torch.from_numpy(masks['train'] & labeled_mask).bool()
val_mask = torch.from_numpy(masks['val'] & labeled_mask).bool()
test_mask = torch.from_numpy(masks['test'] & labeled_mask).bool()
edge_index = torch.from_numpy(raw_data['edges'].T).long()
data = Data(
edge_index=edge_index,
y=y,
train_mask=train_mask,
val_mask=val_mask,
test_mask=test_mask,
x=tf,
tf_col_stats=train_col_stats,
)
return [data]
def _get_inductive_data(self) -> list[Data]:
raw_data = self._get_raw_data()
info = raw_data['info']
features_df = raw_data['features_df']
masks = raw_data['masks']
col_to_stype = self._build_col_to_stype(info)
y_all, labeled_mask = self._prepare_targets(raw_data)
edge_index = torch.from_numpy(raw_data['edges'].T).long()
# Compute statistics only on the training snapshot.
train_df = features_df.loc[masks['train']]
train_col_stats = self._compute_col_stats(train_df, col_to_stype)
# Train snapshot.
train_graph_mask_np = masks['train']
train_graph_mask = torch.from_numpy(train_graph_mask_np).bool()
train_label_mask = torch.from_numpy(masks['train'] & labeled_mask).bool()
train_tf = self._build_tensor_frame(
df=features_df.loc[train_graph_mask_np],
col_to_stype=col_to_stype,
col_stats=train_col_stats,
y=None,
)
train_edge_index, _ = subgraph(
train_graph_mask,
edge_index,
relabel_nodes=True,
)
train_node_id = torch.from_numpy(np.where(train_graph_mask_np)[0]).long()
train_data = Data(
edge_index=train_edge_index,
y=y_all[train_graph_mask],
mask=train_label_mask[train_graph_mask],
x=train_tf,
tf_col_stats=train_col_stats,
node_id=train_node_id,
)
# Validation snapshot.
val_graph_mask_np = masks['train'] | masks['val']
val_graph_mask = torch.from_numpy(val_graph_mask_np).bool()
val_label_mask = torch.from_numpy(masks['val'] & labeled_mask).bool()
val_tf = self._build_tensor_frame(
df=features_df.loc[val_graph_mask_np],
col_to_stype=col_to_stype,
col_stats=train_col_stats,
y=None,
)
val_edge_index, _ = subgraph(
val_graph_mask,
edge_index,
relabel_nodes=True,
)
val_node_id = torch.from_numpy(np.where(val_graph_mask_np)[0]).long()
val_data = Data(
edge_index=val_edge_index,
y=y_all[val_graph_mask],
mask=val_label_mask[val_graph_mask],
x=val_tf,
tf_col_stats=train_col_stats,
node_id=val_node_id,
)
# Test snapshot.
test_graph_mask_np = masks['train'] | masks['val'] | masks['test']
test_graph_mask = torch.from_numpy(test_graph_mask_np).bool()
test_label_mask = torch.from_numpy(masks['test'] & labeled_mask).bool()
test_tf = self._build_tensor_frame(
df=features_df.loc[test_graph_mask_np],
col_to_stype=col_to_stype,
col_stats=train_col_stats,
y=None,
)
test_edge_index, _ = subgraph(
test_graph_mask,
edge_index,
relabel_nodes=True,
)
test_node_id = torch.from_numpy(np.where(test_graph_mask_np)[0]).long()
test_data = Data(
edge_index=test_edge_index,
y=y_all[test_graph_mask],
mask=test_label_mask[test_graph_mask],
x=test_tf,
tf_col_stats=train_col_stats,
node_id=test_node_id,
)
return [train_data, val_data, test_data]
[docs] def process(self) -> None:
data_list = (
self._get_transductive_data()
if self.split in ['RL', 'RH', 'TH']
else self._get_inductive_data()
)
if self._to_undirected:
to_undirected = ToUndirected()
for i, data in enumerate(data_list):
data_list[i] = to_undirected(data)
self.save(data_list, self.processed_paths[0])
def __repr__(self) -> str:
return f'{self.__class__.__name__}(name={self.name}, split={self.split})'
[docs]def get_tabular_graphland_dataset(name: str, root: str, split: str = 'TH'):
r"""Load HM / Pokec / WebTopic from GraphLandTensorFrameDataset.
This loader is intentionally separate from the generic get_dataset()
because these datasets store node attributes as TensorFrame instead of
dense tensor features.
Args:
name (str): Dataset alias. Supported: ['HM', 'Pokec', 'WebTopic']
root (str): Dataset root directory.
split (str): GraphLand split. Defaults to 'TH'.
Returns:
Data: A PyG Data object with:
- data.x: torch_frame.TensorFrame
- data.edge_index: edge list
- data.y: labels
- train/val/test masks
- tf_col_stats: statistics computed from train nodes only
"""
_TABULAR_GRAPHLAND_NAME_MAP = {
'hm': 'hm-categories',
'pokec': 'pokec-regions',
'webtopic': 'web-topics',
}
key = name.lower()
if key not in _TABULAR_GRAPHLAND_NAME_MAP:
raise ValueError(f'Unsupported tabular GraphLand dataset: {name}')
dataset = GraphLandTensorFrameDataset(
root=root,
name=_TABULAR_GRAPHLAND_NAME_MAP[key],
split=split,
to_undirected=True,
)
data = dataset[0]
data.y = data.y.squeeze()
return data