Source code for esteem.trajectories

#!/usr/bin/env python
# coding: utf-8

# In[ ]:


"""
Functions to help generate and manipulate trajectories
"""


# # Generic MD driver

# In[ ]:


# Generate training (or test) data
from ase.io import read, write
from ase.io.trajectory import Trajectory
from ase.units import Bohr
from ase import units
import numpy as np
from os import path, makedirs, getcwd, chdir, remove
import shutil

def targstr(targ):
    if targ == 0 or targ is None:
        return "gs"
    else:
        return "es"+str(targ)

def output_header(model,prefix):
    if (model.cell!=0.0).all():
        print(f"{prefix}: Step Targ     Potential_Energy          Kinetic_Energy      Stress_[0:2]                   Volume")
    else:
        print(f"{prefix}: Step Targ     Potential_Energy          Kinetic_Energy")

def report_outputs(model,step,targ,prefix):
    pe = model.get_potential_energy()
    if isinstance(pe,list) or isinstance(pe,np.ndarray):
        pe = float(pe[0])
    ke = model.get_kinetic_energy()
    if (model.cell!=0.0).all():
        stress = model.get_stress()[0:3]
        vol = model.cell.volume
        print(f"{prefix}: {step:4} {targ:4} {pe:20.10f} {ke:20.10f} {stress} {vol:20.8f}",flush=True)
    else:
        print(f"{prefix}: {step:4} {targ:4} {pe:20.10f} {ke:20.10f}",flush=True)
    
[docs]def generate_md_trajectory(model,seed,target,traj_label,trajname_suffix,md_func, count_snaps,count_equil,md_steps,md_timestep,temp, calc_params,constraints=None,dynamics=None, singlepoint_func=None): """ Runs equilibration followed by statistics generation MD for a given model. This generic function gets called by both AIMD and by MLMD. model: ASE Atoms Initial geometry for the MD trajectory seed: str String containing name of molecule and excited state index target: int or None Excited state index traj_label: character or str String labelling trajectory (usually A,B,C..) seed: str String containing name of molecule and excited state index trajname_suffix: str String appended to ``seed_state_traj`` to give full trajectory filename md_func: function Wrapper function that runs ``md_steps`` steps of MD on ``model`` count_snaps: int Number of snapshot runs count_equil: int Number of equilibration runs md_steps: int Number of molecular dynamics timesteps in each run above md_timestep: float Molecular dynamics timestep within the runs temp: float Thermostat temperature (NVT ensemble) calc_params: Control-parameters for the wrapper (for QM this is the basis, functional, and target; for ML this is the calculator seed, suffix, prefix and target) constraints: wrapper-dependent type Variables controlling the constraints dynamics: wrapper-dependent type Variables controlling the dynamics """ if isinstance(target,list): all_targets = target else: all_targets = [target] rst_ext = '.qmdrst' equil_dir = f'{trajname_suffix}_equil' snaps_dir = f'{trajname_suffix}_snaps' # Change to equilibration directory if not path.exists(equil_dir): print(f"# Creating output directory {equil_dir} for equilibration MD runs") try: makedirs(equil_dir) except FileExistsError: print(f"# Possible mid-air collision between jobs - directory {equil_dir} exists") origdir = getcwd() chdir(equil_dir) # Run Equilibration MD print("Starting Equilibration MD runs 0 to ",count_equil) output_header(model,'EQ') targ = all_targets[0] seed_state_traj = f"{seed}_{targstr(targ)}_{traj_label}" for step in range(0,count_equil): steplabel, readonly, cont = cycle_step_labels_and_restarts( seed,traj_label,None,equil_dir,snaps_dir, targ,targ,targ,step-1,step,step+1, count_equil,0,rst_ext) dynamics = md_func(model,steplabel,calc_params,md_steps,md_timestep,step, temp,solvent=None,constraints=constraints,dynamics=dynamics, continuation=cont,readonly=readonly) report_outputs(model,step,targ,'EQ') # Return to base directory chdir(origdir) print("Finished Equilibration MD runs") outtraj = {} for targ in all_targets: # Check if output trajectory already exists output_traj = f"{seed}_{targstr(targ)}_{traj_label}_{trajname_suffix}.traj" if path.isfile(output_traj): # Delete it and start afresh remove(output_traj) # Create output trajectory print(f"Opening output trajectory: {output_traj} for writing") outtraj[targ] = Trajectory(output_traj,"w") # Change to snapshots directory if not path.exists(snaps_dir): print(f"# Creating output directory {snaps_dir} for snapshot MD runs") try: makedirs(snaps_dir) except FileExistsError: print(f"# Possible mid-air collision between jobs - directory {snaps_dir} exists") chdir(snaps_dir) # Run Snapshot MD print("Starting Snapshot MD runs 0 to ",count_snaps) output_header(model,'MD') dynamics.new_traj = True for step in range(count_snaps): targ = all_targets[0] calc_params['target'] = targ seed_state_traj = f"{seed}_{targstr(targ)}_{traj_label}" steplabel, readonly, cont = cycle_step_labels_and_restarts( seed,traj_label,equil_dir,snaps_dir,None, targ,targ,targ,step-1,step,step+1, count_snaps,count_equil,rst_ext) dynamics = md_func(model,steplabel,calc_params,md_steps,md_timestep,count_equil+step, temp,solvent=None,constraints=constraints,dynamics=dynamics, continuation=cont,readonly=readonly) report_outputs(model,step,targ,'MD') #if "forces" in model.arrays: # del model.arrays["forces"] #write(f"{steplabel}.xyz",model) # Write snapshot to trajectory outtraj[targ].write(model) # Now do excited states, if applicable prev_targ = targ for i,targ in enumerate(all_targets[1:]): # TODO: calculate next_targ properly to enable skipping over finished frames correctly if targ == all_targets[-1]: next_targ = all_targets[0] next_step = step + 1 else: next_targ = all_targets[i+2] next_step = step steplabel, readonly, cont = cycle_step_labels_and_restarts( seed,traj_label,snaps_dir,snaps_dir,None, prev_targ,targ,next_targ,step,step,next_step, count_snaps,count_equil,rst_ext) readonly=False if path.isfile(steplabel+'.out') or path.isfile(steplabel+'.nwo'): readonly=True energy = None; forces = None; calc_params['target'] = targ energy, forces, _ = singlepoint_func(model,steplabel,calc_params, solvent=None,forces=True,continuation=cont,readonly=readonly) print("SP: ",step,i,targ,model.get_potential_energy(), model.positions[0]) outtraj[targ].write(model) prev_targ = targ # Return to base directory and close trajectories chdir(origdir) for targ in all_targets: outtraj[targ].close() print("Finished Snapshot MD runs")
# Helper functions
[docs]def cycle_step_labels_and_restarts(seed,traj_label,prevdir,currdir,nextdir, prevtarg,currtarg,nexttarg,prevstep,currstep,nextstep, count,prevcount,rst_ext=None): """Move restart files around so that each step continues from the previous one""" db_ext = '.db' # Find name of next restart file if (nextstep==count) and (nextdir is not None): # case for final step of non-final part next_steplabel = f"{seed}_{targstr(nexttarg)}_{traj_label}_{nextdir}{0:04}" next_dir = f"../{nextdir}/{next_steplabel}" else: next_steplabel = f"{seed}_{targstr(nexttarg)}_{traj_label}_{currdir}{nextstep:04}" next_dir = next_steplabel next_rst = f"{next_dir}/{next_steplabel}{rst_ext}" # Find name of previous step restart file if (currstep==0) and (prevdir != currdir): # case for first step of new non-initial part prev_steplabel = f"{seed}_{targstr(prevtarg)}_{traj_label}_{prevdir}{prevcount-1:04}" prev_dir = f"../{prevdir}/{prev_steplabel}" else: prev_steplabel = f"{seed}_{targstr(prevtarg)}_{traj_label}_{currdir}{prevstep:04}" prev_dir = prev_steplabel prev_rst = f"{prev_dir}/{prev_steplabel}{rst_ext}" prev_db = f"{prev_dir}/{prev_steplabel}{db_ext}" # Find name of current step restart file curr_steplabel = f"{seed}_{targstr(currtarg)}_{traj_label}_{currdir}{currstep:04}" curr_dir = curr_steplabel curr_db = f"{curr_dir}/{curr_steplabel}{db_ext}" curr_rst = f"{curr_dir}/{curr_steplabel}{rst_ext}" #if not path.exists(next_dir) and rst_ext is not None: # makedirs(next_dir) # Skip this step if the next one had previously started readonly = False if path.isfile(next_rst): readonly = True # On final overall step, assume if there is an output file for the last step then # we must have finished if ((path.isfile(f"{curr_steplabel}.nwo") or path.isfile(f"{curr_steplabel}.out")) and nextdir==None and currstep==count-1): readonly = True # If this step has not previously started, copy in restart file cont = False prev_rst_present = path.isfile(prev_rst) curr_rst_present = path.isfile(curr_rst) next_rst_present = path.isfile(next_rst) if (path.isfile(prev_rst) and not path.isfile(curr_rst)): if not path.exists(curr_dir) and rst_ext is not None: makedirs(curr_dir) print(f"Copying from {prev_rst} to {curr_rst}") print(f"Copying from {prev_db} to {curr_db}") shutil.copyfile(prev_rst,curr_rst) shutil.copyfile(prev_db,curr_db) cont = True elif (path.isfile(prev_rst) and path.isfile(curr_rst) and not path.isfile(next_rst)): print(f"Resuming from data in {curr_rst} and {curr_db}") cont = True # Debug printing of whether restart files are present if False: print(prevstep,targstr(prevtarg),prev_rst,prev_rst_present,'cont=',cont) print(currstep,targstr(currtarg),curr_rst,curr_rst_present,'read=',readonly) print(nextstep,targstr(nexttarg),next_rst,next_rst_present) #print(step,curr_rst,path.isfile(curr_rst),cont) return curr_steplabel, readonly, cont
# # Obtain a starting geometry # In[ ]:
[docs]def find_initial_geometry(seed,geom_opt_func,calc_params,which_traj=None): """ Obtains a suitable initial geometry for the current seed and state. Optimises it if not present. seed: str String indicating name of molecule, used to find xyz file geom_opt_func: function Wrapper function that runs a geometry optimisation calc_params: Control-parameters for the wrapper (for QM this is the basis, functional, and target; for ML this is the calculator seed, suffix, prefix and target) """ # Construct name for input geometries if 'target' in calc_params: targ = calc_params['target'] seed_state_str = f"{seed}_{targstr(targ)}" else: seed_state_str = seed xyzfile_opt = seed_state_str+'_opt.xyz' model_init = None # Look for trajectory of starting points if which_traj is not None: md_init_traj = f'{seed_state_str}_md_init.traj' try: traj = Trajectory(md_init_traj) print(f'MD starting positions trajectory {md_init_traj} found: length {len(traj)}') import random random.seed(which_traj) i = random.randrange(len(traj)) print(f'MD starting positions taken from snapshot {i} based on random seed {which_traj}') model_init = traj[i] optimised = True except: print(f'MD starting positions trajectory {md_init_traj} not found.') # Look for existing xyz file with optimised geometry if model_init is None: try: model_init = read(xyzfile_opt) print("Optimised geometry found in file: ",xyzfile_opt) optimised = True except: print("Optimised geometry not found, no file: ",xyzfile_opt) xyzfile_unopt = seed+'.xyz' print("Reading geometry from file: ",xyzfile_unopt) model_init = read(xyzfile_unopt) optimised = False # Relax to find optimized geometry for this state if not already done model = model_init.copy() if not optimised: print("Optimising geometry for "+seed) energy, forces, positions = geom_opt_func(model,seed,calc_params,'default') write(xyzfile_opt,model) return model
# # Recalculate energies based on an existing input trajectory # In[ ]:
[docs]def recalculate_trajectory(seed,target,traj_label,trajname_suffix,input_target,input_suffix, singlepoint_func,calc_params,input_traj_range=None,output_traj_offset=0, solvent=None): """ Loads snapshots from a trajectory and recalculates the energy and forces with the current settings seed: str String containing name of molecule and excited state index target: int or None Excited state index traj_label: character or str String labelling trajectory (usually A,B,C..) """ # Open input trajectory input_traj = f"{seed}_{targstr(input_target)}_{traj_label}_{input_suffix}.traj" if not path.isfile(input_traj): raise Exception("Input trajectory not found: ",input_traj) print(f"Reading from input trajectory {input_traj}") intraj = Trajectory(input_traj) if isinstance(target,list): all_targets = target else: all_targets = [target] if input_traj_range is None: input_traj_range = range(0,len(intraj)) outtraj = {} for targ in all_targets: # Check if output trajectory already exists output_traj = f"{seed}_{targstr(targ)}_{traj_label}_{trajname_suffix}.traj" if output_traj_offset>0: print(f"Not writing to output trajectory as range is subset of trajectory.") print(f"Post-process to combine whole trajectory.") outtraj[targ] = None elif path.isfile(output_traj): # Delete it and start afresh print(f"Output trajectory {output_traj} already exists: Overwriting!") remove(output_traj) outtraj[targ] = Trajectory(output_traj,"w") else: # Create new file for output trajectory print(f"Opening output trajectory {output_traj} for writing") outtraj[targ] = Trajectory(output_traj,"w") # Create directory for output files and change to it wdir = trajname_suffix origdir = getcwd() if not path.exists(wdir): makedirs(wdir) chdir(wdir) # Loop over and recalculate each trajectory point for i in input_traj_range: frame = intraj[i].copy() frame.calc = intraj[i].calc iout = i + output_traj_offset try: energy_in, forces_in = (frame.get_potential_energy(),frame.get_forces()) except: # existing trajectory may have no calculator energy_in = 0; forces_in = np.array([[0,0,0]]*len(frame)) cont = False for targ in all_targets: label = f"{seed}_{targstr(targ)}_{traj_label}_{trajname_suffix}{iout:04}" readonly=False if path.isfile(label+'.out') or path.isfile(label+'.nwo'): readonly=True try: energy = None; forces = None; if cont: cycle_restarts(seed,traj_label,trajname_suffix,prevtarg,targ,iout,iout) calc_params['target'] = targ energy, forces, _ = singlepoint_func(frame,label,calc_params, solvent=solvent,forces=True,continuation=cont,readonly=readonly) cont = True # After first excitation, assume subsequently can restart prevtarg = targ print(iout,targ,energy,frame.positions[0],forces[0],frame.calc.results['dipole']) except KeyboardInterrupt: raise Exception('Keyboard Interrupt') # Supply keyword dipole to ensure it gets written or fails if outtraj[targ] is not None: outtraj[targ].write(frame,dipole=frame.calc.results['dipole']) # Return to base directory and close trajectories chdir(origdir) for targ in all_targets: if outtraj[targ] is not None: outtraj[targ].close()
[docs]def cycle_restarts(seed,traj_label,trajname_suffix,prevtarg,currtarg,prevstep,currstep): """Move db files around so that each step continues from the previous one""" db_ext = '.db' # Find name of previous step file prev_steplabel = f"{seed}_{targstr(prevtarg)}_{traj_label}_{trajname_suffix}{prevstep:04}" prev_dir = prev_steplabel prev_db = f"{prev_dir}/{prev_steplabel}{db_ext}" # Find name of current step restart file curr_steplabel = f"{seed}_{targstr(currtarg)}_{traj_label}_{trajname_suffix}{currstep:04}" curr_dir = curr_steplabel curr_db = f"{curr_dir}/{curr_steplabel}{db_ext}" # Copy in db file cont = False prev_db_present = path.isfile(prev_db) curr_db_present = path.isfile(curr_db) if (prev_db_present and not curr_db_present): if not path.exists(curr_dir) and db_ext is not None: makedirs(curr_dir) print(f"Copying from {prev_db} to {curr_db}") shutil.copyfile(prev_db,curr_db)
# # Other tools to manage trajectories # In[ ]: import string
[docs]def get_trajectory_list(ntraj): """ Returns a list of characters to be used as trajectory labels Currently ABCDEDFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz (52) """ return list(string.ascii_uppercase+string.ascii_lowercase)[0:ntraj]
# Merge trajectories (if generated separately)
[docs]def merge_traj(trajnames,trajfile): """ Merges a list of trajectories supplied as a list of filenames, and writes the result to another trajectory supplied as a filename """ fulltraj = Trajectory(trajfile,'w') for tr in trajnames: read_traj = Trajectory(tr) for frames in read_traj: fulltraj.write(frames) print("# Merged ",len(fulltraj)," frames: trajectory written to ",trajfile)
[docs]def split_traj(input_traj_file,output_trajs=None,ntrajs=None,randomise=False,start=0,end=-1): """ Takes in a trajectory filename, and splits it into sections, randomly or sequentially """ import numpy as np if output_trajs is not None: if ntrajs is not None: if ntrajs!=len(output_trajs): raise Exception('Contradictory inputs to split_traj') else: ntrajs = len(output_trajs) if ntrajs is None and output_trajs is None: raise Exception('No outputs specified') if output_trajs is None: output_trajs = [input_traj_file.replace('.traj','_')+str(i)+'.traj' for i in range(ntrajs)] intraj = Trajectory(input_traj_file) outtraj = [Trajectory(output_trajs[i],"w") for i in range(ntrajs)] total = len(intraj) if end > total: raise Exception(f'In split_traj, end was {end}, but trajectory was only size {total}') if end<0: end = max(total - end - 1,0) start = max(start,0) total = end-start if randomise: order = np.random.permutation(total) for j in range(start,end): k = int((j-start)*ntrajs/total) jp = j if randomise: jp = order[j] print(end,start,j,k) outtraj[k].write(intraj[j]) for i in range(ntrajs): outtraj[i].close()
def atom_energies(atom_traj): e_at = {} for t in atom_traj: if (len(t)>1): raise Exception(f'This is not a trajectory of single-atom frames: len(t)={len(t)}') at = t[0] j = at.symbol e_at[j] = t.get_potential_energy() return e_at def atom_energy(atoms,e_at): e = 0 for at in atoms.get_chemical_symbols(): e = e + e_at[at] return e
[docs]def fit_atom_energies_to_traj(traj,atom_traj_file): """ Uses linear regression to best fit atom energies to trajectory with varying numbers of atoms of each species """
[docs]def subtract_atom_energies_from_traj(traj,atom_traj,trajout): """ Subtracts the energies associated with isolated atoms from the total energies in a trajectory """ e_at = atom_energies(atom_traj) for t in traj: e_raw = t.get_potential_energy() t.calc.results["energy"] = e_raw - atom_energy(t,e_at) trajout.write(t)
# Compare two trajectories and plot the difference
[docs]def compare_traj_to_traj(trajx,trajy,plot_file=None,xlabel=None,ylabel=None): """ Compares two trajectories to each other and calculates statistics for how much they differ in energy and force trajx: ASE Trajectory Trajectory whose energy is plotted along x-axis trajy: ASE Trajectory Trajectory whose energy is plotted along y-axis plot_file: Filename to write plot image to. xlabel, ylabel: str Axis labels for plots. """ import numpy as np e_x = [] e_y = [] f_x = [] f_y = [] rms_fd = [] max_fd = [] for i,framex in enumerate(trajx): # Read in total energy and forces from trajectories e_x.append(framex.get_potential_energy()) f_x.append(framex.get_forces()) framey = trajy[i] e_y.append(framey.get_potential_energy()) f_y.append(framey.get_forces()) # Calculate RMS and Max force errors rms_fd.append(np.sqrt(np.mean((f_x[-1]-f_y[-1])**2))) max_fd.append(np.max(np.sqrt((f_x[-1]-f_y[-1])**2))) # Calculate Root Mean Square energy error diff = np.array(e_x)-np.array(e_y) rms_e_err = np.sqrt((1./len(diff))*np.sum(diff**2)) max_e_err = np.max(np.abs(diff)) print(f"# RMS_dE,MAX_dE {rms_e_err:20.12f} {max_e_err:20.12f}") # Calculate Root Mean Square of of RMS force difference rms_fd = np.array(rms_fd) rms_fd_err = np.sqrt((1./len(rms_fd))*np.sum(rms_fd**2)) max_fd_err = np.max(np.abs(rms_fd)) print(f"# RMS_dF,MAX_dF {rms_fd_err:20.12f} {max_fd_err:20.12f}") stats = {"RMS dE (eV) ": rms_e_err, "RMS dF (eV/A)": rms_fd_err, "MAX dE (eV) ": max_e_err, "MAX dF (eV/A)": max_fd_err} clabel = 'RMS Force component deviation (eV/Ang)' plot_diff(e_x,e_y,rms_fd,xlabel,ylabel,clabel,stats,plot_file) return rms_e_err,max_e_err,rms_fd_err,max_fd_err,e_x,e_y,rms_fd
# Scatter Plot from matplotlib import pyplot def plot_diff(e_x,e_y,rms_fd,xlabel=None,ylabel=None,clabel=None,stats={},plot_file='show',align_axes=True): # Optionally, plot to file or screen if plot_file is None: return # Set up fig, ax = pyplot.subplots() # Plot data im = ax.scatter(e_x,e_y,c=rms_fd) cb = fig.colorbar(im) cb.set_label(clabel) # Plot y=x for comparison if align_axes: lims = [np.min([ax.get_xlim(), ax.get_ylim()]), np.max([ax.get_xlim(), ax.get_ylim()])] ax.plot(lims, lims, 'k-', alpha=0.75, zorder=0) ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) stat_str="" for s in stats: stat_str += f'{s}: {stats[s]:10.4f}\n' ax.text(0.02,0.98,stat_str,transform=ax.transAxes, horizontalalignment='left',verticalalignment='top',) if plot_file=='show': fig.show() else: fig.savefig(plot_file) # In[ ]: # #TEMP: testing for subtract_atom_energies_from_traj # if True: # import os # from ase.io import Trajectory # os.chdir("/home/theory/phspvr/cate_qmd/") # atom_traj = Trajectory('cate_atoms.traj') # traj = Trajectory('cate_gs_B_training.traj') # print(len(traj)) # trajout = Trajectory('cate_gs_B_training_atsub.traj','w') # subtract_atom_energies_from_traj(traj,atom_traj,trajout) # trajout.close() # In[ ]: # In[ ]: