Source code for esteem.wrappers.physnet

#!/usr/bin/env python
# coding: utf-8

# In[ ]:


"""Defines the PhysNetWrapper Class"""

import numpy as np

[docs]class PhysNetWrapper(): """ Sets up, trains and runs a PhysNet Neural Network Calculator to represent a potential energy surface. """ # Class attribute: default training arguments default_train_args = {'num_features': 128, 'num_basis': 64, 'num_blocks': 5, 'num_residual_atomic': 2, 'num_residual_interaction': 3, 'num_residual_output': 1, 'cutoff': 10.0, 'use_electrostatic': 1, 'use_dispersion': 1, 'grimme_s6': 1.0, #0.5, 'grimme_s8': 1.2177, #0.2130, 'grimme_a1': 0.4145, #0.0, 'grimme_a2': 4.8593, #6.0519, 'dataset': 'test.npz', 'num_train': 450, 'num_valid': 50, 'seed': 42, 'max_steps': 1000000, 'learning_rate': 0.001, 'max_norm': 1000.0, 'ema_decay': 0.999, 'keep_prob': 1.0, 'l2lambda': 0.0, 'nhlambda': 0.01, 'decay_steps': 10000000, 'decay_rate': 0.1, 'batch_size': 32, 'valid_batch_size': 50, 'force_weight': 52.91772105638412, 'charge_weight': 14.399645351950548, 'dipole_weight': 27.211386024367243, 'summary_interval': 10000, 'validation_interval': 10000, 'save_interval': 50000, 'record_run_metadata': 0} def __init__(self,**kwargs): """Sets up instance attributes for PhysNetWrapper """ from copy import deepcopy self.train_args = deepcopy(self.default_train_args) # Allow overrides for this instance of the class for kw in self.train_args: if kw in kwargs: self.train_args[kw] = kwargs[kw] # Make a set of default loading arguments by copying in training arguments self.load_args = {} # These have different names between training and loading self.load_args['F'] = self.train_args['num_features'] self.load_args['K'] = self.train_args['num_basis'] self.load_args['sr_cut'] = self.train_args['cutoff'] if 'lr_cut' in kwargs: self.load_args['lr_cut'] = kwargs['lr_cut'] else: self.load_args['lr_cut'] = None # no need for an electrostatic cutoff for s in ['grimme_s6','grimme_s8','grimme_a1','grimme_a2']: kw = s.replace('grimme_','') if s in self.train_args: self.load_args[kw] = self.train_args[s] # The rest have the same name for kw in ['num_residual_atomic','num_residual_interaction','num_residual_output', 'use_electrostatic','use_dispersion']: if kw in self.train_args: self.load_args[kw] = self.train_args[kw] for kw in ['use_ewald','ewald_alpha','ewald_kmax','ewald_Nmax','skin']: if kw in kwargs: self.load_args[kw] = kwargs[kw] self.calc = None self.calc_params = None self.atom_e = 0.0 self.atom_energies = {} self.update_atom_e = False def calc_filename(self,seed,target,prefix='',suffix=''): if target is None or target == 0: calcfn = seed+"_gs_"+suffix else: calcfn = seed+"_es"+str(target)+"_"+suffix calcfn = prefix + calcfn return calcfn
[docs] def load(self,seed,target=None,prefix="",suffix="",atoms=None): """ Loads an existing PhysNet Calculator seed: str target: int suffix: str kwargs: dict other keywords """ # Check against or store previous calculator parameters if self.calc_params is not None: if ((self.calc_params['target'] != target) or #(self.calc_params['calc_prefix'] != prefix) or (self.calc_params['calc_suffix'] != suffix) or (self.calc_params['calc_seed'] != seed)): raise Exception('Attempted change of calculator parameters for previously-loaded wrapper. Not supported.') self.calc_params = {'target': target,'calc_prefix': prefix, 'calc_suffix': suffix,'calc_seed': seed} if self.calc is not None: return self.calc calcfn = self.calc_filename(seed,target,prefix=prefix,suffix=suffix) import NNCalculator import tensorflow as tf import numpy as np import os from ase.io import read # Backup for if calculator is loaded before atoms available if atoms is None: atoms = read(f"{prefix}{seed}.xyz") try: atoms = read(f"{prefix}{seed}_gs_opt.xyz") except: try: print(f"# Loading {prefix}{seed}.xyz") atoms = read(f"{prefix}{seed}.xyz") except: raise Exception(f"# Could not find initial geometry .xyz file for {seed}") # Find checkpoint file for calculator log_dir=f"{calcfn}/best" checkpoint = tf.train.latest_checkpoint(log_dir) if checkpoint is None: self.calc_params = None raise Exception(f"Checkpoint for {calcfn} not found in current directory {os.getcwd()}") print(f'# Loading Calculator from: {log_dir}, {checkpoint} with args: {self.load_args}',flush=True) self.calc = NNCalculator.NNCalculator(checkpoint,atoms,**self.load_args) #self.calc2 = NNCalculator.NNCalculator(checkpoint,atoms,nn_in=self.calc.nn,sess_in=self.calc.sess,**self.load_args) # Load atoms trajectory and calculate initial value of atom energy from ase.io import Trajectory from esteem.trajectories import atom_energy,atom_energies atom_traj_name = f'{prefix}{seed}_atoms_{suffix}.traj' try: atom_traj = Trajectory(atom_traj_name) except Exception as e: print(f"Could not load atom_traj file: {atom_traj_name} in {os.getcwd()}") raise e self.atom_energies = atom_energies(atom_traj) self.atom_e = atom_energy(atoms,self.atom_energies) return self.calc
def traj_to_npz(self,seed,trajfile,suffix): from ase.io import Trajectory from esteem import trajectories atom_traj = Trajectory(f'{seed}_atoms_{suffix}.traj') traj = Trajectory(trajfile) trajoutfile = trajfile.replace(".traj","_atsub.traj") trajout = Trajectory(trajoutfile,'w') trajectories.subtract_atom_energies_from_traj(traj,atom_traj,trajout) trajout.close() traj.close() traj = Trajectory(trajoutfile) length = len(traj) max_at = max([len(a) for a in traj]) N = np.zeros((length,), dtype=int) #number of atoms E = np.zeros(length) #energy Q = np.zeros(length) #total charge D = np.zeros((length,3)) #dipole moment vector Z = np.zeros((length,max_at)) #nuclear charge R = np.zeros((length,max_at,3)) #cartesian coordinates F = np.zeros((length,max_at,3)) #forces for i,a in enumerate(traj): try: e_raw = a.get_potential_energy() charge = a.get_initial_charges() N[i] = len(a) E[i] = e_raw Q[i] = np.sum(charge) D[i] = a.get_dipole_moment() Z[i,0:len(a)] = a.get_atomic_numbers() R[i,0:len(a)] = a.get_positions() F[i,0:len(a)] = a.get_forces() except Exception as e: print(f'Error while converting frame {i}:') raise e outfilename = trajfile.replace(".traj",".npz") np.savez(outfilename, N=N, E=E, Q=Q, D=D, Z=Z, R=R, F=F) return outfilename, len(traj)
[docs] def reset_loss(self,seed,prefix="",suffix="",target=None): """ Runs training for PhysNet model using an input trajectory as training points seed: str target: int suffix: str prefix: str """ from os import path # Not sensible to try training in other directory than current, so prefix is # suppressed here but used elsewhere (eg for retrieving trajs) label = self.calc_filename(seed,target,prefix="",suffix=suffix) best_loss_file = f"{label}/best/best_loss.npz" if path.isfile(best_loss_file): loss_file = np.load(best_loss_file) best_loss = loss_file["loss"].item() best_emae = loss_file["emae"].item() best_ermse = loss_file["ermse"].item() best_fmae = loss_file["fmae"].item() best_frmse = loss_file["frmse"].item() best_qmae = loss_file["qmae"].item() best_qrmse = loss_file["qrmse"].item() best_dmae = loss_file["dmae"].item() best_drmse = loss_file["drmse"].item() best_step = loss_file["step"].item() print(f"# {best_loss_file} previously contained") for f in loss_file: print(f"# {f}={loss_file[f]}") print(f"# Re-initializing {best_loss_file} to infinity") best_loss = np.Inf #initialize best loss to infinity best_emae = np.Inf best_ermse = np.Inf best_fmae = np.Inf best_frmse = np.Inf best_qmae = np.Inf best_qrmse = np.Inf best_dmae = np.Inf best_drmse = np.Inf # best_step = 0. # Not changing best_step np.savez(best_loss_file, loss=best_loss, emae=best_emae, ermse=best_ermse, fmae=best_fmae, frmse=best_frmse, qmae=best_qmae, qrmse=best_qrmse, dmae=best_dmae, drmse=best_drmse, step=best_step) else: print("Warning: reset_loss == True but best_loss_file {best_loss_file} was not present")
[docs] def train(self,seed,prefix="",suffix="",trajfile="",target=None,restart=False,**kwargs): """ Runs training for PhysNet model using an input trajectory as training points seed: str target: int suffix: str trajfile: str restart: bool kwargs: dict """ # Not sensible to try training in other directory than current, so prefix is # suppressed here but used elsewhere (eg for retrieving trajs) label = self.calc_filename(seed,target,prefix="",suffix=suffix) # Sort out optional arguments to see if any overrides to defaults have been supplied from copy import deepcopy import os train_args = deepcopy(self.train_args) for kw in train_args: if kw in kwargs: train_args[kw] = kwargs[kw] print(f'Converting trajectory {trajfile} to npz format') npzfile, ntraj = self.traj_to_npz(seed,trajfile,suffix) train_args['dataset'] = npzfile valid_frac = 0.1 # NB: Hardcoded to 10% train_args['num_train'] = int(ntraj*(1.0-valid_frac)) train_args['num_valid'] = int(ntraj*valid_frac) train_args['restart'] = label if restart: if not os.path.exists(label): raise Exception(f'Path {label} not found for calculator restart') print(f'Continuing with run found in {label}') # Write config.txt import sys store_argv = sys.argv #config_file = f'{label}_config.txt' #print(f'Writing PhysNet configuration to {config_file}') #with open(config_file,"w") as f: # for kw in train_args: # f.write(f'--{kw}={train_args[kw]}\n') #sys.argv = ['train.py',config_file] sys.argv = ['train.py'] for kw in train_args: sys.argv.append(f'--{kw}') sys.argv.append(f'{train_args[kw]}') print(f'# Training PhysNet model using trajectory {npzfile} with parameters:') print('#',train_args) import train sys.argv = store_argv
def traj_write(self,atoms,traj): kw = {'dipole': atoms.get_dipole_moment(), 'charges': atoms.get_charges(), 'energy': atoms.get_potential_energy(), 'forces': atoms.get_forces()} traj.write(atoms,**kw) def process_dynamics(self,dynamics,model,mdseed,traj_write,md_timestep,temp): from ase.md import Langevin, npt from ase.io import Trajectory # Check for existing settings, retain them if present if hasattr(dynamics,"new_traj"): new_traj = dynamics.new_traj else: new_traj = True if hasattr(dynamics,"friction"): friction = dynamics.friction else: friction = 0.01 if hasattr(dynamics,"type"): dyn_type = dynamics.type else: dyn_type = "LANG" # Set up new dynamics objects, if required, otherwise retain existing ones if dyn_type=="LANG": if type(dynamics)!=Langevin: dynamics = Langevin(model, timestep=md_timestep, temperature_K=temp, friction=friction) else: # in case they have changed dynamics.set_timestep(md_timestep) dynamics.set_friction(friction) dynamics.set_temperature(temperature_K=temp) if dyn_type=="NPT" and type(dynamics)!=npt.NPT: ttime=25*units.fs pfactor = 1.06e9*(units.J/units.m**3)*ttime**2 # Bulk modulus for ethanol dynamics = npt.NPT(model, timestep=md_timestep, temperature_K=temp, externalstress=0, pfactor=pfactor,ttime=ttime) # Copy in extra info stored in dynamics objects, for later retrieval dynamics.new_traj = new_traj dynamics.friction = friction dynamics.type = dyn_type # Handle attachment of trajectory if new_traj: if hasattr(dynamics,'traj'): dynamics.traj.close() dynamics.observers = [] dynamics.traj = Trajectory(mdseed+".traj", 'w', model) dynamics.attach(traj_write, interval=1, atoms=model, traj=dynamics.traj) dynamics.new_traj = False return dynamics
[docs] def run_md(self,model,mdseed,calc_params,md_steps,md_timestep,superstep,temp, solvent=None,charge=0,restart=False,readonly=False,constraints=None,dynamics=None, continuation=None): """ Runs a Molecular Dynamics calculation with the PhysNet ASE calculator. model: ASE Atoms seed: str calc_params: dict md_steps: int md_timestep: float superstep: int temp: float target: int or None solvent: str or None restart: bool readonly: bool """ from ase.md.velocitydistribution import MaxwellBoltzmannDistribution, Stationary, ZeroRotation from ase import units calc_seed = calc_params['calc_seed'] target = calc_params['target'] suffix = calc_params['calc_suffix'] prefix = calc_params['calc_prefix'] # Load the PhysNet Calculator calc_ml = self.load(calc_seed,target,prefix=prefix,suffix=suffix,atoms=model) model.calc = calc_ml # Initialise velocities if this is first step, otherwise inherit from model if np.all(model.get_momenta() == 0.0): MaxwellBoltzmannDistribution(model,temperature_K=temp) # For each ML superstep, remove C.O.M. translation and rotation #Stationary(model) #ZeroRotation(model) #print(f'constraints: {model.constraints}') if readonly: model = read(mdseed+".xyz") # Read final image model.calc = calc_ml model.get_potential_energy() # Recalculate energy for final image return None else: dynamics = self.process_dynamics(dynamics,model,mdseed,self.traj_write,md_timestep,temp) dynamics.run(md_steps) return dynamics
# Define a PhysNet geometry optimisation function
[docs] def geom_opt(self,model,seed,calc_params,driver_tol='default', solvent=None,charge=0,spin=0,writeonly=False,readonly=False,continuation=False,cleanup=False, traj=None): """ Runs a geometry optimisation calculation with the PhysNet ASE calculator model: ASE Atoms seed: str calc_params: dict dummy: str driver_tol: target: int or None solvent: str or None readonly: bool """ from ase.io import Trajectory calc_seed = calc_params['calc_seed'] target = calc_params['target'] suffix = calc_params['calc_suffix'] prefix = calc_params['calc_prefix'] from ase.optimize import BFGS from ase.units import Hartree, Bohr # Load the appropriate PhysNet Calculator calc_ml = self.load(calc_seed,target,prefix=prefix,suffix=suffix,atoms=model) model.calc = calc_ml # Create instance of BFGS optimizer, run it and return results dyn = BFGS(model,trajectory=traj) # tolerances corresponding to NWChem settings fac=1 if driver_tol=='default': fmax = 0.001*fac if driver_tol=='loose': fmax = 0.00450*fac if driver_tol=='tight': fmax = 0.000015*fac dyn.log = self.log dyn.run(fmax=fmax,steps=1000) if hasattr(calc_ml,'results'): calc_ml.results['dipole'] = model.get_dipole_moment() e_calc = model.get_potential_energy() if isinstance(e_calc,np.ndarray): e_calc = np.float64(e_calc[0]) return e_calc, model.get_forces(), model.get_positions()
def log(self): pass
[docs] def freq(self,model_opt,seed,calc_params,solvent=None,charge=0, temp=300,writeonly=False,readonly=False,continuation=False, summary=True,cleanup=True): """ Runs a Vibrational Frequency calculation with the PhysNet ASE calculator model_opt: ASE Atoms seed: str suffix: str dummy: str driver_tol: target: int or None solvent: str or None temp: float readonly: bool """ from ase.vibrations import Vibrations, Infrared from ase.constraints import FixAtoms from os import getcwd calc_seed = calc_params['calc_seed'] target = calc_params['target'] suffix = calc_params['calc_suffix'] prefix = calc_params['calc_prefix'] # Load the appropriate PhysNet Calculator calc_ml = self.load(calc_seed,target,prefix=prefix,suffix=suffix,atoms=model_opt) model_opt.calc = calc_ml indices = list(range(len(model_opt))) if len(model_opt.constraints)>0: constraint_str = "%geom Constraints\n" for constr in model_opt.constraints: if isinstance(constr,FixAtoms): indices = [i for i in indices if i not in constr.index] # Create instance of Vibrations class, run it and return results #vib = Vibrations(model_opt,name=self.calc_filename(seed,target,prefix=prefix,suffix=suffix)) #vib.run() #freqs = vib.get_frequencies() #vib.summary() #vib.clean() calcname = self.calc_filename(seed,target,prefix="",suffix=suffix) #print(getcwd) ir = Infrared(model_opt,indices=indices,name=calcname) ir.run() #if summary: # ir.summary() #ir.write_spectra(out=ir.name+'_ir_spectrum.dat',start=0,end=4000,width=20) #ir.clean() #print(freqs) return ir
[docs] def singlepoint(self,model,seed,calc_params,solvent=None,charge=0,spin=0,forces=False,dipole=True, readonly=False,continuation=False,cleanup=True): """ Runs a singlepoint calculation with the PhysNet ASE calculator model: ASE Atoms seed: str suffix: str dummy: str target: int or None solvent: str or None readonly: bool """ from esteem.trajectories import atom_energy from ase.calculators.singlepoint import SinglePointCalculator calc_seed = calc_params['calc_seed'] target = calc_params['target'] suffix = calc_params['calc_suffix'] prefix = calc_params['calc_prefix'] # Load the appropriate PhysNet Calculator calc_ml = self.load(calc_seed,target,prefix=prefix,suffix=suffix,atoms=model) model.calc = calc_ml e_calc = model.get_potential_energy() if isinstance(e_calc,np.ndarray): e_calc = np.float64(e_calc[0]) if forces: f_calc = model.get_forces() if hasattr(calc_ml,'results'): d_calc = model.get_dipole_moment() calc_ml.results['dipole'] = d_calc if hasattr(self,'store_atom_e'): store_atom_e = self.store_atom_e else: store_atom_e = False if self.update_atom_e or store_atom_e: self.atom_e = atom_energy(model,self.atom_energies) if store_atom_e: calc_ml = SinglePointCalculator(model) calc_ml.results['energy'] = e_calc + self.atom_e if forces: calc_ml.results['forces'] = f_calc calc_ml.results['dipole'] = d_calc #calc_ml.energy = e_calc + self.atom_e if forces: if dipole: return e_calc + self.atom_e, f_calc, d_calc, calc_ml else: return e_calc + self.atom_e, f_calc, calc_ml else: if dipole: return e_calc + self.atom_e, d_calc, calc_ml else: return e_calc + self.atom_e, calc_ml
# In[ ]: