from __future__ import annotations
from typing import Union as _Union
from typing import List as _List
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ._disease import Disease
from ._demographics import Demographics
from ._demographic import Demographic
from ._parameters import Parameters
from ._wards import Wards
from ._ward import Ward
from ._variableset import VariableSet, VariableSets
from datetime import date
__all__ = ["run", "find_mw_exe", "find_mw_include", "find_mw_lib",
"get_reticulate_command"]
def _write_to_file(obj: any, filename: str, dir: str = ".", bzip: bool = False,
dry_run: bool = False) -> str:
"""Write the passed object to a file called 'filename' in
directory 'dir', returning the
relative path to that file
"""
import os
if dry_run:
return filename
filename = os.path.join(dir, filename)
if hasattr(obj, "to_json"):
return obj.to_json(filename, auto_bzip=bzip)
else:
raise IOError(f"Cannot convert {obj} to a file!")
return filename
def _rmdir(directory):
"""Function modified from one copied from 'mitch' on stackoverflow
https://stackoverflow.com/questions/13118029/deleting-folders-in-python-recursively
"""
if directory is None:
return
from pathlib import Path
directory = Path(directory)
# first, check for removing important directories such as $HOME or root
if directory == Path.home():
raise FileExistsError(f"We WILL NOT remove your "
f"home directory ${directory}")
if directory == Path("/"):
raise FileExistsError(f"We WILL NOT remove the root directory "
f"{directory}")
# get the directory containing '$HOME'
if directory == Path.home().parent:
raise FileExistsError(f"We WILL NOT remove the users/home "
f"directory {directory}")
if not directory.is_dir():
directory.unlink()
return
for item in directory.iterdir():
if item.is_dir():
_rmdir(item)
else:
item.unlink()
directory.rmdir()
def _is_executable(filename):
import os
if not os.path.exists(filename):
return None
if os.path.isdir(filename):
return None
# determining if this is executable
# on windows is really difficult, so just
# assume it is...
return filename
def _find_metawards(dirname):
import os
m = _is_executable(os.path.join(dirname, "metawards"))
if m:
return m
m = _is_executable(os.path.join(dirname, "metawards.exe"))
if m:
return m
m = _is_executable(os.path.join(dirname, "Scripts", "metawards"))
if m:
return m
m = _is_executable(os.path.join(dirname, "Scripts", "metawards.exe"))
if m:
return m
m = _is_executable(os.path.join(dirname, "bin", "metawards"))
if m:
return m
m = _is_executable(os.path.join(dirname, "bin", "metawards.exe"))
if m:
return m
return None
def _find_metawards_include(dirname):
import os
# this is from a metawards installation
m = os.path.abspath(os.path.join(dirname, "include", "metawards"))
if os.path.exists(m):
return m
# this is from a metawards source run (used for testing)
m = os.path.abspath(os.path.join(dirname, "src", "metawards"))
if os.path.exists(m):
return m
return None
def _find_metawards_lib(dirname):
import os
import glob
m = glob.glob(os.path.join(dirname, "lib", "libmetawards_*"))
if m is None:
m = []
if len(m) >= 1:
m = os.path.dirname(os.path.abspath(m[0]))
return m
m = glob.glob(os.path.join(dirname, "libmetawards_*"))
if m is None:
m = []
if len(m) >= 1:
m = os.path.dirname(os.path.abspath(m[0]))
return m
m = glob.glob(os.path.join(dirname, "lib*", "metawards_random.*"))
if m is None:
m = []
if len(m) >= 1:
m = os.path.dirname(os.path.abspath(m[0]))
return m
m = glob.glob(os.path.join(dirname, "metawards_random.*"))
if m is None:
m = []
if len(m) >= 1:
m = os.path.dirname(os.path.abspath(m[0]))
return m
return None
[docs]def find_mw_lib():
"""Try to find the directory containing the MetaWards libraries
(e.g. metawards_random).
This raises an exception if the libraries cannot be found.
It returns the full path to the library directory
"""
import metawards as _metawards
import os as _os
import sys as _sys
# Search through the path based on where the metawards module
# has been installed.
modpath = _metawards.__file__
metawards = None
# Loop only 100 times - this should break before now,
# We are not using a while loop to avoid an infinite loop
for i in range(0, 100):
metawards = _find_metawards_lib(modpath)
if metawards:
break
newpath = _os.path.dirname(modpath)
if newpath == modpath:
break
modpath = newpath
if metawards is not None:
return metawards
# Search from sys.prefix
modpath = _sys.prefix
# Loop only 100 times - this should break before now,
# We are not using a while loop to avoid an infinite loop
for i in range(0, 100):
metawards = _find_metawards_lib(modpath)
if metawards:
break
newpath = _os.path.dirname(modpath)
if newpath == modpath:
break
modpath = newpath
if metawards is not None:
return metawards
# This could have been put in the hostedtoolcache folder...
p = _os.path.abspath(_os.path.join(_os.path.dirname(_metawards.__file__),
"..", "hostedtoolcache"))
if _os.path.exists(p):
for dirpath, dirnames, filenames in _os.walk(p):
for filename in [f for f in filenames if (f.endswith(".lib") or
(f.endswith(".a")))]:
if filename.find("metawards") != -1:
metawards = dirpath
if metawards is None:
from .utils._console import Console
Console.error(
"Cannot find the metawards library directory, when starting from "
f"{_metawards.__file__}. Please could you "
"find it and then post an issue on the "
"GitHub repository (https://github.com/metawards/MetaWards) "
"as this may indicate a bug in the code.")
raise RuntimeError("Cannot locate the metawards library directory")
return metawards
[docs]def find_mw_include():
"""Try to find the directory containing the MetaWards include files.
This raises an exception if the include files cannot be found.
It returns the full path to the include files
"""
import metawards as _metawards
import os as _os
import sys as _sys
# Search through the path based on where the metawards module
# has been installed.
modpath = _metawards.__file__
metawards = None
# Loop only 100 times - this should break before now,
# We are not using a while loop to avoid an infinite loop
for i in range(0, 100):
metawards = _find_metawards_include(modpath)
if metawards:
break
newpath = _os.path.dirname(modpath)
if newpath == modpath:
break
modpath = newpath
if metawards is not None:
return metawards
# Search from sys.prefix
modpath = _sys.prefix
# Loop only 100 times - this should break before now,
# We are not using a while loop to avoid an infinite loop
for i in range(0, 100):
metawards = _find_metawards_include(modpath)
if metawards:
break
newpath = _os.path.dirname(modpath)
if newpath == modpath:
break
modpath = newpath
if metawards is None:
from .utils._console import Console
Console.error(
"Cannot find the metawards include directory, when starting from "
f"{_metawards.__file__}. Please could you "
"find it and then post an issue on the "
"GitHub repository (https://github.com/metawards/MetaWards) "
"as this may indicate a bug in the code.")
raise RuntimeError("Cannot locate the metawards include directory")
return metawards
[docs]def find_mw_exe():
"""Try to find the MetaWards executable. This should be findable
if MetaWards has been installed. This raises an exception
if it cannot be found. It returns the full path to the
executable
"""
import metawards as _metawards
import os as _os
import sys as _sys
# Search through the path based on where the metawards module
# has been installed.
modpath = _metawards.__file__
metawards = None
# Loop only 100 times - this should break before now,
# We are not using a while loop to avoid an infinite loop
for i in range(0, 100):
metawards = _find_metawards(modpath)
if metawards:
break
newpath = _os.path.dirname(modpath)
if newpath == modpath:
break
modpath = newpath
if metawards is not None:
return metawards
# Search from sys.prefix
modpath = _sys.prefix
# Loop only 100 times - this should break before now,
# We are not using a while loop to avoid an infinite loop
for i in range(0, 100):
metawards = _find_metawards(modpath)
if metawards:
break
newpath = _os.path.dirname(modpath)
if newpath == modpath:
break
modpath = newpath
if metawards is None:
# We couldn't find it that way - try another route...
dirpath = _os.path.join(_os.path.dirname(_sys.executable))
for option in [_os.path.join(dirpath, "metawards.exe"),
_os.path.join(dirpath, "metawards"),
_os.path.join(dirpath, "Scripts", "metawards.exe"),
_os.path.join(dirpath, "Scripts", "metawards")]:
if _os.path.exists(option):
metawards = option
break
if metawards is None:
# last attempt - is 'metawards' in the PATH?
from shutil import which
metawards = which("metawards")
if metawards is None:
from .utils._console import Console
Console.error(
"Cannot find the metawards executable. Please could you find "
"it and add it to the PATH. Or please post an issue on the "
"GitHub repository (https://github.com/metawards/MetaWards) "
"as this may indicate a bug in the code.")
raise RuntimeError("Cannot locate the metawards executable")
return metawards
[docs]def get_reticulate_command():
"""Print the reticulate command that you need to type
to be able to use the Python in which MetaWards is
installed
"""
import os as _os
import sys as _sys
pyexe = _os.path.abspath(_sys.executable)
return f"reticulate::use_python(\"{pyexe}\", required=TRUE)"
[docs]def run(help: bool = None,
version: bool = None,
dry_run: bool = None,
silent: bool = False,
auto_load: bool = False,
config: str = None,
input: _Union[str, VariableSet, VariableSets] = None,
line: int = None,
repeats: int = None,
seed: int = None,
additional: _Union[str, _List[str]] = None,
output: str = None,
disease: _Union[str, Disease] = None,
model: _Union[str, Wards, Ward] = None,
demographics: _Union[str, Demographics, Demographic] = None,
start_date: _Union[str, date] = None,
start_day: int = None,
parameters: _Union[str, Parameters] = None,
repository: str = None,
population: int = None,
nsteps: int = None,
user_variables: _Union[str, VariableSet] = None,
iterator: str = None,
extractor: str = None,
mixer: str = None,
mover: str = None,
star_as_E: bool = None,
star_as_R: bool = None,
disable_star: bool = None,
UV: float = None,
debug: bool = None,
debug_level: int = None,
outdir_scheme: str = None,
nthreads: int = None,
nprocs: int = None,
hostfile: str = None,
cores_per_node: int = None,
auto_bzip: bool = None,
no_auto_bzip: bool = None,
force_overwrite_output: bool = None,
profile: bool = None,
no_profile: bool = None,
mpi: bool = None,
scoop: bool = None) -> _Union[str, 'pandas.DataFrame']:
"""Run a MetaWards simulation
Parameters
----------
silent: bool
Run without printing the output to the screen
dry_run: bool
Don't run anything - just print what will be run
help: bool
Whether or not to print the full help
version: bool
Whether or not to print the metawards version info
output: str
The name of the directory in which to write the output. If this
is not set, then a new, random-named directory will be used.
force_overwrite_output: bool
Force overwriting the output directory - this will remove any
existing directory before running
auto_load: bool
Whether or not to automatically load and return a pandas dataframe
of the output/results.csv.bz2 file. If pandas is available then
this defaults to True, otherwise False
disease: Disease or str
The disease to model (or the filename of the json file containing
the disease, or name of the disease)
model: Ward, Wards or str
The network wards to run (of the filename of the json file
containing the network, or name of the network))
There are many more parameters, based on the arguments to
metawards --help.
Please set "help" to True to print out a full list of
help for all of the arguments
Returns
-------
results: str or pandas.DataFrame
The file containing the output results (output/results.csv.bz2),
or, if auto_load is True, the pandas.DataFrame containing
those results
"""
import sys
import os
import tempfile
from .utils._console import Console
metawards = find_mw_exe()
args = []
tmpdir = None
theme = "simple"
no_progress = True
no_spinner = True
if help:
args.append("--help")
output = None
elif version:
args.append("--version")
output = None
else:
if output is None and not dry_run:
output = tempfile.mkdtemp(prefix="output_", dir=".")
force_overwrite_output = True
if force_overwrite_output:
args.append("--force-overwrite-output")
else:
if output is None:
output = "output"
while os.path.exists(output):
import metawards as _metawards
print(f"Output directory {output} exists.")
output = _metawards.input("Please choose a new directory: ",
default="error")
if output is None:
return 0
output = output.strip()
if len(output) == 0:
return 0
if output.lower() == "error":
Console.error("You need to delete the directory or set "
"'force_overwrite_output' to TRUE")
return -1
try:
if config is not None:
args.append(f"--config {config}")
if input is not None:
if not isinstance(input, str):
if tmpdir is None:
tmpdir = tempfile.mkdtemp(prefix="input_", dir=".")
input = _write_to_file(input, "input.dat", dir=tmpdir,
bzip=False, dry_run=dry_run)
args.append(f"--input {input}")
if line is not None:
args.append(f"--line {int(line)}")
if repeats is not None:
args.append(f"--repeats {int(repeats)}")
if seed is not None:
args.append(f"--seed {int(seed)}")
if additional is not None:
if isinstance(additional, list):
additional = "\\n".join(additional)
elif not isinstance(additional, str):
additional = str(int(additional))
if "\"" in additional:
if sys.platform.startswith("win"):
additional.replace("\"", "'")
args.append(f"--additional \"{additional}\"")
else:
args.append(f"--additional '{additional}'")
else:
args.append(f"--additional \"{additional}\"")
if output is not None:
args.append(f"--output {output}")
if disease is not None:
if not isinstance(disease, str):
if tmpdir is None:
tmpdir = tempfile.mkdtemp(prefix="input_", dir=".")
disease = _write_to_file(disease, "disease.json",
dir=tmpdir,
bzip=False, dry_run=dry_run)
args.append(f"--disease {disease}")
if model is not None:
from ._ward import Ward
from ._wards import Wards
if isinstance(model, Ward):
m = Wards()
m.add(model)
model = m
if not isinstance(model, str):
if tmpdir is None:
tmpdir = tempfile.mkdtemp(prefix="input_", dir=".")
model = _write_to_file(model, "model.json", dir=tmpdir,
bzip=True, dry_run=dry_run)
args.append(f"--model {model}")
if demographics is not None:
from ._demographic import Demographic
from ._demographics import Demographics
if isinstance(demographics, Demographic):
d = Demographics()
d.add(demographics)
demographics = demographics
if not isinstance(demographics, str):
if tmpdir is None:
tmpdir = tempfile.mkdtemp(prefix="input_", dir=".")
demographics = _write_to_file(demographics,
"demographics.json",
dir=tmpdir,
bzip=False,
dry_run=dry_run)
args.append(f"--demographics {demographics}")
if start_date is not None:
from datetime import date
if isinstance(start_date, date):
start_date = date.isoformat()
args.append(f"--start-date {start_date}")
if start_day is not None:
args.append(f"--start-day {int(start_day)}")
if parameters is not None:
if not isinstance(parameters, str):
if tmpdir is None:
tmpdir = tempfile.mkdtemp(prefix="input_", dir=".")
parameters = _write_to_file(parameters, "parameters.dat",
dir=tmpdir, bzip=False,
dry_run=dry_run)
args.append(f"--parameters {parameters}")
if repository is not None:
args.append(f"--repository {repository}")
if population is not None:
args.append(f"--population {int(population)}")
if nsteps is not None:
args.append(f"--nsteps {int(nsteps)}")
if user_variables is not None:
if not isinstance(user_variables, str):
if tmpdir is None:
tmpdir = tempfile.mkdtemp(prefix="input_", dir=".")
user_variables = _write_to_file(user_variables,
"user_variables.dat",
dir=tmpdir,
bzip=False,
dry_run=dry_run)
args.append(f"--user {user_variables}")
if iterator is not None:
args.append(f"--iterator {iterator}")
if extractor is not None:
args.append(f"--extractor {extractor}")
if mixer is not None:
args.append(f"--mixer {mixer}")
if mover is not None:
args.append(f"--mover {mover}")
if star_as_E:
args.append("--star-as-E")
elif star_as_R:
args.append("--star-as-R")
elif disable_star:
args.append("--disable-star")
if UV is not None:
args.append(f"--UV {UV}")
if theme is not None:
args.append(f"--theme {theme}")
if no_spinner:
args.append("--no-spinner")
if no_progress:
args.append("--no-progress")
if debug:
args.append("--debug")
if debug_level is not None:
args.append(f"--debug-level {debug_level}")
if outdir_scheme is not None:
args.append(f"--outdir-scheme {outdir_scheme}")
if nthreads is not None:
args.append(f"--nthreads {int(nthreads)}")
if nprocs is not None:
args.append(f"--nprocs {int(nprocs)}")
if hostfile is not None:
args.append(f"--hostfile {hostfile}")
if cores_per_node is not None:
args.append(f"--cores-per-node {int(cores_per_node)}")
if auto_bzip:
args.append("--auto-bzip")
elif no_auto_bzip:
args.append("--no-auto-bzip")
if profile:
args.append("--profile")
elif no_profile:
args.append("--no-profile")
if mpi:
args.append("--mpi")
if scoop:
args.append("--scoop")
except Exception as e:
Console.error(f"[ERROR] Error interpreting the arguments"
f"[ERROR] {e.__class__}: {e}")
_rmdir(tmpdir)
raise
return -1
cmd = f"{metawards} {' '.join(args)}"
if dry_run:
Console.info(f"[DRY-RUN] {cmd}")
return_val = 0
else:
if output is not None:
Console.info(
f"Writing output to directory {os.path.abspath(output)}")
Console.info(f"[RUNNING] {cmd}")
try:
if sys.platform.startswith("win"):
# shlex.split doesn't work, but the command can
# be passed as a single string
args = cmd
else:
import shlex
args = shlex.split(cmd)
import subprocess
# We have to specify all of the pipes (stdin, stdout, stderr)
# as below as otherwise we will break metawards on Windows
# (especially needed to allow metawards to run under
# reticulate via metawards$run. Without these specified
# we end up with Windows File Errors)
with subprocess.Popen(args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1, encoding="utf8",
errors="ignore",
text=True) as PROC:
while True:
line = PROC.stdout.readline()
if not line:
break
if not silent:
try:
sys.stdout.write(line)
sys.stdout.flush()
except UnicodeEncodeError:
# We get frequent unicode errors when run
# within RStudio. It is best just to ignore them
pass
except Exception as e:
Console.error(f"WRITE ERROR: {e.__class__} : {e}")
return_val = PROC.poll()
if return_val is None:
# get None if everything OK on Windows
# (sometimes windows returns 0 as None, which
# breaks things!)
return_val = 0
except Exception as e:
Console.error(f"[ERROR] {e.__class__}: {e}")
return_val = -1
if tmpdir is not None:
_rmdir(tmpdir)
if dry_run:
return
if output is None:
return
if return_val == 0:
results = os.path.join(output, "results.csv")
if not os.path.exists(results):
results += ".bz2"
if auto_load:
try:
import pandas
except ImportError:
Console.error("Cannot import pandas:\n{e}")
auto_load = False
if auto_load is None:
try:
import pandas
auto_load = True
except ImportError:
auto_load = False
if auto_load:
import pandas as pd
return pd.read_csv(results)
else:
return results
else:
output_file = os.path.join(output, "console.log.bz2")
Console.error(f"Something went wrong with the run. Please look "
f"at {output_file} for more information")
return None