#!/usr/bin/env python
# coding: utf-8
# In[ ]:
"""
Drivers to run ESTEEM tasks in serial or parallel on a range of inputs.
These inputs will consist of solutes, solvents or pairs of solutes and solvent, depending on the task.
"""
# # Top-level Driver
# In[ ]:
import argparse
from esteem import parallel
from esteem.trajectories import get_trajectory_list
[docs]def main(all_solutes,all_solvents,all_solutes_tasks={},all_solvate_tasks={},
all_clusters_tasks={},all_spectra_tasks={},all_qmd_tasks={},
all_mltrain_tasks={},all_mltest_tasks={},all_mltraj_tasks={},
make_script=parallel.make_sbatch):
"""
Main driver routine which chooses other tasks to run as appropriate.
Control of what driver is actually called depends on command-line arguments with which the script
was invoked: 'python <seed>.py <task> <seed> <target>'
all_solutes: dict of strings
Keys are shortnames of the solutes. Entries are the full names.
all_solvents: dict of strings
Keys are shortnames of the solvents. Entries are the full names.
all_solutes_tasks: namespace or class
Argument list for the Solutes task - see solutes_driver documentation below for more detail.
all_solvate_tasks: namespace or class
Argument list for the Solvate task - see solvate_driver documentation below for more detail.
all_clusters_tasks: namespace or class
Argument list for the Clusters task - see clusters_driver documentation below for more detail.
all_spectra_tasks: namespace or class
Argument list for the Spectra task - see spectra_driver documentation below for more detail.
make_script: dict of strings
Routine that can write a job submission script. Usually parallel.make_sbatch.
"""
import sys
import time
from datetime import datetime
time_stamp = time.time()
date_time = datetime.fromtimestamp(time_stamp)
str_date_time = date_time.strftime("%d-%m-%Y, %H:%M:%S")
print(f'# ESTEEM version 1.0.0 started at {str_date_time}\n')
# Parse arguments to script
parser = argparse.ArgumentParser(description=f'ESTEEM Main driver script')
parser.add_argument('task')
parser.add_argument('seed',default=None,type=str)
parser.add_argument('target',nargs='?',default=None,type=str)
args = parser.parse_args()
task = args.task
scriptname = sys.argv[0].replace(".py","")
if args.seed is None:
import sys
args.seed = scriptname
seed = args.seed
target = args.target
# Special task: set up scripts
if ('scripts' in task.split()):
# case where many SolutesTask have been defined
if isinstance(all_solutes_tasks,dict):
for targ in all_solutes_tasks:
if target is not None and targ!=target:
continue
all_solutes_tasks[targ].script_settings['scriptname'] = scriptname
solutes_script_settings = all_solutes_tasks[targ].script_settings
make_script(seed=seed,task='solutes',target=targ,**solutes_script_settings)
make_script(seed=seed,task='solvents',target=targ,**solutes_script_settings)
else: # just one SolutesTask
all_solutes_tasks.script_settings['scriptname'] = scriptname
solutes_script_settings = all_solutes_tasks.script_settings
make_script(seed=seed,task='solutes',target=target,**solutes_script_settings)
make_script(seed=seed,task='solvents',target=target,**solutes_script_settings)
for tasks in [all_solvate_tasks,all_clusters_tasks,all_qmd_tasks,all_spectra_tasks,
all_mltrain_tasks,all_mltest_tasks,all_mltraj_tasks]:
if tasks is None or tasks=={}:
continue
if isinstance(tasks,dict):
for targ in tasks:
if target is not None and targ!=target:
continue
print(tasks[targ].script_settings)
tasks[targ].script_settings['scriptname'] = scriptname
make_script(seed=seed,task=tasks[targ].task_command,target=targ,**tasks[targ].script_settings)
else:
tasks.script_settings['scriptname'] = scriptname
make_script(seed=seed,task=tasks.task_command,target=target,**tasks.script_settings)
return
# Now run actual task
if ('solutes' in task.split()):
solutes_task = get_actual_args(all_solutes_tasks,target,'solutes')
solutes_driver(all_solutes,all_solvents,solutes_task)
if ('solvents' in task.split()):
solutes_task = get_actual_args(all_solutes_tasks,target,'solvents')
solvents_driver(all_solvents,solutes_task)
if ('solvate' in task.split()):
solvate_task = get_actual_args(all_solvate_tasks,target,'solvate')
solvate_task.script_settings['target'] = target
solvate_task.script_settings['scriptname'] = scriptname
solvate_driver(all_solutes,all_solvents,seed,solvate_task,make_script)
if (any('clusters' in t for t in task.split()) or any('mlclus' in t for t in task.split())):
clusters_task = get_actual_args(all_clusters_tasks,target,'clusters')
clusters_task.script_settings['target'] = target
clusters_task.script_settings['scriptname'] = scriptname
if any('setup' in t for t in task.split()):
dryrun = True
else:
dryrun = False
clusters_driver(all_solutes,all_solvents,seed,clusters_task,make_script,dryrun=dryrun)
if 'atoms' in task.split():
atomen_task = get_actual_args(all_qmd_tasks,target,'qmd')
atomen_task.seed = seed
atomen_task.target = target
atom_energies_driver(atomen_task)
if 'qmd' in task.split():
qmd_task = get_actual_args(all_qmd_tasks,target,'qmd')
qmd_task.seed = seed
qmd_driver(qmd_task,all_solutes,all_solvents)
if ('train' in task.split()) or ('mltrain' in task.split()):
mltrain_task = get_actual_args(all_mltrain_tasks,target,'mltrain')
mltrain_task.seed = seed
mltrain_driver(mltrain_task,all_solutes,all_solvents)
if ('test' in task.split()) or ('mltest' in task.split()):
mltest_task = get_actual_args(all_mltest_tasks,target,'mltest')
mltest_task.seed = seed
mltest_driver(mltest_task,all_solutes,all_solvents)
if ('traj' in task):
cleanup_only = True if 'cleanup' in task else False
mltraj_task = get_actual_args(all_mltraj_tasks,target,'mltraj')
mltraj_task.seed = seed
mltraj_driver(mltraj_task,all_solutes,all_solvents,cleanup_only=cleanup_only)
if ('spectra' in task.split()):
spectra_task = get_actual_args(all_spectra_tasks,target,'spectra')
#warp_params = spectral_warp_driver(all_solutes,all_solvents,spectra_task)
spectra_task.seed = seed
spectra_driver(all_solutes,all_solvents,spectra_task,warp_params=None)
time_stamp_end = time.time()
date_time = datetime.fromtimestamp(time_stamp_end)
str_date_time = date_time.strftime("%d-%m-%Y, %H:%M:%S")
print(f'\n# ESTEEM finished at {str_date_time} ({time_stamp_end-time_stamp:12.3f} seconds elapsed)')
# Helper function that extracts the appropriate element from a dictionary
def get_actual_args(all_args,target,task):
if isinstance(all_args,dict):
try:
args = all_args[target]
except KeyError as e:
print(f"all_{task}_args[{target}] not found. Possible values:")
all_keys = list(all_args.keys())
all_keys.sort()
# First take "which_traj" off the end of they key, if it is present
many_trajs = get_trajectory_list(26*27)
prev_stem = ""
key_suffix = ""
for key in all_keys:
key_parts = key.split('_')
print(key,key_parts)
prev_key_suffix = key_suffix
if key_parts[-1] in many_trajs:
key_stem = '_'.join(key_parts[0:-1])
key_suffix = key_parts[-1]
else:
key_stem = key
key_suffix = ""
if key_stem!=prev_stem:
if prev_key_suffix!="":
print("], ",end="")
print(key_stem,end="")
if key_suffix!="":
print("_[",end="")
prev_stem = key_stem
print(key_suffix+",",end="")
if prev_key_suffix!="":
print("]\n")
else:
print("\n")
raise e
else:
args = all_args
return args
# In[ ]:
# # Run calculations on individual Solute molecules
# In[ ]:
"""
Drivers to run ESTEEM tasks in serial or parallel on a range of inputs
These inputs will consist of solutes, solvents or pairs of solutes and solvent, depending on the task
"""
from esteem.tasks import solutes
from esteem import parallel
from os import path,chdir,makedirs
from shutil import copytree
[docs]def solutes_driver(all_solutes,all_solvents,task):
"""
Driver to run a range of DFT/TDDFT calculations on a range of solutes.
If the script is run as an array task, it performs the task for the
requested solute only.
all_solutes: dict of strings
Keys are shortnames of the solutes. Entries are the full names.
all_solvents: dict of strings
Keys are shortnames of the solvents (implicit only, here). Entries are the full names.
task: SolutesTask class
Argument list for the whole Solutes job - see Solutes module documentation for more detail.
Arguments used only within the driver routine include:
``task.directory``: Directory prefix for where output of this particular 'target' of the Solutes
calculation will be written
"""
base_path = path.basename(getcwd())
if task.directory is not None and base_path != task.directory:
if not path.exists(task.directory):
print(f'Creating directory {task.directory}')
try:
makedirs(task.directory)
except FileExistsError:
print(f"# Possible mid-air collision between jobs - directory {task.directory} exists")
if not path.exists(task.directory+'/xyz'):
if path.exists('xyz'):
copytree('xyz', task.directory+'/xyz')
chdir(task.directory)
# Determine if we are in a job array, in which case just run one pair
task_id = parallel.get_array_task_id()
if task_id is None:
solutes_to_run = all_solutes.copy() # run everything
else:
solute = list(all_solutes)[task_id]
solutes_to_run = {solute: all_solutes[solute]} # run just one solute
# Geom opts and TDDFT for solutes, then implicit solvent versions in each solvent
if not all_solvents:
all_solvents_to_run = [None]
else:
all_solvents_to_run = all_solvents.copy()
for solvent in all_solvents_to_run:
task.solvent = solvent
if task.solvent_settings is not None and task.solvent is not None:
task.solvent = task.solvent_settings
task.solvent['solvent'] = solvent
# if the solute is also a solvent, do not run this solute in any other solvents
solute_namelist = task.get_xyz_files(solutes_to_run,"xyz")
solute_namelist = [solu2 for solu2 in solute_namelist if (solu2==solvent or solu2 not in all_solvents)]
task.run(solute_namelist)
if task.directory is not None:
chdir("..")
# # Run calculations on individual Solvent molecules
# In[ ]:
from esteem.tasks import solutes
[docs]def solvents_driver(all_solvents,task):
"""
Driver to run a range of DFT/TDDFT calculations on a range of solvent molecules.
See the Solutes module documentation for more info on what tasks are run.
all_solutes: dict of strings
Keys are shortnames of the solutes. Entries are the full names.
all_solvents: dict of strings
Keys are shortnames of the solvents. Entries are the full names.
task: SolutesTask class
Argument list for the whole Solutes job - see Solutes module documentation for more detail.
Arguments used only within the driver routine include:
``task.directory``: Directory prefix for where output of this particular 'target' of the Solutes
calculation will be written
wrapper: namespace or class
Functions that will be used in the Solutes task - see Solutes module documentation for more detail.
"""
if task.directory is not None:
if not path.exists(task.directory):
print(f'# Creating directory {task.directory}')
try:
makedirs(task.directory)
except FileExistsError:
print(f"# Possible mid-air collision between jobs - directory {task.directory} exists")
if not path.exists(task.directory+'/xyz'):
print(f'# Copying xyz directory from current path to {task.directory}')
if path.exists('xyz'):
copytree('xyz', task.directory+'/xyz')
print(f'# Changing directory to {task.directory}')
chdir(task.directory)
solvent_namelist = task.get_xyz_files(all_solvents,"xyz")
# Geom opts and TDDFT for solvents (in implicit solvent of self)
for solvent in all_solvents:
task.solvent = solvent
if task.solvent_settings is not None:
task.solvent = task.solvent_settings
task.solvent['solvent'] = solvent
task.run([solvent])
if task.directory is not None:
print(f'# Returning to parent directory')
chdir("..")
# # Calculate Range Separation Parameters
# In[ ]:
# Temporarily here till moved elsewhere
def range_sep_driver(all_solutes,all_solvents,task,wrapper):
import os
from ase.units import Hartree as Ha
if task.directory is not None:
if not path.exists(task.directory):
raise Exception(f"# Directory not present: {task.directory} (must run solutes task first)")
chdir(task.directory)
#from esteem.tasks.solutes import find_range_sep
solutes = ["AQS"]
solvent = {'solvent': 'watr', 'cosmo_smd': True, 'cosmo_vem': 0}
func = "LC-PBE0"
basis = "6-311++G**"
charges={"AH2QS": -1, "AQS": -1}
rs_range = [0.05,0.1,0.15,0.2,0.25,0.3]
energy,calc,evals,occs=find_range_sep(solutes,"is_opt_watr","rs_opt",nw,basis=basis,
func=func,solvent=solvent,charges=charges,
rs_range=rs_range,all_readonly=True)
ehomo = {}
elumo = {}
seed = solutes[0]
all_rs = []
all_Ja = []
all_Jb = []
all_J2 = []
for rsf in rs_range:
rs = f'{rsf:.2f}'
if seed in charges:
charge = charges[seed]
else:
charge = 0
for c in [charge+1,charge-1,charge]:
# Assume when c==charge, system is closed-shell
if c != charge:
occfac = 0.99
s = 0.5
else:
occfac = 1.99
s = 0
ih = max(np.argwhere(occs[rs,c]>occfac))[0]
il = min(np.argwhere(occs[rs,c]<occfac))[0]
ehomo[rs,c] = (evals[rs,c])[ih]
elumo[rs,c] = (evals[rs,c])[il]
#except:
# print('fail for rs=',rs)
all_rs.append(float(rs))
IP = energy[rs,charge+1] - energy[rs,charge]
EH = ehomo[rs,charge]
all_Ja.append((EH+IP)**2)
IPm = energy[rs,charge] - energy[rs,charge-1]
EHm = ehomo[rs,charge-1]
print(f'E_H-IP for rs={rs},Q={charge} is {EH:0.3f}+{IP:0.3f} = {EH+IP:0.3f}',
f', Q={charge-1} is {EHm:0.3f}+{IPm:0.3f} = {EHm+IPm:0.3f}')
all_Jb.append((EHm+IPm)**2)
all_J2.append((EHm+IPm)**2+all_Ja[-1])
#print(f'-IP for rs={rs},Q={charge} is {-IP}')
import matplotlib.pyplot as plt
plt.plot(all_rs,all_Ja)
#plt.plot(all_rs,all_Jb)
#plt.plot(all_rs,all_J2)
plt.show()
if task.directory is not None:
chdir("..")
# # Run MD calculations on all pairs of solvent + solute
# In[ ]:
from esteem.tasks import solvate
from shutil import copyfile
import itertools
from os import path,chdir,makedirs
[docs]def solvate_driver(all_solutes,all_solvents,seed,task,make_sbatch=None):
"""
Driver to run set up and run MD on solvated boxes containing a solute molecule and solvent.
If the script is run as an array task, it performs the task for the requested solute/solute pair
only, out of the nsolutes * nsolvents possible combinations.
all_solutes: dict of strings
Keys are shortnames of the solutes. Entries are the full names.
all_solvents: dict of strings
Keys are shortnames of the solvents. Entries are the full names.
task: SolvateTask class
Argument list for the whole clusters job - see Solvate module documentation for more detail.
Arguments used within this routine include:
``task.md_suffix``: Directory suffix for where the MD runs will take place
``task.md_geom_prefix``: Directory prefix for where the geometries from the Solutes calculation should be obtained.
wrapper: class
Wrapper that will be used in the Solvate task - see Solvate module documentation for more detail.
"""
md_suffix = task.md_suffix
# Make list of all combinations of solvent and solute
all_pairs = list(itertools.product(all_solvents.items(),all_solutes.items()))
for i,((solvent,fullsolvent),(solute,fullsolute)) in enumerate(all_pairs):
if solute in all_solvents and (solute!=solvent):
del all_pairs[i]
all_paths = [f'{solute}_{solvent}_{md_suffix}' for (solvent,_),(solute,_) in all_pairs]
base_path = path.basename(getcwd())
# Determine if we are in a job array, in which case just run one pair
task_id = parallel.get_array_task_id()
if task_id is not None:
all_pairs = [all_pairs[task_id]]
# Iterate over pairs of solvent and solute
for i,((solvent,fullsolvent),(solute,fullsolute)) in enumerate(all_pairs):
sol_sol_str = f'{solute} ({fullsolute}) in {solvent} ({fullsolvent})'
md_path = f'{solute}_{solvent}_{md_suffix}'
if solute in all_solvents and (solute!=solvent):
continue
# If we are in the parent directory of this project, or a subdirectory with
# appropriate symlinks, setup clusters subdirectories if not already present
if base_path not in all_paths:
task_str = f'# Task {i} of {len(all_pairs)}'
print(f'{task_str}: Setting up solvate calculation for {sol_sol_str}\n')
# Set up files and directories for MD run
if not path.exists(md_path):
print(f'# Creating directory {md_path}')
try:
makedirs(md_path)
except FileExistsError:
print(f"# Possible mid-air collision between jobs - directory {md_path} exists")
# See if a geometry file for a "complex" exists, or the solvent and/or solute geometries
md_geom_prefix = task.md_geom_prefix
if solvent is not None:
md_geom_prefix = md_geom_prefix + '/is_opt_{solv}'
else:
md_geom_prefix = md_geom_prefix + '/opt'
md_geom_prefix = sub_solu_solv_names(md_geom_prefix,f'{solute}_{solvent}',
all_solutes,all_solvents)
# Copy in optimised geometry of solute in implicit solvent
try:
infile = f'{md_geom_prefix}/{solute}.xyz'
outfile = f'{md_path}/{solute}.xyz'
copyfile(infile,outfile)
except FileNotFoundError:
print(f'# Optimised geometry for {solute} in {solvent} not found')
print(f'# Expected: {infile}')
continue
# Copy in optimised geometry of solvent in implicit solvent
try:
infile = f'{md_geom_prefix}/{solvent}.xyz'
outfile = f'{md_path}/{solvent}.xyz'
copyfile(infile,outfile)
except FileNotFoundError:
print(f'# Optimised geometry for {solvent} not found')
print(f'# Expected: {infile}')
continue
print(f'# Changing directory to {md_path}')
chdir(md_path)
# Write job script for submission to HPC cluster
if make_sbatch is not None:
task.script_settings['jobname'] = f'{solute}_{solvent}_{md_suffix}'
task.script_settings['execpath'] = '../'
make_sbatch(seed=seed,task='solvate',**task.script_settings)
# Go back to base directory
print(f'# Returning to parent directory\n\n')
chdir('..')
# We are in a subdirectory already so run a particular calculation
elif base_path==md_path:
# Now run MD in md_path
task.solute = solute
task.solvent = solvent
# If we provided a dictionary for the boxsize, find the specific entry that we need
if isinstance(task.boxsize,dict):
if solvent in task.boxsize:
task.boxsize = task.boxsize[solvent]
else:
raise Exception(f"task.boxsize is a dictionary but contains no entry for solvent '{solvent}'")
task.setup_amber()
task.run()
# # Run Clusters script on MD snapshots to extract region near solute
# In[ ]:
from shutil import copyfile
import itertools
from os import path,chdir,makedirs,getcwd,symlink
[docs]def clusters_driver(all_solutes,all_solvents,seed,task,make_sbatch=None,dryrun=False):
"""
Driver to extract isolated clusters from solvated models, for a range of solute/solvent pairs.
Takes MD results from the directory {task.md_prefix} and performs excitation
calculation in the directory {solute}_{solvent}_{task.exc_suffix}.
If invoked from the base directory, rather than the excitation directory, it sets up the
excitation directory and writes a job script then exits.
If invoked from the excitation directory, it performs the excitation calculations for all
the extracted clusters. If the script is run as an array task, it performs the task for the
requested cluster only.
*Arguments*
all_solutes: dict of strings
Keys are shortnames of the solutes. Entries are the full names.
all_solvents: dict of strings
Keys are shortnames of the solvents. Entries are the full names.
seed: str
Overall 'seed' name for the run - used in creation of job scripts for the calculation
task: ClustersTask class
Argument list for the whole clusters job - see Clusters module documentation for more detail.
Arguments used predominantly in the driver rather than the task main routine include:
``task.md_prefix``: Directory where MD outputs are to be found.
``task.md_suffix``: Suffix of MD output trajectories.
``task.exc_suffix``: Directory where results of Cluster excitation calculations will go.
make_sbatch: function
Function that writes a job submission script for the clusters jobs
*Output*:
On first run, from the base directory of the project, the script will create subdirectories
for each solute-solvent pair, with path '{solute}_{solvent}_{exc_suffix}'. The default value
of ``exc_suffix`` is 'exc'.
To each directory, this routine will copy the trajectory file
'{md_prefix}/{solute}_{solvent}_{md_suffix}.traj'
which consists of ``task.nsnaps`` snapshots.
It will then create a job script using ``make_sbatch`` and ``settings`` with the
name '{solute}_{solvent}_{exc_suffix}_sub' and exit.
The user then needs to run the individual job scripts from each subdirectory, probably
as a job array: the array task ID should range from 0 to ``task.nsnaps - 1``
The output of those runs will be the excited state energies of the clusters, which
can be averaged over in the Spectra task.
"""
# Iterate over all pairs of solute and solvent, unless in a specific directory already,
# in which case just run that pair
# First take "which_traj" off the end of exc_suffix, if it is present
if hasattr(task,'exc_dir_suffix'):
exc_dir_suffix = task.exc_dir_suffix
else:
exc_dir_suffix = task.exc_suffix
exc_parts = exc_dir_suffix.split('_')
if exc_parts[-1] == task.which_traj:
exc_dir_suffix = '_'.join(exc_parts[0:-1])
else:
exc_dir_suffix = exc_dir_suffix
all_pairs = list(itertools.product(all_solvents.items(),all_solutes.items()))
all_paths = [f'{solute}_{solvent}_{exc_dir_suffix}' for (solvent,_),(solute,_) in all_pairs]
from re import sub
all_paths = [sub("_None","",p) for p in all_paths]
base_path = path.basename(getcwd())
orig_path = getcwd()
for i,((solvent,fullsolvent),(solute,fullsolute)) in enumerate(all_pairs):
sol_sol_str = f'{solute} ({fullsolute}) in {solvent} ({fullsolvent})'
if solvent is None:
sol_sol_str = f'{solute} ({fullsolute})'
clus_path = f'{solute}_{solvent}_{exc_dir_suffix}' if solvent is not None else f'{solute}_{exc_dir_suffix}'
md_path = sub_solu_solv_names(task.md_prefix,f'{solute}_{solvent}',all_solutes,all_solvents)
if solute in all_solvents and (solute!=solvent):
continue
# We are in the parent directory of this project, so setup clusters
# subdirectories if not already present
if base_path not in all_paths:
# Check that MD has finished - skip to next file if not
if not isinstance(task.md_suffix,list):
md_suffix = [task.md_suffix]
else:
md_suffix = task.md_suffix
trajfile = {}
if not path.exists(md_path):
print(f'\n# Skipping Clusters for: {sol_sol_str} - MD directory not found')
print(f'# Expected: {md_path}')
continue
else:
for mds in md_suffix:
trajfile[mds] = f'{md_path}/{solute}_{solvent}_{mds}.traj'
if solvent is None:
trajfile[mds] = f'{md_path}/{solute}_{mds}.traj'
if not all([path.exists(trajfile[mds]) for mds in trajfile]):
print(f'\n# Skipping Clusters for: {sol_sol_str} - trajectory file(s) not found')
for trajf in trajfile:
if not path.exists(trajfile[trajf]):
print(f'Not found: {trajfile[trajf]}')
continue
print(f'\nTask {i} of {len(all_pairs)}: Setting up clusters calculation for {sol_sol_str}')
if not path.exists(clus_path):
print(f'# Creating directory {clus_path}')
try:
makedirs(clus_path)
except FileExistsError:
print(f"# Possible mid-air collision between jobs - directory {clus_path} exists")
# Create symlinks to trajectory files in excitations directory
for mds in md_suffix:
trajlink = f'{solute}_{solvent}_{mds}.traj'
if solvent is None:
trajlink = f'{solute}_{mds}.traj'
if not (path.isfile(f'{clus_path}/{trajlink}') or path.islink(f'{clus_path}/{trajlink}')):
chdir(clus_path)
print(f'# Creating link from ../{trajfile[mds]} to {trajlink}')
symlink('../'+trajfile[mds],trajlink)
chdir(orig_path)
# Copy the geometries of the solute and solvent to the excitations directory
# so they can be used in cluster extraction
try:
copyfile(f'{md_path}/{solute}.xyz',f'{clus_path}/{solute}.xyz')
except FileNotFoundError:
print(f'# Geometry file {md_path}/{solute}.xyz not found')
except PermissionError:
print(f'# Could not write to {clus_path}/{solute}.xyz (PermissionError)')
if solvent is not None:
try:
copyfile(f'{md_path}/{solvent}.xyz',f'{clus_path}/{solvent}.xyz')
except FileNotFoundError:
print(f'# Geometry file {md_path}/{solvent}.xyz not found')
except PermissionError:
print(f'# Could not write to {clus_path}/{solvent}.xyz (PermissionError)')
print(f'# Changing directory to {clus_path}')
chdir(clus_path)
# Write job script for submission to HPC cluster
if make_sbatch is not None:
whichstr = f'_{task.which_traj}' if task.which_traj is not None else ''
task.script_settings['jobname'] = f'{solute}_{solvent}_{task.exc_suffix}{whichstr}'
if solvent is None:
task.script_settings['jobname'] = f'{solute}_{task.exc_suffix}{whichstr}'
task.script_settings['execpath'] = '../'
seed = f'{solute}_{solvent}' if solvent is not None else solute
make_sbatch(seed=seed,task='clusters',**task.script_settings)
# Go back to base directory
chdir(orig_path)
# We are in a subdirectory already so run a particular calculation
elif clus_path==base_path:
whichstr = f' for trajectory {task.which_traj}' if task.which_traj is not None else ''
print(f'\n# Processing clusters for {sol_sol_str}{whichstr}')
# Prepare Cluster excitation calculation
task.solute = solute
task.solvent = solvent
if task.solute == task.solvent:
reset_roots = False
if type(task.target)!=list:
if task.nroots > task.target:
reset_roots = True
else:
if task.nroots > min(task.target):
task.target = min(task.target)
reset_roots = True
if reset_roots:
print(f'\n# Warning: solute==solvent, so assuming excitations are not required. Setting nroots=0')
task.nroots = 0
if isinstance(task.impsolv,dict):
task.impsolv['solvent'] = solvent
else:
task.impsolv = solvent
# If we provided a dictionary for the radius, find the specific entry that we need
if isinstance(task.radius,dict):
if solvent in task.radius:
task.radius = task.radius[solvent]
else:
raise Exception(f"task.radius is a dictionary but contains no entry for solvent '{solvent}'")
if hasattr(task,'calc_seed'):
if task.calc_seed is not None:
seed = f'{solute}_{solvent}' if solvent is not None else solute
task.calc_seed = sub_solu_solv_names(task.calc_seed,seed,all_solutes,all_solvents)
task.task_id = parallel.get_array_task_id()
# Finally, actually run the task
task.run(dryrun=dryrun)
# In[1]:
def get_solu_solv_names(seed):
# Guess solute and solvent names from seed name
solu = seed.split("_")[0]
try:
solv = seed.split("_")[1]
except:
solv = "NO_SOLVENT_FOUND"
return solu,solv
def sub_solu_solv_names(string,seed,all_solutes,all_solvents):
solu,solv = get_solu_solv_names(seed)
string_sub = string
if '{seed}' in string_sub:
string_sub = string_sub.replace("{seed}",seed)
if '{solu}' in string_sub:
if solu in all_solutes:
string_sub = string_sub.replace("{solu}",solu)
if '{solv}' in string_sub:
if solv in all_solvents:
string_sub = string_sub.replace("{solv}",solv)
return string_sub
def merge_xyzs(solu,solv,seed_xyz,merge_path='.'):
from os import path
from ase.io import read,write
solu_xyz = f'{merge_path}/{solu}.xyz'
solv_xyz = f'{merge_path}/{solv}.xyz'
print(f'# Merging {solu_xyz} and {solv_xyz}, writing to {seed_xyz}')
if ((path.isfile(solu_xyz) or path.islink(solu_xyz)) and
(path.isfile(solv_xyz) or path.islink(solv_xyz))):
solu_atoms = read(solu_xyz)
solv_atoms = read(solv_xyz)
solv_atoms.translate([20.0,0.0,0.0])
solu_solv_atoms = solu_atoms + solv_atoms
write(seed_xyz,solu_solv_atoms)
return
else:
print(f'# Source files {solu}.xyz and {solv}.xyz not found in {merge_path}')
return
def atom_energies_driver(atomen):
from ase.io import read, Trajectory
from ase import Atoms
from esteem.tasks.clusters import get_ref_mol_energy
# Determine if we are in a job array
task_id = parallel.get_array_task_id()
if task_id is None:
task_id = 0
# Guess solute and solvent names from seed name
solu,solv = get_solu_solv_names(atomen.seed)
base_path = path.basename(getcwd())
wrapper_is_ml = False
if wrapper_is_ml:
calc_params = {'calc_seed': atomen.seed,
'calc_suffix': atomen.calc_suffix,
'calc_dir_suffix': atomen.calc_dir_suffix,
'calc_prefix': '',
'target': atomen.target}
else:
# QM Wrapper
calc_params = {'basis':atomen.basis,'func':atomen.func,'target':None,'disp':atomen.disp} # TODO
orig_dir = getcwd()
if solv=="NO_SOLVENT_FOUND":
solv = None
ref_mol = solv if solv is not None else solu
if hasattr(atomen,'ref_mol_dir'):
ref_mol_dir = atomen.ref_mol_dir
else:
ref_mol_dir = 'PBE0'
if hasattr(atomen,'ref_mol_xyz'):
ref_mol_xyz = f'{ref_mol_dir}/{atomen.ref_mol_xyz}'
else:
if solv is not None:
ref_mol_xyz = f'{ref_mol_dir}/is_opt_{solv}/{ref_mol}.xyz'
else:
ref_mol_xyz = f'{ref_mol_dir}/opt/{ref_mol}.xyz'
ref_mol_energy, ref_mol_model = get_ref_mol_energy(atomen.wrapper,ref_mol,
solv,calc_params,ref_mol_xyz,ref_mol_dir,
silent=False)
print(f'# Reference molecule energy = {ref_mol_energy}')
# Prepare directory for calculation
atoms_dir = f'{atomen.seed}_atoms_{atomen.target}'
atoms_xyz = f'{atomen.seed}.xyz'
if base_path != atoms_dir:
if not path.exists(atoms_dir):
print(f'# Creating directory {atoms_dir}')
try:
makedirs(atoms_dir)
except FileExistsError:
print(f"# Possible mid-air collision between jobs - directory {atoms_dir} exists")
if not path.exists('atoms_dir/{solu}.xyz'):
if solv is not None:
copyfile(f'{ref_mol_dir}/is_opt_{solv}/{solu}.xyz',f'{atoms_dir}/{solu}.xyz')
else:
copyfile(f'{ref_mol_dir}/opt/{solu}.xyz',f'{atoms_dir}/{solu}.xyz')
if not path.exists('atoms_dir/{solv}.xyz'):
if solv is not None:
copyfile(f'{ref_mol_dir}/is_opt_{solv}/{solv}.xyz',f'{atoms_dir}/{solv}.xyz')
if not path.isfile(f'{atoms_dir}/{atoms_xyz}') and not path.islink(f'{atoms_dir}/{atoms_xyz}'):
print(f'# Source file {atoms_xyz} not found - trying to combine {solu} and {solv} xyzs')
merge_xyzs(solu,solv,f'{atoms_dir}/{atoms_xyz}',atoms_dir)
if not path.isfile(f'{atoms_dir}/{atoms_xyz}'):
try:
copyfile(atoms_xyz,f'{atoms_dir}/{atoms_xyz}')
print(f'# Source file for atoms {atoms_xyz} copied to {atoms_dir}')
except PermissionError:
print(f'# Could not copy source file for atoms {atoms_xyz} to {atoms_dir} - permission error')
chdir(atoms_dir)
model_opt = read(atoms_xyz)
atom_list = set(model_opt.symbols)
atom_energies = {}
atom_traj_file = f'../{atomen.seed}_atoms_{atomen.target}.traj'
atom_traj = Trajectory(atom_traj_file,'w')
atom_model = {}
for atom in atom_list:
atom_model[atom] = Atoms(atom)
spin=0
if atom=='H':
spin=0.5
if atom=='O':
spin=1
if atom=='N':
spin=0.5
if atom=='C':
spin=0
if wrapper_is_ml:
atom_energies[atom],forces,dipole,calc = atomen.wrapper.singlepoint(atom_model[atom],
f'{atomen.seed}',calc_params,solvent=solv,dipole=True,forces=True,)
else:
atom_energies[atom],forces,dipole,calc = atomen.wrapper.singlepoint(atom_model[atom],
f'{atomen.seed}_{atom}',calc_params,solvent=solv,dipole=True,forces=True,spin=spin)
# Rescale atom energies
sum_ni_Ei = 0
for i in atom_energies:
ni = ref_mol_model.symbols.count(i)
sum_ni_Ei += atom_energies[i]*ni
E_mol = ref_mol_energy
alpha = E_mol/(sum_ni_Ei)
print(f'# Sum of atomic energies for reference model: {sum_ni_Ei} \nRescaling by: {alpha}')
for atom in atom_list:
atom_energies[atom] *= alpha
atom_model[atom].calc.results['energy'] = atom_energies[atom]
atom_traj.write(atom_model[atom])
print(f'atom_energies[{atom}] = {atom_energies[atom]}')
atom_traj.close()
[docs]def qmd_driver(qmdtraj,all_solutes,all_solvents):
"""
Driver to run calculations to generate a range of Quantum Molecular
Dynamics trajectories.
If the script is run as an array task, it performs the task for the
requested trajectory only.
all_solutes: dict of strings
Keys are shortnames of the solutes. Entries are the full names.
all_solvents: dict of strings
Keys are shortnames of the solvents (implicit only, here). Entries are the full names.
task: QMDTrajTask class
Argument list for the whole QMD_Trajectories job - see QMD_Trajectories module documentation for more detail.
Arguments used only within the driver routine include:
``task.directory``: Directory prefix for where output of this particular 'target' of the Solutes
calculation will be written
Arguments for which the driver routine will perform solute & solvent name substitution:
``task.solvent``:
"""
import string
from esteem.trajectories import get_trajectory_list, targstr
# Substitute names into geom_prefix if required
qmdtraj.geom_prefix = sub_solu_solv_names(qmdtraj.geom_prefix,qmdtraj.seed,
all_solutes,all_solvents)
base_path = path.basename(getcwd())
qmd_dir = f"{qmdtraj.seed}_qmd"
seed_xyz = f'{qmdtraj.seed}.xyz'
if base_path != qmd_dir:
if not path.exists(qmd_dir):
print(f'# Creating directory {qmd_dir}')
try:
makedirs(qmd_dir)
except FileExistsError:
print(f"# Possible mid-air collision between jobs - directory {qmd_dir} exists")
if not path.isfile(f'{qmd_dir}/{seed_xyz}'):
copyfile(f'{qmdtraj.geom_prefix}/{seed_xyz}',f'{qmd_dir}/{seed_xyz}')
print(f'# Source file {seed_xyz} copied to {qmd_dir}')
chdir(qmd_dir)
# Substitute names into solvent if required
if qmdtraj.solvent is not None:
qmdtraj.solvent = sub_solu_solv_names(qmdtraj.solvent,qmdtraj.seed,
all_solutes,all_solvents)
# Make list of trajectories
all_trajs = get_trajectory_list(qmdtraj.ntraj)
# Determine if we are in a job array, in which case just run one trajectory
task_id = parallel.get_array_task_id()
if isinstance(qmdtraj.which_trajs,str):
raise Exception("# which_trajs must be a list, not a string")
if qmdtraj.which_trajs is None:
qmdtraj.which_trajs = all_trajs
trajs_in_task = qmdtraj.which_trajs
if task_id is not None:
if all_trajs[task_id] in trajs_in_task:
qmdtraj.which_trajs = [all_trajs[task_id]]
else:
raise Exception(f"# Requested trajectory {all_trajs[task_id]} is not part of this task.")
qmdtraj.run()
def make_traj_links(mltrain_task,traj_links,train_dir,prefix,all_solutes,all_solvents):
from ase.io import Trajectory
origdir = getcwd()
# switch to the training directory, so links are made there
chdir(train_dir)
# obtain a list of the trajectories to be used in the training
which_trajs,trajnames = mltrain_task.get_trajnames(prefix)
trajnames = dict(zip(which_trajs,trajnames))
if prefix == "":
print(f'# Creating symlinks to training trajectories')
elif prefix == "valid":
print(f'# Creating symlinks to validation trajectories')
elif prefix == "test":
print(f'# Creating symlinks to testing trajectories')
for traj in which_trajs:
if traj not in traj_links:
raise Exception(f'# Invalid trajectory in traj_links: {traj}. Expected: {traj_links}')
# If there are multiple links that must be set up to assemble this trajectory,
# the user may supply a list of links. Otherwise, we make a list with just
# the single entry provided
if isinstance(traj_links[traj],list):
traj_links_list = traj_links[traj]
else:
traj_links_list = [traj_links[traj]]
# Loop over the list made above
for traj_link in traj_links_list:
# Check if the traj_link string contains "{solu}" or "{solv}", in
# which case replace these with the appropriate solvent or solute string
traj_link = sub_solu_solv_names(traj_link,mltrain_task.seed,all_solutes,all_solvents)
# Find the length of the trajectory and print it along with the proposed link
try:
t = Trajectory('../'+traj_link); l = len(t); t.close()
except:
l = 0
print('#',traj_link,'<->',trajnames[traj],' len=',l)
# Check the link destination exists, and if so make the link
if not path.isfile('../'+traj_link) and not path.islink('../'+traj_link):
raise Exception(f'# File to link to not found for trajectory {traj}: {traj_link}')
if not path.isfile('../'+trajnames[traj]) and not path.islink('../'+trajnames[traj]):
symlink('../'+traj_link,'../'+trajnames[traj])
chdir(origdir)
[docs]def mltrain_driver(mltrain_task,all_solutes={},all_solvents={}):
"""
Driver to train a Neural Network (or other machine-learning approach) for
a dataset. Key arguments are the wrapper, which supplies the interface
to the ML model, and the specification of the trajectory data.
all_solutes: dict of strings
Keys are shortnames of the solutes. Entries are the full names.
all_solvents: dict of strings
Keys are shortnames of the solvents (implicit only, here). Entries are the full names.
task: MLTrainTask class
Argument list for the whole MLTrainTask job - see ML_Training module documentation for more detail.
Arguments used only within the driver routine include:
``mltrain_task.traj_links``: Expressed as a dictionary containing trajectory
labels and full paths of trajectory data files, relative to the path
from which the script was invokes
Arguments for which the driver routine will perform solute & solvent name substitution:
``mltrain_task.seed``:
"""
from os import symlink, path
from ase.io import Trajectory, read, write
# Determine if we are in a job array, in which case find task_id
task_id = parallel.get_array_task_id()
if task_id is None:
task_id = 0
# Determine train_dir and solute/solvent names
if mltrain_task.calc_dir_suffix is None:
mltrain_task.calc_dir_suffix = mltrain_task.calc_suffix
train_dir = mltrain_task.wrapper.calc_filename(mltrain_task.seed,
target=mltrain_task.target,
prefix=mltrain_task.calc_prefix,
suffix=mltrain_task.calc_dir_suffix)
solu,solv = get_solu_solv_names(mltrain_task.seed)
# Determine location of xyz files
if mltrain_task.geom_prefix is not None:
geom_prefix = sub_solu_solv_names(mltrain_task.geom_prefix,mltrain_task.seed,all_solutes,all_solvents)
else:
geom_prefix = '.'
if solu=='all':
solu_checked = list(all_solutes)[0]
else:
solu_checked = solu
if solv is not None:
train_seed_xyz = f'{train_dir}/{solu_checked}_{solv}.xyz'
else:
train_seed_xyz = f'{train_dir}/{solu_checked}.xyz'
# Check we are in parent directory and train_dir exists
base_path = path.basename(getcwd())
if base_path != train_dir:
if not path.exists(train_dir):
print(f'# Creating directory {train_dir}')
try:
makedirs(train_dir)
except FileExistsError:
print(f"# Possible mid-air collision between jobs - directory {train_dir} exists")
else:
chdir('..')
# Check that the xyz file exists, create it if it does not
if not path.isfile(train_seed_xyz):
if solv is not None:
source_seed_xyz = f'{geom_prefix}/{solu_checked}_{solv}.xyz'
else:
source_seed_xyz = f'{geom_prefix}/{solu_checked}.xyz'
if not path.isfile(source_seed_xyz):
print(f'# Source file {source_seed_xyz} not found - trying to combine {solu_checked} and {solv} xyzs')
merge_xyzs(solu_checked,solv,train_seed_xyz)
else:
copyfile(source_seed_xyz,f'{train_seed_xyz}')
print(f'# Source file {source_seed_xyz} copied to {train_dir}')
if mltrain_task.calc_prefix is None or mltrain_task.calc_prefix == "":
mltrain_task.calc_prefix = train_dir+"/"
# Process any traj_links present in the input task
make_traj_links(mltrain_task,mltrain_task.traj_links,train_dir,'',all_solutes,all_solvents)
if mltrain_task.traj_links_valid is not None:
make_traj_links(mltrain_task,mltrain_task.traj_links_valid,train_dir,'valid',all_solutes,all_solvents)
if mltrain_task.traj_links_test is not None:
make_traj_links(mltrain_task,mltrain_task.traj_links_test,train_dir,'test',all_solutes,all_solvents)
mltrain_task.run()
[docs]def mltest_driver(mltest,all_solutes,all_solvents):
"""
Driver to run tests of a Neural Network (or other machine-learning approach)
by comparing it to results from a dataset, which may have been
calculated already using eg a Clusters task.
Key arguments are the wrapper, which supplies the interface
to the ML model, and the specification of the test data.
all_solutes: dict of strings
Keys are shortnames of the solutes. Entries are the full names.
all_solvents: dict of strings
Keys are shortnames of the solvents (implicit only, here). Entries are the full names.
mltest_task: MLTestTask class
Argument list for the whole MLTestTask job - see :mod:`~esteem.tasks.ml_testing` module documentation for more detail.
Arguments used only within the driver routine include:
``mltest_task.traj_links``: Expressed as a dictionary containing trajectory
labels and full paths of trajectory data files, relative to the path
from which the script was invokes
Arguments for which the driver routine will perform solute & solvent name substitution:
``mltest_task.calc_seed``:
``mltest_task.traj_prefix``:
"""
from ase.io import Trajectory
from esteem.wrappers import amp
from esteem.trajectories import targstr
from os.path import commonprefix
from os import getcwd, readlink, remove
base_path = path.basename(getcwd())
# Determine if we are in a job array
task_id = parallel.get_array_task_id()
if task_id is None:
task_id = 0
# Check if solute or solute names need substituting into calculator seed name
mltest.calc_seed = sub_solu_solv_names(mltest.calc_seed,mltest.seed,all_solutes,all_solvents)
seed_state_str = f'{mltest.calc_seed}_{targstr(mltest.target)}'
if mltest.calc_dir_suffix is None:
mltest.calc_dir_suffix = mltest.calc_suffix
test_dir = f'{seed_state_str}_{mltest.calc_dir_suffix}_test'
if base_path != test_dir:
if not path.exists(test_dir):
print(f'# Creating directory {test_dir}')
try:
makedirs(test_dir)
except FileExistsError:
print(f"# Possible mid-air collision between jobs - directory {test_dir} exists")
chdir(test_dir)
print(f'# Moved to {getcwd()}')
if path.exists(f"../{mltest.seed}.xyz"):
try:
copyfile(f"../{mltest.seed}.xyz",f"{mltest.seed}.xyz")
except PermissionError:
print(f"# Could not copy from ../{mltest.seed}.xyz to {mltest.seed}.xyz - permission error")
if isinstance(mltest.wrapper,amp.AMPWrapper):
calcfn = mltest.wrapper.calc_filename(mltest.seed,mltest.target,prefix=mltest.calc_prefix,suffix=mltest.calc_suffix)
calc_file = calcfn+ml_wrapper.calc_ext
mltest.calc_suffix = suffix+"_test"
calcfn = ml_wrapper.calc_filename(mltest.seed,mltest.target,prefix=mltest.calc_prefix,suffix=mltest.calc_suffix)
test_calc_file = calcfn+ml_wrapper.calc_ext
test_calc_log = calcfn+ml_wrapper.log_ext
if os.path.isfile(calc_file):
print(f"# Copying {calc_file} to {test_calc_file} for testing")
shutil.copyfile(calc_file,test_calc_file)
else:
raise Exception("# Calculator file does not exist: ",calc_file)
if mltest.calc_seed is None:
mltest.calc_seed = mltest.seed
# Check if the traj_prefix string contains "{solu}" or "{solv}", in
# which case replace these with the appropriate solvent or solute string
mltest.traj_prefix = sub_solu_solv_names(mltest.traj_prefix,mltest.calc_seed,all_solutes,all_solvents)
# Process any traj_links present in the input task
if hasattr(mltest,'traj_links'):
if mltest.traj_links is not None:
origdir = getcwd()
# switch to the appropriate directory, so links are made there
if mltest.traj_prefix != "" and mltest.traj_prefix is not None:
chdir("../"+mltest.traj_prefix)
print(f'# Moved to {getcwd()}')
# obtain a list of the trajectories to be used in the training
which_trajs,trajnames = mltest.get_trajnames()
trajnames = dict(zip(which_trajs,trajnames))
print(f'# Creating symlinks to trajectories')
for traj in which_trajs:
if traj not in mltest.traj_links:
continue
# If there are multiple links that must be set up to assemble this trajectory,
# the user may supply a list of links. Otherwise, we make a list with just
# the single entry provided
if isinstance(mltest.traj_links[traj],list):
traj_links = mltest.traj_links[traj]
else:
traj_links = [mltest.traj_links[traj]]
# Loop over the list made above
for traj_link in traj_links:
# Check if the traj_link string contains "{solu}" or "{solv}", in
# which case replace these with the appropriate solvent or solute string
traj_link = sub_solu_solv_names(traj_link,mltest.seed,all_solutes,all_solvents)
# Find the length of the trajectory and print it along with the proposed link
try:
t = Trajectory('../'+traj_link); l = len(t); t.close()
except Exception as e:
print(e)
l = 0
print('#',traj_link,'<->',trajnames[traj],' len=',l)
# Check the link destination exists, and if so make the link
if not path.isfile('../'+traj_link) and not path.islink('../'+traj_link):
raise Exception(f'# File to link to not found for trajectory {traj}: ../{traj_link} from {getcwd()}')
if path.islink(trajnames[traj]):
if (readlink(trajnames[traj])!='../'+traj_link):
print(f'# Removing pre-existing link to {readlink(trajnames[traj])}')
remove(trajnames[traj])
if not path.isfile(trajnames[traj]) and not path.islink(trajnames[traj]):
print(f'# Making link to: ../{traj_link} called {trajnames[traj]} in {getcwd()}')
symlink('../'+traj_link,trajnames[traj])
# switch back to starting directory
if mltest.traj_prefix != "" and mltest.traj_prefix is not None:
chdir(origdir)
if isinstance(mltest.which_trajs,dict):
traj_dict = mltest.which_trajs
for t in traj_dict:
if t in mltest.ref_mol_seed_dict:
ref_mol_seed = mltest.ref_mol_seed_dict[t]
mltest.seed = sub_solu_solv_names(ref_mol_seed,mltest.calc_seed,all_solutes,all_solvents)
print(f'# Reference molecule seed set to {mltest.seed}')
mltest.which_trajs = traj_dict[t]
plotfile = seed_state_str + '_' + mltest.calc_suffix + '_' + str(t) + '.png'
mltest.plotfile = sub_solu_solv_names(plotfile,mltest.calc_seed,all_solutes,all_solvents)
print(f"# Plotfile set to {mltest.plotfile}")
try:
mltest.run()
except Exception as e:
print(f'# Error when testing trajectory, continuing')
print(e)
else: # single trajectory to test
if mltest.plotfile is not None:
mltest.plotfile = sub_solu_solv_names(mltest.plotfile,mltest.calc_seed,all_solutes,all_solvents)
print(f"# Plotfile set to {mltest.plotfile}")
mltest.run()
if isinstance(mltest.wrapper,amp.AMPWrapper):
if os.path.isfile(test_calc_log):
os.remove(test_calc_log)
def mltraj_driver(mltraj,all_solutes,all_solvents,cleanup_only=False):
from os import symlink, path, remove
from esteem.trajectories import get_trajectory_list,targstr,merge_traj
# Determine if we are in a job array, in which case just run one trajectory
task_id = parallel.get_array_task_id()
if task_id is not None:
try:
mltraj.which_trajs = [get_trajectory_list(mltraj.ntraj)[task_id]]
except:
print(f'# Could not find an entry for task_id {task_id} in trajectory list {get_trajectory_list(mltraj.ntraj)}')
return
# Check if solute or solute names need substituting into calculator seed name
mltraj.calc_seed = sub_solu_solv_names(mltraj.calc_seed,mltraj.seed,all_solutes,all_solvents)
# Same for snap_calc_params if present
if hasattr(mltraj,'snap_calc_params'):
if mltraj.snap_calc_params is not None:
if 'calc_seed' in mltraj.snap_calc_params:
mltraj.snap_calc_params['calc_seed'] = sub_solu_solv_names(
mltraj.snap_calc_params['calc_seed'],mltraj.seed,all_solutes,all_solvents)
# Check if we are in the right directory, create it if it does not exists
base_path = path.basename(getcwd())
seed_state_str = f'{mltraj.seed}_{targstr(mltraj.target)}'
if mltraj.calc_dir_suffix is not None:
traj_dir = f'{seed_state_str}_{mltraj.calc_dir_suffix}_mldyn'
else:
traj_dir = f'{seed_state_str}_{mltraj.calc_suffix}_mldyn'
if base_path != traj_dir:
if not path.exists(traj_dir):
print(f'# Creating directory {traj_dir}')
try:
makedirs(traj_dir)
except FileExistsError:
print(f"# Possible mid-air collision between jobs - directory {traj_dir} exists")
chdir(traj_dir)
# Check if we need to create a symlink to or merge the initial trajectories file
if mltraj.md_init_traj_link is not None:
init_traj = seed_state_str+"_md_init.traj"
if not isinstance(mltraj.md_init_traj_link,list):
traj_link = sub_solu_solv_names(mltraj.md_init_traj_link,mltraj.seed,all_solutes,all_solvents)
if not path.isfile(init_traj) and not path.islink(init_traj):
print(f"# Creating symlink: {init_traj} -> ../{traj_link} ")
symlink('../'+traj_link,init_traj)
else:
all_traj_links = ['../'+sub_solu_solv_names(traj_link,mltraj.seed,all_solutes,all_solvents) for traj_link in mltraj.md_init_traj_link]
if task_id is not None:
if not path.isfile(init_traj):
print(f'# File not found: {init_traj}')
raise Exception(f"# Please merge trajectories by running with no task_id before running individual task_id's")
else:
if path.islink(init_traj):
print(f'# Merged initial trajectories file {init_traj} is a link but should be a file')
raise Exception("# Please remove the link before merging the trajectories")
merge_traj(all_traj_links,init_traj)
# check if any of the initial geometry files are present in the parent directory, and if so copy
# them to the current directory
solu, solv = get_solu_solv_names(mltraj.seed)
for suffix in [".xyz"]:
# See if a geometry file for a "complex" exists, or the solvent and/or solute geometries
geom_prefix = mltraj.geom_prefix
if solv is not None:
geom_prefix = '../' + geom_prefix + '/is_opt_{solv}'
else:
geom_prefix = '../' + geom_prefix + '/opt'
geom_prefix = sub_solu_solv_names(geom_prefix,mltraj.seed,all_solutes,all_solvents)
for geomfile in [seed_state_str+suffix,solu+suffix,solv+suffix]:
geomfile_in = geom_prefix+'/'+geomfile
if path.isfile(geomfile_in) or path.islink(geomfile_in):
if not path.isfile(geomfile):
print(f'# Copying from {geomfile_in} to {geomfile}')
copyfile(geomfile_in,geomfile)
else:
print(f'# Geometry {geomfile} already present')
else:
print(f'# Geometry {geomfile_in} not found to copy')
# Set default value of calc_seed if not otherwise set
if mltraj.calc_seed is None:
mltraj.calc_seed = mltraj.seed
# Run the MLTraj task
if not cleanup_only:
mltraj.run()
# If requested, carve spheres centered on solute from trajectory, then delete full trajectory
if mltraj.carve_trajectory_radius is not None:
# If we provided a dictionary for the radius, find the specific entry that we need
if isinstance(mltraj.carve_trajectory_radius,dict):
if solv in mltraj.carve_trajectory_radius:
mltraj.carve_trajectory_radius = mltraj.carve_trajectory_radius[solv]
else:
raise Exception(f"mltraj.carve_trajectory_radius is a dictionary but contains no entry for solvent '{solv}'")
mltraj_cleanup(mltraj)
def mltraj_cleanup(mltraj):
from esteem.tasks.clusters import ClustersTask
from os import symlink, path, remove
from esteem.trajectories import get_trajectory_list,targstr
ct = ClustersTask()
ct.solute,ct.solvent = get_solu_solv_names(mltraj.seed)
ct.which_target = mltraj.target
for ct.which_traj in mltraj.which_trajs:
ct.min_snapshots = 0;
ct.max_snapshots = mltraj.nsnap
ct.carved_suffix = f"{mltraj.traj_suffix}_carved"
ct.md_suffix = f"{targstr(ct.which_target)}_{ct.which_traj}_{mltraj.traj_suffix}"
solvstr = f'_{ct.solvent}' if ct.solvent is not None else ''
traj_carved_file = f'{ct.solute}{solvstr}_{targstr(ct.which_target)}_{ct.which_traj}_{ct.carved_suffix}.traj'
if path.exists(traj_carved_file) and path.getsize(traj_carved_file)>0:
print(f'# Skipping carving spheres for postprocessing - {traj_carved_file} already present')
else:
ct.carve_spheres(solvent_radius=mltraj.carve_trajectory_radius)
#remove(f'{ct.solute}_{ct.solvent}_{ct.md_suffix}.traj')
if mltraj.recalculate_carved_traj:
ct.wrapper = mltraj.snap_wrapper
ct.output = f"{mltraj.traj_suffix}_recalc"
ct.calc_params = mltraj.snap_calc_params
ct.target = mltraj.snap_calc_params['target']
ct.nroots = mltraj.target
traj_recalc_file = f'{ct.solute}{solvstr}_{targstr(ct.which_target)}_{ct.which_traj}_{ct.output}.traj'
if path.exists(traj_recalc_file) and path.getsize(traj_recalc_file)>0:
print(f'# Skipping recalculating clusters in postprocessing - {traj_recalc_file} already present')
else:
ct.run()
if mltraj.store_full_traj:
# Remove equilibration trajectory data
eq_dir = f"{mltraj.traj_suffix}_equil"
ct.md_suffix = f"{targstr(ct.which_target)}_{ct.which_traj}_{mltraj.traj_suffix}_equil0000"
if mltraj.nequil>0:
eq_file = f'{eq_dir}/{ct.solute}_{ct.solvent}_{ct.md_suffix}.traj'
if path.exists(eq_file):
remove(eq_file)
# Move to MD directory
md_dir = f"{mltraj.traj_suffix}_md"
chdir(md_dir)
# Copy geometry files to current directory
copyfile(f"../{ct.solute}.xyz",f"{ct.solute}.xyz")
copyfile(f"../{ct.solvent}.xyz",f"{ct.solvent}.xyz")
# Carve main md trajectory data
ct.md_suffix = f"{targstr(ct.which_target)}_{ct.which_traj}_{mltraj.traj_suffix}_md0000"
if mltraj.nsnap>0:
md_file = f'{ct.solute}_{ct.solvent}_{ct.md_suffix}.traj'
if not path.exists(md_file):
print(f"# No MD trajectory file found: {md_file} - continuing")
continue
ct.carved_suffix = f"{mltraj.traj_suffix}_md0000_carved"
ct.max_snapshots = mltraj.nsnap * mltraj.md_steps
ct.carve_spheres(solvent_radius=mltraj.carve_trajectory_radius)
# Remove full md trajectory data
remove(f'{ct.solute}_{ct.solvent}_{ct.md_suffix}.traj')
chdir('..')
# # Plot Spectra from cluster calculations
# In[ ]:
import glob
import numpy as np
def add_arrows_and_label(warp_params,arrow1_pos,arrow2_pos,s_ax,task):
s_ax.arrow(arrow1_pos[0],arrow1_pos[1],arrow1_pos[2],arrow1_pos[3],
head_width=0.1, head_length=10, fc='k', ec='k',
length_includes_head = True)
if task.warp_scheme == 'alphabeta' or task.warp_scheme == 'betabeta':
s_ax.arrow(arrow2_pos[0],arrow2_pos[1],arrow2_pos[2],arrow2_pos[3],
head_width=0.1, head_length=10, fc='k', ec='k',
length_includes_head = True)
warp_str = ''
if task.warp_scheme=='alphabeta':
warp_str = warp_str + f'\u03B1 = {warp_params[0]:0.3f}, '
warp_str = warp_str + f'\u03B2 = {warp_params[1]:0.3f}'
elif task.warp_scheme=='betabeta':
warp_str = warp_str + f'\u03B2 1 = {warp_params[0]:0.3f}, '
warp_str = warp_str + f'\u03B2 2 = {warp_params[1]:0.3f}'
else:
warp_str = warp_str + f'\u03B2 = {warp_params[0]:0.3f}'
return warp_str
[docs]def spectral_warp_driver(all_solutes,all_solvents,task,annotation=None):
"""
Driver to calculate spectral warping parameters for a range of solute/solvent pairs
all_solutes: dict of strings
Keys are shortnames of the solutes. Entries are the full names.
all_solvents: dict of strings
Keys are shortnames of the solvents. Entries are the full names.
task: SpectraTask class
Argument list for the whole spectra job - see Spectra module documentation for more detail.
Arguments used only in the driver include:
``task.exc_suffix``: Directory in which results of excitation calculations performed
by the Clusters task can be found. The pattern used to find matches is:
'{solute}_{solvent}_{exc_suffix}/{solute}_{solvent}_solv*.out'
``task.warp_origin_ref_peak_range``: Peak range searched when looking for 'reference' peaks.
in the origin spectrum for spectral warping.
``task.warp_dest_ref_peak_range``: Peak range searched when looking for 'reference' peaks.
in the destination spectrum for spectral warping.
``task.warp_broad``: Broadening to be applied to origin and destination spectra.
``task.warp_inputformat``: Format of the files to be loaded for origin and destination spectra.
[TODO: May need to be adjusted to allow separate ``task.warp_ref_inputformat`` and ``task.warp_dest_inputformat``]
``task.warp_files``: File pattern to search for when looking for origin and destination spectra
for spectral warping.
``task.merge_solutes``: Dictionary: each entry should be a list of solute names that will be merged into the
corresponding key
"""
from copy import deepcopy
import matplotlib.pyplot as pyplot
import glob
# Set up lists of solute-solvent pairs
exc_suffix = task.exc_suffix
all_pairs = list(itertools.product(all_solvents.items(),all_solutes.items()))
# Retrieve spectral warping settings from args
#warp_origin_prefix = task.warp_origin_prefix
#warp_dest_prefix = task.warp_dest_prefix
warp_inputformat = task.warp_inputformat
#warp_dest_files = task.warp_files
warp_params = []
store_wavelength = deepcopy(task.wavelength)
store_broad = task.broad
store_renorm = task.renorm
store_inputformat = task.inputformat
if isinstance(task.warp_origin_ref_peak_range,dict):
all_warp_origin_ref_peak_ranges = task.warp_origin_ref_peak_range
else:
all_warp_origin_ref_peak_ranges = None
if isinstance(task.warp_dest_ref_peak_range,dict):
all_warp_dest_ref_peak_ranges = task.warp_dest_ref_peak_range
else:
all_warp_dest_ref_peak_ranges = None
if task.warp_broad is not None:
task.broad = task.warp_broad
task.renorm = False
# Loop over solvent-solute pairs to find warp params for each pair
for i,((solvent,fullsolvent),(solute,fullsolute)) in enumerate(all_pairs):
print('\nSpectral Warping for',solute)
if all_warp_origin_ref_peak_ranges is not None:
if solute in all_warp_origin_ref_peak_ranges:
task.warp_origin_ref_peak_range = all_warp_origin_ref_peak_ranges[solute]
if all_warp_dest_ref_peak_ranges is not None:
if solute in all_warp_dest_ref_peak_ranges:
task.warp_dest_ref_peak_range = all_warp_dest_ref_peak_ranges[solute]
# if peak ranges for spectral warps are not specified, use full wavelength range
task.wavelength = store_wavelength
if task.warp_origin_ref_peak_range is None:
task.warp_origin_ref_peak_range = deepcopy(task.wavelength)
if task.warp_dest_ref_peak_range is None:
task.warp_dest_ref_peak_range = deepcopy(task.wavelength)
# Process Solute spectra
task.warp_params = [0.0]
if task.warp_scheme == 'alphabeta':
task.warp_params = [1.0,0.0]
if task.warp_scheme == 'betabeta':
task.warp_params = [0.0,0.0,0.0,0.0]
arrow1_pos = [0]*4
arrow2_pos = [0]*4
s_fig = None; s_ax = None
# Plot destination spectrum
# Cut down wavelength according to ref_peak_range values
task.wavelength = deepcopy(store_wavelength)
if task.warp_dest_ref_peak_range[0]<task.wavelength[0]:
task.wavelength[0] = task.warp_dest_ref_peak_range[0]
if task.warp_dest_ref_peak_range[1]>task.wavelength[1]:
task.wavelength[1] = task.warp_dest_ref_peak_range[1]
pre_glob_files = sub_solu_solv_names(task.warp_dest_files,f'{solute}_{solvent}',all_solutes,all_solvents)
task.files = glob.glob(pre_glob_files)
if task.files==[]:
raise Exception(f"Error in spectral_warp_driver: No files were found matching destination task.files: {pre_glob_files}")
task.inputformat = warp_inputformat
dest_spectrum,spec,s_fig,s_ax,trans_orig_dest,_ = task.run(s_fig,s_ax,plotlabel='dest')
if dest_spectrum is None:
raise Exception(f"Error in spectral_warp_driver: No spectrum was plotted for task.files: {pre_glob_files} {task.files}")
if annotation is not None:
s_ax.text(task.wavelength[0],0.95,annotation)
pyplot.setp(spec,color='r')
# Plot origin spectrum
task.wavelength = store_wavelength.copy()
if task.warp_origin_ref_peak_range[0]<task.wavelength[0]:
task.wavelength[0] = task.warp_origin_ref_peak_range[0]
if task.warp_origin_ref_peak_range[1]>task.wavelength[1]:
task.wavelength[1] = task.warp_origin_ref_peak_range[1]
#print(task.warp_origin_files)
pre_glob_files = sub_solu_solv_names(task.warp_origin_files,f'{solute}_{solvent}',all_solutes,all_solvents)
task.files = glob.glob(pre_glob_files)
if task.files==[]:
raise Exception(f"Error in spectral_warp_driver: No files were found matching origin task.files: {pre_glob_files}")
origin_spectrum,spec,s_fig,s_ax,trans_orig_origin,_ = task.run(s_fig,s_ax,plotlabel='origin')
pyplot.setp(spec,color='b')
wp = task.find_spectral_warp_params(dest_spectrum,origin_spectrum,arrow1_pos,arrow2_pos)
warp_params.append(wp)
task.warp_params = warp_params[-1]
task.wavelength = [min(store_wavelength[0],task.warp_origin_ref_peak_range[0],task.warp_dest_ref_peak_range[0]),
max(store_wavelength[1],task.warp_origin_ref_peak_range[1],task.warp_dest_ref_peak_range[1]),
task.warp_origin_ref_peak_range[2]]
warped_spectrum,spec,s_fig,s_ax,_,_ = task.run(s_fig,s_ax,plotlabel='warped')
pyplot.setp(spec,color='g')
s_ax.legend()
if True:
warp_str = add_arrows_and_label(wp,arrow1_pos,arrow2_pos,s_ax,task)
s_ax.set_title(f'Spectral warp params for {fullsolute} in {fullsolvent}: {warp_str} eV')
#print(f'Transition origins for {fullsolute} in {fullsolvent}:')
max_trans = min(len(trans_orig_dest[0]),len(trans_orig_origin[0]))
max_trans = 0
for iexc in range(max_trans):
to = trans_orig_origin[0][iexc]
td = trans_orig_dest[0][iexc]
print(f'excitation {iexc+1}: {to[0][1]} eV ({to[0][2]:0.4f}) --> {td[0][1]} eV ({td[0][2]:0.4f})')
to = [s[:2] for s in to[1:] if abs(s[2])>0.3]
td = [s[:2] for s in td[1:] if abs(s[2])>0.3]
print(f'origins {iexc+1}: {to} --> {td}')
s_fig.savefig(f'{solute}_{solvent}_check_warp.png')
# Restore settings that we over-wrote when doing spectral warping
task.renorm = store_renorm
task.wavelength = deepcopy(store_wavelength)
task.broad = store_broad
task.inputformat = store_inputformat
task.warp_origin_ref_peak_range = all_warp_origin_ref_peak_ranges
task.warp_dest_ref_peak_range = all_warp_dest_ref_peak_ranges
return warp_params
def spectra_driver(all_solutes,all_solvents,task,warp_params=None,cluster_spectra=[],c_fig=None,c_ax=None):
import glob
# Set up lists of solute-solvent pairs
exc_suffix = task.exc_suffix
all_pairs = list(itertools.product(all_solvents.items(),all_solutes.items()))
all_paths = []
for (solvent,_),(solute,_) in all_pairs:
if isinstance(task.exc_suffix,dict):
if solvent is not None:
exc_path = f'{solute}_{solvent}_{exc_suffix[solute]}'
else:
exc_path = f'{solute}_{exc_suffix[solute]}'
else:
if solvent is not None:
exc_path = f'{solute}_{solvent}_{exc_suffix}'
else:
exc_path = f'{solute}_{exc_suffix}'
all_paths.append(exc_path)
base_path = path.basename(getcwd())
orig_output = task.output
# Now plot explicit solvent spectrum
for i,((solvent,fullsolvent),(solute,fullsolute)) in enumerate(all_pairs):
# Find cluster spectra path names
exc_suffix = task.exc_suffix
if isinstance(exc_suffix,dict):
if solute in exc_suffix:
exc_suffix = task.exc_suffix[solute]
if solvent is not None:
exc_path = f'{solute}_{solvent}_{exc_suffix}'
else:
exc_path = f'{solute}_{exc_suffix}'
solusolvstr = f'{fullsolute} in {fullsolvent}' if solvent is not None else fullsolute
if base_path in all_paths and base_path!=exc_path:
continue
if base_path not in all_paths:
if not path.exists(exc_path):
print(f'\nSkipping Spectra for: {solusolvstr}')
print(f'Directory {exc_path} not found')
continue
else:
chdir(exc_path)
print(f'\nPlotting Spectra for {solusolvstr}')
# List files matching pattern:
if task.files is not None:
task.files = sub_solu_solv_names(task.files,f'{solute}_{solvent}',all_solutes,all_solvents)
if '*' in task.files:
task.files = glob.glob(task.files)
if task.renorm == -1:
task.renorm = 1.0/float(len(task.files))
# Check if any other solutes are this solute's merge list, and if so add their files
if solute in task.merge_solutes:
for other_solute in task.merge_solutes[solute]:
other_files = glob.glob(f'../{other_solute}_{solvent}_{exc_suffix}/{other_solute}_{solvent}_solv*.out')
task.files = task.files + other_files
# Check if this solute is in any other solute's merge list
found_solute = False
for other_solute in task.merge_solutes:
if solute in task.merge_solutes[other_solute]:
found_solute = True
continue
if found_solute:
chdir('..')
continue
print('Cluster files to process: ',len(task.files),task.files)
if task.trajectory is not None:
for i in range(len(task.trajectory)):
for j in range(len(task.trajectory[i])):
t = task.trajectory[i][j]
task.trajectory[i][j] = sub_solu_solv_names(t,f'{solute}_{solvent}',all_solutes,all_solvents)
if task.correction_trajectory is not None:
t = task.correction_trajectory[i][j]
task.correction_trajectory[i][j] = sub_solu_solv_names(t,f'{solute}_{solvent}',all_solutes,all_solvents)
if task.vibration_trajectory is not None:
t = task.vibration_trajectory[i][j]
task.vibration_trajectory[i][j] = sub_solu_solv_names(t,f'{solute}_{solvent}',all_solutes,all_solvents)
if hasattr(task.wrapper,'rootname'):
task.wrapper.rootname = sub_solu_solv_names(task.wrapper.rootname,
f'{solute}_{solvent}',all_solutes,all_solvents)
task.wrapper.input_filename = sub_solu_solv_names(task.wrapper.input_filename,
f'{solute}_{solvent}',all_solutes,all_solvents)
# Retrieve spectral warping parameters
if warp_params is not None:
task.warp_params = warp_params[i]
else:
task.warp_params = None
rgb = np.array((0,0,1.0))
if isinstance(task.line_colours,list):
rgb = task.line_colours[i]
task.output = sub_solu_solv_names(orig_output,f'{solute}_{solvent}',all_solutes,all_solvents)
cluster_spectrum,spec,c_fig,c_ax,_,_ = task.run(c_fig,c_ax,
plotlabel=f'{fullsolute} in {fullsolvent}',rgb=rgb)
cluster_spectra.append(cluster_spectrum)
# Return to parent directory if we are looping over dirs
if base_path not in all_paths:
chdir('..')
return cluster_spectra,c_fig,c_ax
# # Helper functions
# In[ ]:
# return args objects with default values for each script
from esteem.tasks import solutes, solvate, clusters, spectra
def get_default_args():
parser = solutes.make_parser()
solutes_args = parser.parse_args("")
parser = solvate.make_parser()
solvate_args = parser.parse_args(['--solute','a','--solvent','b'])
parser = clusters.make_parser()
clusters_args = parser.parse_args(['--solute','a','--solvent','b'])
parser = spectra.make_parser()
spectra_args = parser.parse_args("")
return solutes_args, solvate_args, clusters_args, spectra_args