Source code for esteem.wrappers.physnet

#!/usr/bin/env python
# coding: utf-8

# In[ ]:


"""Defines the PhysNetWrapper Class"""

import numpy as np

[docs]class PhysNetWrapper(): """ Sets up, trains and runs a PhysNet Neural Network Calculator to represent a potential energy surface. """ # Class attribute: default training arguments default_train_args = {'num_features': 128, 'num_basis': 64, 'num_blocks': 5, 'num_residual_atomic': 2, 'num_residual_interaction': 3, 'num_residual_output': 1, 'cutoff': 10.0, 'use_electrostatic': 1, 'use_dispersion': 1, 'grimme_s6': 0.5, 'grimme_s8': 0.2130, 'grimme_a1': 0.0, 'grimme_a2': 6.0519, 'dataset': 'test.npz', 'num_train': 450, 'num_valid': 50, 'seed': 42, 'max_steps': 1000000, 'learning_rate': 0.001, 'max_norm': 1000.0, 'ema_decay': 0.999, 'keep_prob': 1.0, 'l2lambda': 0.0, 'nhlambda': 0.01, 'decay_steps': 10000000, 'decay_rate': 0.1, 'batch_size': 32, 'valid_batch_size': 50, 'force_weight': 52.91772105638412, 'charge_weight': 14.399645351950548, 'dipole_weight': 27.211386024367243, 'summary_interval': 10000, 'validation_interval': 10000, 'save_interval': 50000, 'record_run_metadata': 0} def __init__(self,**kwargs): """Sets up instance attributes for PhysNetWrapper """ from copy import deepcopy self.train_args = deepcopy(self.default_train_args) # Allow overrides for this instance of the class for kw in self.train_args: if kw in kwargs: self.train_args[kw] = kwargs[kw] # Make a set of default loading arguments by copying in training arguments self.load_args = {} # These have different names between training and loading self.load_args['F'] = self.train_args['num_features'] self.load_args['K'] = self.train_args['num_basis'] self.load_args['sr_cut'] = self.train_args['cutoff'] if 'lr_cut' in kwargs: self.load_args['lr_cut'] = kwargs['lr_cut'] else: self.load_args['lr_cut'] = None # no need for an electrostatic cutoff for s in ['grimme_s6','grimme_s8','grimme_a1','grimme_a2']: kw = s.replace('grimme_','') if s in self.train_args: self.load_args[kw] = self.train_args[s] # The rest have the same name for kw in ['num_residual_atomic','num_residual_interaction','num_residual_output', 'use_electrostatic','use_dispersion']: if kw in self.train_args: self.load_args[kw] = self.train_args[kw] self.calc_ext = "" self.log_ext = "" self.calc = None self.atom_e = 0.0 self.atom_energies = {} self.atoms_on_load = None def calc_filename(self,seed,target,prefix='',suffix=''): if target is None or target == 0: calcfn = seed+"_gs_"+suffix else: calcfn = seed+"_es"+str(target)+"_"+suffix calcfn = prefix + calcfn return calcfn
[docs] def load(self,seed,target=None,prefix="",suffix="",atoms=None): """ Loads an existing PhysNet Calculator seed: str target: int suffix: str kwargs: dict other keywords """ if self.calc is not None: return self.calc calcfn = self.calc_filename(seed,target,prefix=prefix,suffix=suffix) import NNCalculator import tensorflow as tf import numpy as np from ase.io import read # Backup for if calculator is loaded before atoms available if atoms is None: atoms = read(f"{prefix}{seed}.xyz") try: atoms = read(f"{prefix}{seed}_gs_opt.xyz") except: try: print(f"{prefix}{seed}.xyz") atoms = read(f"{prefix}{seed}.xyz") except: raise Exception(f"Could not find initial geometry .xyz file for {seed}") # Find checkpoint file for calculator log_dir=f"{calcfn}/best" checkpoint = tf.train.latest_checkpoint(log_dir) print(f'Loading Calculator from: {log_dir}, {checkpoint} with args: {self.load_args}',flush=True) self.calc = NNCalculator.NNCalculator(checkpoint,atoms,**self.load_args) # Load atoms trajectory and calculate initial value of atom energy from ase.io import Trajectory from esteem.trajectories import atom_energy,atom_energies atom_traj = Trajectory(f'{prefix}{seed}_atoms_{suffix}.traj') self.atom_energies = atom_energies(atom_traj) self.atom_e = atom_energy(atoms,self.atom_energies) self.atoms_on_load = atoms.copy() return self.calc
def traj_to_npz(self,seed,trajfile,suffix): from ase.io import Trajectory from esteem import trajectories atom_traj = Trajectory(f'{seed}_atoms_{suffix}.traj') traj = Trajectory(trajfile) trajoutfile = trajfile.replace(".traj","_atsub.traj") trajout = Trajectory(trajoutfile,'w') trajectories.subtract_atom_energies_from_traj(traj,atom_traj,trajout) trajout.close() traj.close() traj = Trajectory(trajoutfile) length = len(traj) max_at = max([len(a) for a in traj]) N = np.zeros((length,), dtype=int) #number of atoms E = np.zeros(length) #energy Q = np.zeros(length) #total charge D = np.zeros((length,3)) #dipole moment vector Z = np.zeros((length,max_at)) #nuclear charge R = np.zeros((length,max_at,3)) #cartesian coordinates F = np.zeros((length,max_at,3)) #forces for i,a in enumerate(traj): e_raw = a.get_potential_energy() charge = a.get_initial_charges() N[i] = len(a) E[i] = e_raw Q[i] = np.sum(charge) D[i] = a.get_dipole_moment() Z[i,0:len(a)] = a.get_atomic_numbers() R[i,0:len(a)] = a.get_positions() F[i,0:len(a)] = a.get_forces() outfilename = trajfile.replace(".traj",".npz") np.savez(outfilename, N=N, E=E, Q=Q, D=D, Z=Z, R=R, F=F) return outfilename, len(traj)
[docs] def train(self,seed,prefix="",suffix="",trajfile="",target=None,restart=False,**kwargs): """ Runs training for PhysNet model using an input trajectory as training points seed: str target: int suffix: str trajfile: str restart: bool kwargs: dict """ label = self.calc_filename(seed,target,prefix=prefix,suffix=suffix) # Sort out optional arguments to see if any overrides to defaults have been supplied from copy import deepcopy import os train_args = deepcopy(self.default_train_args) for kw in train_args: if kw in kwargs: train_args[kw] = kwargs[kw] print(f'Converting trajectory {trajfile} to npz format') npzfile, ntraj = self.traj_to_npz(seed,trajfile,suffix) train_args['dataset'] = npzfile valid_frac = 0.1 # NB: Hardcoded to 10% train_args['num_train'] = int(ntraj*(1.0-valid_frac)) train_args['num_valid'] = int(ntraj*valid_frac) train_args['restart'] = label if restart: if not os.path.exists(label): raise Exception(f'Path {label} not found for calculator restart') print(f'Continuing with run found in {label}') # Write config.txt config_file = 'config.txt' print(f'Writing PhysNet configuration to {config_file}') with open(config_file,"w") as f: for kw in train_args: f.write(f'--{kw}={train_args[kw]}\n') print(f'Training PhysNet model using trajectory {npzfile}') import sys store_argv = sys.argv sys.argv = ['train.py'] import train sys.argv = store_argv
def traj_write(self,atoms,traj): kw = {'dipole': atoms.get_dipole_moment(), 'charges': atoms.get_charges(), 'energy': atoms.get_potential_energy(), 'forces': atoms.get_forces()} traj.write(atoms,**kw)
[docs] def run_mlmd(self,model,mdseed,calc_params,md_steps,md_timestep,superstep,temp, solvent=None,restart=False,readonly=False,constraints=None,dynamics=None, continuation=None): """ Runs a Molecular Dynamics calculation with the PhysNet ASE calculator. model: ASE Atoms seed: str calc_params: dict md_steps: int md_timestep: float superstep: int temp: float target: int or None solvent: str or None restart: bool readonly: bool """ from ase.md.velocitydistribution import MaxwellBoltzmannDistribution, Stationary, ZeroRotation from ase.md import Langevin, npt from ase.io import Trajectory from ase import units calc_seed = calc_params['calc_seed'] target = calc_params['target'] suffix = calc_params['calc_suffix'] prefix = calc_params['calc_prefix'] # Load the PhysNet Calculator calc_ml = self.load(calc_seed,target,prefix=prefix,suffix=suffix,atoms=model) model.calc = calc_ml # Initialise velocities if this is first step, otherwise inherit from model if np.all(model.get_momenta() == 0.0): MaxwellBoltzmannDistribution(model,temperature_K=temp) # For each ML superstep, remove C.O.M. translation and rotation #Stationary(model) #ZeroRotation(model) #print(f'constraints: {model.constraints}') if readonly: model = read(mdseed+".xyz") # Read final image model.calc = calc_ml model.get_potential_energy() # Recalculate energy for final image return None else: if hasattr(dynamics,'new_traj'): new_traj = dynamics.new_traj else: new_traj = False if dynamics is None or isinstance(dynamics,str): new_traj = True if dynamics is None or dynamics=="LANG": dynamics = Langevin(model, timestep=md_timestep, temperature_K=temp, friction=0.002) if dynamics=="NPT": ttime=25*units.fs # Bulk modulus for ethanol pfactor = 1.06e9*(units.J/units.m**3)*ttime**2 print(f'pfactor = {pfactor}, ttime={ttime}') dynamics = npt.NPT(model, timestep=md_timestep, temperature_K=temp, externalstress=0, pfactor=pfactor,ttime=ttime) if new_traj: if hasattr(dynamics,'traj'): dynamics.traj.close() dynamics.observers = [] dynamics.traj = Trajectory(mdseed+".traj", 'w', model) dynamics.attach(self.traj_write, interval=1, atoms=model, traj=dynamics.traj) dynamics.new_traj = False dynamics.run(md_steps) return dynamics
# Define an PhysNet geometry optimisation function
[docs] def geom_opt(self,model,seed,calc_params,driver_tol='default', solvent=None,charge=0,spin=0,writeonly=False,readonly=False,continuation=False,cleanup=False, traj=None): """ Runs a geometry optimisation calculation with the PhysNet ASE calculator model: ASE Atoms seed: str calc_params: dict dummy: str driver_tol: target: int or None solvent: str or None readonly: bool """ from ase.io import Trajectory calc_seed = calc_params['calc_seed'] target = calc_params['target'] suffix = calc_params['calc_suffix'] prefix = calc_params['calc_prefix'] from ase.optimize import BFGS from ase.units import Hartree, Bohr # Load the appropriate PhysNet Calculator calc_ml = self.load(calc_seed,target,prefix=prefix,suffix=suffix,atoms=model) model.calc = calc_ml # Create instance of BFGS optimizer, run it and return results dyn = BFGS(model,trajectory=traj) # tolerances corresponding to NWChem settings fac=1 if driver_tol=='default': fmax = 0.00045*fac if driver_tol=='loose': fmax = 0.00450*fac if driver_tol=='tight': fmax = 0.000015*fac dyn.run(fmax=fmax) return model.get_potential_energy(), model.get_forces(), model.get_positions()
[docs] def freq(self,model_opt,seed,calc_params,solvent=None,charge=0, temp=300,writeonly=False,readonly=False,continuation=False,cleanup=True): """ Runs a Vibrational Frequency calculation with the PhysNet ASE calculator model_opt: ASE Atoms seed: str suffix: str dummy: str driver_tol: target: int or None solvent: str or None temp: float readonly: bool """ from ase.vibrations import Vibrations, Infrared calc_seed = calc_params['calc_seed'] target = calc_params['target'] suffix = calc_params['calc_suffix'] prefix = calc_params['calc_prefix'] # Load the appropriate PhysNet Calculator calc_ml = self.load(calc_seed,target,prefix=prefix,suffix=suffix,atoms=model) model_opt.calc = calc_ml # Create instance of Vibrations class, run it and return results #vib = Vibrations(model_opt,name=self.calc_filename(seed,target,prefix=prefix,suffix=suffix)) #vib.run() #freqs = vib.get_frequencies() #vib.summary() #vib.clean() ir = Infrared(model_opt,name=self.calc_filename(seed,target,prefix=prefix,suffix=suffix)) ir.run() ir.summary() ir.write_spectra(out=ir.name+'_ir_spectrum.dat',start=0,end=4000,width=20) ir.clean()
#print(freqs) #return freqs
[docs] def singlepoint(self,model,seed,calc_params,solvent=None,charge=0,spin=0,readonly=False,cleanup=True): """ Runs a singlepoint calculation with the PhysNet ASE calculator model: ASE Atoms seed: str suffix: str dummy: str target: int or None solvent: str or None readonly: bool """ calc_seed = calc_params['calc_seed'] target = calc_params['target'] suffix = calc_params['calc_suffix'] prefix = calc_params['calc_prefix'] # Load the appropriate PhysNet Calculator calc_ml = self.load(calc_seed,target,prefix=prefix,suffix=suffix,atoms=model) model.calc = calc_ml e_calc = model.get_potential_energy() if isinstance(e_calc,np.ndarray): e_calc = np.float64(e_calc[0]) return e_calc + self.atom_e, model.get_forces()