"""Solvers for the MPFIT charge fitting problem."""
from __future__ import annotations
import abc
from dataclasses import dataclass
import numpy as np
from scipy.spatial.distance import cdist
[docs]
class MPFITSolverError(Exception):
"""An exception raised when an MPFIT solver fails."""
[docs]
class MPFITSolver(abc.ABC):
"""Base class for MPFIT solvers.
MPFIT uses per-site fitting where each atom has its own design matrix (A)
and reference vector (b). The solver receives object arrays containing
these per-site matrices and must solve each site independently.
"""
[docs]
@abc.abstractmethod
def solve(
self,
design_matrix: np.ndarray,
reference_values: np.ndarray,
ancillary_arrays: dict | None = None,
) -> np.ndarray:
"""Solve the MPFIT fitting problem.
Parameters
----------
design_matrix
An object array where each element is a site-specific A matrix.
reference_values
An object array where each element is a site-specific b vector.
ancillary_arrays
Dictionary containing additional arrays needed for the solver.
For SVD solver, must include 'quse_masks' - list of boolean masks
indicating which atoms affect each multipole site.
Returns
-------
The set of charge values with shape=(n_atoms + n_vsites, 1)
"""
raise NotImplementedError
[docs]
class MPFITSVDSolver(MPFITSolver):
"""Solver that uses SVD to find charges that reproduce multipole moments.
This solver processes each multipole site independently using SVD
decomposition, accumulating charge contributions via quse_masks.
"""
def __init__(self, svd_threshold: float = 1.0e-4) -> None:
"""Initialize the SVD solver.
Parameters
----------
svd_threshold
The threshold below which singular values are considered zero.
"""
self._svd_threshold = svd_threshold
[docs]
def solve(
self,
design_matrix: np.ndarray,
reference_values: np.ndarray,
ancillary_arrays: dict | None = None,
) -> np.ndarray:
"""Solve for charges using SVD for each multipole site.
Parameters
----------
design_matrix
An object array where each element is a site-specific A matrix.
reference_values
An object array where each element is a site-specific b vector.
ancillary_arrays
Dictionary containing 'quse_masks' - list of boolean masks
indicating which atoms affect each multipole site.
Returns
-------
Charge values with shape=(n_atoms + n_vsites, 1). When no vsites
are present, this is simply (n_atoms, 1).
"""
is_object_array = hasattr(
design_matrix, "dtype"
) and design_matrix.dtype == np.dtype("O")
if is_object_array and len(design_matrix) == 0:
return np.zeros((0, 1))
if ancillary_arrays is None or "quse_masks" not in ancillary_arrays:
raise MPFITSolverError("SVD solver requires quse_masks in ancillary_arrays")
quse_masks = ancillary_arrays["quse_masks"]
n_atoms = len(design_matrix)
n_vsites = ancillary_arrays.get("n_vsites", 0)
charge_values = np.zeros((n_atoms + n_vsites, 1))
# Solve for each multipole site
for i in range(len(design_matrix)):
site_A = np.asarray(design_matrix[i], dtype=np.float64)
site_b = np.asarray(reference_values[i], dtype=np.float64)
quse_mask = np.asarray(quse_masks[i], dtype=bool)
U, S, Vh = np.linalg.svd(site_A)
S[self._svd_threshold > S] = 0.0
inv_S = np.zeros_like(S)
mask_S = S != 0
inv_S[mask_S] = 1.0 / S[mask_S]
q = (Vh.T * inv_S) @ (U.T @ site_b)
charge_values[quse_mask, 0] += q.flatten()
return charge_values
[docs]
@dataclass(frozen=True)
class ConstrainedMPFITState:
"""Immutable container for a constrained MPFIT problem."""
xyzmult: np.ndarray
xyzcharge: np.ndarray
multipoles: np.ndarray
quse: np.ndarray
atomtype: tuple[str, ...]
rvdw: np.ndarray
lmax: np.ndarray
r1: float
r2: float
maxl: int
atom_counts: tuple[int, ...]
molecule_charges: tuple[float, ...]
[docs]
def build_quse_matrix(
xyzmult: np.ndarray,
xyzcharge: np.ndarray,
rvdw: np.ndarray,
) -> np.ndarray:
"""Build binary mask: ``quse[s, i] = 1`` if atom *i* affects site *s*."""
return (cdist(xyzmult, xyzcharge) < rvdw[:, None]).astype(int)
def _find_twin(atomtype: tuple[str, ...], i: int) -> int | None:
"""Return the index of the first atom with the same type as atom *i*, or None."""
return next((k for k in range(i) if atomtype[k] == atomtype[i]), None)
[docs]
def count_parameters(state: ConstrainedMPFITState) -> int:
"""Count free parameters after applying atom-type equivalence constraints."""
atomtype = state.atomtype
quse = state.quse
n_params = 0
for i in range(len(atomtype)):
n_sites_using = int(np.sum(quse[:, i]))
twin = _find_twin(atomtype, i) if i > 0 else None
if twin is not None:
n_params += n_sites_using - 1
else:
n_params += n_sites_using
return n_params
[docs]
def expandcharge(
p0: np.ndarray,
state: ConstrainedMPFITState,
) -> tuple[np.ndarray, np.ndarray]:
"""Map reduced parameters to full charges with atom-type constraints.
Returns
-------
allcharge
Per-site charge contributions, shape ``(n_sites, n_atoms)``.
qstore
Total charge per atom, shape ``(n_atoms,)``.
"""
atomtype = state.atomtype
quse = state.quse
n_sites = state.xyzmult.shape[0]
n_atoms = len(atomtype)
allcharge = np.zeros((n_sites, n_atoms))
qstore = np.zeros(n_atoms)
count = 0
for i in range(n_atoms):
twin = _find_twin(atomtype, i) if i > 0 else None
charge_sum = 0.0
if twin is not None:
count1 = int(np.sum(quse[:, i]))
count2 = 1
for j in range(n_sites):
if quse[j, i] == 1 and count2 < count1:
allcharge[j, i] = p0[count]
charge_sum += p0[count]
count += 1
count2 += 1
elif quse[j, i] == 1 and count2 == count1:
allcharge[j, i] = qstore[twin] - charge_sum
qstore[i] = qstore[twin]
else:
for j in range(n_sites):
if quse[j, i] == 1:
allcharge[j, i] = p0[count]
charge_sum += p0[count]
count += 1
qstore[i] = charge_sum
return allcharge, qstore
[docs]
def createdkaisq(dparam1: np.ndarray, state: ConstrainedMPFITState) -> np.ndarray:
"""Project full-space gradient to reduced parameter space."""
atomtype = state.atomtype
quse = state.quse
n_atoms = len(atomtype)
n_sites = n_atoms
dparam1 = dparam1.copy()
# Pass 1: redistribute constrained site's gradient to twin
for i in range(1, n_atoms):
twin = _find_twin(atomtype, i)
if twin is not None:
count1 = int(np.sum(quse[:, i]))
count2 = 1
for j in range(n_sites):
if quse[j, i] == 1 and count2 < count1:
count2 += 1
elif quse[j, i] == 1 and count2 == count1:
for k in range(n_sites):
dparam1[twin * n_sites + k] += dparam1[i * n_sites + j]
for k in range(j):
dparam1[i * n_sites + k] -= dparam1[i * n_sites + j]
# Pass 2: extract free-parameter gradient entries
n_params = count_parameters(state)
dkaisq_out = np.zeros(n_params)
count = 0
for i in range(n_atoms):
twin = _find_twin(atomtype, i) if i > 0 else None
if twin is not None:
count1 = int(np.sum(quse[:, i]))
count2 = 1
for j in range(n_sites):
if quse[j, i] == 1 and count2 < count1:
dkaisq_out[count] = dparam1[i * n_sites + j]
count2 += 1
count += 1
else:
for j in range(n_sites):
if quse[j, i] == 1:
dkaisq_out[count] = dparam1[i * n_sites + j]
count += 1
return dkaisq_out
[docs]
class ConstrainedMPFITSolver(abc.ABC):
"""Base class for constrained MPFIT solvers."""
def __init__(
self,
conchg: float = 1.0,
method: str = "L-BFGS-B",
maxiter: int = 10000,
ftol: float = 1e-8,
gtol: float = 1e-6,
) -> None:
self._conchg = conchg
self._method = method
self._maxiter = maxiter
self._ftol = ftol
self._gtol = gtol
[docs]
@classmethod
def loss(
cls,
p0: np.ndarray,
state: ConstrainedMPFITState,
conchg: float,
) -> float:
"""Return the value of the constrained MPFIT objective function.
Parameters
----------
p0
The current reduced parameter vector with shape=(n_params,).
Mapped to full charges via ``expandcharge``.
state
The frozen problem state containing coordinates, multipoles,
equivalence constraints, and per-molecule target charges.
conchg
The Lagrange-like penalty weight enforcing per-molecule charge
conservation.
Returns
-------
The scalar value of the objective function.
"""
from pympfit.mpfit.core import _regular_solid_harmonic
allcharge, qstore = expandcharge(p0, state)
n_sites = state.xyzmult.shape[0]
maxl = state.maxl
sumkai = 0.0
for s in range(n_sites):
q0 = allcharge[s, :]
rmax = state.rvdw[s] + state.r2
rminn = state.rvdw[s] + state.r1
lmax_s = int(state.lmax[s])
W = np.zeros(maxl + 1)
for i in range(lmax_s + 1):
W[i] = (1.0 / (1.0 - 2.0 * i)) * (
rmax ** (1 - 2 * i) - rminn ** (1 - 2 * i)
)
dx = state.xyzcharge[:, 0] - state.xyzmult[s, 0]
dy = state.xyzcharge[:, 1] - state.xyzmult[s, 1]
dz = state.xyzcharge[:, 2] - state.xyzmult[s, 2]
site_sum = 0.0
for l in range(lmax_s + 1):
weight = (4.0 * np.pi) if l == 0 else (4.0 * np.pi / (2.0 * l + 1.0))
weight *= W[l]
for m in range(l + 1):
cs_range = [0] if m == 0 else [0, 1]
for cs in cs_range:
rsh_vals = _regular_solid_harmonic(l, m, cs, dx, dy, dz)
sum1 = np.dot(q0, rsh_vals)
site_sum += weight * (state.multipoles[s, l, m, cs] - sum1) ** 2
sumkai += site_sum
# Per-molecule charge conservation penalty
sumcon = 0.0
offset = 0
for n_i, q_target in zip(
state.atom_counts, state.molecule_charges, strict=False
):
mol_q = np.sum(qstore[offset : offset + n_i])
sumcon += conchg * (mol_q - q_target) ** 2
offset += n_i
return sumkai + sumcon
[docs]
@classmethod
def jacobian(
cls,
p0: np.ndarray,
state: ConstrainedMPFITState,
conchg: float,
) -> np.ndarray:
"""Return the gradient of the objective function w.r.t. reduced parameters.
Computes the full-space gradient of the multipole fitting error and
charge conservation penalty, then projects to the reduced parameter
space via ``createdkaisq``.
This is a pure function — it does not mutate ``state``.
Parameters
----------
p0
The current reduced parameter vector with shape=(n_params,).
Mapped to full charges via ``expandcharge``.
state
The frozen problem state containing coordinates, multipoles,
equivalence constraints, and per-molecule target charges.
conchg
The Lagrange-like penalty weight enforcing per-molecule charge
conservation.
Returns
-------
The gradient in the reduced parameter space with shape=(n_params,).
"""
from pympfit.mpfit.core import _regular_solid_harmonic
allcharge, qstore = expandcharge(p0, state)
n_sites = state.xyzmult.shape[0]
n_atoms = n_sites
maxl = state.maxl
dparam = np.zeros((n_sites, n_atoms))
for s in range(n_sites):
q0 = allcharge[s, :]
rmax = state.rvdw[s] + state.r2
rminn = state.rvdw[s] + state.r1
lmax_s = int(state.lmax[s])
W = np.zeros(maxl + 1)
for i in range(lmax_s + 1):
W[i] = (1.0 / (1.0 - 2.0 * i)) * (
rmax ** (1 - 2 * i) - rminn ** (1 - 2 * i)
)
dx = state.xyzcharge[:, 0] - state.xyzmult[s, 0]
dy = state.xyzcharge[:, 1] - state.xyzmult[s, 1]
dz = state.xyzcharge[:, 2] - state.xyzmult[s, 2]
for l in range(lmax_s + 1):
weight = (4.0 * np.pi) if l == 0 else (4.0 * np.pi / (2.0 * l + 1.0))
weight *= W[l]
for m in range(l + 1):
cs_range = [0] if m == 0 else [0, 1]
for cs in cs_range:
rsh_vals = _regular_solid_harmonic(l, m, cs, dx, dy, dz)
sum1 = np.dot(q0, rsh_vals)
coeff = 2.0 * weight * (state.multipoles[s, l, m, cs] - sum1)
dparam[s, :] -= coeff * rsh_vals
# Flatten to atom-major order: dparam1[atom_i * n_sites + site_s]
dparam1 = dparam.T.flatten()
# Per-molecule charge conservation gradient
offset = 0
for n_i, q_target in zip(
state.atom_counts, state.molecule_charges, strict=False
):
mol_q = np.sum(qstore[offset : offset + n_i])
grad_val = conchg * 2.0 * (mol_q - q_target)
for a in range(offset, offset + n_i):
dparam1[a * n_sites : a * n_sites + n_sites] += grad_val
offset += n_i
return createdkaisq(dparam1, state)
[docs]
def initial_guess(self, state: ConstrainedMPFITState) -> np.ndarray:
"""Return a zero initial guess with shape=(n_params,)."""
return np.zeros(count_parameters(state))
[docs]
def solve(self, state: ConstrainedMPFITState) -> np.ndarray:
"""Solve the constrained MPFIT problem and return total charge per atom.
Parameters
----------
state
The frozen problem state containing coordinates, multipoles,
equivalence constraints, and per-molecule target charges.
Returns
-------
Total charge per atom with shape=(n_atoms,).
"""
p_opt = self._solve(state)
_, qstore = expandcharge(p_opt, state)
return qstore
@abc.abstractmethod
def _solve(self, state: ConstrainedMPFITState) -> np.ndarray:
"""Implement the optimizer-specific solving logic."""
...
[docs]
class ConstrainedSciPySolver(ConstrainedMPFITSolver):
"""Constrained MPFIT solver using ``scipy.optimize.minimize``."""
def _solve(self, state: ConstrainedMPFITState) -> np.ndarray:
from scipy.optimize import minimize
result = minimize(
fun=lambda p: self.loss(p, state, self._conchg),
x0=self.initial_guess(state),
jac=lambda p: self.jacobian(p, state, self._conchg),
method=self._method,
options={
"maxiter": self._maxiter,
"ftol": self._ftol,
"gtol": self._gtol,
},
)
if not result.success:
raise MPFITSolverError(f"Optimization failed: {result.message}")
return result.x