Source code for esteem.trajectories

#!/usr/bin/env python
# coding: utf-8

# In[1]:


"""
Functions to help generate and manipulate trajectories
"""


# # Generic MD driver

# In[ ]:


# Generate training (or test) data
from ase.io import read, write
from ase.io.trajectory import Trajectory
from ase.units import Bohr
from ase import units
import numpy as np
from os import path, makedirs, getcwd, chdir, remove
import shutil

def targstr(targ):
    if targ == 0 or targ is None:
        return "gs"
    else:
        return "es"+str(targ)

def output_header(model,prefix):
    if (model.cell!=0.0).all():
        print(f"{prefix}: Step Targ     Potential_Energy      Kinetic_Energy       Temperature_K     Stress_[0:2]                   Volume")
    else:
        print(f"{prefix}: Step Targ     Potential_Energy      Kinetic_Energy       Temperature_K")

def report_outputs(model,step,targ,prefix):
    from ase.units import kB
    pe = model.get_potential_energy()
    if isinstance(pe,list) or isinstance(pe,np.ndarray):
        pe = float(pe[0])
    ke = model.get_kinetic_energy()
    temp = 2*ke/(3*len(model)*kB)
    if (model.cell!=0.0).all():
        stress = model.get_stress()[0:3]
        vol = model.cell.volume
        print(f"{prefix}: {step:4} {targ:4} {pe:20.10f} {ke:20.10f} {temp:20.10f} {stress} {vol:20.8f}",flush=True)
    else:
        print(f"{prefix}: {step:4} {targ:4} {pe:20.10f} {ke:20.10f} {temp:20.10f}",flush=True)

def get_param(md_param,label):

    if isinstance(md_param,dict):
        if label in md_param:
            param = md_param[label]
        else:
            raise Exception(f"# Error: dictionary provided has no entry for key '{label}'")
    else:
        param = md_param

    return param

[docs]def generate_md_trajectory(model,seed,target,traj_label,traj_suffix,wrapper, count_snaps,count_equil,md_steps,md_timestep,md_friction,temp, calc_params,charge=0,solvent=None,constraints=None,dynamics=None, snap_wrapper=None,snap_calc_params=None, continuation=False,store_full_traj=False,debugger=None): """ Runs equilibration followed by statistics generation MD for a given model. This generic function gets called by both AIMD and by MLMD. model: ASE Atoms Initial geometry for the MD trajectory seed: str String containing name of molecule and excited state index target: int or None Excited state index traj_label: character or str String labelling trajectory (usually A,B,C..) seed: str String containing name of molecule and excited state index traj_suffix: str String appended to seed and target state to give full trajectory filename wrapper: ESTEEM wrapper class Wrapper containing run_md and singlepoint functions count_snaps: int Number of snapshot runs count_equil: int Number of equilibration runs md_steps: int Number of molecular dynamics timesteps in each run above md_timestep: float Molecular dynamics timestep within the runs store_full_traj: bool Determines whether the full step-by-step trajectory is written, or just the snapshots each md_steps temp: float Thermostat temperature (NVT ensemble) calc_params: Control-parameters for the wrapper (for QM this is the basis, functional, and target; for ML this is the calculator seed, suffix, prefix and target) solvent: Implicit solvent description constraints: wrapper-dependent type Variables controlling the constraints dynamics: wrapper-dependent type Variables controlling the dynamics """ from ase.units import fs if isinstance(target,list): all_targets = target else: all_targets = [target] # Setup file extensions for restart files exts = [] snap_exts = [] if snap_wrapper is None: snap_wrapper = wrapper if snap_calc_params is None: snap_calc_params = calc_params if hasattr(wrapper,'get_restart_file_exts'): exts = wrapper.get_restart_file_exts() if hasattr(snap_wrapper,'get_restart_file_exts'): snap_exts = snap_wrapper.get_restart_file_exts() equil_dir = f'{traj_suffix}_equil' md_dir = f'{traj_suffix}_md' snaps_dir = f'{traj_suffix}_snaps' # Create equilibration directory if not path.exists(equil_dir): print(f"# Creating output directory {equil_dir} for equilibration MD runs") try: makedirs(equil_dir) except FileExistsError: print(f"# Possible mid-air collision between jobs - directory {equil_dir} exists") # Create MD directory if not path.exists(md_dir): print(f"# Creating output directory {md_dir} for MD runs") try: makedirs(md_dir) except FileExistsError: print(f"# Possible mid-air collision between jobs - directory {md_dir} exists") # Create snapshots directory if not path.exists(snaps_dir): print(f"# Creating output directory {snaps_dir} for snapshots") try: makedirs(snaps_dir) except FileExistsError: print(f"# Possible mid-air collision between jobs - directory {snaps_dir} exists") origdir = getcwd() # Ensure the system has a cell if we are doing NPT dynamics if hasattr(dynamics,'type'): if dynamics.type=="NPT" and (model.cell==0.0).all(): print('# Setting 10A padding') model.center(10) # Run Equilibration MD if not continuation: chdir(equil_dir) timestep = get_param(md_timestep,'EQ') friction = get_param(md_friction,'EQ') output_header(model,'EQ') if dynamics is None: from types import SimpleNamespace dynamics = SimpleNamespace() dynamics.type = "LANG" dynamics.friction = friction dynamics.new_traj = store_full_traj targ = all_targets[0] print(f"# Starting EQ runs 0 to {count_equil} (each {md_steps} steps of {dynamics.type} dynamics with timestep {timestep/fs} fs)") for step in range(0,count_equil): steplabel, readonly, cont = cycle_step_labels_and_restarts( seed,traj_label, None,equil_dir,md_dir, targ,targ,targ,step-1,step,step+1, count_equil,0,exts) dynamics = wrapper.run_md(model,steplabel,calc_params,md_steps,timestep,step, temp,solvent=solvent,charge=charge,constraints=constraints,dynamics=dynamics, continuation=cont,readonly=readonly) if debugger is not None: debugger.check(model) report_outputs(model,step,targ,'EQ') # Return to base directory chdir(origdir) print("# Finished Equilibration MD runs") else: print("# Skipping Equilibration MD runs") outtraj = {} snaps_done = 0 for targ in all_targets: # Check if output trajectory already exists output_traj = f"{seed}_{targstr(targ)}_{traj_label}_{traj_suffix}.traj" if path.isfile(output_traj) and not continuation: # Delete it and start afresh try: remove(output_traj) except: print('# Unable to remove {output_traj} - mid-air collision between jobs? Continuing') # Create output trajectory if continuation: print(f"# Opening output trajectory: {output_traj} for appending") outtraj[targ] = Trajectory(output_traj) snaps_done = len(outtraj[targ]) outtraj[targ].close() outtraj[targ] = Trajectory(output_traj,"a") else: print(f"# Opening output trajectory: {output_traj} for writing") outtraj[targ] = Trajectory(output_traj,"w") # Setup main MD timestep = get_param(md_timestep,'MD') friction = get_param(md_friction,'MD') if dynamics is None: from types import SimpleNamespace dynamics = SimpleNamespace() dynamics.type = "LANG" dynamics.friction = friction dynamics.new_traj = False if snaps_done<count_snaps: print(f"# Starting Snapshot MD runs {snaps_done} to {count_snaps} (each {md_steps} steps of {dynamics.type} dynamics with timestep {timestep/fs} fs)") output_header(model,'MD') else: print(f'# Snapshot MD runs already completed - {count_snaps} snapshots found') if (count_equil > 0 or continuation) and store_full_traj: if continuation: print(f"# Warning - continuation functionality for store_full_traj not finished - overwriting") dynamics.new_traj = True for step in range(snaps_done,count_snaps): chdir(md_dir) targ = all_targets[0] calc_params['target'] = targ # Run main MD between snapshots steplabel, readonly, cont = cycle_step_labels_and_restarts( seed,traj_label, equil_dir,md_dir,None, targ,targ,targ,step-1,step,step+1, count_snaps,count_equil,exts) dynamics = wrapper.run_md(model,steplabel,calc_params,md_steps,timestep,count_equil+step, temp,solvent=solvent,charge=charge,constraints=constraints,dynamics=dynamics, continuation=cont,readonly=readonly) report_outputs(model,step,targ,'MD') # Now do singlepoints for (ground and) excited state snapshots, if applicable chdir(origdir) chdir(snaps_dir) prev_targ = targ for i,targ in enumerate(all_targets[0:]): if targ == all_targets[-1]: next_targ = all_targets[0] next_step = step + 1 else: next_targ = all_targets[i+1] next_step = step steplabel, readonly, cont = cycle_step_labels_and_restarts( seed,traj_label, md_dir,snaps_dir,None, prev_targ,targ,next_targ,step,step,next_step, count_snaps,count_equil,snap_exts) readonly=False if path.isfile(steplabel+'.out') or path.isfile(steplabel+'.nwo'): readonly=True energy = None; forces = None; calc_params['target'] = targ calc_forces = True calc_dipole = True results = snap_wrapper.singlepoint(model,steplabel, snap_calc_params, solvent=solvent,charge=charge, forces=calc_forces,dipole=calc_dipole, continuation=cont,readonly=readonly) if calc_forces and calc_dipole: energy, forces, dipole, calc = results elif calc_forces and not calc_dipole: energy, forces, calc = results else: energy, calc = results energy = model.get_potential_energy() if isinstance(energy,list) or isinstance(energy,np.ndarray): energy = energy[0] if len(all_targets[0:])>1: print("SP: ",step,i,targ,energy,model.positions[0]) outtraj[targ].write(model) prev_targ = targ chdir(origdir) # Return to base directory and close trajectories chdir(origdir) for targ in all_targets: outtraj[targ].close() print("# Finished Snapshot MD runs")
# Helper functions
[docs]def cycle_step_labels_and_restarts(seed,traj_label, prevdir,currdir,nextdir, prevtarg,currtarg,nexttarg, prevstep,currstep,nextstep, count,prevcount,exts,use_subdirs=False): """ Move restart files around so that each step continues from the previous one """ # Find name of next restart files if (nextstep==count) and (nextdir is not None): # case for final step of non-final part next_steplabel = f"{seed}_{targstr(nexttarg)}_{traj_label}_{nextdir}{0:04}" next_dir = f"../{nextdir}/" if use_subdirs: next_dir += "{next_steplabel}/" else: next_steplabel = f"{seed}_{targstr(nexttarg)}_{traj_label}_{currdir}{nextstep:04}" next_dir = f"" if use_subdirs: next_dir += "{next_steplabel}/" next_file = [] for ext in exts: next_file.append(f"{next_dir}{next_steplabel}{ext}") # Find name of previous step restart files if (currstep==0) and (prevdir != currdir): # case for first step of new non-initial part prev_steplabel = f"{seed}_{targstr(prevtarg)}_{traj_label}_{prevdir}{prevcount-1:04}" prev_dir = f"../{prevdir}/" if use_subdirs: prev_dir += "{prev_steplabel}/" else: prev_steplabel = f"{seed}_{targstr(prevtarg)}_{traj_label}_{currdir}{prevstep:04}" prev_dir = f"" if use_subdirs: prev_dir += "{prev_steplabel}/" prev_file = [] for ext in exts: prev_file.append(f"{prev_dir}{prev_steplabel}{ext}") # Find name of current step restart file curr_steplabel = f"{seed}_{targstr(currtarg)}_{traj_label}_{currdir}{currstep:04}" curr_dir = f"" if use_subdirs: curr_dir += "{curr_steplabel}/" curr_file = [] for ext in exts: curr_file.append(f"{curr_dir}{curr_steplabel}{ext}") # Skip this step if the next one had previously started readonly = False if len(next_file)==0: return curr_steplabel, False, False if path.isfile(next_file[0]): readonly = True # On final overall step, assume if there is an output file for the last step then # we must have finished if ((path.isfile(f"{curr_steplabel}.nwo") or path.isfile(f"{curr_steplabel}.out")) and nextdir==None and currstep==count-1): readonly = True # If this step has not previously started, copy in restart file cont = False prev_rst_present = path.isfile(prev_file[0]) curr_rst_present = path.isfile(curr_file[0]) next_rst_present = path.isfile(next_file[0]) if (prev_rst_present and not curr_rst_present): if not path.exists(curr_dir) and len(exts)>0 and curr_dir!="": makedirs(curr_dir) for i,ext in enumerate(exts): print(f"Copying from {prev_file[i]} to {curr_file[i]}") shutil.copyfile(prev_file[i],curr_file[i]) cont = True elif (prev_rst_present and curr_rst_present and not next_rst_present): print(f"Resuming from data in {curr_file[0]}") cont = True # Debug printing of whether restart files are present if False: print(prevstep,targstr(prevtarg),prev_file[0],prev_rst_present,'cont=',cont) print(currstep,targstr(currtarg),curr_file[0],curr_rst_present,'read=',readonly) print(nextstep,targstr(nexttarg),next_file[0],next_rst_present) #print(step,curr_rst,path.isfile(curr_rst),cont) return curr_steplabel, readonly, cont
# # Obtain a starting geometry # In[ ]:
[docs]def find_initial_geometry(seed,geom_opt_func=None,calc_params={},which_traj=None,ntraj=-1): """ Obtains a suitable initial geometry for the current seed and state. Optimises it if not present. seed: str String indicating name of molecule, used to find xyz file geom_opt_func: function Wrapper function that runs a geometry optimisation calc_params: Control-parameters for the wrapper (for QM this is the basis, functional, and target; for ML this is the calculator seed, suffix, prefix and target) """ # Construct name for input geometries if 'target' in calc_params: targ = calc_params['target'] seed_state_str = f"{seed}_{targstr(targ)}" else: seed_state_str = seed xyzfile_opt = seed_state_str+'.xyz' model_init = None curr_dir = getcwd() # Look for trajectory of starting points if which_traj is not None: md_init_traj = f'{seed_state_str}_md_init.traj' try: traj = Trajectory(md_init_traj) print(f'# MD starting positions trajectory {md_init_traj} found: length {len(traj)}') import random if ntraj==len(traj): traj_list = get_trajectory_list(ntraj) i = traj_list.index(which_traj) print(f'# MD starting positions taken from snapshot {i} based on traj index {i} of {ntraj}==length') else: random.seed(which_traj) i = random.randrange(len(traj)) print(f'# MD starting positions taken from snapshot {i} based on random seed {which_traj}') model_init = traj[i] optimised = True except: print(f'# MD starting positions trajectory {md_init_traj} not found.') # Look for existing xyz file with optimised geometry if model_init is None: try: model_init = read(xyzfile_opt) print(f"# Optimised geometry found in file: {curr_dir}/{xyzfile_opt}") optimised = True except: print(f"# Optimised geometry not found, no file: {curr_dir}/{xyzfile_opt}") xyzfile_unopt = seed+'.xyz' print(f"# Reading geometry from file: {curr_dir}/{xyzfile_unopt}") model_init = read(xyzfile_unopt) optimised = False print(f"# Loaded {len(model_init)} atoms") # Relax to find optimized geometry for this state if not already done model = model_init.copy() if not optimised and geom_opt_func is not None: print("# Optimising geometry for "+seed) energy, forces, positions = geom_opt_func(model,seed,calc_params,'default') write(xyzfile_opt,model) return model
# # Recalculate energies based on an existing input trajectory # In[ ]:
[docs]def recalculate_trajectory(seed,target,traj_label,traj_suffix,input_target,input_suffix, wrapper,calc_params,input_traj_range=None,input_traj_label=None, output_traj_offset=0,charge=0,solvent=None, calc_forces=True,calc_dipole=True, geom_opt_kernel=False,vibfreq_kernel=False): """ Loads snapshots from a trajectory and recalculates the energy and forces with the current settings seed: str String containing name of molecule and excited state index target: int or None Excited state index traj_label: character or str String labelling trajectory (usually A,B,C..) """ from glob import glob if False: db_ext = '.db' else: db_ext = '.gbw' # Open input trajectory if input_traj_label is None: input_traj_label = traj_label input_traj = f"{seed}_{targstr(input_target)}_{input_traj_label}_{input_suffix}.traj" if not path.isfile(input_traj): raise Exception("Input trajectory not found: ",input_traj) print(f"# Reading from input trajectory {input_traj}") intraj = Trajectory(input_traj) if isinstance(target,list): all_targets = target else: all_targets = [target] if input_traj_range is None: input_traj_range = range(0,len(intraj)) outtraj = {} for targ in all_targets: # Check if output trajectory already exists output_traj = f"{seed}_{targstr(targ)}_{traj_label}_{traj_suffix}.traj" if output_traj_offset>0 or (len(input_traj_range)==1 and output_traj_offset==0): print(f"# Not writing to output trajectory as range is subset of trajectory.") print(f"# Post-process to combine whole trajectory.") outtraj[targ] = None elif path.isfile(output_traj): # Delete it and start afresh print(f"# Output trajectory {output_traj} already exists: Overwriting!") try: remove(output_traj) except: print('# Unable to remove {output_traj} - mid-air collision between jobs? Continuing') outtraj[targ] = Trajectory(output_traj,"w") else: # Create new file for output trajectory print(f"# Opening output trajectory {output_traj} for writing") outtraj[targ] = Trajectory(output_traj,"w") # Create directory for output files and change to it wdir = traj_suffix origdir = getcwd() if not path.exists(wdir): try: makedirs(wdir) except FileExistsError: print(f"# Possible mid-air collision between jobs - directory {wdir} exists") chdir(wdir) # Loop over and recalculate each trajectory point for i in input_traj_range: frame = intraj[i].copy() frame.calc = intraj[i].calc iout = i + output_traj_offset try: energy_in, forces_in = (frame.get_potential_energy(),frame.get_forces()) except: # existing trajectory may have no calculator energy_in = 0; forces_in = np.array([[0,0,0]]*len(frame)) cont = False for targ in all_targets: label = f"{seed}_{targstr(targ)}_{traj_label}_{traj_suffix}{iout:04}" # Default to readonly for multiple-frame trajectories readonly = len(input_traj_range)!=1 # If output files already exist, read them if hasattr(wrapper,'get_completed_file_exts'): completed_files = [path.isfile(label+f) for f in wrapper.get_completed_file_exts()] else: completed_files = [False] if all(completed_files): readonly = True elif readonly and any(completed_files): print(f'# Some files not present for {label} - cleanup may be required') main_outfile = f'{label}{wrapper.get_completed_file_exts()[0]}' try: print(f'# Final 30 lines of {main_outfile}:') final_lines = open(main_outfile, "r").readlines()[-30:] for line in final_lines: print(line, end="") except: print(f'# Not found: {main_outfile}') all_files = glob(f"{label}*") print(f'# Removing {label}*: {all_files}') for f in all_files: remove(f) energy = None; forces = None; if cont and not readonly: cycle_restarts(seed,traj_label,traj_suffix,prevtarg,targ,iout,iout,db_ext) calc_params['target'] = targ if geom_opt_kernel or vibfreq_kernel: from ase.constraints import FixAtoms c = FixAtoms(mask=[atom.tag!=1 for atom in frame]) frame.set_constraint(c) if geom_opt_kernel: results = wrapper.geom_opt(frame,label,calc_params, solvent=solvent,charge=charge, continuation=cont, readonly=readonly) energy, forces, pos = results dip = frame.calc.results['dipole'] if vibfreq_kernel: ir = wrapper.freq(frame,label,calc_params, solvent=solvent,charge=charge, continuation=cont,readonly=readonly,summary=True) #print('getting freqs') freqs = ir.get_frequencies() modes = ir.modes intensities = ir.intensities ir.clean() # Try to run or read the energy, forces and dipole for this frame success = False try: results = wrapper.singlepoint(frame,label,calc_params, solvent=solvent,charge=charge, forces=calc_forces,dipole=calc_dipole, continuation=cont,readonly=readonly) if calc_forces and calc_dipole: energy, forces, dipole, calc = results elif calc_forces and not calc_dipole: energy, forces, calc = results else: energy, calc = results if len(frame) != len(forces) and not(hasattr(energy,"__len__")): print(f'# ERROR: length of frame {i} ({len(frame)}) does not match length of forces array ({len(forces)})') raise Exception('Length matching failure') frame.calc = calc success = True except KeyboardInterrupt: # Always exit if Ctrl-C pressed raise Exception('Keyboard Interrupt') except Exception as e: # Any other error should be logged as a FAIL and zeros logged. energy = 0.0 frame.calc.results['dipole'] = np.zeros(3) dipole = np.zeros(3) forces = np.zeros([len(frame),3]) print(e) print('FAIL: ',label+'.out',iout,targ,len(frame)) #raise Exception(e) finally: chdir(origdir) chdir(wdir) cont = True # After first excitation, assume subsequently can restart prevtarg = targ # Prepare formatted strings for positions, forces and dipole pos = frame.positions pos_str = f'[ {pos[0,0]:10.6f} {pos[0,1]:10.6f} {pos[0,2]:10.6f} ]' if calc_forces: if forces.ndim==3: forces = np.mean(forces,axis=0) force_str = f'[ {forces[0,0]:12.9f} {forces[0,1]:12.9f} {forces[0,2]:12.9f} ]' else: force_str = '' if calc_dipole: if dipole.ndim==3: dipole = np.mean(dipole,axis=0)[0] elif dipole.ndim==2 and dipole.shape[-1]==3: dipole = np.mean(dipole,axis=0) dip_str = f'[ {dipole[0]:10.6f} {dipole[1]:10.6f} {dipole[2]:10.6f} ]' if isinstance(energy,np.ndarray): energy = np.mean(energy) freq_str = '' # Write line to stdout print(f'{iout:04} {targ:2} {energy:16.8f} {len(frame):5} {pos_str} {force_str} {dip_str}') # Supply keyword dipole explicitly to ensure it gets written or fails if outtraj[targ] is not None: if vibfreq_kernel: frame.info = {} frame.info['freqs'] = freqs frame.info['modes'] = modes frame.info['intensities'] = intensities if isinstance(frame.calc,list): frame.calc = frame.calc[-1] if True: #if success: outtraj[targ].write(frame) # Return to base directory and close trajectories chdir(origdir) for targ in all_targets: if outtraj[targ] is not None: outtraj[targ].close()
[docs]def cycle_restarts(seed,traj_label,traj_suffix,prevtarg,currtarg,prevstep,currstep,db_ext): """ Move db files around so that each step continues from the previous one """ # Find name of previous step file prev_steplabel = f"{seed}_{targstr(prevtarg)}_{traj_label}_{traj_suffix}{prevstep:04}" prev_dir = f"{prev_steplabel}/" if False else "" prev_db = f"{prev_dir}{prev_steplabel}{db_ext}" # Find name of current step restart file curr_steplabel = f"{seed}_{targstr(currtarg)}_{traj_label}_{traj_suffix}{currstep:04}" curr_dir = f"{curr_steplabel}/" if False else "" curr_db = f"{curr_dir}{curr_steplabel}{db_ext}" # Copy in db file prev_db_present = path.isfile(prev_db) curr_db_present = path.isfile(curr_db) if (prev_db_present and not curr_db_present): if curr_dir!="" and not path.exists(curr_dir) and db_ext is not None: makedirs(curr_dir) print(f"# Copying from {prev_db} to {curr_db}") shutil.copyfile(prev_db,curr_db)
# # Other tools to manage trajectories # In[ ]:
[docs]def get_trajectory_list(ntraj): """ Returns a list of characters to be used as trajectory labels Currently ABCDEDFGHIJKLMNOPQRSTUVWXYZ (26) then uses AA,AB,...,ZZ (26^2) """ import itertools, string extras = [] if ntraj>26*27: raise Exception('get_trajectory_list cannot yet handle more than 26*27 trajectories') if ntraj>26: extras = [''.join(i) for i in list(itertools.product(string.ascii_uppercase,string.ascii_uppercase))] return (list(string.ascii_uppercase)+extras)[0:ntraj]
# Merge trajectories (if generated separately)
[docs]def merge_traj(trajnames,trajfile): """ Merges a list of trajectories supplied as a list of filenames, and writes the result to another trajectory supplied as a filename """ fulltraj = Trajectory(trajfile,'w') for tr in trajnames: read_traj = Trajectory(tr) for frames in read_traj: fulltraj.write(frames) print("# Merged ",len(fulltraj)," frames: trajectory written to ",trajfile)
# Difference of two trajectories (generated separately)
[docs]def diff_traj(itrajfile,jtrajfile,outtrajfile): """ Takes two trajectory filenames and finds the energy and force difference between them, outputting the result to a trajectory name outtrajfile """ outtraj = Trajectory(outtrajfile,'w') itraj = Trajectory(itrajfile) jtraj = Trajectory(jtrajfile) assert(len(itraj)==len(jtraj)) for i in range(len(itraj)): iframe = itraj[i].copy() jframe = itraj[j].copy() iframe.results["energy"] -= jframe.results["energy"] iframe.results["forces"] -= jframe.results["forces"] outtraj.write(iframe) print("# Took difference of energy and forces. ",len(outtraj)," frames: trajectory written to ",outtrajfile) outtraj.close()
[docs]def split_traj(input_traj_file,output_trajs=None,ntrajs=None,randomise=False,start=0,end=-1): """ Takes in a trajectory filename, and splits it into sections, randomly or sequentially """ import numpy as np if output_trajs is not None: if ntrajs is not None: if ntrajs!=len(output_trajs): raise Exception('Contradictory inputs to split_traj') else: ntrajs = len(output_trajs) if ntrajs is None and output_trajs is None: raise Exception('No outputs specified') if output_trajs is None: output_trajs = [input_traj_file.replace('.traj','_')+str(i)+'.traj' for i in range(ntrajs)] intraj = Trajectory(input_traj_file) outtraj = [Trajectory(output_trajs[i],"w") for i in range(ntrajs)] total = len(intraj) if end > total: raise Exception(f'In split_traj, end was {end}, but trajectory was only size {total}') if end<0: end = max(total - end - 1,0) start = max(start,0) total = end-start if randomise: order = np.random.permutation(total) for j in range(start,end): k = int((j-start)*ntrajs/total) jp = j if randomise: jp = order[j] print(end,start,j,k) outtraj[k].write(intraj[j]) for i in range(ntrajs): outtraj[i].close()
def atom_energies(atom_traj): e_at = {} for t in atom_traj: if (len(t)>1): raise Exception(f'This is not a trajectory of single-atom frames: len(t)={len(t)}') at = t[0] j = at.symbol e_at[j] = t.get_potential_energy() return e_at def atom_energy(atoms,e_at): e = 0 for at in atoms.get_chemical_symbols(): e = e + e_at[at] return e
[docs]def subtract_atom_energies_from_traj(traj,atom_traj,trajout): """ Subtracts the energies associated with isolated atoms from the total energies in a trajectory """ e_at = atom_energies(atom_traj) for i,t in enumerate(traj): try: e_raw = t.get_potential_energy() except Exception as e: print(f'Error while subtracting atoms energies for frame {i}:') raise e t.calc.results["energy"] = e_raw - atom_energy(t,e_at) trajout.write(t)
# Scatter Plot def plot_diff(e_x,e_y,rms_fd,xlabel=None,ylabel=None,clabel=None,stats={},plot_file='show',align_axes=True): from matplotlib import pyplot # Optionally, plot to file or screen if plot_file is None: return # Set up fig, ax = pyplot.subplots() # Plot data im = ax.scatter(e_x,e_y,c=rms_fd) cb = fig.colorbar(im) cb.set_label(clabel) # Plot y=x for comparison if align_axes: lims = [np.min([ax.get_xlim(), ax.get_ylim()]), np.max([ax.get_xlim(), ax.get_ylim()])] ax.plot(lims, lims, 'k-', alpha=0.75, zorder=0) ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) stat_str="" for s in stats: stat_str += f'{s}: {stats[s]:10.4f}\n' ax.text(0.02,0.98,stat_str,transform=ax.transAxes, horizontalalignment='left',verticalalignment='top',) if plot_file=='show': fig.show() else: fig.savefig(plot_file) # Compare two trajectories and plot the difference
[docs]def compare_traj_to_traj(trajx,trajy,plot_func=plot_diff,plot_file=None,xlabel=None,ylabel=None,clabel=None): """ Compares two trajectories to each other and calculates statistics for how much they differ in energy and force trajx: ASE Trajectory Trajectory whose energy is plotted along x-axis trajy: ASE Trajectory Trajectory whose energy is plotted along y-axis plot_file: Filename to write plot image to. xlabel, ylabel: str Axis labels for plots. """ import numpy as np e_x = [] e_y = [] f_x = [] f_y = [] d_x = [] d_y = [] rms_fd = [] mae_fd = [] max_fd = [] nat = [] if plot_func is None: plot_func = plot_diff for i,framex in enumerate(trajx): # Read in total energy and forces from trajectories e_x.append(framex.get_potential_energy()) f_x.append(framex.get_forces()) d_x.append(framex.get_dipole_moment()) framey = trajy[i] e_y.append(framey.get_potential_energy()) f_y.append(framey.get_forces()) new_d = framey.get_dipole_moment() if new_d.shape==(1,3): new_d = new_d[0] d_y.append(new_d) nat.append(len(framex)) # Calculate RMS and Max force errors rms_fd.append(np.sqrt(np.mean((f_x[-1]-f_y[-1])**2))) mae_fd.append(np.mean(np.abs(f_x[-1]-f_y[-1]))) max_fd.append(np.max(np.sqrt((f_x[-1]-f_y[-1])**2))) # Calculate Root Mean Square energy error e_x = np.array(e_x); e_y = np.array(e_y); nat = np.array(nat) if e_y.ndim==2: mean_e_y = np.mean(e_y,axis=1) std_e_y = np.std(e_y,axis=1) else: mean_e_y = e_y std_e_y = np.zeros_like(e_y) diff = e_x - mean_e_y rms_e_err = np.sqrt((1./len(diff))*np.sum(diff**2)) mae_e_err = np.mean(np.abs(diff)) max_e_err = np.max(np.abs(diff)) print(f"# MAE_dE,RMS_dE,MAX_dE {mae_e_err:16.8f} {rms_e_err:16.8f} {max_e_err:16.8f}") # Calculate Root Mean Square of RMS force difference rms_fd = np.array(rms_fd) rms_fd_err = np.sqrt((1./len(rms_fd))*np.sum(rms_fd**2)) mae_fd_err = np.mean(mae_fd) max_fd_err = np.max(np.abs(rms_fd)) print(f"# MAE_dF,RMS_dF,MAX_dF {mae_fd_err:16.8f} {rms_fd_err:16.8f} {max_fd_err:16.8f}") # Calculate Root Mean Square of and MAE of dipole difference diff = np.array(d_x)-np.array(d_y) rms_d_err = np.sqrt(np.sum((1./len(diff))*(np.sum(diff**2,axis=1)))) mae_d_err = np.mean(np.sqrt(np.sum(diff**2,axis=1))) max_d_err = np.max(np.sqrt(np.sum(diff**2,axis=1))) print(f"# MAE_dd,RMS_dd,MAX_dd {mae_d_err:16.8f} {rms_d_err:16.8f} {max_d_err:16.8f}") # Calculate Mean number of atoms mean_nat = np.mean(nat) print(f"# Mean Number of atoms: {mean_nat:10.4f}") stats = {"MAE dE (eV) ": mae_e_err, "MAE dF (eV/A)": mae_fd_err, "MAE dd (e.A) ": mae_d_err, "RMS dE (eV) ": rms_e_err, "RMS dF (eV/A)": rms_fd_err, "RMS dd (e.A) ": rms_d_err, "MAX dE (eV) ": max_e_err, "MAX dF (eV/A)": max_fd_err, "MAX dd (e.A) ": max_d_err, "Mean No.atoms": mean_nat} plot_func(e_x,mean_e_y,rms_fd,xlabel,ylabel,clabel,stats,plot_file) return rms_e_err,max_e_err,rms_fd_err,max_fd_err,e_x,e_y,rms_fd