from .._network import Network
from .._population import Population
from .._variableset import VariableSets
from .._outputfiles import OutputFiles
from contextlib import contextmanager as _contextmanager
import os as _os
import sys as _sys
__all__ = ["get_number_of_processes", "run_models", "redirect_output"]
def get_number_of_processes(parallel_scheme: str, nprocs: int = None):
"""This function works out how many processes have been set
by the paralellisation system called 'parallel_scheme'
"""
if nprocs is None:
if parallel_scheme == "multiprocessing":
return 1
elif parallel_scheme == "mpi4py":
from mpi4py import MPI
comm = MPI.COMM_WORLD
nprocs = comm.Get_size()
return nprocs
elif parallel_scheme == "scoop":
raise ValueError(
f"You must specify the number of processes for "
f"scoop to parallelise over")
else:
raise ValueError(
f"You must specify the number of processes to "
f"use for parallel scheme '{parallel_scheme}'")
if parallel_scheme == "mpi4py":
from mpi4py import MPI
comm = MPI.COMM_WORLD
n = comm.Get_size()
if n < nprocs:
return n
else:
return nprocs
elif parallel_scheme == "scoop":
return 4
elif parallel_scheme == "multiprocessing":
return nprocs
else:
raise ValueError(
f"Unrecognised parallelisation scheme {parallel_scheme}")
[docs]@_contextmanager
def redirect_output(outdir):
"""Nice way to redirect stdout and stderr - thanks to
Emil Stenström in
https://stackoverflow.com/questions/6735917/redirecting-stdout-to-nothing-in-python
"""
new_out = open(_os.path.join(outdir, "output.txt"), "w")
old_out = _sys.stdout
_sys.stdout = new_out
new_err = open(_os.path.join(outdir, "output.err"), "w")
old_err = _sys.stderr
_sys.stderr = new_err
try:
yield new_out
finally:
_sys.stdout = old_out
_sys.stderr = old_err
new_out.close()
new_err.close()
[docs]def run_models(network: Network, variables: VariableSets,
population: Population,
nprocs: int, nthreads: int, seed: int,
nsteps: int, output_dir: OutputFiles,
iterator: str, extractor: str,
profile: bool, parallel_scheme: str):
"""Run all of the models on the passed Network that are described
by the passed VariableSets
Parameters
----------
network: Network
The network to model
variables: VariableSets
The sets of VariableSet that represent all of the model
runs to perform
population: Population
The initial population for all of the model runs. This also
contains the starting date and day for the model outbreak
nprocs: int
The number of model runs to perform in parallel
nthreads: int
The number of threads to parallelise each model run over
seed: int
Random number seed which is used to generate random seeds
for all model runs
nsteps: int
The maximum number of steps to perform for each model - this
will run until the outbreak is over if this is None
output_dir: OutputFiles
The OutputFiles that represents the directory in which all
output should be placed
iterator: str
Iterator to load that will be used to iterate the outbreak
extractor: str
Extractor to load that will be used to extract information
profile: bool
Whether or not to profile the model run and print out live
timing (useful for performance debugging)
parallel_scheme: str
Which parallel scheme (multiprocessing, mpi4py or scoop) to use
to run multiple model runs in parallel
"""
# this variable is used to pick out some of the additional seeds?
s = -1
if len(variables) == 1:
# no need to do anything complex - just a single run
params = network.params.set_variables(variables[0])
network.update(params, profile=profile)
trajectory = network.run(population=population, seed=seed,
s=s, nsteps=nsteps,
output_dir=output_dir,
iterator=iterator,
extractor=extractor,
profile=profile,
nthreads=nthreads)
return [(variables[0], trajectory)]
# generate the random number seeds for all of the jobs
# (for testing, we will use the same seed so that I can check
# that they are all working)
seeds = []
if seed == 0:
# this is a special mode that a developer can use to force
# all jobs to use the same random number seed (15324) that
# is used for comparing outputs. This should NEVER be used
# for production code
print("** WARNING: Using special mode to fix all random number")
print("** WARNING: seeds to 15324. DO NOT USE IN PRODUCTION!!!")
for i in range(0, len(variables)):
seeds.append(15324)
else:
from ._ran_binomial import seed_ran_binomial, ran_int
rng = seed_ran_binomial(seed)
# seed the rngs used for the sub-processes using this rng
for i in range(0, len(variables)):
seeds.append(ran_int(rng, 10000, 99999999))
# set the output directories for all of the jobs - this is based
# on the fingerprint, so should be unique for each job
outdirs = []
for v in variables:
f = v.fingerprint(include_index=True)
d = _os.path.join(output_dir.get_path(), f)
outdirs.append(d)
outputs = []
print(f"\nRunning {len(variables)} jobs using {nprocs} process(es)")
if nprocs == 1:
# no need to use a pool, as we will repeat this calculation
# several times
for i, variable in enumerate(variables):
seed = seeds[i]
outdir = outdirs[i]
with output_dir.open_subdir(outdir) as subdir:
print(f"\nRunning parameter set {i+1} of {len(variables)} "
f"using seed {seed}")
print(f"All output written to {subdir.get_path()}")
with redirect_output(subdir.get_path()):
print(f"Running variable set {i+1}")
print(f"Variables: {variable}")
print(f"Random seed: {seed}")
print(f"nthreads: {nthreads}")
# no need to do anything complex - just a single run
params = network.params.set_variables(variable)
network.update(params, profile=profile)
output = network.run(population=population, seed=seed,
s=s, nsteps=nsteps,
output_dir=subdir,
iterator=iterator,
extractor=extractor,
profile=profile,
nthreads=nthreads)
outputs.append((variable, output))
print(f"Completed job {i+1} of {len(variables)}")
print(variable)
print(output[-1])
# end of OutputDirs context manager
# end of loop over variable sets
else:
from ._worker import run_worker
# create all of the parameters and options to run
arguments = []
max_nodes = network.nnodes + 1
max_links = max(network.nlinks, network.plinks) + 1
for i, variable in enumerate(variables):
seed = seeds[i]
outdir = outdirs[i]
arguments.append({
"params": network.params.set_variables(variable),
"options": {"seed": seed,
"output_dir": outdir,
"auto_bzip": output_dir.auto_bzip(),
"population": population,
"s": s,
"nsteps": nsteps,
"iterator": iterator,
"extractor": extractor,
"profile": profile,
"nthreads": nthreads,
"max_nodes": max_nodes,
"max_links": max_links}
})
if parallel_scheme == "multiprocessing":
# run jobs using a multiprocessing pool
print("\nRunning jobs in parallel using a multiprocessing pool...")
from multiprocessing import Pool
with Pool(processes=nprocs) as pool:
results = pool.map(run_worker, arguments)
for i, result in enumerate(results):
print(f"\nCompleted job {i+1} of {len(variables)}")
print(variables[i])
print(result[-1])
outputs.append((variables[i], result))
elif parallel_scheme == "mpi4py":
# run jobs using a mpi4py pool
print("\nRunning jobs in parallel using a mpi4py pool...")
from mpi4py import futures
with futures.MPIPoolExecutor(max_workers=nprocs) as pool:
results = pool.map(run_worker, arguments)
for i, result in enumerate(results):
print(f"\nCompleted job {i+1} of {len(variables)}")
print(variables[i])
print(result[-1])
outputs.append((variables[i], result))
elif parallel_scheme == "scoop":
# run jobs using a scoop pool
print("\nRunning jobs in parallel using a scoop pool...")
from scoop import futures
results = futures.map(run_worker, arguments)
for i, result in enumerate(results):
print(f"\nCompleted job {i+1} of {len(variables)}")
print(variables[i])
print(result[-1])
outputs.append((variables[i], result))
else:
raise ValueError(f"Unrecognised parallelisation scheme "
f"{parallel_scheme}.")
return outputs