#!/usr/bin/env python
# coding: utf-8
# In[ ]:
"""Defines the PhysNetWrapper Class"""
import numpy as np
[docs]class PhysNetWrapper():
"""
Sets up, trains and runs a PhysNet Neural Network Calculator to represent a
potential energy surface.
"""
# Class attribute: default training arguments
default_train_args = {'num_features': 128,
'num_basis': 64,
'num_blocks': 5,
'num_residual_atomic': 2,
'num_residual_interaction': 3,
'num_residual_output': 1,
'cutoff': 10.0,
'use_electrostatic': 1,
'use_dispersion': 1,
'grimme_s6': 0.5,
'grimme_s8': 0.2130,
'grimme_a1': 0.0,
'grimme_a2': 6.0519,
'dataset': 'test.npz',
'num_train': 450,
'num_valid': 50,
'seed': 42,
'max_steps': 1000000,
'learning_rate': 0.001,
'max_norm': 1000.0,
'ema_decay': 0.999,
'keep_prob': 1.0,
'l2lambda': 0.0,
'nhlambda': 0.01,
'decay_steps': 10000000,
'decay_rate': 0.1,
'batch_size': 32,
'valid_batch_size': 50,
'force_weight': 52.91772105638412,
'charge_weight': 14.399645351950548,
'dipole_weight': 27.211386024367243,
'summary_interval': 10000,
'validation_interval': 10000,
'save_interval': 50000,
'record_run_metadata': 0}
def __init__(self,**kwargs):
"""Sets up instance attributes for PhysNetWrapper """
from copy import deepcopy
self.train_args = deepcopy(self.default_train_args)
# Allow overrides for this instance of the class
for kw in self.train_args:
if kw in kwargs:
self.train_args[kw] = kwargs[kw]
# Make a set of default loading arguments by copying in training arguments
self.load_args = {}
# These have different names between training and loading
self.load_args['F'] = self.train_args['num_features']
self.load_args['K'] = self.train_args['num_basis']
self.load_args['sr_cut'] = self.train_args['cutoff']
if 'lr_cut' in kwargs:
self.load_args['lr_cut'] = kwargs['lr_cut']
else:
self.load_args['lr_cut'] = None # no need for an electrostatic cutoff
for s in ['grimme_s6','grimme_s8','grimme_a1','grimme_a2']:
kw = s.replace('grimme_','')
if s in self.train_args:
self.load_args[kw] = self.train_args[s]
# The rest have the same name
for kw in ['num_residual_atomic','num_residual_interaction','num_residual_output',
'use_electrostatic','use_dispersion']:
if kw in self.train_args:
self.load_args[kw] = self.train_args[kw]
self.calc_ext = ""
self.log_ext = ""
self.calc = None
self.atom_e = 0.0
self.atom_energies = {}
self.atoms_on_load = None
def calc_filename(self,seed,target,prefix='',suffix=''):
if target is None or target == 0:
calcfn = seed+"_gs_"+suffix
else:
calcfn = seed+"_es"+str(target)+"_"+suffix
calcfn = prefix + calcfn
return calcfn
[docs] def load(self,seed,target=None,prefix="",suffix="",atoms=None):
"""
Loads an existing PhysNet Calculator
seed: str
target: int
suffix: str
kwargs: dict
other keywords
"""
if self.calc is not None:
return self.calc
calcfn = self.calc_filename(seed,target,prefix=prefix,suffix=suffix)
import NNCalculator
import tensorflow as tf
import numpy as np
from ase.io import read
# Backup for if calculator is loaded before atoms available
if atoms is None:
atoms = read(f"{prefix}{seed}.xyz")
try:
atoms = read(f"{prefix}{seed}_gs_opt.xyz")
except:
try:
print(f"{prefix}{seed}.xyz")
atoms = read(f"{prefix}{seed}.xyz")
except:
raise Exception(f"Could not find initial geometry .xyz file for {seed}")
# Find checkpoint file for calculator
log_dir=f"{calcfn}/best"
checkpoint = tf.train.latest_checkpoint(log_dir)
print(f'Loading Calculator from: {log_dir}, {checkpoint} with args: {self.load_args}',flush=True)
self.calc = NNCalculator.NNCalculator(checkpoint,atoms,**self.load_args)
# Load atoms trajectory and calculate initial value of atom energy
from ase.io import Trajectory
from esteem.trajectories import atom_energy,atom_energies
atom_traj = Trajectory(f'{prefix}{seed}_atoms_{suffix}.traj')
self.atom_energies = atom_energies(atom_traj)
self.atom_e = atom_energy(atoms,self.atom_energies)
self.atoms_on_load = atoms.copy()
return self.calc
def traj_to_npz(self,seed,trajfile,suffix):
from ase.io import Trajectory
from esteem import trajectories
atom_traj = Trajectory(f'{seed}_atoms_{suffix}.traj')
traj = Trajectory(trajfile)
trajoutfile = trajfile.replace(".traj","_atsub.traj")
trajout = Trajectory(trajoutfile,'w')
trajectories.subtract_atom_energies_from_traj(traj,atom_traj,trajout)
trajout.close()
traj.close()
traj = Trajectory(trajoutfile)
length = len(traj)
max_at = max([len(a) for a in traj])
N = np.zeros((length,), dtype=int) #number of atoms
E = np.zeros(length) #energy
Q = np.zeros(length) #total charge
D = np.zeros((length,3)) #dipole moment vector
Z = np.zeros((length,max_at)) #nuclear charge
R = np.zeros((length,max_at,3)) #cartesian coordinates
F = np.zeros((length,max_at,3)) #forces
for i,a in enumerate(traj):
e_raw = a.get_potential_energy()
charge = a.get_initial_charges()
N[i] = len(a)
E[i] = e_raw
Q[i] = np.sum(charge)
D[i] = a.get_dipole_moment()
Z[i,0:len(a)] = a.get_atomic_numbers()
R[i,0:len(a)] = a.get_positions()
F[i,0:len(a)] = a.get_forces()
outfilename = trajfile.replace(".traj",".npz")
np.savez(outfilename, N=N, E=E, Q=Q, D=D, Z=Z, R=R, F=F)
return outfilename, len(traj)
[docs] def train(self,seed,prefix="",suffix="",trajfile="",target=None,restart=False,**kwargs):
"""
Runs training for PhysNet model using an input trajectory as training points
seed: str
target: int
suffix: str
trajfile: str
restart: bool
kwargs: dict
"""
label = self.calc_filename(seed,target,prefix=prefix,suffix=suffix)
# Sort out optional arguments to see if any overrides to defaults have been supplied
from copy import deepcopy
import os
train_args = deepcopy(self.default_train_args)
for kw in train_args:
if kw in kwargs:
train_args[kw] = kwargs[kw]
print(f'Converting trajectory {trajfile} to npz format')
npzfile, ntraj = self.traj_to_npz(seed,trajfile,suffix)
train_args['dataset'] = npzfile
valid_frac = 0.1 # NB: Hardcoded to 10%
train_args['num_train'] = int(ntraj*(1.0-valid_frac))
train_args['num_valid'] = int(ntraj*valid_frac)
train_args['restart'] = label
if restart:
if not os.path.exists(label):
raise Exception(f'Path {label} not found for calculator restart')
print(f'Continuing with run found in {label}')
# Write config.txt
config_file = 'config.txt'
print(f'Writing PhysNet configuration to {config_file}')
with open(config_file,"w") as f:
for kw in train_args:
f.write(f'--{kw}={train_args[kw]}\n')
print(f'Training PhysNet model using trajectory {npzfile}')
import sys
store_argv = sys.argv
sys.argv = ['train.py']
import train
sys.argv = store_argv
def traj_write(self,atoms,traj):
kw = {'dipole': atoms.get_dipole_moment(),
'charges': atoms.get_charges(),
'energy': atoms.get_potential_energy(),
'forces': atoms.get_forces()}
traj.write(atoms,**kw)
[docs] def run_mlmd(self,model,mdseed,calc_params,md_steps,md_timestep,superstep,temp,
solvent=None,restart=False,readonly=False,constraints=None,dynamics=None,
continuation=None):
"""
Runs a Molecular Dynamics calculation with the PhysNet ASE calculator.
model: ASE Atoms
seed: str
calc_params: dict
md_steps: int
md_timestep: float
superstep: int
temp: float
target: int or None
solvent: str or None
restart: bool
readonly: bool
"""
from ase.md.velocitydistribution import MaxwellBoltzmannDistribution, Stationary, ZeroRotation
from ase.md import Langevin, npt
from ase.io import Trajectory
from ase import units
calc_seed = calc_params['calc_seed']
target = calc_params['target']
suffix = calc_params['calc_suffix']
prefix = calc_params['calc_prefix']
# Load the PhysNet Calculator
calc_ml = self.load(calc_seed,target,prefix=prefix,suffix=suffix,atoms=model)
model.calc = calc_ml
# Initialise velocities if this is first step, otherwise inherit from model
if np.all(model.get_momenta() == 0.0):
MaxwellBoltzmannDistribution(model,temperature_K=temp)
# For each ML superstep, remove C.O.M. translation and rotation
#Stationary(model)
#ZeroRotation(model)
#print(f'constraints: {model.constraints}')
if readonly:
model = read(mdseed+".xyz") # Read final image
model.calc = calc_ml
model.get_potential_energy() # Recalculate energy for final image
return None
else:
if hasattr(dynamics,'new_traj'):
new_traj = dynamics.new_traj
else:
new_traj = False
if dynamics is None or isinstance(dynamics,str):
new_traj = True
if dynamics is None or dynamics=="LANG":
dynamics = Langevin(model, timestep=md_timestep, temperature_K=temp, friction=0.002)
if dynamics=="NPT":
ttime=25*units.fs
# Bulk modulus for ethanol
pfactor = 1.06e9*(units.J/units.m**3)*ttime**2
print(f'pfactor = {pfactor}, ttime={ttime}')
dynamics = npt.NPT(model, timestep=md_timestep, temperature_K=temp, externalstress=0,
pfactor=pfactor,ttime=ttime)
if new_traj:
if hasattr(dynamics,'traj'):
dynamics.traj.close()
dynamics.observers = []
dynamics.traj = Trajectory(mdseed+".traj", 'w', model)
dynamics.attach(self.traj_write, interval=1, atoms=model, traj=dynamics.traj)
dynamics.new_traj = False
dynamics.run(md_steps)
return dynamics
# Define an PhysNet geometry optimisation function
[docs] def geom_opt(self,model,seed,calc_params,driver_tol='default',
solvent=None,charge=0,spin=0,writeonly=False,readonly=False,continuation=False,cleanup=False,
traj=None):
"""
Runs a geometry optimisation calculation with the PhysNet ASE calculator
model: ASE Atoms
seed: str
calc_params: dict
dummy: str
driver_tol:
target: int or None
solvent: str or None
readonly: bool
"""
from ase.io import Trajectory
calc_seed = calc_params['calc_seed']
target = calc_params['target']
suffix = calc_params['calc_suffix']
prefix = calc_params['calc_prefix']
from ase.optimize import BFGS
from ase.units import Hartree, Bohr
# Load the appropriate PhysNet Calculator
calc_ml = self.load(calc_seed,target,prefix=prefix,suffix=suffix,atoms=model)
model.calc = calc_ml
# Create instance of BFGS optimizer, run it and return results
dyn = BFGS(model,trajectory=traj)
# tolerances corresponding to NWChem settings
fac=1
if driver_tol=='default':
fmax = 0.00045*fac
if driver_tol=='loose':
fmax = 0.00450*fac
if driver_tol=='tight':
fmax = 0.000015*fac
dyn.run(fmax=fmax)
return model.get_potential_energy(), model.get_forces(), model.get_positions()
[docs] def freq(self,model_opt,seed,calc_params,solvent=None,charge=0,
temp=300,writeonly=False,readonly=False,continuation=False,cleanup=True):
"""
Runs a Vibrational Frequency calculation with the PhysNet ASE calculator
model_opt: ASE Atoms
seed: str
suffix: str
dummy: str
driver_tol:
target: int or None
solvent: str or None
temp: float
readonly: bool
"""
from ase.vibrations import Vibrations, Infrared
calc_seed = calc_params['calc_seed']
target = calc_params['target']
suffix = calc_params['calc_suffix']
prefix = calc_params['calc_prefix']
# Load the appropriate PhysNet Calculator
calc_ml = self.load(calc_seed,target,prefix=prefix,suffix=suffix,atoms=model)
model_opt.calc = calc_ml
# Create instance of Vibrations class, run it and return results
#vib = Vibrations(model_opt,name=self.calc_filename(seed,target,prefix=prefix,suffix=suffix))
#vib.run()
#freqs = vib.get_frequencies()
#vib.summary()
#vib.clean()
ir = Infrared(model_opt,name=self.calc_filename(seed,target,prefix=prefix,suffix=suffix))
ir.run()
ir.summary()
ir.write_spectra(out=ir.name+'_ir_spectrum.dat',start=0,end=4000,width=20)
ir.clean()
#print(freqs)
#return freqs
[docs] def singlepoint(self,model,seed,calc_params,solvent=None,charge=0,spin=0,readonly=False,cleanup=True):
"""
Runs a singlepoint calculation with the PhysNet ASE calculator
model: ASE Atoms
seed: str
suffix: str
dummy: str
target: int or None
solvent: str or None
readonly: bool
"""
calc_seed = calc_params['calc_seed']
target = calc_params['target']
suffix = calc_params['calc_suffix']
prefix = calc_params['calc_prefix']
# Load the appropriate PhysNet Calculator
calc_ml = self.load(calc_seed,target,prefix=prefix,suffix=suffix,atoms=model)
model.calc = calc_ml
e_calc = model.get_potential_energy()
if isinstance(e_calc,np.ndarray):
e_calc = np.float64(e_calc[0])
return e_calc + self.atom_e, model.get_forces()