Source code for pyagc.models.sagsc

from typing import Optional

import torch
from torch import Tensor
from torch_geometric.typing import Adj

from pyagc.models.base import BaseModel
from pyagc.models.sgc import SGC


[docs]class SAGSC(BaseModel): r""" The Scalable Attributed-Graph Subspace Clustering (SAGSC) model from the `"Scalable Attributed-Graph Subspace Clustering" <https://ojs.aaai.org/index.php/AAAI/article/view/25918>`_ paper (Fettal et al., AAAI 2023). SAGSC consists of two stages: **(1) Node Embedding Generation (SGC):** .. math:: H = S^p X where :math:`S = \hat{D}^{-1/2} \hat{A} \hat{D}^{-1/2}` is the normalized adjacency and :math:`p` is the propagation order. **(2) Implicit Subspace Clustering:** SAGSC avoids learning a dense coefficient matrix :math:`C \in \mathbb{R}^{n \times n}` by imposing a low–rank factorization: .. math:: C = U U^\top,\quad U^\top U = I_k. :math:`U` is obtained from the top-:math:`k` left singular vectors of :math:`H`. To construct a **non–negative affinity matrix**, SAGSC uses a *quadratic kernel* applied row-wise: .. math:: Q = \Phi(U), \qquad M = Q Q^\top \ge 0. Then the normalized matrix .. math:: \tilde{Q} = Q D^{-1/2}, is decomposed via SVD, and the rows of the top :math:`k` singular vectors (excluding the trivial first one) are clustered using :math:`k`-means. SAGSC performs *implicit spectral clustering* without explicitly forming the affinity matrix :math:`M \in \mathbb{R}^{n \times n}`. Args: p (int): Power of the SGC propagation matrix. k (int): Number of clusters (also feature dimension of encoder outputs). bias (float): Bias term in the quadratic feature map :math:`\Phi`. """ def __init__( self, p: int, k: int, bias: float = 2 ** -0.5, # same as authors' code ): super().__init__() self.p = p self.k = k self.bias = bias # SGC encoder (no parameters, only propagation) self.encoder = SGC(K=p, cached=False) # -------------------------------------------------------------------------- # Feature map Phi(U): quadratic kernel (row-wise) # -------------------------------------------------------------------------- def _quad_feature_map(self, U: Tensor) -> Tensor: r""" Applies a row-wise quadratic polynomial feature map :math:`\Phi` used in SAGSC. For each row :math:`\mathbf{u} \in \mathbb{R}^k`: .. math:: \Phi(\mathbf{u}) = [ u_1^2, \ldots, u_k^2, \sqrt{2} u_1 u_2, \ldots, \sqrt{2} u_{k-1} u_k, \sqrt{2b} u_1, \ldots, \sqrt{2b} u_k, b ] producing a feature dimensionality :math:`m = \frac{(k+2)(k+1)}{2}`. Args: U (Tensor): Input matrix of shape :obj:`(n, k)`. Returns: Transformed features of shape :obj:`(n, m)`. """ n, k = U.size() device = U.device # squared terms sq = U ** 2 # (n, k) # Cross terms: u_i * u_j for i < j cross_terms = [] for i in range(k): for j in range(i + 1, k): cross_terms.append((U[:, i] * U[:, j]) * (2.0 ** 0.5)) cross_terms = torch.stack(cross_terms, dim=1) if len(cross_terms) > 0 else None # linear terms linear = U * (2.0 * self.bias) ** 0.5 # bias term bias_col = torch.full((n, 1), self.bias, device=device) parts = [sq] if cross_terms is not None: parts.append(cross_terms) parts.append(linear) parts.append(bias_col) return torch.cat(parts, dim=1) # -------------------------------------------------------------------------- # Core SAGSC embedding # --------------------------------------------------------------------------
[docs] @torch.no_grad() def embed(self, *args, **kwargs) -> Tensor: r""" Computes the SAGSC latent representation used for clustering. Steps: 1. :math:`H = S^p X` (SGC propagation) 2. :math:`U =` top-:math:`k` left singular vectors of :math:`H` 3. :math:`Q = \Phi(U)` 4. :math:`D =` row sum vector of :math:`Q Q^\top` (implemented implicitly) 5. :math:`\tilde{Q} = Q D^{-1/2}` 6. :math:`Z =` singular vectors 2 to :math:`k+1` of :math:`\tilde{Q}` Args: x (Tensor): Node features of shape :obj:`(n, d)`. edge_index (Adj): Edge indices. edge_weight (Tensor, optional): Edge weights. Returns: Spectral embedding of shape :obj:`(n, k)`. """ # -------- Step 1: SGC propagation ---------- H = self.encoder.embed( *args, **kwargs) # (n, d) # -------- Step 2: top-k left singular vectors ---------- # H = U Σ Vᵀ → want U[:, :k] U, _, _ = torch.svd_lowrank(H, q=self.k) # U: (n, k) # -------- Step 3: quadratic kernel feature map ---------- Q = self._quad_feature_map(U) # (n, m) # -------- Step 4: compute D = row sums of M = Q Q^T ---------- # D_i = sum_j M[i,j] = (Q_i · Q_j)_j = ||Q_i||^2 D = Q.pow(2).sum(dim=1) + 1e-10 # (n,) D_inv_sqrt = torch.where(D > 0, D.rsqrt(), torch.zeros_like(D)) # -------- Step 5: row normalization ---------- Q_tilde = Q * D_inv_sqrt.unsqueeze(1) # (n, m) # -------- Step 6: obtain spectral embedding ---------- # Singular vectors 2 to k+1 (skip first) U2, _, _ = torch.svd_lowrank(Q_tilde, q=self.k + 1) Z = U2[:, 1:self.k+1] # (n, k) return Z
# -------------------------------------------------------------------------- # Same as embed() # --------------------------------------------------------------------------
[docs] @torch.no_grad() def forward(self, x: Tensor, edge_index: Adj, edge_weight: Optional[Tensor] = None): r""" Returns the spectral embedding. """ return self.embed(x, edge_index, edge_weight)
def __repr__(self) -> str: return f'{self.__class__.__name__}(p={self.p}, k={self.k})'