#!/usr/bin/env python
# coding: utf-8
# In[ ]:
"""
Functions to help generate and manipulate trajectories
"""
# # Generic MD driver
# In[ ]:
# Generate training (or test) data
from ase.io import read, write
from ase.io.trajectory import Trajectory
from ase.units import Bohr
from ase import units
import numpy as np
from os import path, makedirs, getcwd, chdir, remove
import shutil
def targstr(targ):
if targ == 0 or targ is None:
return "gs"
else:
return "es"+str(targ)
def output_header(model,prefix):
if (model.cell!=0.0).all():
print(f"{prefix}: Step Targ Potential_Energy Kinetic_Energy Stress_[0:2] Volume")
else:
print(f"{prefix}: Step Targ Potential_Energy Kinetic_Energy")
def report_outputs(model,step,targ,prefix):
pe = model.get_potential_energy()
if isinstance(pe,list) or isinstance(pe,np.ndarray):
pe = float(pe[0])
ke = model.get_kinetic_energy()
if (model.cell!=0.0).all():
stress = model.get_stress()[0:3]
vol = model.cell.volume
print(f"{prefix}: {step:4} {targ:4} {pe:20.10f} {ke:20.10f} {stress} {vol:20.8f}",flush=True)
else:
print(f"{prefix}: {step:4} {targ:4} {pe:20.10f} {ke:20.10f}",flush=True)
[docs]def generate_md_trajectory(model,seed,target,traj_label,trajname_suffix,md_func,
count_snaps,count_equil,md_steps,md_timestep,temp,
calc_params,constraints=None,dynamics=None,
singlepoint_func=None):
"""
Runs equilibration followed by statistics generation MD for a given model.
This generic function gets called by both AIMD and by MLMD.
model: ASE Atoms
Initial geometry for the MD trajectory
seed: str
String containing name of molecule and excited state index
target: int or None
Excited state index
traj_label: character or str
String labelling trajectory (usually A,B,C..)
seed: str
String containing name of molecule and excited state index
trajname_suffix: str
String appended to ``seed_state_traj`` to give full trajectory filename
md_func: function
Wrapper function that runs ``md_steps`` steps of MD on ``model``
count_snaps: int
Number of snapshot runs
count_equil: int
Number of equilibration runs
md_steps: int
Number of molecular dynamics timesteps in each run above
md_timestep: float
Molecular dynamics timestep within the runs
temp: float
Thermostat temperature (NVT ensemble)
calc_params:
Control-parameters for the wrapper (for QM this is the basis, functional, and target;
for ML this is the calculator seed, suffix, prefix and target)
constraints: wrapper-dependent type
Variables controlling the constraints
dynamics: wrapper-dependent type
Variables controlling the dynamics
"""
if isinstance(target,list):
all_targets = target
else:
all_targets = [target]
rst_ext = '.qmdrst'
equil_dir = f'{trajname_suffix}_equil'
snaps_dir = f'{trajname_suffix}_snaps'
# Change to equilibration directory
if not path.exists(equil_dir):
print(f"# Creating output directory {equil_dir} for equilibration MD runs")
try:
makedirs(equil_dir)
except FileExistsError:
print(f"# Possible mid-air collision between jobs - directory {equil_dir} exists")
origdir = getcwd()
chdir(equil_dir)
# Run Equilibration MD
print("Starting Equilibration MD runs 0 to ",count_equil)
output_header(model,'EQ')
targ = all_targets[0]
seed_state_traj = f"{seed}_{targstr(targ)}_{traj_label}"
for step in range(0,count_equil):
steplabel, readonly, cont = cycle_step_labels_and_restarts(
seed,traj_label,None,equil_dir,snaps_dir,
targ,targ,targ,step-1,step,step+1,
count_equil,0,rst_ext)
dynamics = md_func(model,steplabel,calc_params,md_steps,md_timestep,step,
temp,solvent=None,constraints=constraints,dynamics=dynamics,
continuation=cont,readonly=readonly)
report_outputs(model,step,targ,'EQ')
# Return to base directory
chdir(origdir)
print("Finished Equilibration MD runs")
outtraj = {}
for targ in all_targets:
# Check if output trajectory already exists
output_traj = f"{seed}_{targstr(targ)}_{traj_label}_{trajname_suffix}.traj"
if path.isfile(output_traj):
# Delete it and start afresh
remove(output_traj)
# Create output trajectory
print(f"Opening output trajectory: {output_traj} for writing")
outtraj[targ] = Trajectory(output_traj,"w")
# Change to snapshots directory
if not path.exists(snaps_dir):
print(f"# Creating output directory {snaps_dir} for snapshot MD runs")
try:
makedirs(snaps_dir)
except FileExistsError:
print(f"# Possible mid-air collision between jobs - directory {snaps_dir} exists")
chdir(snaps_dir)
# Run Snapshot MD
print("Starting Snapshot MD runs 0 to ",count_snaps)
output_header(model,'MD')
dynamics.new_traj = True
for step in range(count_snaps):
targ = all_targets[0]
calc_params['target'] = targ
seed_state_traj = f"{seed}_{targstr(targ)}_{traj_label}"
steplabel, readonly, cont = cycle_step_labels_and_restarts(
seed,traj_label,equil_dir,snaps_dir,None,
targ,targ,targ,step-1,step,step+1,
count_snaps,count_equil,rst_ext)
dynamics = md_func(model,steplabel,calc_params,md_steps,md_timestep,count_equil+step,
temp,solvent=None,constraints=constraints,dynamics=dynamics,
continuation=cont,readonly=readonly)
report_outputs(model,step,targ,'MD')
#if "forces" in model.arrays:
# del model.arrays["forces"]
#write(f"{steplabel}.xyz",model)
# Write snapshot to trajectory
outtraj[targ].write(model)
# Now do excited states, if applicable
prev_targ = targ
for i,targ in enumerate(all_targets[1:]):
# TODO: calculate next_targ properly to enable skipping over finished frames correctly
if targ == all_targets[-1]:
next_targ = all_targets[0]
next_step = step + 1
else:
next_targ = all_targets[i+2]
next_step = step
steplabel, readonly, cont = cycle_step_labels_and_restarts(
seed,traj_label,snaps_dir,snaps_dir,None,
prev_targ,targ,next_targ,step,step,next_step,
count_snaps,count_equil,rst_ext)
readonly=False
if path.isfile(steplabel+'.out') or path.isfile(steplabel+'.nwo'):
readonly=True
energy = None; forces = None;
calc_params['target'] = targ
energy, forces, _ = singlepoint_func(model,steplabel,calc_params,
solvent=None,forces=True,continuation=cont,readonly=readonly)
print("SP: ",step,i,targ,model.get_potential_energy(), model.positions[0])
outtraj[targ].write(model)
prev_targ = targ
# Return to base directory and close trajectories
chdir(origdir)
for targ in all_targets:
outtraj[targ].close()
print("Finished Snapshot MD runs")
# Helper functions
[docs]def cycle_step_labels_and_restarts(seed,traj_label,prevdir,currdir,nextdir,
prevtarg,currtarg,nexttarg,prevstep,currstep,nextstep,
count,prevcount,rst_ext=None):
"""Move restart files around so that each step continues from the previous one"""
db_ext = '.db'
# Find name of next restart file
if (nextstep==count) and (nextdir is not None): # case for final step of non-final part
next_steplabel = f"{seed}_{targstr(nexttarg)}_{traj_label}_{nextdir}{0:04}"
next_dir = f"../{nextdir}/{next_steplabel}"
else:
next_steplabel = f"{seed}_{targstr(nexttarg)}_{traj_label}_{currdir}{nextstep:04}"
next_dir = next_steplabel
next_rst = f"{next_dir}/{next_steplabel}{rst_ext}"
# Find name of previous step restart file
if (currstep==0) and (prevdir != currdir): # case for first step of new non-initial part
prev_steplabel = f"{seed}_{targstr(prevtarg)}_{traj_label}_{prevdir}{prevcount-1:04}"
prev_dir = f"../{prevdir}/{prev_steplabel}"
else:
prev_steplabel = f"{seed}_{targstr(prevtarg)}_{traj_label}_{currdir}{prevstep:04}"
prev_dir = prev_steplabel
prev_rst = f"{prev_dir}/{prev_steplabel}{rst_ext}"
prev_db = f"{prev_dir}/{prev_steplabel}{db_ext}"
# Find name of current step restart file
curr_steplabel = f"{seed}_{targstr(currtarg)}_{traj_label}_{currdir}{currstep:04}"
curr_dir = curr_steplabel
curr_db = f"{curr_dir}/{curr_steplabel}{db_ext}"
curr_rst = f"{curr_dir}/{curr_steplabel}{rst_ext}"
#if not path.exists(next_dir) and rst_ext is not None:
# makedirs(next_dir)
# Skip this step if the next one had previously started
readonly = False
if path.isfile(next_rst):
readonly = True
# On final overall step, assume if there is an output file for the last step then
# we must have finished
if ((path.isfile(f"{curr_steplabel}.nwo") or path.isfile(f"{curr_steplabel}.out")) and
nextdir==None and currstep==count-1):
readonly = True
# If this step has not previously started, copy in restart file
cont = False
prev_rst_present = path.isfile(prev_rst)
curr_rst_present = path.isfile(curr_rst)
next_rst_present = path.isfile(next_rst)
if (path.isfile(prev_rst) and not path.isfile(curr_rst)):
if not path.exists(curr_dir) and rst_ext is not None:
makedirs(curr_dir)
print(f"Copying from {prev_rst} to {curr_rst}")
print(f"Copying from {prev_db} to {curr_db}")
shutil.copyfile(prev_rst,curr_rst)
shutil.copyfile(prev_db,curr_db)
cont = True
elif (path.isfile(prev_rst) and path.isfile(curr_rst) and not path.isfile(next_rst)):
print(f"Resuming from data in {curr_rst} and {curr_db}")
cont = True
# Debug printing of whether restart files are present
if False:
print(prevstep,targstr(prevtarg),prev_rst,prev_rst_present,'cont=',cont)
print(currstep,targstr(currtarg),curr_rst,curr_rst_present,'read=',readonly)
print(nextstep,targstr(nexttarg),next_rst,next_rst_present)
#print(step,curr_rst,path.isfile(curr_rst),cont)
return curr_steplabel, readonly, cont
# # Obtain a starting geometry
# In[ ]:
[docs]def find_initial_geometry(seed,geom_opt_func,calc_params,which_traj=None):
"""
Obtains a suitable initial geometry for the current seed and state.
Optimises it if not present.
seed: str
String indicating name of molecule, used to find xyz file
geom_opt_func: function
Wrapper function that runs a geometry optimisation
calc_params:
Control-parameters for the wrapper (for QM this is the basis, functional, and target;
for ML this is the calculator seed, suffix, prefix and target)
"""
# Construct name for input geometries
if 'target' in calc_params:
targ = calc_params['target']
seed_state_str = f"{seed}_{targstr(targ)}"
else:
seed_state_str = seed
xyzfile_opt = seed_state_str+'_opt.xyz'
model_init = None
# Look for trajectory of starting points
if which_traj is not None:
md_init_traj = f'{seed_state_str}_md_init.traj'
try:
traj = Trajectory(md_init_traj)
print(f'MD starting positions trajectory {md_init_traj} found: length {len(traj)}')
import random
random.seed(which_traj)
i = random.randrange(len(traj))
print(f'MD starting positions taken from snapshot {i} based on random seed {which_traj}')
model_init = traj[i]
optimised = True
except:
print(f'MD starting positions trajectory {md_init_traj} not found.')
# Look for existing xyz file with optimised geometry
if model_init is None:
try:
model_init = read(xyzfile_opt)
print("Optimised geometry found in file: ",xyzfile_opt)
optimised = True
except:
print("Optimised geometry not found, no file: ",xyzfile_opt)
xyzfile_unopt = seed+'.xyz'
print("Reading geometry from file: ",xyzfile_unopt)
model_init = read(xyzfile_unopt)
optimised = False
# Relax to find optimized geometry for this state if not already done
model = model_init.copy()
if not optimised:
print("Optimising geometry for "+seed)
energy, forces, positions = geom_opt_func(model,seed,calc_params,'default')
write(xyzfile_opt,model)
return model
# # Recalculate energies based on an existing input trajectory
# In[ ]:
[docs]def recalculate_trajectory(seed,target,traj_label,trajname_suffix,input_target,input_suffix,
singlepoint_func,calc_params,input_traj_range=None,output_traj_offset=0,
solvent=None):
"""
Loads snapshots from a trajectory and recalculates the energy and forces with the current settings
seed: str
String containing name of molecule and excited state index
target: int or None
Excited state index
traj_label: character or str
String labelling trajectory (usually A,B,C..)
"""
# Open input trajectory
input_traj = f"{seed}_{targstr(input_target)}_{traj_label}_{input_suffix}.traj"
if not path.isfile(input_traj):
raise Exception("Input trajectory not found: ",input_traj)
print(f"Reading from input trajectory {input_traj}")
intraj = Trajectory(input_traj)
if isinstance(target,list):
all_targets = target
else:
all_targets = [target]
if input_traj_range is None:
input_traj_range = range(0,len(intraj))
outtraj = {}
for targ in all_targets:
# Check if output trajectory already exists
output_traj = f"{seed}_{targstr(targ)}_{traj_label}_{trajname_suffix}.traj"
if output_traj_offset>0:
print(f"Not writing to output trajectory as range is subset of trajectory.")
print(f"Post-process to combine whole trajectory.")
outtraj[targ] = None
elif path.isfile(output_traj):
# Delete it and start afresh
print(f"Output trajectory {output_traj} already exists: Overwriting!")
remove(output_traj)
outtraj[targ] = Trajectory(output_traj,"w")
else:
# Create new file for output trajectory
print(f"Opening output trajectory {output_traj} for writing")
outtraj[targ] = Trajectory(output_traj,"w")
# Create directory for output files and change to it
wdir = trajname_suffix
origdir = getcwd()
if not path.exists(wdir):
makedirs(wdir)
chdir(wdir)
# Loop over and recalculate each trajectory point
for i in input_traj_range:
frame = intraj[i].copy()
frame.calc = intraj[i].calc
iout = i + output_traj_offset
try:
energy_in, forces_in = (frame.get_potential_energy(),frame.get_forces())
except:
# existing trajectory may have no calculator
energy_in = 0; forces_in = np.array([[0,0,0]]*len(frame))
cont = False
for targ in all_targets:
label = f"{seed}_{targstr(targ)}_{traj_label}_{trajname_suffix}{iout:04}"
readonly=False
if path.isfile(label+'.out') or path.isfile(label+'.nwo'):
readonly=True
try:
energy = None; forces = None;
if cont:
cycle_restarts(seed,traj_label,trajname_suffix,prevtarg,targ,iout,iout)
calc_params['target'] = targ
energy, forces, _ = singlepoint_func(frame,label,calc_params,
solvent=solvent,forces=True,continuation=cont,readonly=readonly)
cont = True # After first excitation, assume subsequently can restart
prevtarg = targ
print(iout,targ,energy,frame.positions[0],forces[0],frame.calc.results['dipole'])
except KeyboardInterrupt:
raise Exception('Keyboard Interrupt')
# Supply keyword dipole to ensure it gets written or fails
if outtraj[targ] is not None:
outtraj[targ].write(frame,dipole=frame.calc.results['dipole'])
# Return to base directory and close trajectories
chdir(origdir)
for targ in all_targets:
if outtraj[targ] is not None:
outtraj[targ].close()
[docs]def cycle_restarts(seed,traj_label,trajname_suffix,prevtarg,currtarg,prevstep,currstep):
"""Move db files around so that each step continues from the previous one"""
db_ext = '.db'
# Find name of previous step file
prev_steplabel = f"{seed}_{targstr(prevtarg)}_{traj_label}_{trajname_suffix}{prevstep:04}"
prev_dir = prev_steplabel
prev_db = f"{prev_dir}/{prev_steplabel}{db_ext}"
# Find name of current step restart file
curr_steplabel = f"{seed}_{targstr(currtarg)}_{traj_label}_{trajname_suffix}{currstep:04}"
curr_dir = curr_steplabel
curr_db = f"{curr_dir}/{curr_steplabel}{db_ext}"
# Copy in db file
cont = False
prev_db_present = path.isfile(prev_db)
curr_db_present = path.isfile(curr_db)
if (prev_db_present and not curr_db_present):
if not path.exists(curr_dir) and db_ext is not None:
makedirs(curr_dir)
print(f"Copying from {prev_db} to {curr_db}")
shutil.copyfile(prev_db,curr_db)
# # Other tools to manage trajectories
# In[ ]:
import string
[docs]def get_trajectory_list(ntraj):
"""
Returns a list of characters to be used as trajectory labels
Currently ABCDEDFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz (52)
"""
return list(string.ascii_uppercase+string.ascii_lowercase)[0:ntraj]
# Merge trajectories (if generated separately)
[docs]def merge_traj(trajnames,trajfile):
"""
Merges a list of trajectories supplied as a list of filenames,
and writes the result to another trajectory supplied as a filename
"""
fulltraj = Trajectory(trajfile,'w')
for tr in trajnames:
read_traj = Trajectory(tr)
for frames in read_traj:
fulltraj.write(frames)
print("# Merged ",len(fulltraj)," frames: trajectory written to ",trajfile)
[docs]def split_traj(input_traj_file,output_trajs=None,ntrajs=None,randomise=False,start=0,end=-1):
"""
Takes in a trajectory filename, and splits it into sections, randomly or sequentially
"""
import numpy as np
if output_trajs is not None:
if ntrajs is not None:
if ntrajs!=len(output_trajs):
raise Exception('Contradictory inputs to split_traj')
else:
ntrajs = len(output_trajs)
if ntrajs is None and output_trajs is None:
raise Exception('No outputs specified')
if output_trajs is None:
output_trajs = [input_traj_file.replace('.traj','_')+str(i)+'.traj' for i in range(ntrajs)]
intraj = Trajectory(input_traj_file)
outtraj = [Trajectory(output_trajs[i],"w") for i in range(ntrajs)]
total = len(intraj)
if end > total:
raise Exception(f'In split_traj, end was {end}, but trajectory was only size {total}')
if end<0:
end = max(total - end - 1,0)
start = max(start,0)
total = end-start
if randomise:
order = np.random.permutation(total)
for j in range(start,end):
k = int((j-start)*ntrajs/total)
jp = j
if randomise:
jp = order[j]
print(end,start,j,k)
outtraj[k].write(intraj[j])
for i in range(ntrajs):
outtraj[i].close()
def atom_energies(atom_traj):
e_at = {}
for t in atom_traj:
if (len(t)>1):
raise Exception(f'This is not a trajectory of single-atom frames: len(t)={len(t)}')
at = t[0]
j = at.symbol
e_at[j] = t.get_potential_energy()
return e_at
def atom_energy(atoms,e_at):
e = 0
for at in atoms.get_chemical_symbols():
e = e + e_at[at]
return e
[docs]def fit_atom_energies_to_traj(traj,atom_traj_file):
"""
Uses linear regression to best fit atom energies to trajectory with varying numbers
of atoms of each species
"""
[docs]def subtract_atom_energies_from_traj(traj,atom_traj,trajout):
"""
Subtracts the energies associated with isolated atoms from the total energies
in a trajectory
"""
e_at = atom_energies(atom_traj)
for t in traj:
e_raw = t.get_potential_energy()
t.calc.results["energy"] = e_raw - atom_energy(t,e_at)
trajout.write(t)
# Compare two trajectories and plot the difference
[docs]def compare_traj_to_traj(trajx,trajy,plot_file=None,xlabel=None,ylabel=None):
"""
Compares two trajectories to each other and calculates statistics for how much they differ in energy and force
trajx: ASE Trajectory
Trajectory whose energy is plotted along x-axis
trajy: ASE Trajectory
Trajectory whose energy is plotted along y-axis
plot_file:
Filename to write plot image to.
xlabel, ylabel: str
Axis labels for plots.
"""
import numpy as np
e_x = []
e_y = []
f_x = []
f_y = []
rms_fd = []
max_fd = []
for i,framex in enumerate(trajx):
# Read in total energy and forces from trajectories
e_x.append(framex.get_potential_energy())
f_x.append(framex.get_forces())
framey = trajy[i]
e_y.append(framey.get_potential_energy())
f_y.append(framey.get_forces())
# Calculate RMS and Max force errors
rms_fd.append(np.sqrt(np.mean((f_x[-1]-f_y[-1])**2)))
max_fd.append(np.max(np.sqrt((f_x[-1]-f_y[-1])**2)))
# Calculate Root Mean Square energy error
diff = np.array(e_x)-np.array(e_y)
rms_e_err = np.sqrt((1./len(diff))*np.sum(diff**2))
max_e_err = np.max(np.abs(diff))
print(f"# RMS_dE,MAX_dE {rms_e_err:20.12f} {max_e_err:20.12f}")
# Calculate Root Mean Square of of RMS force difference
rms_fd = np.array(rms_fd)
rms_fd_err = np.sqrt((1./len(rms_fd))*np.sum(rms_fd**2))
max_fd_err = np.max(np.abs(rms_fd))
print(f"# RMS_dF,MAX_dF {rms_fd_err:20.12f} {max_fd_err:20.12f}")
stats = {"RMS dE (eV) ": rms_e_err,
"RMS dF (eV/A)": rms_fd_err,
"MAX dE (eV) ": max_e_err,
"MAX dF (eV/A)": max_fd_err}
clabel = 'RMS Force component deviation (eV/Ang)'
plot_diff(e_x,e_y,rms_fd,xlabel,ylabel,clabel,stats,plot_file)
return rms_e_err,max_e_err,rms_fd_err,max_fd_err,e_x,e_y,rms_fd
# Scatter Plot
from matplotlib import pyplot
def plot_diff(e_x,e_y,rms_fd,xlabel=None,ylabel=None,clabel=None,stats={},plot_file='show',align_axes=True):
# Optionally, plot to file or screen
if plot_file is None:
return
# Set up
fig, ax = pyplot.subplots()
# Plot data
im = ax.scatter(e_x,e_y,c=rms_fd)
cb = fig.colorbar(im)
cb.set_label(clabel)
# Plot y=x for comparison
if align_axes:
lims = [np.min([ax.get_xlim(), ax.get_ylim()]),
np.max([ax.get_xlim(), ax.get_ylim()])]
ax.plot(lims, lims, 'k-', alpha=0.75, zorder=0)
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
stat_str=""
for s in stats:
stat_str += f'{s}: {stats[s]:10.4f}\n'
ax.text(0.02,0.98,stat_str,transform=ax.transAxes,
horizontalalignment='left',verticalalignment='top',)
if plot_file=='show':
fig.show()
else:
fig.savefig(plot_file)
# In[ ]:
# #TEMP: testing for subtract_atom_energies_from_traj
# if True:
# import os
# from ase.io import Trajectory
# os.chdir("/home/theory/phspvr/cate_qmd/")
# atom_traj = Trajectory('cate_atoms.traj')
# traj = Trajectory('cate_gs_B_training.traj')
# print(len(traj))
# trajout = Trajectory('cate_gs_B_training_atsub.traj','w')
# subtract_atom_energies_from_traj(traj,atom_traj,trajout)
# trajout.close()
# In[ ]:
# In[ ]: