#!/usr/bin/env python
# coding: utf-8
# In[ ]:
"""
Drivers to run ESTEEM tasks in serial or parallel on a range of inputs.
These inputs will consist of solutes, solvents or pairs of solutes and solvent, depending on the task.
"""
# # Top-level Driver
# In[ ]:
import argparse
from esteem import parallel
[docs]def main(all_solutes,all_solvents,all_solutes_tasks={},all_solvate_tasks={},
all_clusters_tasks={},all_spectra_tasks={},all_qmd_tasks={},
all_mltrain_tasks={},all_mltest_tasks={},all_mltraj_tasks={},
make_script=parallel.make_sbatch):
"""
Main driver routine which chooses other tasks to run as appropriate.
Control of what driver is actually called depends on command-line arguments with which the script
was invoked: 'python <seed>.py <task> <seed> <target>'
all_solutes: dict of strings
Keys are shortnames of the solutes. Entries are the full names.
all_solvents: dict of strings
Keys are shortnames of the solvents. Entries are the full names.
all_solutes_tasks: namespace or class
Argument list for the Solutes task - see solutes_driver documentation below for more detail.
all_solvate_tasks: namespace or class
Argument list for the Solvate task - see solvate_driver documentation below for more detail.
all_clusters_tasks: namespace or class
Argument list for the Clusters task - see clusters_driver documentation below for more detail.
all_spectra_tasks: namespace or class
Argument list for the Spectra task - see spectra_driver documentation below for more detail.
make_script: dict of strings
Routine that can write a job submission script. Usually parallel.make_sbatch.
"""
import sys
# Parse arguments to script
parser = argparse.ArgumentParser(description=f'ESTEEM Main driver script')
parser.add_argument('task')
parser.add_argument('seed',default=None,type=str)
parser.add_argument('target',nargs='?',default=None,type=str)
args = parser.parse_args()
task = args.task
scriptname = sys.argv[0].replace(".py","")
if args.seed is None:
import sys
args.seed = scriptname
seed = args.seed
target = args.target
# Special task: set up scripts
if ('scripts' in task.split()):
# case where many SolutesTask have been defined
if isinstance(all_solutes_tasks,dict):
for targ in all_solutes_tasks:
all_solutes_tasks[targ].script_settings['scriptname'] = scriptname
solutes_script_settings = all_solutes_tasks[targ].script_settings
make_script(seed=seed,task='solutes',target=targ,**solutes_script_settings)
make_script(seed=seed,task='solvents',target=targ,**solutes_script_settings)
else: # just one SolutesTask
all_solutes_tasks.script_settings['scriptname'] = scriptname
solutes_script_settings = all_solutes_tasks.script_settings
make_script(seed=seed,task='solutes',target=target,**solutes_script_settings)
make_script(seed=seed,task='solvents',target=target,**solutes_script_settings)
for tasks in [all_solvate_tasks,all_clusters_tasks,all_qmd_tasks,
all_mltrain_tasks,all_mltest_tasks,all_mltraj_tasks]:
if tasks is None or tasks=={}:
continue
if isinstance(tasks,dict):
for targ in tasks:
tasks[targ].script_settings['scriptname'] = scriptname
make_script(seed=seed,task=tasks[targ].task_command,target=targ,**tasks[targ].script_settings)
else:
tasks.script_settings['scriptname'] = scriptname
make_script(seed=seed,task=tasks.task_command,target=targ,**tasks.script_settings)
return
# Now run actual task
if ('solutes' in task.split()):
solutes_task = get_actual_args(all_solutes_tasks,target,'solutes')
solutes_driver(all_solutes,all_solvents,solutes_task)
if ('solvents' in task.split()):
solutes_task = get_actual_args(all_solutes_tasks,target,'solvents')
solvents_driver(all_solvents,solutes_task)
if ('solvate' in task.split()):
solvate_task = get_actual_args(all_solvate_tasks,target,'solvate')
solvate_task.script_settings['target'] = target
solvate_task.script_settings['scriptname'] = scriptname
solvate_driver(all_solutes,all_solvents,seed,solvate_task,make_script)
if ('clusters' in task.split()) or ('cluster_scripts' in task.split()):
clusters_task = get_actual_args(all_clusters_tasks,target,'clusters')
clusters_task.script_settings['target'] = target
clusters_task.script_settings['scriptname'] = scriptname
clusters_driver(all_solutes,all_solvents,seed,clusters_task,make_script)
if 'atoms' in task.split():
atomen_task = get_actual_args(all_qmd_tasks,target,'qmd')
atomen_task.seed = seed
atomen_task.target = target
atom_energies_driver(atomen_task)
if 'qmd' in task.split():
qmd_task = get_actual_args(all_qmd_tasks,target,'qmd')
qmd_task.seed = seed
qmd_driver(qmd_task)
if ('train' in task.split()) or ('mltrain' in task.split()):
mltrain_task = get_actual_args(all_mltrain_tasks,target,'mltrain')
mltrain_task.seed = seed
mltrain_driver(mltrain_task)
if ('test' in task.split()) or ('mltest' in task.split()):
mltest_task = get_actual_args(all_mltest_tasks,target,'mltest')
mltest_task.seed = seed
mltest_driver(mltest_task)
if ('traj' in task.split()) or ('mltraj' in task.split()):
mltraj_task = get_actual_args(all_mltraj_tasks,target,'mltraj')
mltraj_task.seed = seed
mltraj_driver(mltraj_task)
if ('spectra' in task.split()):
spectra_task = get_actual_args(all_spectra_tasks,target,'spectra')
spectra_driver(all_solutes,all_solvents,spectra_task)
# Helper function that extracts the appropriate element from a dictionary
def get_actual_args(all_args,target,task):
if isinstance(all_args,dict):
try:
args = all_args[target]
except:
raise Exception(f"all_{task}_args[{target}] not found. Possible values: {list(all_args.keys())}")
else:
args = all_args
return args
# # Run calculations on individual Solute molecules
# In[ ]:
"""
Drivers to run ESTEEM tasks in serial or parallel on a range of inputs
These inputs will consist of solutes, solvents or pairs of solutes and solvent, depending on the task
"""
from esteem.tasks import solutes
from esteem import parallel
from os import path,chdir,makedirs
from shutil import copytree
[docs]def solutes_driver(all_solutes,all_solvents,task):
"""
Driver to run a range of DFT/TDDFT calculations on a range of solutes.
If the script is run as an array task, it performs the task for the
requested solute only.
all_solutes: dict of strings
Keys are shortnames of the solutes. Entries are the full names.
all_solvents: dict of strings
Keys are shortnames of the solvents (implicit only, here). Entries are the full names.
task: SolutesTask class
Argument list for the whole Solutes job - see Solutes module documentation for more detail.
Arguments used only within the driver routine include:
``task.directory``: Directory prefix for where output of this particular 'target' of the Solutes
calculation will be written
"""
if task.directory is not None:
if not path.exists(task.directory):
print(f'Creating directory {task.directory}')
makedirs(task.directory)
if not path.exists(task.directory+'/xyz'):
if path.exists('xyz'):
copytree('xyz', task.directory+'/xyz')
chdir(task.directory)
# Determine if we are in a job array, in which case just run one pair
task_id = parallel.get_array_task_id()
if task_id is None:
solutes_to_run = all_solutes # run everything
else:
solute = list(all_solutes)[task_id]
solutes_to_run = {solute: all_solutes[solute]} # run just one solute
solute_namelist = task.get_xyz_files(solutes_to_run,"xyz")
# Geom opts and TDDFT for solutes, then implicit solvent versions in each solvent
if not all_solvents:
task.run(solute_namelist)
else:
for solvent in all_solvents:
task.solvent = solvent
if task.solvent_settings is not None and task.solvent is not None:
task.solvent = task.solvent_settings
task.solvent['solvent'] = solvent
task.run(solute_namelist)
if task.directory is not None:
chdir("..")
# # Run calculations on individual Solvent molecules
# In[ ]:
from esteem.tasks import solutes
[docs]def solvents_driver(all_solvents,task):
"""
Driver to run a range of DFT/TDDFT calculations on a range of solvent molecules.
See the Solutes module documentation for more info on what tasks are run.
all_solutes: dict of strings
Keys are shortnames of the solutes. Entries are the full names.
all_solvents: dict of strings
Keys are shortnames of the solvents. Entries are the full names.
task: SolutesTask class
Argument list for the whole Solutes job - see Solutes module documentation for more detail.
Arguments used only within the driver routine include:
``task.directory``: Directory prefix for where output of this particular 'target' of the Solutes
calculation will be written
wrapper: namespace or class
Functions that will be used in the Solutes task - see Solutes module documentation for more detail.
"""
if task.directory is not None:
if not path.exists(task.directory):
print(f'Creating directory {task.directory}')
makedirs(task.directory)
if not path.exists(task.directory+'/xyz'):
if path.exists('xyz'):
copytree('xyz', task.directory+'/xyz')
chdir(task.directory)
solvent_namelist = task.get_xyz_files(all_solvents,"xyz")
# Geom opts and TDDFT for solvents (in implicit solvent of self)
for solvent in all_solvents:
task.solvent = solvent
if task.solvent_settings is not None:
task.solvent = task.solvent_settings
task.solvent['solvent'] = solvent
task.run([solvent])
if task.directory is not None:
chdir("..")
# # Calculate Range Separation Parameters
# In[ ]:
# Temporarily here till moved elsewhere
def range_sep_driver(all_solutes,all_solvents,task,wrapper):
import os
from ase.units import Hartree as Ha
if task.directory is not None:
if not path.exists(task.directory):
raise Exception(f"Directory not present: {task.directory} (must run solutes task first)")
chdir(task.directory)
#from esteem.tasks.solutes import find_range_sep
solutes = ["AQS"]
solvent = {'solvent': 'watr', 'cosmo_smd': True, 'cosmo_vem': 0}
func = "LC-PBE0"
basis = "6-311++G**"
charges={"AH2QS": -1, "AQS": -1}
rs_range = [0.05,0.1,0.15,0.2,0.25,0.3]
energy,calc,evals,occs=find_range_sep(solutes,"is_opt_watr","rs_opt",nw,basis=basis,
func=func,solvent=solvent,charges=charges,
rs_range=rs_range,all_readonly=True)
ehomo = {}
elumo = {}
seed = solutes[0]
all_rs = []
all_Ja = []
all_Jb = []
all_J2 = []
for rsf in rs_range:
rs = f'{rsf:.2f}'
if seed in charges:
charge = charges[seed]
else:
charge = 0
for c in [charge+1,charge-1,charge]:
# Assume when c==charge, system is closed-shell
if c != charge:
occfac = 0.99
s = 0.5
else:
occfac = 1.99
s = 0
ih = max(np.argwhere(occs[rs,c]>occfac))[0]
il = min(np.argwhere(occs[rs,c]<occfac))[0]
ehomo[rs,c] = (evals[rs,c])[ih]
elumo[rs,c] = (evals[rs,c])[il]
#except:
# print('fail for rs=',rs)
all_rs.append(float(rs))
IP = energy[rs,charge+1] - energy[rs,charge]
EH = ehomo[rs,charge]
all_Ja.append((EH+IP)**2)
IPm = energy[rs,charge] - energy[rs,charge-1]
EHm = ehomo[rs,charge-1]
print(f'E_H-IP for rs={rs},Q={charge} is {EH:0.3f}+{IP:0.3f} = {EH+IP:0.3f}',
f', Q={charge-1} is {EHm:0.3f}+{IPm:0.3f} = {EHm+IPm:0.3f}')
all_Jb.append((EHm+IPm)**2)
all_J2.append((EHm+IPm)**2+all_Ja[-1])
#print(f'-IP for rs={rs},Q={charge} is {-IP}')
import matplotlib.pyplot as plt
plt.plot(all_rs,all_Ja)
#plt.plot(all_rs,all_Jb)
#plt.plot(all_rs,all_J2)
plt.show()
if task.directory is not None:
chdir("..")
# # Run MD calculations on all pairs of solvent + solute
# In[ ]:
from esteem.tasks import solvate
from shutil import copyfile
import itertools
from os import path,chdir,makedirs
[docs]def solvate_driver(all_solutes,all_solvents,seed,task,make_sbatch=None):
"""
Driver to run set up and run MD on solvated boxes containing a solute molecule and solvent.
If the script is run as an array task, it performs the task for the requested solute/solute pair
only, out of the nsolutes * nsolvents possible combinations.
all_solutes: dict of strings
Keys are shortnames of the solutes. Entries are the full names.
all_solvents: dict of strings
Keys are shortnames of the solvents. Entries are the full names.
task: SolvateTask class
Argument list for the whole clusters job - see Solvate module documentation for more detail.
Arguments used within this routine include:
``task.md_suffix``: Directory suffix for where the MD runs will take place
``task.md_geom_prefix``: Directory prefix for where the geometries from the Solutes calculation should be obtained.
wrapper: class
Wrapper that will be used in the Solvate task - see Solvate module documentation for more detail.
"""
md_suffix = task.md_suffix
md_geom_prefix = task.md_geom_prefix
# Make list of all combinations of solvent and solute
all_pairs = list(itertools.product(all_solvents.items(),all_solutes.items()))
all_paths = [f'{solute}_{solvent}_{md_suffix}' for (solvent,_),(solute,_) in all_pairs]
base_path = path.basename(getcwd())
# Determine if we are in a job array, in which case just run one pair
task_id = parallel.get_array_task_id()
if task_id is not None:
all_pairs = [all_pairs[task_id]]
# Iterate over pairs of solvent and solute
for i,((solvent,fullsolvent),(solute,fullsolute)) in enumerate(all_pairs):
if solute in all_solvents and (solute!=solvent):
continue
sol_sol_str = f'{solute} ({fullsolute}) in {solvent} ({fullsolvent})'
if task_id is None:
task_str = f'Task {i} of {len(all_pairs)}'
else:
task_str = f'Task {task_id}'
md_path = f'{solute}_{solvent}_{md_suffix}'
# We are in the parent directory of this project, so setup clusters
# subdirectories if not already present
if base_path not in all_paths:
print(f'{task_str}: Setting up solvate calculation for {sol_sol_str}\n\n')
# Set up files and directories for MD run
if not path.exists(md_path):
print(f'Creating directory {md_path}')
makedirs(md_path)
# Copy in optimised geometry of solute in implicit solvent
try:
infile = f'{md_geom_prefix}_{solvent}/{solute}.xyz'
copyfile(infile,f'{md_path}/{solute}.xyz')
except FileNotFoundError:
print(f'Optimised geometry for {solute} in {solvent} not found')
print(f'Expected: {infile}')
continue
# Copy in optimised geometry of solvent in implicit solvent
try:
infile = f'{md_geom_prefix}_{solvent}/{solvent}.xyz'
copyfile(infile,f'{md_path}/{solvent}.xyz')
except FileNotFoundError:
print(f'Optimised geometry for {solvent} not found')
print(f'Expected: {infile}')
continue
print(f'Changing directory to {md_path}')
chdir(md_path)
# Write job script for submission to HPC cluster
if make_sbatch is not None:
task.script_settings['jobname'] = f'{solute}_{solvent}_{md_suffix}'
task.script_settings['execpath'] = '../'
make_sbatch(seed=seed,task='solvate',**task.script_settings)
# Go back to base directory
print(f'Returning to parent directory')
chdir('..')
# We are in a subdirectory already so run a particular calculation
elif base_path==md_path:
# Now run MD in md_path
task.solute = solute
task.solvent = solvent
task.setup_amber()
task.run()
chdir("..")
# # Run Clusters script on MD snapshots to extract region near solute
# In[ ]:
from shutil import copyfile
import itertools
from os import path,chdir,makedirs,getcwd
[docs]def clusters_driver(all_solutes,all_solvents,seed,task,make_sbatch=None):
"""
Driver to extract isolated clusters from solvated models, for a range of solute/solvent pairs.
Takes MD results from the directory {solute}_{solvent}_{task.md_suffix} and performs excitation
calculation in the directory {solute}_{solvent}_{task.exc_suffix}.
If invoked from the base directory, rather than the excitation directory, it sets up the
excitation directory and writes a job script then exits.
If invoked from the excitation directory, it performs the excitation calculations for all
the extracted clusters. If the script is run as an array task, it performs the task for the
requested cluster only.
*Arguments*
all_solutes: dict of strings
Keys are shortnames of the solutes. Entries are the full names.
all_solvents: dict of strings
Keys are shortnames of the solvents. Entries are the full names.
seed: str
Overall 'seed' name for the run - used in creation of job scripts for the calculation
task: ClustersTask class
Argument list for the whole clusters job - see Clusters module documentation for more detail.
Arguments used predominantly in the driver rather than the task main routine include:
``task.md_suffix``: Directory where MD outputs are to be found.
``task.exc_suffix``: Directory where results of Cluster excitation calculations will go.
make_sbatch: function
Function that writes a job submission script for the clusters jobs
*Output*:
On first run, from the base directory of the project, the script will create subdirectories
for each solute-solvent pair, with path '{solute}_{solvent}_{exc_suffix}'. The default value
of ``exc_suffix`` is 'exc'.
To each directory, this routine will copy the trajectory file
'{solute}_{solvent}_{md_suffix}/{solute}_{solvent}_solv.traj'
which consists of ``task.nsnaps`` snapshots.
It will then create a job script using ``make_sbatch`` and ``settings`` with the
name '{solute}_{solvent}_{exc_suffix}_sub' and exit.
The user then needs to run the individual job scripts from each subdirectory, probably
as a job array: the array task ID should range from 0 to ``task.nsnaps - 1``
The output of those runs will be the excited state energies of the clusters, which
can be averaged over in the Spectra task.
"""
# Iterate over all pairs of solute and solvent, unless in a specific directory already,
# in which case just run that pair
md_suffix = task.md_suffix
exc_suffix = task.exc_suffix
all_pairs = list(itertools.product(all_solvents.items(),all_solutes.items()))
all_paths = [f'{solute}_{solvent}_{exc_suffix}' for (solvent,_),(solute,_) in all_pairs]
base_path = path.basename(getcwd())
for i,((solvent,fullsolvent),(solute,fullsolute)) in enumerate(all_pairs):
sol_sol_str = f'{solute} ({fullsolute}) in {solvent} ({fullsolvent})'
exc_path = f'{solute}_{solvent}_{exc_suffix}'
if solute in all_solvents and (solute!=solvent):
continue
# We are in the parent directory of this project, so setup clusters
# subdirectories if not already present
if base_path not in all_paths:
# Check that MD has finished - skip to next file if not
md_path = f'{solute}_{solvent}_{md_suffix}'
if not path.exists(md_path):
print(f'\nSkipping Clusters for: {sol_sol_str} - MD directory not found')
print(f'Expected: {md_path}')
continue
else:
trajfile = f'{md_path}/{solute}_{solvent}_solv.traj'
if not path.exists(trajfile):
print(f'\nSkipping Clusters for: {sol_sol_str} - trajectory file not found')
print(f'Expected: {trajfile}')
continue
print(f'\nTask {i} of {len(all_pairs)}: Setting up clusters calculation for {sol_sol_str}')
if not path.exists(exc_path):
print(f'Creating directory {exc_path}')
makedirs(exc_path)
# Copy in trajectory file to excitations directory
try:
copyfile(trajfile,f'{exc_path}/{solute}_{solvent}_solv.traj')
except FileNotFoundError:
print(f'Trajectory file generated by {sol_sol_str} MD not found')
print(f'Expected: {trajfile}')
continue
# Copy the geometries of the solute and solvent to the excitations directory
# so they can be used in cluster extraction
try:
copyfile(f'{md_path}/{solute}.xyz',f'{exc_path}/{solute}.xyz')
copyfile(f'{md_path}/{solvent}.xyz',f'{exc_path}/{solvent}.xyz')
except FileNotFoundError:
print(f'Geometry of {solute} or {solvent} not found in MD directory')
continue
print(f'Changing directory to {exc_path}')
chdir(exc_path)
# Write job script for submission to HPC cluster
if make_sbatch is not None:
task.script_settings['jobname'] = f'{solute}_{solvent}_{exc_suffix}'
task.script_settings['execpath'] = '../'
make_sbatch(seed=seed,task='clusters',**task.script_settings)
# Go back to base directory
chdir('..')
# We are in a subdirectory already so run a particular calculation
elif base_path==exc_path:
print(f'\nProcessing clusters for {fullsolute} in {fullsolvent}')
# Run Cluster excitation calculation according to value of task_id
task.solute = solute
task.solvent = solvent
if isinstance(task.impsolv,dict):
task.impsolv['solvent'] = solvent
else:
task.impsolv = solvent
task.task_id = parallel.get_array_task_id()
task.run()
# In[ ]:
def atom_energies_driver(atomen):
# Determine if we are in a job array, as this might be necessary for choosing
# a calculator, for ML tasks
task_id = parallel.get_array_task_id()
if task_id is None:
task_id = 0
from ase.io import read, Trajectory
from ase import Atoms
model_opt = read(atomen.seed+".xyz")
wrapper_is_ml = False
#(isinstance(atomen.wrapper,amp.AMPWrapper)
# or isinstance(atomen.wrapper,physnet.PhysNetWrapper))
if wrapper_is_ml:
calc_params = {'calc_seed': atomen.seed,
'calc_suffix': atomen.calc_suffix,
'calc_prefix': '',
'target': atomen.target}
else:
# QM Wrapper
calc_params = {'basis':atomen.basis,'func':atomen.func,'target':None} # TODO
atom_list = set(model_opt.symbols)
atom_energies = {}
atom_traj = Trajectory(f'{atomen.seed}_atoms_{atomen.target}.traj','w')
for atom in atom_list:
atom_model = Atoms(atom)
if atom=='H':
spin=0.5
if atom=='O':
spin=1
if atom=='N':
spin=0.5
if atom=='C':
spin=0
if wrapper_is_ml:
atom_energies[atom],calc = atomen.wrapper.singlepoint(atom_model,
f'{atomen.seed}',calc_params)
else:
atom_energies[atom],calc = atomen.wrapper.singlepoint(atom_model,
f'{atomen.seed}_{atom}',calc_params,spin=spin)
atom_traj.write(atom_model)
print(f'atom_energies[{atom}] = {atom_energies[atom]}')
atom_traj.close()
def qmd_driver(qmdtraj):
import string
# Make list of trajectories
all_trajs = list(string.ascii_uppercase)[0:qmdtraj.ntraj]
# Determine if we are in a job array, in which case just run one trajectory
task_id = parallel.get_array_task_id()
trajs_in_task = qmdtraj.which_trajs
if task_id is not None:
if all_trajs[task_id] in trajs_in_task:
qmdtraj.which_trajs = all_trajs[task_id]
else:
raise Exception(f"Requested trajectory {all_trajs[task_id]} is not part of this task.")
print(qmdtraj)
qmdtraj.run()
def mltrain_driver(mltrain):
# Determine if we are in a job array, in which case find task_id
task_id = parallel.get_array_task_id()
if task_id is None:
task_id = 0
# set random seed based on task_id
mltrain.train_params['random_seed'] = task_id
print(mltrain.train_params)
mltrain.run()
def mltest_driver(mltest):
from esteem.wrappers import amp
# Determine if we are in a job array, in which case just run one trajectory
task_id = parallel.get_array_task_id()
if task_id is None:
task_id = 0
if isinstance(mltest.wrapper,amp.AMPWrapper):
calcfn = mltest.wrapper.calc_filename(mltest.seed,mltest.target,prefix=mltest.prefix,suffix=mltest.calc_suffix)
calc_file = calcfn+ml_wrapper.calc_ext
mltest.calc_suffix = suffix+"_test"
calcfn = ml_wrapper.calc_filename(mltest.seed,mltest.target,prefix=mltest.prefix,suffix=mltest.calc_suffix)
test_calc_file = calcfn+ml_wrapper.calc_ext
test_calc_log = calcfn+ml_wrapper.log_ext
if os.path.isfile(calc_file):
print(f"Copying {calc_file} to {test_calc_file} for testing")
shutil.copyfile(calc_file,test_calc_file)
else:
raise Exception("Calculator file does not exist: ",calc_file)
if mltest.target == 0 or mltest.target is None:
seed_state_str = mltest.seed+"_gs"
else:
seed_state_str = mltest.seed+"_es"+str(mltest.target)
if mltest.calc_seed is None:
mltest.calc_seed = mltest.seed
if isinstance(mltest.which_trajs,dict):
traj_dict = mltest.which_trajs
for t in traj_dict:
mltest.which_trajs = traj_dict[t]
mltest.plotfile = seed_state_str + '_' + mltest.calc_suffix + '_' + str(t) + '.png'
mltest.run()
else: # single trajectory to test
mltest.run()
if isinstance(mltest.wrapper,amp.AMPWrapper):
if os.path.isfile(test_calc_log):
os.remove(test_calc_log)
def mltraj_driver(mltraj):
from esteem.trajectories import get_trajectory_list
# Determine if we are in a job array, in which case just run one trajectory
task_id = parallel.get_array_task_id()
if task_id is not None:
mltraj.which_trajs = get_trajectory_list(mltraj.ntraj)[task_id]
if mltraj.calc_seed is None:
mltraj.calc_seed = mltraj.seed
mltraj.run()
# # Plot Spectra from cluster calculations
# In[ ]:
from esteem.tasks import spectra
import glob
import numpy as np
def add_arrows_and_label(warp_params,arrow1_pos,arrow2_pos,s_ax,task):
s_ax.arrow(arrow1_pos[0],arrow1_pos[1],arrow1_pos[2],arrow1_pos[3],
head_width=0.1, head_length=10, fc='k', ec='k',
length_includes_head = True)
if task.warp_scheme == 'alphabeta' or task.warp_scheme == 'betabeta':
s_ax.arrow(arrow2_pos[0],arrow2_pos[1],arrow2_pos[2],arrow2_pos[3],
head_width=0.1, head_length=10, fc='k', ec='k',
length_includes_head = True)
warp_str = ''
if task.warp_scheme=='alphabeta':
warp_str = warp_str + f'\u03B1 = {warp_params[0]:0.3f}, '
warp_str = warp_str + f'\u03B2 = {warp_params[1]:0.3f}'
elif task.warp_scheme=='betabeta':
warp_str = warp_str + f'\u03B2 1 = {warp_params[0]:0.3f}, '
warp_str = warp_str + f'\u03B2 2 = {warp_params[1]:0.3f}'
else:
warp_str = warp_str + f'\u03B2 = {warp_params[0]:0.3f}'
return warp_str
[docs]def spectra_driver(all_solutes,all_solvents,task):
"""
Driver to plot spectra (and calculate spectral warping parameters) for a range of solute/solvent pairs
all_solutes: dict of strings
Keys are shortnames of the solutes. Entries are the full names.
all_solvents: dict of strings
Keys are shortnames of the solvents. Entries are the full names.
task: SpectraTask class
Argument list for the whole spectra job - see Spectra module documentation for more detail.
Arguments used only in the driver include:
``task.exc_suffix``: Directory in which results of excitation calculations performed
by the Clusters task can be found. The pattern used to find matches is:
'{solute}_{solvent}_{exc_suffix}/{solute}_{solvent}_solv*.out'
``task.warp_origin_ref_peak_range``: Peak range searched when looking for 'reference' peaks.
in the origin spectrum for spectral warping.
``task.warp_dest_ref_peak_range``: Peak range searched when looking for 'reference' peaks.
in the destination spectrum for spectral warping.
``task.warp_broad``: Broadening to be applied to origin and destination spectra.
``task.warp_inputformat``: Format of the files to be loaded for origin and destination spectra.
[TODO: May need to be adjusted to allow separate ``task.warp_ref_inputformat`` and ``task.warp_dest_inputformat``]
``task.warp_files``: File pattern to search for when looking for origin and destination spectra
for spectral warping.
``task.merge_solutes``: Dictionary: each entry should be a list of solute names that will be merged into the
corresponding key
"""
import matplotlib.pyplot as pyplot
# Set up lists of solute-solvent pairs
exc_suffix = task.exc_suffix
all_pairs = list(itertools.product(all_solvents.items(),all_solutes.items()))
# Retrieve spectral warping settings from args
warp_origin_prefix = task.warp_origin_prefix
warp_dest_prefix = task.warp_dest_prefix
warp_inputformat = task.warp_inputformat
warp_files = task.warp_files
warp_params = []
store_wavelength = task.wavelength.copy()
store_broad = task.broad
store_renorm = task.renorm
store_inputformat = task.inputformat
if isinstance(task.warp_origin_ref_peak_range,dict):
all_warp_origin_ref_peak_ranges = task.warp_origin_ref_peak_range
else:
all_warp_origin_ref_peak_ranges = None
if isinstance(task.warp_dest_ref_peak_range,dict):
all_warp_dest_ref_peak_ranges = task.warp_dest_ref_peak_range
else:
all_warp_dest_ref_peak_ranges = None
if task.warp_broad is not None:
task.broad = task.warp_broad
task.renorm = False
# Loop over solvent-solute pairs to find warp params for each pair
for i,((solvent,fullsolvent),(solute,fullsolute)) in enumerate(all_pairs):
print('\nSpectral Warping for',solute)
if all_warp_origin_ref_peak_ranges is not None:
if solute in all_warp_origin_ref_peak_ranges:
task.warp_origin_ref_peak_range = all_warp_origin_ref_peak_ranges[solute]
if all_warp_dest_ref_peak_ranges is not None:
if solute in all_warp_dest_ref_peak_ranges:
task.warp_dest_ref_peak_range = all_warp_dest_ref_peak_ranges[solute]
# if peak ranges for spectral warps are not specified, use full wavelength range
task.wavelength = store_wavelength
if task.warp_origin_ref_peak_range is None:
task.warp_origin_ref_peak_range = task.wavelength.copy()
if task.warp_dest_ref_peak_range is None:
task.warp_dest_ref_peak_range = task.wavelength.copy()
# Process Solute spectra
task.warp_params = [0.0]
if task.warp_scheme == 'alphabeta':
task.warp_params = [1.0,0.0]
if task.warp_scheme == 'betabeta':
task.warp_params = [0.0,0.0,0.0,0.0]
arrow1_pos = [0]*4
arrow2_pos = [0]*4
s_fig = None; s_ax = None
# Plot destination spectrum
# Cut down wavelength according to ref_peak_range values
task.wavelength = store_wavelength.copy()
if task.warp_dest_ref_peak_range[0]<task.wavelength[0]:
task.wavelength[0] = task.warp_dest_ref_peak_range[0]
if task.warp_dest_ref_peak_range[1]>task.wavelength[1]:
task.wavelength[1] = task.warp_dest_ref_peak_range[1]
task.files = [f'{warp_dest_prefix}_{solvent}/{solute}/{warp_files}']
task.inputformat = warp_inputformat
dest_spectrum,spec,s_fig,s_ax,trans_orig_dest = task.main(s_fig,s_ax,plotlabel='dest')
pyplot.setp(spec,color='r')
# Plot origin spectrum
task.wavelength = store_wavelength.copy()
if task.warp_origin_ref_peak_range[0]<task.wavelength[0]:
task.wavelength[0] = task.warp_origin_ref_peak_range[0]
if task.warp_origin_ref_peak_range[1]>task.wavelength[1]:
task.wavelength[1] = task.warp_origin_ref_peak_range[1]
task.files = [f'{warp_origin_prefix}_{solvent}/{solute}/{warp_files}']
origin_spectrum,spec,s_fig,s_ax,trans_orig_origin = task.main(s_fig,s_ax,plotlabel='origin')
pyplot.setp(spec,color='b')
wp = task.find_spectral_warp_params(dest_spectrum,origin_spectrum,arrow1_pos,arrow2_pos)
warp_params.append(wp)
task.warp_params = warp_params[-1]
task.wavelength = [min(store_wavelength[0],task.warp_origin_ref_peak_range[0],task.warp_dest_ref_peak_range[0]),
max(store_wavelength[1],task.warp_origin_ref_peak_range[1],task.warp_dest_ref_peak_range[1]),
task.warp_origin_ref_peak_range[2]]
warped_spectrum,spec,s_fig,s_ax,_ = task.main(s_fig,s_ax,plotlabel='warped')
pyplot.setp(spec,color='g')
s_ax.legend()
if False:
warp_str = add_arrows_and_label(wp,arrow1_pos,arrow2_pos,s_ax,task)
s_ax.set_title(f'Spectral warp params for {fullsolute} in {fullsolvent}: {warp_str} eV')
print(f'Transition origins for {fullsolute} in {fullsolvent}:')
max_trans = min(len(trans_orig_dest[0]),len(trans_orig_origin[0]))
for iexc in range(max_trans):
to = trans_orig_origin[0][iexc]
td = trans_orig_dest[0][iexc]
print(f'excitation {iexc+1}: {to[0][1]} eV ({to[0][2]:0.4f}) --> {td[0][1]} eV ({td[0][2]:0.4f})')
to = [s[:2] for s in to[1:] if abs(s[2])>0.3]
td = [s[:2] for s in td[1:] if abs(s[2])>0.3]
print(f'origins {iexc+1}: {to} --> {td}')
s_fig.savefig(f'{solute}_{solvent}_check_warp.png')
c_fig = None; c_ax = None; cluster_spectra = []
# Restore settings that we over-wrote when doing spectral warping
task.renorm = store_renorm
task.wavelength = store_wavelength.copy()
task.broad = store_broad
task.inputformat = store_inputformat
# Now plot explicit solvent spectrum
for i,((solvent,fullsolvent),(solute,fullsolute)) in enumerate(all_pairs):
# Find cluster spectra path names
exc_suffix = task.exc_suffix
if isinstance(task.exc_suffix,dict):
if solute in task.exc_suffix:
exc_suffix = task.exc_suffix[solute]
exc_path = f'{solute}_{solvent}_{exc_suffix}'
if not path.exists(exc_path):
print(f'\nSkipping Spectra for: {fullsolute} in {fullsolvent}')
print(f'Directory {exc_path} not found')
continue
chdir(exc_path)
# List files matching pattern:
pattern = f'{solute}_{solvent}_solv*.out'
task.files = glob.glob(pattern)
if store_renorm == -1:
task.renorm = 1.0/float(len(task.files))
# Check if any other solutes are this solute's merge list, and if so add their files
if solute in task.merge_solutes:
for other_solute in task.merge_solutes[solute]:
other_files = glob.glob(f'../{other_solute}_{solvent}_{exc_suffix}/{other_solute}_{solvent}_solv*.out')
task.files = task.files + other_files
# Check if this solute is in any other solute's merge list
found_solute = False
for other_solute in task.merge_solutes:
if solute in task.merge_solutes[other_solute]:
found_solute = True
continue
if found_solute:
chdir('..')
continue
# Retrieve spectral warping parameters
task.warp_params = warp_params[i]
print(f'\nPlotting Spectra for {fullsolute} in {fullsolvent}')
print('Cluster files to process: ',len(task.files),task.files)
rgb = np.array((0,0,1.0))
if isinstance(task.line_colours,list):
rgb = task.line_colours[i]
cluster_spectrum,spec,c_fig,c_ax,_ = task.main(c_fig,c_ax,
plotlabel=f'{fullsolute} in {fullsolvent}',rgb=rgb)
cluster_spectra.append(cluster_spectrum)
chdir('..')
if c_fig is not None:
c_fig.savefig('combined_spectrum.png')
return cluster_spectra,c_fig,c_ax
# # Helper functions
# In[ ]:
# return args objects with default values for each script
from esteem.tasks import solutes, solvate, clusters, spectra
def get_default_args():
parser = solutes.make_parser()
solutes_args = parser.parse_args("")
parser = solvate.make_parser()
solvate_args = parser.parse_args(['--solute','a','--solvent','b'])
parser = clusters.make_parser()
clusters_args = parser.parse_args(['--solute','a','--solvent','b'])
parser = spectra.make_parser()
spectra_args = parser.parse_args("")
return solutes_args, solvate_args, clusters_args, spectra_args