Source code for esapp.utils.network

"""
Network Matrix Utilities
========================

Provides network topology analysis including incidence matrices,
graph Laplacians with various weighting schemes, and branch parameter
calculations for power system analysis.

Classes
-------
Network
    Network matrix construction and branch weight calculations.
BranchType
    Enumeration of supported branch weight types for Laplacian construction.

Example
-------
Basic network matrix operations::

    >>> from esapp import PowerWorld
    >>> pw = PowerWorld("case.pwb")
    >>> A = pw.network.incidence()  # Incidence matrix
    >>> L = pw.network.laplacian(BranchType.LENGTH)  # Length-weighted Laplacian

See Also
--------
esapp.utils.gic : GIC analysis with network topology.
esapp.saw.matrices : Matrix retrieval from PowerWorld.
"""

from __future__ import annotations

from enum import Enum

import numpy as np
from pandas import DataFrame, Series, concat
from scipy.sparse import diags, coo_matrix, csc_matrix

from ..components import Branch, Bus, DCTransmissionLine, Substation

__all__ = ['Network', 'BranchType']


[docs] class BranchType(Enum): """ Branch weighting schemes for Laplacian construction. Attributes ---------- LENGTH : int Weight by inverse squared physical length (km^-2). RES_DIST : int Weight by inverse impedance magnitude (resistance distance). DELAY : int Weight by inverse squared propagation delay (s^-2). """ LENGTH = 1 RES_DIST = 2 DELAY = 3
[docs] class Network: """ Network matrix construction and analysis. Builds sparse network matrices (incidence, Laplacian) and computes branch electrical parameters. AC branches and HVDC transmission lines are always included when present in the case. This class is accessed via ``PowerWorld.network`` and delegates all data access to its parent PowerWorld instance. Notes ----- Matrix dimensions follow PowerWorld bus ordering. Use busmap() to translate between bus numbers and matrix indices. """ def __init__(self, pw=None): self._pw = pw self._A = None def _dc_lines(self) -> DataFrame | None: """Return DC transmission line data, or None if unavailable.""" try: df = self._pw[DCTransmissionLine] return df if df is not None and len(df) > 0 else None except Exception: return None
[docs] def busmap(self) -> Series: """ Create mapping from bus numbers to matrix indices. Returns ------- pd.Series Series indexed by BusNum with positional values. """ buses = self._pw[Bus] return Series(buses.index, buses["BusNum"])
[docs] def incidence(self, remake: bool = True) -> csc_matrix: """ Construct the sparse arc-incidence matrix. Each row represents a branch with +1 at the to-bus and -1 at the from-bus. HVDC lines are appended after AC branches when present. Parameters ---------- remake : bool, default True If True, recomputes even if cached. Returns ------- scipy.sparse.csc_matrix Sparse incidence matrix (branches x buses). """ if self._A is not None and not remake: return self._A fields = ["BusNum", "BusNum:1"] branches = self._pw[Branch][fields] dc = self._dc_lines() if dc is not None: branches = concat([branches, dc[fields]], ignore_index=True) bmap = self.busmap() fr = branches["BusNum"].map(bmap).to_numpy() to = branches["BusNum:1"].map(bmap).to_numpy() nb, nbus = len(branches), len(bmap) idx = np.arange(nb) self._A = coo_matrix( (np.concatenate([-np.ones(nb), np.ones(nb)]), (np.concatenate([idx, idx]), np.concatenate([fr, to]))), shape=(nb, nbus), ).tocsc() return self._A
[docs] def laplacian( self, weights: BranchType | np.ndarray, longer_xfmr_lens: bool = True, len_thresh: float = 0.01, ) -> csc_matrix: """ Construct weighted graph Laplacian: L = A.T @ W @ A. Parameters ---------- weights : BranchType or np.ndarray Weighting scheme or custom weight vector. longer_xfmr_lens : bool, default True Use impedance-based pseudo-lengths for transformers. len_thresh : float, default 0.01 Threshold (km) below which branches are treated as transformers. Returns ------- scipy.sparse.csc_matrix Sparse weighted Laplacian matrix (buses x buses). """ if isinstance(weights, BranchType): if weights is BranchType.LENGTH: W = 1 / self.lengths(longer_xfmr_lens, len_thresh) ** 2 elif weights is BranchType.RES_DIST: W = 1 / self.zmag() else: W = 1 / self.delay() ** 2 else: W = weights A = self.incidence() return (A.T @ diags(W) @ A).tocsc()
[docs] def lengths( self, longer_xfmr_lens: bool = False, length_thresh_km: float = 0.01, ) -> Series: """ Get branch lengths in kilometers. Parameters ---------- longer_xfmr_lens : bool, default False Calculate pseudo-lengths for transformers based on their impedance relative to average line impedance per km. length_thresh_km : float, default 0.01 Branches shorter than this are treated as transformers. Returns ------- pd.Series Branch lengths in kilometers. """ fields = ["LineLengthByParameters", "LineLengthByParameters:2", "LineR:2", "LineX:2"] data = self._pw[Branch, fields][fields] # Prefer user-specified length over calculated user = data["LineLengthByParameters"] data.loc[user > 0, "LineLengthByParameters:2"] = user[user > 0] ell = data["LineLengthByParameters:2"] dc = self._dc_lines() if dc is not None: dc_ell = self._pw[DCTransmissionLine, "LineLengthByParameters"]["LineLengthByParameters"] ell = concat([ell, dc_ell], ignore_index=True) if longer_xfmr_lens: is_line = ell > length_thresh_km z = np.abs(data["LineR:2"] + 1j * data["LineX:2"]) z_per_km = (z[is_line] / ell[is_line]).mean() ell.loc[~is_line] = (z[~is_line] / z_per_km).to_numpy() else: ell.loc[ell == 0] = 0.01 return ell
[docs] def zmag(self) -> Series: """ Get branch impedance magnitudes |Z|. Returns ------- pd.Series Impedance magnitude for each branch. """ return 1 / np.abs(self.ybranch())
[docs] def ybranch(self, asZ: bool = False) -> Series: """ Get branch admittance (or impedance) in complex form. Parameters ---------- asZ : bool, default False If True, return impedance Z = R + jX. Returns ------- pd.Series Complex admittance Y = 1/(R + jX) or impedance Z. """ branches = self._pw[Branch, ["LineR:2", "LineX:2"]] Z = branches["LineR:2"] + 1j * branches["LineX:2"] dc = self._dc_lines() if dc is not None: Z = concat([Z, Series(np.full(len(dc), 0.001 + 0j))], ignore_index=True) return Z if asZ else 1 / Z
[docs] def yshunt(self) -> Series: """ Get branch shunt admittance in complex form. Returns ------- pd.Series Complex shunt admittance Y = G + jB. """ branches = self._pw[Branch, ["LineG", "LineC"]] return branches["LineG"] + 1j * branches["LineC"]
[docs] def gamma(self) -> Series: """ Compute propagation constants for each branch. Returns ------- pd.Series Complex propagation constant gamma = sqrt(Z * Y). """ ell = self.lengths() Z = self.ybranch(asZ=True).copy() Y = self.yshunt().copy() Z[Z == 0] = 0.000446 + 0.002878j Y[Y == 0] = 0.000463j return np.sqrt((Y / ell) * (Z / ell))
[docs] def delay(self, min_delay: float = 10e-4) -> np.ndarray: r""" Compute effective propagation delay for network branches. Parameters ---------- min_delay : float, default 10e-4 Minimum delay value to prevent numerical overflow when computing 1/delay^2 in the Laplacian. Returns ------- np.ndarray Effective propagation parameter beta for each branch. Notes ----- - Branch inductance: omega * L_ij = Im(Z^br_ij) - Effective capacitance: C_ij = (C_i + C_j) / 2 - Propagation delay: omega * tau_ij = Im(sqrt(Z_ij * Y_ij)) = beta_ij """ Z = self.ybranch(asZ=True) Ybus = self._pw.esa.get_ybus() AVG = np.abs(self.incidence()) / 2 Y = AVG @ Ybus @ np.ones(Ybus.shape[0]) return np.maximum(np.imag(np.sqrt(Z * Y)), min_delay)
[docs] def buscoords(self, astuple: bool = True): """ Retrieve bus latitude and longitude from substation data. Parameters ---------- astuple : bool, default True If True, return (Longitude, Latitude) Series tuple. If False, return merged DataFrame. Returns ------- tuple of pd.Series or pd.DataFrame """ A = self._pw[Bus, "SubNum"] S = self._pw[Substation, ["SubNum", "Longitude", "Latitude"]] LL = A.merge(S, on="SubNum") if astuple: return LL["Longitude"], LL["Latitude"] return LL