from __future__ import annotations
from typing import Union as _Union
from typing import List as _List
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ._disease import Disease
from ._demographics import Demographics
from ._demographic import Demographic
from ._parameters import Parameters
from ._wards import Wards
from ._ward import Ward
from ._variableset import VariableSet, VariableSets
from datetime import date
__all__ = ["run"]
def _write_to_file(obj: any, filename: str, dir: str = ".", bzip: bool = False,
dry_run: bool = False) -> str:
"""Write the passed object to a file called 'filename' in
directory 'dir', returning the
relative path to that file
"""
import os
if dry_run:
return filename
filename = os.path.join(dir, filename)
if hasattr(obj, "to_json"):
return obj.to_json(filename, auto_bzip=bzip)
else:
raise IOError(f"Cannot convert {obj} to a file!")
return filename
def _rmdir(directory):
"""Function modified from one copied from 'mitch' on stackoverflow
https://stackoverflow.com/questions/13118029/deleting-folders-in-python-recursively
"""
if directory is None:
return
from pathlib import Path
directory = Path(directory)
# first, check for removing important directories such as $HOME or root
if directory == Path.home():
raise FileExistsError(f"We WILL NOT remove your "
f"home directory ${directory}")
if directory == Path("/"):
raise FileExistsError(f"We WILL NOT remove the root directory "
f"{directory}")
# get the directory containing '$HOME'
if directory == Path.home().parent:
raise FileExistsError(f"We WILL NOT remove the users/home "
f"directory {directory}")
if not directory.is_dir():
directory.unlink()
return
for item in directory.iterdir():
if item.is_dir():
_rmdir(item)
else:
item.unlink()
directory.rmdir()
[docs]def run(help: bool = None,
version: bool = None,
dry_run: bool = None,
silent: bool = False,
auto_load: bool = False,
config: str = None,
input: _Union[str, VariableSet, VariableSets] = None,
line: int = None,
repeats: int = None,
seed: int = None,
additional: _Union[str, _List[str]] = None,
output: str = None,
disease: _Union[str, Disease] = None,
model: _Union[str, Wards, Ward] = None,
demographics: _Union[str, Demographics, Demographic] = None,
start_date: _Union[str, date] = None,
start_day: int = None,
parameters: _Union[str, Parameters] = None,
repository: str = None,
population: int = None,
nsteps: int = None,
user_variables: _Union[str, VariableSet] = None,
iterator: str = None,
extractor: str = None,
mixer: str = None,
mover: str = None,
star_as_E: bool = None,
star_as_R: bool = None,
disable_star: bool = None,
UV: float = None,
debug: bool = None,
debug_level: int = None,
outdir_scheme: str = None,
nthreads: int = None,
nprocs: int = None,
hostfile: str = None,
cores_per_node: int = None,
auto_bzip: bool = None,
no_auto_bzip: bool = None,
force_overwrite_output: bool = None,
profile: bool = None,
no_profile: bool = None,
mpi: bool = None,
scoop: bool = None) -> _Union[str, 'pandas.DataFrame']:
"""Run a MetaWards simulation
Parameters
----------
silent: bool
Run without printing the output to the screen
dry_run: bool
Don't run anything - just print what will be run
help: bool
Whether or not to print the full help
version: bool
Whether or not to print the metawards version info
output: str
The name of the directory in which to write the output. If this
is not set, then a new, random-named directory will be used.
force_overwrite_output: bool
Force overwriting the output directory - this will remove any
existing directory before running
auto_load: bool
Whether or not to automatically load and return a pandas dataframe
of the output/results.csv.bz2 file. If pandas is available then
this defaults to True, otherwise False
disease: Disease or str
The disease to model (or the filename of the json file containing
the disease, or name of the disease)
model: Ward, Wards or str
The network wards to run (of the filename of the json file
containing the network, or name of the network))
There are many more parameters, based on the arguments to
metawards --help.
Please set "help" to True to print out a full list of
help for all of the arguments
Returns
-------
results: str or pandas.DataFrame
The file containing the output results (output/results.csv.bz2),
or, if auto_load is True, the pandas.DataFrame containing
those results
"""
import sys
import os
import tempfile
from .utils._console import Console
metawards = os.path.join(os.path.dirname(sys.executable), "metawards")
if not os.path.exists(metawards):
Console.error(f"Cannot find the metawards executable: {metawards}")
return -1
args = []
tmpdir = None
theme = "simple"
no_progress = True
no_spinner = True
if help:
args.append("--help")
output = None
elif version:
args.append("--version")
output = None
else:
if output is None and not dry_run:
output = tempfile.mkdtemp(prefix="output_", dir=".")
force_overwrite_output = True
if force_overwrite_output:
args.append("--force-overwrite-output")
else:
if output is None:
output = "output"
while os.path.exists(output):
import metawards as _metawards
print(f"Output directory {output} exists.")
output = _metawards.input("Please choose a new directory: ",
default="error")
if output is None:
return 0
output = output.strip()
if len(output) == 0:
return 0
if output.lower() == "error":
Console.error("You need to delete the directory or set "
"'force_overwrite_output' to TRUE")
return -1
try:
if config is not None:
args.append(f"--config {config}")
if input is not None:
if not isinstance(input, str):
if tmpdir is None:
tmpdir = tempfile.mkdtemp(prefix="input_", dir=".")
input = _write_to_file(input, "input.dat", dir=tmpdir,
bzip=False, dry_run=dry_run)
args.append(f"--input {input}")
if line is not None:
args.append(f"--line {int(line)}")
if repeats is not None:
args.append(f"--repeats {int(repeats)}")
if seed is not None:
args.append(f"--seed {int(seed)}")
if additional is not None:
if isinstance(additional, list):
additional = "\\n".join(additional)
elif not isinstance(additional, str):
additional = str(int(additional))
if "'" in additional:
args.append(f"--additional \"{additional}\"")
else:
args.append(f"--additional '{additional}'")
if output is not None:
args.append(f"--output {output}")
if disease is not None:
if not isinstance(disease, str):
if tmpdir is None:
tmpdir = tempfile.mkdtemp(prefix="input_", dir=".")
disease = _write_to_file(disease, "disease.json",
dir=tmpdir,
bzip=False, dry_run=dry_run)
args.append(f"--disease {disease}")
if model is not None:
from ._ward import Ward
from ._wards import Wards
if isinstance(model, Ward):
m = Wards()
m.add(model)
model = m
if not isinstance(model, str):
if tmpdir is None:
tmpdir = tempfile.mkdtemp(prefix="input_", dir=".")
model = _write_to_file(model, "model.json", dir=tmpdir,
bzip=True, dry_run=dry_run)
args.append(f"--model {model}")
if demographics is not None:
from ._demographic import Demographic
from ._demographics import Demographics
if isinstance(demographics, Demographic):
d = Demographics()
d.add(demographics)
demographics = demographics
if not isinstance(demographics, str):
if tmpdir is None:
tmpdir = tempfile.mkdtemp(prefix="input_", dir=".")
demographics = _write_to_file(demographics,
"demographics.json",
dir=tmpdir,
bzip=False,
dry_run=dry_run)
args.append(f"--demographics {demographics}")
if start_date is not None:
from datetime import date
if isinstance(start_date, date):
start_date = date.isoformat()
args.append(f"--start-date {start_date}")
if start_day is not None:
args.append(f"--start-day {int(start_day)}")
if parameters is not None:
if not isinstance(parameters, str):
if tmpdir is None:
tmpdir = tempfile.mkdtemp(prefix="input_", dir=".")
parameters = _write_to_file(parameters, "parameters.dat",
dir=tmpdir, bzip=False,
dry_run=dry_run)
args.append(f"--parameters {parameters}")
if repository is not None:
args.append(f"--repository {repository}")
if population is not None:
args.append(f"--population {int(population)}")
if nsteps is not None:
args.append(f"--nsteps {int(nsteps)}")
if user_variables is not None:
if not isinstance(user_variables, str):
if tmpdir is None:
tmpdir = tempfile.mkdtemp(prefix="input_", dir=".")
user_variables = _write_to_file(user_variables,
"user_variables.dat",
dir=tmpdir,
bzip=False,
dry_run=dry_run)
args.append(f"--user {user_variables}")
if iterator is not None:
args.append(f"--iterator {iterator}")
if extractor is not None:
args.append(f"--extractor {extractor}")
if mixer is not None:
args.append(f"--mixer {mixer}")
if mover is not None:
args.append(f"--mover {mover}")
if star_as_E:
args.append("--star-as-E")
elif star_as_R:
args.append("--star-as-R")
elif disable_star:
args.append("--disable-star")
if UV is not None:
args.append(f"--UV {UV}")
if theme is not None:
args.append(f"--theme {theme}")
if no_spinner:
args.append("--no-spinner")
if no_progress:
args.append("--no-progress")
if debug:
args.append("--debug")
if debug_level is not None:
args.append(f"--debug-level {debug_level}")
if outdir_scheme is not None:
args.append(f"--outdir-scheme {outdir_scheme}")
if nthreads is not None:
args.append(f"--nthreads {int(nthreads)}")
if nprocs is not None:
args.append(f"--nprocs {int(nprocs)}")
if hostfile is not None:
args.append(f"--hostfile {hostfile}")
if cores_per_node is not None:
args.append(f"--cores-per-node {int(cores_per_node)}")
if auto_bzip:
args.append("--auto-bzip")
elif no_auto_bzip:
args.append("--no-auto-bzip")
if profile:
args.append("--profile")
elif no_profile:
args.append("--no-profile")
if mpi:
args.append("--mpi")
if scoop:
args.append("--scoop")
except Exception as e:
Console.error(f"[ERROR] Error interpreting the arguments"
f"[ERROR] {e.__class__}: {e}")
_rmdir(tmpdir)
raise
return -1
cmd = f"{metawards} {' '.join(args)}"
if dry_run:
Console.info(f"[DRY-RUN] {cmd}")
return_val = 0
else:
if output is not None:
Console.info(f"Writing output to directory {output}")
try:
import shlex
import subprocess
args = shlex.split(cmd)
with subprocess.Popen(args, stdin=sys.stdin,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE, bufsize=1,
universal_newlines=True) as PROC:
while True:
line = PROC.stdout.readline()
if not line:
break
if not silent:
sys.stdout.write(line)
sys.stdout.flush()
return_val = PROC.poll()
except Exception as e:
Console.error(f"[ERROR] {e.__class__}: {e}")
return_val = -1
if tmpdir is not None:
_rmdir(tmpdir)
if dry_run:
return
if output is None:
return
if return_val == 0:
results = os.path.join(output, "results.csv")
if not os.path.exists(results):
results += ".bz2"
if auto_load:
try:
import pandas
except ImportError:
Console.error("Cannot import pandas:\n{e}")
auto_load = False
if auto_load is None:
try:
import pandas
auto_load = True
except ImportError as e:
auto_load = False
if auto_load:
import pandas as pd
return pd.read_csv(results)
else:
return results
else:
Console.error(f"Something went wrong with the run. Please look "
f"at {output}/console.log.bz2 for more information")
return None