from typing import Optional
import torch
from torch import Tensor
from torch_geometric.utils import to_undirected, add_remaining_self_loops, degree
from pyagc.models.base import BaseModel
def compute_normalized_matrices(
x: Tensor,
edge_index: Tensor,
num_nodes: int
) -> tuple[Tensor, Tensor]:
r"""
Computes normalized adjacency matrix and normalized feature matrix.
Args:
x (Tensor): Node features of shape :obj:`(num_nodes, num_features)`.
edge_index (Tensor): Edge indices of shape :obj:`(2, num_edges)`.
num_nodes (int): Number of nodes in the graph.
Returns:
Tuple of (normalized transition matrix, normalized features).
"""
# Ensure self-loops and undirected
edge_index = add_remaining_self_loops(edge_index, num_nodes=num_nodes)[0]
edge_index = to_undirected(edge_index)
# Compute degree
row, col = edge_index
deg = degree(row, num_nodes, dtype=x.dtype)
# Normalized adjacency: P_hat = D^{-1} A
deg_inv = deg.pow(-1)
deg_inv[torch.isinf(deg_inv)] = 0
# Build normalized transition matrix (sparse)
edge_weight = deg_inv[row]
P_hat = torch.sparse_coo_tensor(
edge_index,
edge_weight,
size=(num_nodes, num_nodes)
).coalesce().to(x.device)
# Normalize P_hat row-wise
row_sums = torch.sparse.sum(P_hat, dim=1).to_dense()
row_sums[row_sums == 0] = 1.0
# Create normalized P_hat
edge_weight_normalized = edge_weight / row_sums[row]
P_hat_normalized = torch.sparse_coo_tensor(
edge_index,
edge_weight_normalized,
size=(num_nodes, num_nodes)
).to(x.device)
# Normalize features: X_hat_i = X_i / sqrt(sum_j X_i · X_j^T)
# Avoid computing x @ x.T by computing row-wise norms efficiently
# ||X_i||_2 for each row
x_row_norms = torch.norm(x, p=2, dim=1, keepdim=True)
x_row_norms[x_row_norms == 0] = 1.0
x_normalized = x / x_row_norms
return P_hat_normalized, x_normalized
def power_iteration(
P_hat: Tensor,
x_hat: Tensor,
T: int,
alpha: float
) -> Tensor:
r"""
Computes Normalized Smoothed Representations (NSR) via power iteration.
.. math::
Z = \sum_{t=0}^{T} \frac{(1-\alpha)\alpha^t}{1-\alpha^{T+1}} \hat{P}^t \hat{X}
Args:
P_hat (Tensor): Normalized transition matrix (sparse).
x_hat (Tensor): Normalized feature matrix.
T (int): Number of propagation steps.
alpha (float): Decay factor.
Returns:
Node representations Z of shape :obj:`(num_nodes, num_features)`.
"""
factor = (1 - alpha) / (1 - alpha ** (T + 1))
Z = factor * x_hat
for t in range(1, T + 1):
x_hat = torch.sparse.mm(P_hat, x_hat)
Z = Z + alpha ** t * factor * x_hat
return Z
[docs]class S2CAG(BaseModel):
r"""
The S²CAG (Spectral Subspace Clustering for Attributed Graphs) model from the
`"Spectral Subspace Clustering for Attributed Graphs"
<https://arxiv.org/abs/2411.11074>`_ paper (Lin et al., KDD 2025).
S²CAG performs unsupervised graph clustering by computing Normalized Smoothed
Representations (NSR) via graph Laplacian smoothing, then extracting cluster
structures through truncated SVD and spectral rounding.
The model optimizes the following objective:
.. math::
\min_{S} \|Z - SZ\|_F^2 + \|S\|_* + \|S^T S - I\|_F^2
where :math:`Z` is the NSR matrix, :math:`S` is the self-expressive matrix,
and :math:`\|\cdot\|_*` denotes the nuclear norm.
Theoretically, S²CAG is equivalent to minimizing the total conductance of
clusters on an affinity graph constructed from NSR.
Args:
n_clusters (int): Number of clusters.
T (int, optional): Number of propagation steps. (default: :obj:`10`)
alpha (float, optional): Decay factor for smoothing. (default: :obj:`0.9`)
tau (int, optional): Number of iterations for SVD approximation. (default: :obj:`7`)
oversampling (int, optional): Oversampling parameter for randomized SVD.
(default: :obj:`10`)
Example:
>>> from pyagc.models import S2CAG
>>> from pyagc.data import get_dataset
>>>
>>> # Load data
>>> x, edge_index, y = get_dataset('Cora', root='./data')
>>> n_clusters = int(y.max()) + 1
>>>
>>> # Create model
>>> model = S2CAG(n_clusters=n_clusters, T=20, alpha=0.9)
>>>
>>> # Get cluster assignments
>>> clusters = model.embed(x, edge_index)
"""
def __init__(
self,
n_clusters: int,
T: int = 10,
alpha: float = 0.9,
tau: int = 7,
oversampling: int = 10
):
super().__init__()
self.n_clusters = n_clusters
self.T = T
self.alpha = alpha
self.tau = tau
self.oversampling = oversampling
def _compute_nsr(
self,
x: Tensor,
edge_index: Tensor,
num_nodes: Optional[int] = None
) -> Tensor:
r"""Computes Normalized Smoothed Representations (NSR)."""
if num_nodes is None:
num_nodes = x.size(0)
P_hat, x_hat = compute_normalized_matrices(x, edge_index, num_nodes)
Z = power_iteration(P_hat, x_hat, self.T, self.alpha)
return Z
def _randomized_svd(
self,
Z: Tensor,
k: int,
n_iter: Optional[int] = None
) -> Tensor:
r"""
Computes top-k left singular vectors via randomized SVD.
Args:
Z (Tensor): Input matrix of shape :obj:`(n, d)`.
k (int): Number of singular vectors to compute.
n_iter (int, optional): Number of power iterations.
Returns:
Top-k left singular vectors of shape :obj:`(n, k)`.
"""
if n_iter is None:
n_iter = self.tau
n, d = Z.shape
o = self.oversampling
k_total = min(k + o, min(n, d))
# Random initialization
Q = torch.randn(d, k_total, device=Z.device, dtype=Z.dtype)
# Power iterations
for _ in range(n_iter):
Q = Z @ Q
Q = Z.T @ Q
Q = Z @ Q
# QR decomposition
Q, _ = torch.linalg.qr(Q)
# Project and compute SVD
B = (Z.T @ Q).T
U_hat, _, _ = torch.linalg.svd(B, full_matrices=False)
# Compute final left singular vectors
U = Q @ U_hat
# Skip first trivial vector and return top-k
return U[:, 1:k + 1]
[docs] def embed(self, x: Tensor, edge_index: Tensor, **kwargs) -> Tensor:
r"""
Computes cluster assignments via S²CAG.
Args:
x (Tensor): Node features of shape :obj:`(num_nodes, num_features)`.
edge_index (Tensor): Edge indices of shape :obj:`(2, num_edges)`.
Returns:
Node embeddings of shape :obj:`(num_nodes, n_clusters)`.
"""
num_nodes = x.size(0)
# Compute NSR
Z = self._compute_nsr(x, edge_index, num_nodes)
# Compute top-k left singular vectors
Y = self._randomized_svd(Z, self.n_clusters)
return Y
[docs] def forward(self, x: Tensor, edge_index: Tensor, **kwargs) -> Tensor:
r"""Alias for :meth:`embed`."""
return self.embed(x, edge_index, **kwargs)
def __repr__(self) -> str:
return (f'{self.__class__.__name__}('
f'n_clusters={self.n_clusters}, '
f'T={self.T}, '
f'alpha={self.alpha})')
class MS2CAG(BaseModel):
r"""
The M-S²CAG (Modularity-based S²CAG) model from the
`"Spectral Subspace Clustering for Attributed Graphs"
<https://arxiv.org/abs/2411.11074>`_ paper (Lin et al., KDD 2025).
M-S²CAG extends S²CAG by incorporating modularity maximization into the
subspace clustering framework. It optimizes:
.. math::
\max_C \text{trace}(C^T(\hat{Z}\hat{Z}^T - \gamma \frac{\omega\omega^T}{\omega^T 1})C)
where :math:`\hat{Z}` is normalized NSR, :math:`\omega` represents degree-like
weights, and :math:`\gamma` controls the balance between intra-cluster and
inter-cluster connectivity.
Args:
n_clusters (int): Number of clusters.
T (int, optional): Number of propagation steps. (default: :obj:`10`)
alpha (float, optional): Decay factor for smoothing. (default: :obj:`0.9`)
gamma (float, optional): Modularity parameter. (default: :obj:`0.9`)
tau (int, optional): Number of subspace iterations. (default: :obj:`50`)
Example:
>>> from pyagc.models import MS2CAG
>>> from pyagc.data import get_dataset
>>>
>>> # Load data
>>> x, edge_index, y = get_dataset('Cora', root='./data')
>>> n_clusters = int(y.max()) + 1
>>>
>>> # Create model
>>> model = MS2CAG(n_clusters=n_clusters, T=20, alpha=0.9, gamma=1.0)
>>>
>>> # Get cluster assignments
>>> clusters = model.embed(x, edge_index)
"""
def __init__(
self,
n_clusters: int,
T: int = 10,
alpha: float = 0.9,
gamma: float = 0.9,
tau: int = 50
):
super().__init__()
self.n_clusters = n_clusters
self.T = T
self.alpha = alpha
self.gamma = gamma
self.tau = tau
def _compute_nsr(
self,
x: Tensor,
edge_index: Tensor,
num_nodes: Optional[int] = None
) -> Tensor:
r"""Computes Normalized Smoothed Representations (NSR)."""
if num_nodes is None:
num_nodes = x.size(0)
P_hat, x_hat = compute_normalized_matrices(x, edge_index, num_nodes)
Z = power_iteration(P_hat, x_hat, self.T, self.alpha)
return Z
def embed(self, x: Tensor, edge_index: Tensor, **kwargs) -> Tensor:
r"""
Computes cluster assignments via M-S²CAG.
Args:
x (Tensor): Node features of shape :obj:`(num_nodes, num_features)`.
edge_index (Tensor): Edge indices of shape :obj:`(2, num_edges)`.
Returns:
Cluster assignments of shape :obj:`(num_nodes,)`.
"""
num_nodes = x.size(0)
# Compute NSR
Z = self._compute_nsr(x, edge_index, num_nodes)
# Normalize Z: avoid computing Z @ Z.T explicitly
# Use row-wise L2 normalization instead
Z_norms = torch.norm(Z, p=2, dim=1, keepdim=True)
Z_norms[Z_norms == 0] = 1.0
Z_hat = Z / Z_norms
# # Compute omega (degree-like weights): row sums of Z_hat @ Z_hat.T
# omega = (Z_hat @ Z_hat.T).sum(dim=1, keepdim=True)
# omega_sum = omega.sum()
# Since Z_hat @ Z_hat.T is symmetric and row sums equal column sums:
# omega_i = sum_j (Z_hat_i · Z_hat_j) = Z_hat @ (Z_hat.sum(dim=0))
Z_hat_col_sum = Z_hat.sum(dim=0) # (d,)
omega = Z_hat @ Z_hat_col_sum # (n,)
omega = omega.unsqueeze(1) # (n, 1)
omega_sum = omega.sum()
# Initialize Q with random orthonormal matrix
Q, _ = torch.linalg.qr(torch.randn(num_nodes, self.n_clusters, device=x.device))
# Subspace iterations
for _ in range(self.tau):
# H = Z_hat @ Z_hat^T @ Q - gamma * omega @ (omega^T @ Q) / omega_sum
H = Z_hat @ (Z_hat.T @ Q)
H = H - self.gamma * omega @ (omega.T @ Q) / omega_sum
# Orthonormalize
Q, _ = torch.linalg.qr(H)
return Q
def forward(self, x: Tensor, edge_index: Tensor, **kwargs) -> Tensor:
r"""Alias for :meth:`embed`."""
return self.embed(x, edge_index, **kwargs)
def __repr__(self) -> str:
return (f'{self.__class__.__name__}('
f'n_clusters={self.n_clusters}, '
f'T={self.T}, '
f'alpha={self.alpha}, '
f'gamma={self.gamma})')
def snem_rounding(vectors: Tensor, n_clusters: int, T: int = 100) -> Tensor:
r"""
SNEM (Spectral Network Embedding) rounding algorithm for clustering.
This algorithm iteratively refines cluster assignments by alternating between
soft assignment based on similarity to cluster prototypes and re-normalization.
Args:
vectors (Tensor): Node embeddings of shape :obj:`(num_nodes, n_features)`.
n_clusters (int): Number of clusters.
T (int, optional): Number of iterations. (default: :obj:`100`)
Returns:
Cluster assignments of shape :obj:`(num_nodes,)`.
Reference:
From S²CAG paper and SNEM algorithm.
"""
device = vectors.device
n_samples = vectors.size(0)
# Initialize with hard assignment
labels = vectors.argmax(dim=1)
# Create one-hot encoding
C = torch.zeros(n_samples, n_clusters, device=device, dtype=vectors.dtype)
C[torch.arange(n_samples, device=device), labels] = 1.0
# Normalize columns
col_sums = C.sum(dim=0).sqrt()
col_sums[col_sums == 0] = 1.0
C = C / col_sums.unsqueeze(0)
# Iterative refinement
for _ in range(T):
# Compute prototype matrix Q = V^T @ C
Q = vectors.T @ C # (n_features, n_clusters)
# Soft assignment: V @ Q
soft_assign = vectors @ Q # (n_samples, n_clusters)
# Hard assignment
labels = soft_assign.argmax(dim=1)
# Create one-hot encoding
C = torch.zeros(n_samples, n_clusters, device=device, dtype=vectors.dtype)
C[torch.arange(n_samples, device=device), labels] = 1.0
# Normalize columns
col_sums = C.sum(dim=0).sqrt()
col_sums[col_sums == 0] = 1.0
C = C / col_sums.unsqueeze(0)
return labels
# from scipy.sparse import csc_matrix
# import numpy as np
#
# def snem_rounding_numpy(vectors: Tensor, n_clusters: int, T: int = 100) -> Tensor:
# r"""
# SNEM (Spectral Network Embedding) rounding algorithm for clustering.
#
# This algorithm iteratively refines cluster assignments by alternating between
# soft assignment based on similarity to cluster prototypes and re-normalization.
#
# Args:
# vectors (Tensor): Node embeddings of shape :obj:`(num_nodes, n_features)`.
# n_clusters (int): Number of clusters.
# T (int, optional): Number of iterations. (default: :obj:`100`)
#
# Returns:
# Cluster assignments of shape :obj:`(num_nodes,)`.
#
# Reference:
# From S²CAG paper and SNEM algorithm.
# """
# vectors = vectors.detach().cpu().numpy().astype(np.float64)
# n_samples, n_feats = vectors.shape
#
# # Initialize with hard assignment
# labels = vectors.argmax(axis=1)
# vectors_discrete = csc_matrix(
# (np.ones(len(labels)), (np.arange(n_samples), labels)),
# shape=(n_samples, n_clusters)
# )
#
# # Normalize columns (convert to array for element-wise operations)
# vectors_discrete_array = vectors_discrete.toarray()
# vectors_sum = np.sqrt(vectors_discrete_array.sum(axis=0))
# vectors_sum[vectors_sum == 0] = 1.0
# vectors_discrete_array = vectors_discrete_array / vectors_sum[np.newaxis, :]
#
# # Iterative refinement
# for _ in range(T):
# # Compute prototype matrix Q
# Q = vectors.T @ vectors_discrete_array
# Q = np.asarray(Q)
#
# # Soft assignment
# vectors_discrete_new = vectors @ Q
#
# # Hard assignment
# labels = vectors_discrete_new.argmax(axis=1)
# vectors_discrete_array = np.zeros((n_samples, n_clusters))
# vectors_discrete_array[np.arange(n_samples), labels] = 1.0
#
# # Normalize columns
# vectors_sum = np.sqrt(vectors_discrete_array.sum(axis=0))
# vectors_sum[vectors_sum == 0] = 1.0
# vectors_discrete_array = vectors_discrete_array / vectors_sum[np.newaxis, :]
#
# return torch.tensor(labels, dtype=torch.long)