import os, numpy as np, tempfile, subprocess, shutil, re, glob
[docs]class AsynchronousEvaluationModel(object):
"""
Evaluate a model in parallel when model instances are invoked by a shell
script.
Parameters
----------
process sample: callable function (default=None)
Function that overwrites the basic implementation
which reads the sample in from a file called params_filename.
This is useful if there are a number of pre-processing steps
needed by the model before shall command is executed.
load_results: callable function (default=None)
Function that overwrites the basic implementation
which reads the results from a files called results_filename
This is useful if there are a number of post-processing steps
needed by the model after the shell command is executed.
If evaluation fails this function must return None
workdir_basename: string (default=None):
The name of the directory to store local copies or soft links
of files. The shell command will be executed in these directories.
If None temporary directories with python generated names
will be created and then deleted once the shell command has been
run.
save_workdirs : string (default=True)
'no' - do not save workdirs
'yes' - save all files in each workdir
'limited' - save only params and results files
If workdir_basename is None this variable is ignored
the tmp directories that are created will always be deleted.
model_name : string (default=None)
A identifier used when printing model evaluation info
saved_data_basename : string (default=None)
The basename of the file used to store the output data from the
model. A new file is created every time __call__ is exceuted.
a unique identifier is created based upon the value of evaluation id
when __call__ is started.
"""
def __init__(self, shell_command, max_eval_concurrency=1,
workdir_basename=None,link_filenames=[],
params_filename='params.in',results_filename='results.out',
params_file_header='',process_sample=None,
load_results=None, saved_data_basename=None,
save_workdirs='yes', model_name=None):
self.shell_command = shell_command
self.max_eval_concurrency = max_eval_concurrency
self.workdir_basename = workdir_basename
self.link_filenames = link_filenames
self.params_filename = params_filename
self.results_filename = results_filename
self.params_file_header=params_file_header
# ensure process sample is not a member function. If it is
# this model will not pikcle
self.process_sample=process_sample
self.load_results=load_results
self.saved_data_basename=saved_data_basename
self.save_workdirs=save_workdirs
if (not ((save_workdirs=='no') or (save_workdirs=='yes') or
(save_workdirs=='limited'))):
raise Exception('save_workdirs must be ["no","yes","limited"]')
self.model_name=model_name
if self.saved_data_basename is not None:
saved_data_dir=os.path.split(saved_data_basename)[0]
if not saved_data_dir=='' and not os.path.exists(saved_data_dir):
os.makedirs(saved_data_dir)
self.running_procs = []
self.running_workdirs = []
self.function_eval_id = 0
self.num_qoi = 0
self.current_vals = []
[docs] def cleanup_threads(self, opts):
verbosity = opts.get("verbosity",0)
finished_proc_indices = []
for i in range(len(self.running_procs)):
proc = self.running_procs[i]
if proc.poll() is not None:
finished_proc_indices.append(i)
curdir = os.getcwd()
for i in range(len(finished_proc_indices)):
workdir = self.running_workdirs[finished_proc_indices[i]]
os.chdir(workdir)
function_eval_id = int(re.findall(
r'[0-9]+',os.path.split(workdir)[1])[-1])
if self.load_results is None:
if not os.path.exists(self.results_filename):
if verbosity>0:
print('Eval %d: %s was not found in directory %s'%(
function_eval_id, self.results_filename,workdir))
vals = None
else:
vals = np.loadtxt(self.results_filename,usecols=[0])
shutil.copy(
self.results_filename,
self.results_filename+'.%d'%function_eval_id)
else:
try:
vals = self.load_results(opts)
except:
vals = None
# load results may not have generated a results file
# so write one here
if vals is not None:
np.savetxt(
self.results_filename+'.%d'%function_eval_id,vals)
sample = np.loadtxt(self.params_filename+'.%d'%function_eval_id)
if ( self.workdir_basename is not None and
self.save_workdirs=='limited'):
filenames_to_delete = glob.glob('*')
if vals is not None:
filenames_to_delete.remove(
self.results_filename+'.%d'%function_eval_id)
filenames_to_delete.remove(
self.params_filename+'.%d'%function_eval_id)
if verbosity>0:
filenames_to_delete.remove('stdout.txt')
for filename in filenames_to_delete:
os.remove(filename)
if verbosity>0:
print('Model %s: completed eval %d'%(
self.model_name,function_eval_id))
self.current_vals.append(vals)
self.current_samples.append(sample)
self.completed_function_eval_ids.append(function_eval_id)
os.chdir(curdir)
if self.workdir_basename is None or self.save_workdirs=='no':
shutil.rmtree(workdir)
self.running_procs = [v for i, v in enumerate(self.running_procs) if i not in finished_proc_indices]
self.running_workdirs = [v for i, v in enumerate(self.running_workdirs) if i not in finished_proc_indices]
[docs] def create_work_dir(self):
if self.workdir_basename is None:
tmpdir = tempfile.mkdtemp(suffix='.%d'%self.function_eval_id)
else:
tmpdir = self.workdir_basename+'.%d'%self.function_eval_id
if not os.path.exists(tmpdir):
os.makedirs(tmpdir)
else:
msg = 'work_dir %s already exists. '%(tmpdir)
msg += 'Exiting so as not to overwrite previous results'
raise Exception(msg)
return tmpdir
[docs] def asynchronous_evaluate_using_shell_command(self, sample, opts):
verbosity = opts.get("verbosity",0)
curdir = os.getcwd()
workdir = self.create_work_dir()
os.chdir(workdir)
for filename in self.link_filenames:
if not os.path.exists(os.path.split(filename)[1]):
os.symlink(
filename,os.path.split(filename)[1])
else:
msg = '%s exists in %s cannot create soft link'%(
filename,workdir)
raise Exception(msg)
# default of savetxt is to write header with # at start of line
#comments='' removes the #
if self.process_sample is not None:
self.process_sample(sample)
else:
np.savetxt(self.params_filename,sample,
header=self.params_file_header,
comments='')
if verbosity>0:
out = open("stdout.txt","wb")
else:
out = open(os.devnull, 'w')
proc = subprocess.Popen(
self.shell_command,shell=True,stdout=out,stderr=out)
out.close()
self.running_procs.append(proc)
self.running_workdirs.append(workdir)
# store a copy of the parameters and return values with
# a unique filename
shutil.copy(
self.params_filename,
self.params_filename+'.%d'%self.function_eval_id)
self.function_eval_id += 1
os.chdir(curdir)
[docs] def __call__(self, samples, opts=dict()):
self.current_vals = []
self.current_samples = []
self.completed_function_eval_ids = []
nsamples = samples.shape[1]
for i in range(nsamples):
while len(self.running_procs)>=self.max_eval_concurrency:
self.cleanup_threads(opts)
self.asynchronous_evaluate_using_shell_command(
samples[:,i],opts)
while len(self.running_procs)>0:
self.cleanup_threads(opts)
if self.saved_data_basename is not None:
data_filename = self.saved_data_basename+'-%d-%d.npz'%(
self.function_eval_id-nsamples,self.function_eval_id)
else:
data_filename = None
vals = self.prepare_values(
self.current_samples,self.current_vals,
self.completed_function_eval_ids,data_filename)
return vals
[docs] def prepare_values(self,samples,vals,completed_function_eval_ids,
data_filename):
nsamples = len(vals)
# get number of QoI
num_qoi=0
for i in range(nsamples):
if vals[i] is not None:
num_qoi=vals[i].shape[0]
break
if num_qoi==0 and vals[0] is None:
raise Exception('All model evaluations failed')
# return nan for failed function evaluations
for i in range(nsamples):
if vals[i] is None:
vals[i] = np.zeros((num_qoi))+np.nan
I = np.argsort(np.array(completed_function_eval_ids))
prepared_vals = np.array(vals)[I,:]
samples = np.asarray(samples).T[:,I]
if data_filename is not None:
np.savez(data_filename, vals=prepared_vals, samples=samples)
return prepared_vals