Source code for esteem.drivers

#!/usr/bin/env python
# coding: utf-8

# In[ ]:


"""
Drivers to run ESTEEM tasks in serial or parallel on a range of inputs.

These inputs will consist of solutes, solvents or pairs of solutes and solvent, depending on the task.
"""


# # Top-level Driver

# In[ ]:


import argparse
from esteem import parallel

[docs]def main(all_solutes,all_solvents,all_solutes_tasks={},all_solvate_tasks={}, all_clusters_tasks={},all_spectra_tasks={},all_qmd_tasks={}, all_mltrain_tasks={},all_mltest_tasks={},all_mltraj_tasks={}, make_script=parallel.make_sbatch): """ Main driver routine which chooses other tasks to run as appropriate. Control of what driver is actually called depends on command-line arguments with which the script was invoked: 'python <seed>.py <task> <seed> <target>' all_solutes: dict of strings Keys are shortnames of the solutes. Entries are the full names. all_solvents: dict of strings Keys are shortnames of the solvents. Entries are the full names. all_solutes_tasks: namespace or class Argument list for the Solutes task - see solutes_driver documentation below for more detail. all_solvate_tasks: namespace or class Argument list for the Solvate task - see solvate_driver documentation below for more detail. all_clusters_tasks: namespace or class Argument list for the Clusters task - see clusters_driver documentation below for more detail. all_spectra_tasks: namespace or class Argument list for the Spectra task - see spectra_driver documentation below for more detail. make_script: dict of strings Routine that can write a job submission script. Usually parallel.make_sbatch. """ import sys # Parse arguments to script parser = argparse.ArgumentParser(description=f'ESTEEM Main driver script') parser.add_argument('task') parser.add_argument('seed',default=None,type=str) parser.add_argument('target',nargs='?',default=None,type=str) args = parser.parse_args() task = args.task scriptname = sys.argv[0].replace(".py","") if args.seed is None: import sys args.seed = scriptname seed = args.seed target = args.target # Special task: set up scripts if ('scripts' in task.split()): # case where many SolutesTask have been defined if isinstance(all_solutes_tasks,dict): for targ in all_solutes_tasks: all_solutes_tasks[targ].script_settings['scriptname'] = scriptname solutes_script_settings = all_solutes_tasks[targ].script_settings make_script(seed=seed,task='solutes',target=targ,**solutes_script_settings) make_script(seed=seed,task='solvents',target=targ,**solutes_script_settings) else: # just one SolutesTask all_solutes_tasks.script_settings['scriptname'] = scriptname solutes_script_settings = all_solutes_tasks.script_settings make_script(seed=seed,task='solutes',target=target,**solutes_script_settings) make_script(seed=seed,task='solvents',target=target,**solutes_script_settings) for tasks in [all_solvate_tasks,all_clusters_tasks,all_qmd_tasks, all_mltrain_tasks,all_mltest_tasks,all_mltraj_tasks]: if tasks is None or tasks=={}: continue if isinstance(tasks,dict): for targ in tasks: tasks[targ].script_settings['scriptname'] = scriptname make_script(seed=seed,task=tasks[targ].task_command,target=targ,**tasks[targ].script_settings) else: tasks.script_settings['scriptname'] = scriptname make_script(seed=seed,task=tasks.task_command,target=targ,**tasks.script_settings) return # Now run actual task if ('solutes' in task.split()): solutes_task = get_actual_args(all_solutes_tasks,target,'solutes') solutes_driver(all_solutes,all_solvents,solutes_task) if ('solvents' in task.split()): solutes_task = get_actual_args(all_solutes_tasks,target,'solvents') solvents_driver(all_solvents,solutes_task) if ('solvate' in task.split()): solvate_task = get_actual_args(all_solvate_tasks,target,'solvate') solvate_task.script_settings['target'] = target solvate_task.script_settings['scriptname'] = scriptname solvate_driver(all_solutes,all_solvents,seed,solvate_task,make_script) if ('clusters' in task.split()) or ('cluster_scripts' in task.split()): clusters_task = get_actual_args(all_clusters_tasks,target,'clusters') clusters_task.script_settings['target'] = target clusters_task.script_settings['scriptname'] = scriptname clusters_driver(all_solutes,all_solvents,seed,clusters_task,make_script) if 'atoms' in task.split(): atomen_task = get_actual_args(all_qmd_tasks,target,'qmd') atomen_task.seed = seed atomen_task.target = target atom_energies_driver(atomen_task) if 'qmd' in task.split(): qmd_task = get_actual_args(all_qmd_tasks,target,'qmd') qmd_task.seed = seed qmd_driver(qmd_task) if ('train' in task.split()) or ('mltrain' in task.split()): mltrain_task = get_actual_args(all_mltrain_tasks,target,'mltrain') mltrain_task.seed = seed mltrain_driver(mltrain_task) if ('test' in task.split()) or ('mltest' in task.split()): mltest_task = get_actual_args(all_mltest_tasks,target,'mltest') mltest_task.seed = seed mltest_driver(mltest_task) if ('traj' in task.split()) or ('mltraj' in task.split()): mltraj_task = get_actual_args(all_mltraj_tasks,target,'mltraj') mltraj_task.seed = seed mltraj_driver(mltraj_task) if ('spectra' in task.split()): spectra_task = get_actual_args(all_spectra_tasks,target,'spectra') spectra_driver(all_solutes,all_solvents,spectra_task)
# Helper function that extracts the appropriate element from a dictionary def get_actual_args(all_args,target,task): if isinstance(all_args,dict): try: args = all_args[target] except: raise Exception(f"all_{task}_args[{target}] not found. Possible values: {list(all_args.keys())}") else: args = all_args return args # # Run calculations on individual Solute molecules # In[ ]: """ Drivers to run ESTEEM tasks in serial or parallel on a range of inputs These inputs will consist of solutes, solvents or pairs of solutes and solvent, depending on the task """ from esteem.tasks import solutes from esteem import parallel from os import path,chdir,makedirs from shutil import copytree
[docs]def solutes_driver(all_solutes,all_solvents,task): """ Driver to run a range of DFT/TDDFT calculations on a range of solutes. If the script is run as an array task, it performs the task for the requested solute only. all_solutes: dict of strings Keys are shortnames of the solutes. Entries are the full names. all_solvents: dict of strings Keys are shortnames of the solvents (implicit only, here). Entries are the full names. task: SolutesTask class Argument list for the whole Solutes job - see Solutes module documentation for more detail. Arguments used only within the driver routine include: ``task.directory``: Directory prefix for where output of this particular 'target' of the Solutes calculation will be written """ if task.directory is not None: if not path.exists(task.directory): print(f'Creating directory {task.directory}') makedirs(task.directory) if not path.exists(task.directory+'/xyz'): if path.exists('xyz'): copytree('xyz', task.directory+'/xyz') chdir(task.directory) # Determine if we are in a job array, in which case just run one pair task_id = parallel.get_array_task_id() if task_id is None: solutes_to_run = all_solutes # run everything else: solute = list(all_solutes)[task_id] solutes_to_run = {solute: all_solutes[solute]} # run just one solute solute_namelist = task.get_xyz_files(solutes_to_run,"xyz") # Geom opts and TDDFT for solutes, then implicit solvent versions in each solvent if not all_solvents: task.run(solute_namelist) else: for solvent in all_solvents: task.solvent = solvent if task.solvent_settings is not None and task.solvent is not None: task.solvent = task.solvent_settings task.solvent['solvent'] = solvent task.run(solute_namelist) if task.directory is not None: chdir("..")
# # Run calculations on individual Solvent molecules # In[ ]: from esteem.tasks import solutes
[docs]def solvents_driver(all_solvents,task): """ Driver to run a range of DFT/TDDFT calculations on a range of solvent molecules. See the Solutes module documentation for more info on what tasks are run. all_solutes: dict of strings Keys are shortnames of the solutes. Entries are the full names. all_solvents: dict of strings Keys are shortnames of the solvents. Entries are the full names. task: SolutesTask class Argument list for the whole Solutes job - see Solutes module documentation for more detail. Arguments used only within the driver routine include: ``task.directory``: Directory prefix for where output of this particular 'target' of the Solutes calculation will be written wrapper: namespace or class Functions that will be used in the Solutes task - see Solutes module documentation for more detail. """ if task.directory is not None: if not path.exists(task.directory): print(f'Creating directory {task.directory}') makedirs(task.directory) if not path.exists(task.directory+'/xyz'): if path.exists('xyz'): copytree('xyz', task.directory+'/xyz') chdir(task.directory) solvent_namelist = task.get_xyz_files(all_solvents,"xyz") # Geom opts and TDDFT for solvents (in implicit solvent of self) for solvent in all_solvents: task.solvent = solvent if task.solvent_settings is not None: task.solvent = task.solvent_settings task.solvent['solvent'] = solvent task.run([solvent]) if task.directory is not None: chdir("..")
# # Calculate Range Separation Parameters # In[ ]: # Temporarily here till moved elsewhere def range_sep_driver(all_solutes,all_solvents,task,wrapper): import os from ase.units import Hartree as Ha if task.directory is not None: if not path.exists(task.directory): raise Exception(f"Directory not present: {task.directory} (must run solutes task first)") chdir(task.directory) #from esteem.tasks.solutes import find_range_sep solutes = ["AQS"] solvent = {'solvent': 'watr', 'cosmo_smd': True, 'cosmo_vem': 0} func = "LC-PBE0" basis = "6-311++G**" charges={"AH2QS": -1, "AQS": -1} rs_range = [0.05,0.1,0.15,0.2,0.25,0.3] energy,calc,evals,occs=find_range_sep(solutes,"is_opt_watr","rs_opt",nw,basis=basis, func=func,solvent=solvent,charges=charges, rs_range=rs_range,all_readonly=True) ehomo = {} elumo = {} seed = solutes[0] all_rs = [] all_Ja = [] all_Jb = [] all_J2 = [] for rsf in rs_range: rs = f'{rsf:.2f}' if seed in charges: charge = charges[seed] else: charge = 0 for c in [charge+1,charge-1,charge]: # Assume when c==charge, system is closed-shell if c != charge: occfac = 0.99 s = 0.5 else: occfac = 1.99 s = 0 ih = max(np.argwhere(occs[rs,c]>occfac))[0] il = min(np.argwhere(occs[rs,c]<occfac))[0] ehomo[rs,c] = (evals[rs,c])[ih] elumo[rs,c] = (evals[rs,c])[il] #except: # print('fail for rs=',rs) all_rs.append(float(rs)) IP = energy[rs,charge+1] - energy[rs,charge] EH = ehomo[rs,charge] all_Ja.append((EH+IP)**2) IPm = energy[rs,charge] - energy[rs,charge-1] EHm = ehomo[rs,charge-1] print(f'E_H-IP for rs={rs},Q={charge} is {EH:0.3f}+{IP:0.3f} = {EH+IP:0.3f}', f', Q={charge-1} is {EHm:0.3f}+{IPm:0.3f} = {EHm+IPm:0.3f}') all_Jb.append((EHm+IPm)**2) all_J2.append((EHm+IPm)**2+all_Ja[-1]) #print(f'-IP for rs={rs},Q={charge} is {-IP}') import matplotlib.pyplot as plt plt.plot(all_rs,all_Ja) #plt.plot(all_rs,all_Jb) #plt.plot(all_rs,all_J2) plt.show() if task.directory is not None: chdir("..") # # Run MD calculations on all pairs of solvent + solute # In[ ]: from esteem.tasks import solvate from shutil import copyfile import itertools from os import path,chdir,makedirs
[docs]def solvate_driver(all_solutes,all_solvents,seed,task,make_sbatch=None): """ Driver to run set up and run MD on solvated boxes containing a solute molecule and solvent. If the script is run as an array task, it performs the task for the requested solute/solute pair only, out of the nsolutes * nsolvents possible combinations. all_solutes: dict of strings Keys are shortnames of the solutes. Entries are the full names. all_solvents: dict of strings Keys are shortnames of the solvents. Entries are the full names. task: SolvateTask class Argument list for the whole clusters job - see Solvate module documentation for more detail. Arguments used within this routine include: ``task.md_suffix``: Directory suffix for where the MD runs will take place ``task.md_geom_prefix``: Directory prefix for where the geometries from the Solutes calculation should be obtained. wrapper: class Wrapper that will be used in the Solvate task - see Solvate module documentation for more detail. """ md_suffix = task.md_suffix md_geom_prefix = task.md_geom_prefix # Make list of all combinations of solvent and solute all_pairs = list(itertools.product(all_solvents.items(),all_solutes.items())) all_paths = [f'{solute}_{solvent}_{md_suffix}' for (solvent,_),(solute,_) in all_pairs] base_path = path.basename(getcwd()) # Determine if we are in a job array, in which case just run one pair task_id = parallel.get_array_task_id() if task_id is not None: all_pairs = [all_pairs[task_id]] # Iterate over pairs of solvent and solute for i,((solvent,fullsolvent),(solute,fullsolute)) in enumerate(all_pairs): if solute in all_solvents and (solute!=solvent): continue sol_sol_str = f'{solute} ({fullsolute}) in {solvent} ({fullsolvent})' if task_id is None: task_str = f'Task {i} of {len(all_pairs)}' else: task_str = f'Task {task_id}' md_path = f'{solute}_{solvent}_{md_suffix}' # We are in the parent directory of this project, so setup clusters # subdirectories if not already present if base_path not in all_paths: print(f'{task_str}: Setting up solvate calculation for {sol_sol_str}\n\n') # Set up files and directories for MD run if not path.exists(md_path): print(f'Creating directory {md_path}') makedirs(md_path) # Copy in optimised geometry of solute in implicit solvent try: infile = f'{md_geom_prefix}_{solvent}/{solute}.xyz' copyfile(infile,f'{md_path}/{solute}.xyz') except FileNotFoundError: print(f'Optimised geometry for {solute} in {solvent} not found') print(f'Expected: {infile}') continue # Copy in optimised geometry of solvent in implicit solvent try: infile = f'{md_geom_prefix}_{solvent}/{solvent}.xyz' copyfile(infile,f'{md_path}/{solvent}.xyz') except FileNotFoundError: print(f'Optimised geometry for {solvent} not found') print(f'Expected: {infile}') continue print(f'Changing directory to {md_path}') chdir(md_path) # Write job script for submission to HPC cluster if make_sbatch is not None: task.script_settings['jobname'] = f'{solute}_{solvent}_{md_suffix}' task.script_settings['execpath'] = '../' make_sbatch(seed=seed,task='solvate',**task.script_settings) # Go back to base directory print(f'Returning to parent directory') chdir('..') # We are in a subdirectory already so run a particular calculation elif base_path==md_path: # Now run MD in md_path task.solute = solute task.solvent = solvent task.setup_amber() task.run() chdir("..")
# # Run Clusters script on MD snapshots to extract region near solute # In[ ]: from shutil import copyfile import itertools from os import path,chdir,makedirs,getcwd
[docs]def clusters_driver(all_solutes,all_solvents,seed,task,make_sbatch=None): """ Driver to extract isolated clusters from solvated models, for a range of solute/solvent pairs. Takes MD results from the directory {solute}_{solvent}_{task.md_suffix} and performs excitation calculation in the directory {solute}_{solvent}_{task.exc_suffix}. If invoked from the base directory, rather than the excitation directory, it sets up the excitation directory and writes a job script then exits. If invoked from the excitation directory, it performs the excitation calculations for all the extracted clusters. If the script is run as an array task, it performs the task for the requested cluster only. *Arguments* all_solutes: dict of strings Keys are shortnames of the solutes. Entries are the full names. all_solvents: dict of strings Keys are shortnames of the solvents. Entries are the full names. seed: str Overall 'seed' name for the run - used in creation of job scripts for the calculation task: ClustersTask class Argument list for the whole clusters job - see Clusters module documentation for more detail. Arguments used predominantly in the driver rather than the task main routine include: ``task.md_suffix``: Directory where MD outputs are to be found. ``task.exc_suffix``: Directory where results of Cluster excitation calculations will go. make_sbatch: function Function that writes a job submission script for the clusters jobs *Output*: On first run, from the base directory of the project, the script will create subdirectories for each solute-solvent pair, with path '{solute}_{solvent}_{exc_suffix}'. The default value of ``exc_suffix`` is 'exc'. To each directory, this routine will copy the trajectory file '{solute}_{solvent}_{md_suffix}/{solute}_{solvent}_solv.traj' which consists of ``task.nsnaps`` snapshots. It will then create a job script using ``make_sbatch`` and ``settings`` with the name '{solute}_{solvent}_{exc_suffix}_sub' and exit. The user then needs to run the individual job scripts from each subdirectory, probably as a job array: the array task ID should range from 0 to ``task.nsnaps - 1`` The output of those runs will be the excited state energies of the clusters, which can be averaged over in the Spectra task. """ # Iterate over all pairs of solute and solvent, unless in a specific directory already, # in which case just run that pair md_suffix = task.md_suffix exc_suffix = task.exc_suffix all_pairs = list(itertools.product(all_solvents.items(),all_solutes.items())) all_paths = [f'{solute}_{solvent}_{exc_suffix}' for (solvent,_),(solute,_) in all_pairs] base_path = path.basename(getcwd()) for i,((solvent,fullsolvent),(solute,fullsolute)) in enumerate(all_pairs): sol_sol_str = f'{solute} ({fullsolute}) in {solvent} ({fullsolvent})' exc_path = f'{solute}_{solvent}_{exc_suffix}' if solute in all_solvents and (solute!=solvent): continue # We are in the parent directory of this project, so setup clusters # subdirectories if not already present if base_path not in all_paths: # Check that MD has finished - skip to next file if not md_path = f'{solute}_{solvent}_{md_suffix}' if not path.exists(md_path): print(f'\nSkipping Clusters for: {sol_sol_str} - MD directory not found') print(f'Expected: {md_path}') continue else: trajfile = f'{md_path}/{solute}_{solvent}_solv.traj' if not path.exists(trajfile): print(f'\nSkipping Clusters for: {sol_sol_str} - trajectory file not found') print(f'Expected: {trajfile}') continue print(f'\nTask {i} of {len(all_pairs)}: Setting up clusters calculation for {sol_sol_str}') if not path.exists(exc_path): print(f'Creating directory {exc_path}') makedirs(exc_path) # Copy in trajectory file to excitations directory try: copyfile(trajfile,f'{exc_path}/{solute}_{solvent}_solv.traj') except FileNotFoundError: print(f'Trajectory file generated by {sol_sol_str} MD not found') print(f'Expected: {trajfile}') continue # Copy the geometries of the solute and solvent to the excitations directory # so they can be used in cluster extraction try: copyfile(f'{md_path}/{solute}.xyz',f'{exc_path}/{solute}.xyz') copyfile(f'{md_path}/{solvent}.xyz',f'{exc_path}/{solvent}.xyz') except FileNotFoundError: print(f'Geometry of {solute} or {solvent} not found in MD directory') continue print(f'Changing directory to {exc_path}') chdir(exc_path) # Write job script for submission to HPC cluster if make_sbatch is not None: task.script_settings['jobname'] = f'{solute}_{solvent}_{exc_suffix}' task.script_settings['execpath'] = '../' make_sbatch(seed=seed,task='clusters',**task.script_settings) # Go back to base directory chdir('..') # We are in a subdirectory already so run a particular calculation elif base_path==exc_path: print(f'\nProcessing clusters for {fullsolute} in {fullsolvent}') # Run Cluster excitation calculation according to value of task_id task.solute = solute task.solvent = solvent if isinstance(task.impsolv,dict): task.impsolv['solvent'] = solvent else: task.impsolv = solvent task.task_id = parallel.get_array_task_id() task.run()
# In[ ]: def atom_energies_driver(atomen): # Determine if we are in a job array, as this might be necessary for choosing # a calculator, for ML tasks task_id = parallel.get_array_task_id() if task_id is None: task_id = 0 from ase.io import read, Trajectory from ase import Atoms model_opt = read(atomen.seed+".xyz") wrapper_is_ml = False #(isinstance(atomen.wrapper,amp.AMPWrapper) # or isinstance(atomen.wrapper,physnet.PhysNetWrapper)) if wrapper_is_ml: calc_params = {'calc_seed': atomen.seed, 'calc_suffix': atomen.calc_suffix, 'calc_prefix': '', 'target': atomen.target} else: # QM Wrapper calc_params = {'basis':atomen.basis,'func':atomen.func,'target':None} # TODO atom_list = set(model_opt.symbols) atom_energies = {} atom_traj = Trajectory(f'{atomen.seed}_atoms_{atomen.target}.traj','w') for atom in atom_list: atom_model = Atoms(atom) if atom=='H': spin=0.5 if atom=='O': spin=1 if atom=='N': spin=0.5 if atom=='C': spin=0 if wrapper_is_ml: atom_energies[atom],calc = atomen.wrapper.singlepoint(atom_model, f'{atomen.seed}',calc_params) else: atom_energies[atom],calc = atomen.wrapper.singlepoint(atom_model, f'{atomen.seed}_{atom}',calc_params,spin=spin) atom_traj.write(atom_model) print(f'atom_energies[{atom}] = {atom_energies[atom]}') atom_traj.close() def qmd_driver(qmdtraj): import string # Make list of trajectories all_trajs = list(string.ascii_uppercase)[0:qmdtraj.ntraj] # Determine if we are in a job array, in which case just run one trajectory task_id = parallel.get_array_task_id() trajs_in_task = qmdtraj.which_trajs if task_id is not None: if all_trajs[task_id] in trajs_in_task: qmdtraj.which_trajs = all_trajs[task_id] else: raise Exception(f"Requested trajectory {all_trajs[task_id]} is not part of this task.") print(qmdtraj) qmdtraj.run() def mltrain_driver(mltrain): # Determine if we are in a job array, in which case find task_id task_id = parallel.get_array_task_id() if task_id is None: task_id = 0 # set random seed based on task_id mltrain.train_params['random_seed'] = task_id print(mltrain.train_params) mltrain.run() def mltest_driver(mltest): from esteem.wrappers import amp # Determine if we are in a job array, in which case just run one trajectory task_id = parallel.get_array_task_id() if task_id is None: task_id = 0 if isinstance(mltest.wrapper,amp.AMPWrapper): calcfn = mltest.wrapper.calc_filename(mltest.seed,mltest.target,prefix=mltest.prefix,suffix=mltest.calc_suffix) calc_file = calcfn+ml_wrapper.calc_ext mltest.calc_suffix = suffix+"_test" calcfn = ml_wrapper.calc_filename(mltest.seed,mltest.target,prefix=mltest.prefix,suffix=mltest.calc_suffix) test_calc_file = calcfn+ml_wrapper.calc_ext test_calc_log = calcfn+ml_wrapper.log_ext if os.path.isfile(calc_file): print(f"Copying {calc_file} to {test_calc_file} for testing") shutil.copyfile(calc_file,test_calc_file) else: raise Exception("Calculator file does not exist: ",calc_file) if mltest.target == 0 or mltest.target is None: seed_state_str = mltest.seed+"_gs" else: seed_state_str = mltest.seed+"_es"+str(mltest.target) if mltest.calc_seed is None: mltest.calc_seed = mltest.seed if isinstance(mltest.which_trajs,dict): traj_dict = mltest.which_trajs for t in traj_dict: mltest.which_trajs = traj_dict[t] mltest.plotfile = seed_state_str + '_' + mltest.calc_suffix + '_' + str(t) + '.png' mltest.run() else: # single trajectory to test mltest.run() if isinstance(mltest.wrapper,amp.AMPWrapper): if os.path.isfile(test_calc_log): os.remove(test_calc_log) def mltraj_driver(mltraj): from esteem.trajectories import get_trajectory_list # Determine if we are in a job array, in which case just run one trajectory task_id = parallel.get_array_task_id() if task_id is not None: mltraj.which_trajs = get_trajectory_list(mltraj.ntraj)[task_id] if mltraj.calc_seed is None: mltraj.calc_seed = mltraj.seed mltraj.run() # # Plot Spectra from cluster calculations # In[ ]: from esteem.tasks import spectra import glob import numpy as np def add_arrows_and_label(warp_params,arrow1_pos,arrow2_pos,s_ax,task): s_ax.arrow(arrow1_pos[0],arrow1_pos[1],arrow1_pos[2],arrow1_pos[3], head_width=0.1, head_length=10, fc='k', ec='k', length_includes_head = True) if task.warp_scheme == 'alphabeta' or task.warp_scheme == 'betabeta': s_ax.arrow(arrow2_pos[0],arrow2_pos[1],arrow2_pos[2],arrow2_pos[3], head_width=0.1, head_length=10, fc='k', ec='k', length_includes_head = True) warp_str = '' if task.warp_scheme=='alphabeta': warp_str = warp_str + f'\u03B1 = {warp_params[0]:0.3f}, ' warp_str = warp_str + f'\u03B2 = {warp_params[1]:0.3f}' elif task.warp_scheme=='betabeta': warp_str = warp_str + f'\u03B2 1 = {warp_params[0]:0.3f}, ' warp_str = warp_str + f'\u03B2 2 = {warp_params[1]:0.3f}' else: warp_str = warp_str + f'\u03B2 = {warp_params[0]:0.3f}' return warp_str
[docs]def spectra_driver(all_solutes,all_solvents,task): """ Driver to plot spectra (and calculate spectral warping parameters) for a range of solute/solvent pairs all_solutes: dict of strings Keys are shortnames of the solutes. Entries are the full names. all_solvents: dict of strings Keys are shortnames of the solvents. Entries are the full names. task: SpectraTask class Argument list for the whole spectra job - see Spectra module documentation for more detail. Arguments used only in the driver include: ``task.exc_suffix``: Directory in which results of excitation calculations performed by the Clusters task can be found. The pattern used to find matches is: '{solute}_{solvent}_{exc_suffix}/{solute}_{solvent}_solv*.out' ``task.warp_origin_ref_peak_range``: Peak range searched when looking for 'reference' peaks. in the origin spectrum for spectral warping. ``task.warp_dest_ref_peak_range``: Peak range searched when looking for 'reference' peaks. in the destination spectrum for spectral warping. ``task.warp_broad``: Broadening to be applied to origin and destination spectra. ``task.warp_inputformat``: Format of the files to be loaded for origin and destination spectra. [TODO: May need to be adjusted to allow separate ``task.warp_ref_inputformat`` and ``task.warp_dest_inputformat``] ``task.warp_files``: File pattern to search for when looking for origin and destination spectra for spectral warping. ``task.merge_solutes``: Dictionary: each entry should be a list of solute names that will be merged into the corresponding key """ import matplotlib.pyplot as pyplot # Set up lists of solute-solvent pairs exc_suffix = task.exc_suffix all_pairs = list(itertools.product(all_solvents.items(),all_solutes.items())) # Retrieve spectral warping settings from args warp_origin_prefix = task.warp_origin_prefix warp_dest_prefix = task.warp_dest_prefix warp_inputformat = task.warp_inputformat warp_files = task.warp_files warp_params = [] store_wavelength = task.wavelength.copy() store_broad = task.broad store_renorm = task.renorm store_inputformat = task.inputformat if isinstance(task.warp_origin_ref_peak_range,dict): all_warp_origin_ref_peak_ranges = task.warp_origin_ref_peak_range else: all_warp_origin_ref_peak_ranges = None if isinstance(task.warp_dest_ref_peak_range,dict): all_warp_dest_ref_peak_ranges = task.warp_dest_ref_peak_range else: all_warp_dest_ref_peak_ranges = None if task.warp_broad is not None: task.broad = task.warp_broad task.renorm = False # Loop over solvent-solute pairs to find warp params for each pair for i,((solvent,fullsolvent),(solute,fullsolute)) in enumerate(all_pairs): print('\nSpectral Warping for',solute) if all_warp_origin_ref_peak_ranges is not None: if solute in all_warp_origin_ref_peak_ranges: task.warp_origin_ref_peak_range = all_warp_origin_ref_peak_ranges[solute] if all_warp_dest_ref_peak_ranges is not None: if solute in all_warp_dest_ref_peak_ranges: task.warp_dest_ref_peak_range = all_warp_dest_ref_peak_ranges[solute] # if peak ranges for spectral warps are not specified, use full wavelength range task.wavelength = store_wavelength if task.warp_origin_ref_peak_range is None: task.warp_origin_ref_peak_range = task.wavelength.copy() if task.warp_dest_ref_peak_range is None: task.warp_dest_ref_peak_range = task.wavelength.copy() # Process Solute spectra task.warp_params = [0.0] if task.warp_scheme == 'alphabeta': task.warp_params = [1.0,0.0] if task.warp_scheme == 'betabeta': task.warp_params = [0.0,0.0,0.0,0.0] arrow1_pos = [0]*4 arrow2_pos = [0]*4 s_fig = None; s_ax = None # Plot destination spectrum # Cut down wavelength according to ref_peak_range values task.wavelength = store_wavelength.copy() if task.warp_dest_ref_peak_range[0]<task.wavelength[0]: task.wavelength[0] = task.warp_dest_ref_peak_range[0] if task.warp_dest_ref_peak_range[1]>task.wavelength[1]: task.wavelength[1] = task.warp_dest_ref_peak_range[1] task.files = [f'{warp_dest_prefix}_{solvent}/{solute}/{warp_files}'] task.inputformat = warp_inputformat dest_spectrum,spec,s_fig,s_ax,trans_orig_dest = task.main(s_fig,s_ax,plotlabel='dest') pyplot.setp(spec,color='r') # Plot origin spectrum task.wavelength = store_wavelength.copy() if task.warp_origin_ref_peak_range[0]<task.wavelength[0]: task.wavelength[0] = task.warp_origin_ref_peak_range[0] if task.warp_origin_ref_peak_range[1]>task.wavelength[1]: task.wavelength[1] = task.warp_origin_ref_peak_range[1] task.files = [f'{warp_origin_prefix}_{solvent}/{solute}/{warp_files}'] origin_spectrum,spec,s_fig,s_ax,trans_orig_origin = task.main(s_fig,s_ax,plotlabel='origin') pyplot.setp(spec,color='b') wp = task.find_spectral_warp_params(dest_spectrum,origin_spectrum,arrow1_pos,arrow2_pos) warp_params.append(wp) task.warp_params = warp_params[-1] task.wavelength = [min(store_wavelength[0],task.warp_origin_ref_peak_range[0],task.warp_dest_ref_peak_range[0]), max(store_wavelength[1],task.warp_origin_ref_peak_range[1],task.warp_dest_ref_peak_range[1]), task.warp_origin_ref_peak_range[2]] warped_spectrum,spec,s_fig,s_ax,_ = task.main(s_fig,s_ax,plotlabel='warped') pyplot.setp(spec,color='g') s_ax.legend() if False: warp_str = add_arrows_and_label(wp,arrow1_pos,arrow2_pos,s_ax,task) s_ax.set_title(f'Spectral warp params for {fullsolute} in {fullsolvent}: {warp_str} eV') print(f'Transition origins for {fullsolute} in {fullsolvent}:') max_trans = min(len(trans_orig_dest[0]),len(trans_orig_origin[0])) for iexc in range(max_trans): to = trans_orig_origin[0][iexc] td = trans_orig_dest[0][iexc] print(f'excitation {iexc+1}: {to[0][1]} eV ({to[0][2]:0.4f}) --> {td[0][1]} eV ({td[0][2]:0.4f})') to = [s[:2] for s in to[1:] if abs(s[2])>0.3] td = [s[:2] for s in td[1:] if abs(s[2])>0.3] print(f'origins {iexc+1}: {to} --> {td}') s_fig.savefig(f'{solute}_{solvent}_check_warp.png') c_fig = None; c_ax = None; cluster_spectra = [] # Restore settings that we over-wrote when doing spectral warping task.renorm = store_renorm task.wavelength = store_wavelength.copy() task.broad = store_broad task.inputformat = store_inputformat # Now plot explicit solvent spectrum for i,((solvent,fullsolvent),(solute,fullsolute)) in enumerate(all_pairs): # Find cluster spectra path names exc_suffix = task.exc_suffix if isinstance(task.exc_suffix,dict): if solute in task.exc_suffix: exc_suffix = task.exc_suffix[solute] exc_path = f'{solute}_{solvent}_{exc_suffix}' if not path.exists(exc_path): print(f'\nSkipping Spectra for: {fullsolute} in {fullsolvent}') print(f'Directory {exc_path} not found') continue chdir(exc_path) # List files matching pattern: pattern = f'{solute}_{solvent}_solv*.out' task.files = glob.glob(pattern) if store_renorm == -1: task.renorm = 1.0/float(len(task.files)) # Check if any other solutes are this solute's merge list, and if so add their files if solute in task.merge_solutes: for other_solute in task.merge_solutes[solute]: other_files = glob.glob(f'../{other_solute}_{solvent}_{exc_suffix}/{other_solute}_{solvent}_solv*.out') task.files = task.files + other_files # Check if this solute is in any other solute's merge list found_solute = False for other_solute in task.merge_solutes: if solute in task.merge_solutes[other_solute]: found_solute = True continue if found_solute: chdir('..') continue # Retrieve spectral warping parameters task.warp_params = warp_params[i] print(f'\nPlotting Spectra for {fullsolute} in {fullsolvent}') print('Cluster files to process: ',len(task.files),task.files) rgb = np.array((0,0,1.0)) if isinstance(task.line_colours,list): rgb = task.line_colours[i] cluster_spectrum,spec,c_fig,c_ax,_ = task.main(c_fig,c_ax, plotlabel=f'{fullsolute} in {fullsolvent}',rgb=rgb) cluster_spectra.append(cluster_spectrum) chdir('..') if c_fig is not None: c_fig.savefig('combined_spectrum.png') return cluster_spectra,c_fig,c_ax
# # Helper functions # In[ ]: # return args objects with default values for each script from esteem.tasks import solutes, solvate, clusters, spectra def get_default_args(): parser = solutes.make_parser() solutes_args = parser.parse_args("") parser = solvate.make_parser() solvate_args = parser.parse_args(['--solute','a','--solvent','b']) parser = clusters.make_parser() clusters_args = parser.parse_args(['--solute','a','--solvent','b']) parser = spectra.make_parser() spectra_args = parser.parse_args("") return solutes_args, solvate_args, clusters_args, spectra_args