from typing import List as _List
from typing import Dict as _Dict
from typing import Union as _Union
from datetime import date as _date
__all__ = ["VariableSets", "VariableSet"]
def _set_beta(params, name: str, index: _Union[int, str], value: float):
"""Adjust the Disease.beta parameter"""
index = params.disease_params.get_index(index)
params.disease_params.beta[index] = float(value)
def _set_progress(params, name: str, index: int, value: float):
"""Adjust the Disease.progress parameter"""
index = params.disease_params.get_index(index)
params.disease_params.progress[index] = float(value)
def _set_too_ill_to_move(params, name: str, index: int, value: float):
"""Adjust the Disease.too_ill_to_move parameter"""
index = params.disease_params.get_index(index)
params.disease_params.too_ill_to_move[index] = float(value)
def _set_contrib_foi(params, name: str, index: int, value: float):
"""Adjust the Disease.contrib_foi parameter"""
index = params.disease_params.get_index(index)
params.disease_params.contrib_foi[index] = float(value)
def _set_lengthday(params, name: str, index: int, value: float):
"""Adjust the Parameters.length_day parameter"""
if index is not None:
raise IndexError("You cannot index the lengthday")
params.length_day = float(value)
def _set_uv(params, name: str, index: int, value: float):
"""Adjust the Parameters.UV parameter"""
if index is not None:
raise IndexError("You cannot index the UV parameter")
params.UV = float(value)
def _set_UV_max(params, name: str, index: int, value: _date):
"""Adjust the Parameters.UV parameter"""
if index is not None:
raise IndexError("You cannot index the UV_max parameter")
if not isinstance(value, _date):
raise TypeError("The passed UV_max must be a date")
params.UV_max = value
def _set_initial_inf(params, name: str, index: int, value: float):
"""Adjust the Parameters.initial_inf parameter"""
if index is not None:
raise IndexError("You cannot index the initial_inf parameter")
params.initial_inf = float(value)
def _set_static_play_at_home(params, name: str, index: int, value: float):
"""Adjust the Parameters.static_play_at_home parameter"""
if index is not None:
raise IndexError("You cannot index the static_play_at_home parameter")
params.static_play_at_home = float(value)
def _set_dyn_play_at_home(params, name: str, index: int, value: float):
"""Adjust the Parameters.dyn_play_at_home parameter"""
if index is not None:
raise IndexError("You cannot index the dyn_play_at_home parameter")
params.dyn_play_at_home = float(value)
def _set_dyn_dist_cutoff(params, name: str, index: int, value: float):
"""Adjust the Parameters.dyn_dist_cutoff parameter"""
if index is not None:
raise IndexError("You cannot index the dyn_dist_cutoff parameter")
params.dyn_dist_cutoff = float(value)
def _set_play_to_work(params, name: str, index: int, value: float):
"""Adjust the Parameters.play_to_work parameter"""
if index is not None:
raise IndexError("You cannot index the play_to_work parameter")
params.play_to_work = float(value)
def _set_work_to_play(params, name: str, index: int, value: float):
"""Adjust the Parameters.work_to_play parameter"""
if index is not None:
raise IndexError("You cannot index the work_to_play parameter")
params.work_to_play = float(value)
def _set_scale_uv(params, name: str, index: int, value: float):
"""Adjust the Parameters.scale_uv parameter"""
if index is not None:
raise IndexError("You cannot index the scale_uv parameter")
value = float(value)
if value < 0:
raise ValueError("Negative values of scale_uv are not permitted.")
params.scale_uv = value
def _set_bg_foi(params, name: str, index: int, value: float):
"""Adjust the Parameters.bg_foi parameter"""
if index is not None:
raise IndexError("You cannot index the bg_foi parameter")
params.bg_foi = float(value)
def _set_daily_imports(params, name: str, index: int, value: float):
"""Adjust the Parameters.daily_imports parameter"""
if index is not None:
raise IndexError("You cannot index the daily_imports parameter")
params.daily_imports = float(value)
def _set_user_params(params, name: str, index: int, value: float):
"""Adjust a custom user-supplied parameter, held in
Parameters.user_params[name]. Set a user parameter
called 'name' via 'user.name' or '.name'.
"""
if name.startswith("user."):
name = name[5:]
elif name.startswith("."):
name = name[1:]
if params.user_params is None:
params.user_params = {}
if index is None:
params.user_params[name] = value
else:
if name not in params.user_params:
params.user_params[name] = []
while len(params.user_params[name]) <= index:
params.user_params[name].append(None)
params.user_params[name][index] = value
_adjustable = {}
_adjustable["beta"] = _set_beta
_adjustable["progress"] = _set_progress
_adjustable["too_ill_to_move"] = _set_too_ill_to_move
_adjustable["contrib_foi"] = _set_contrib_foi
_adjustable["user"] = _set_user_params
_adjustable["length_day"] = _set_lengthday
_adjustable["UV"] = _set_uv
_adjustable["initial_inf"] = _set_initial_inf
_adjustable["static_play_at_home"] = _set_static_play_at_home
_adjustable["dyn_play_at_home"] = _set_dyn_play_at_home
_adjustable["dyn_dist_cutoff"] = _set_dyn_dist_cutoff
_adjustable["play_to_work"] = _set_play_to_work
_adjustable["work_to_play"] = _set_work_to_play
_adjustable["daily_imports"] = _set_daily_imports
_adjustable["scale_uv"] = _set_scale_uv
_adjustable["bg_foi"] = _set_bg_foi
_adjustable["UV_max"] = _set_UV_max
def _clean(x):
"""Clean the passed string by stripping off unnecesary characters,
and turning "True" and "False" into 1 and 0. Also change
'=' and ':' into '=='
"""
x = x.strip()
if x.lower() == "true":
return True
elif x.lower() == "false":
return False
elif x == "=" or x == ":":
return "=="
elif x.endswith("=="):
return x[0:-2], "=="
elif x.endswith(":") or x.endswith("="):
return x[0:-1], "=="
else:
return x
def _interpret(value):
if not isinstance(value, str):
return value
canonical = value.lower().replace('"', "'")
from ._interpret import Interpret
if canonical.startswith("d'"):
# this is a date
try:
return Interpret.date(value[2: -1])
except Exception:
pass
from datetime import date
return date.fromisoformat(value[2: -1])
elif canonical.startswith("f'"):
# this is a floating point number
return float(Interpret.number(value[2: -1]))
elif canonical.startswith("i'"):
# this is an integer
return Interpret.integer(value[2:-1])
elif canonical.startswith("s'"):
# this is a string
return Interpret.string(value[2:-1])
elif canonical.startswith("b'"):
# this is a boolean
return Interpret.boolean(value[2:-1])
elif canonical.startswith("[") and canonical.endswith("]"):
return Interpret.list(value)
elif canonical.startswith("(") and canonical.endswith(")"):
return Interpret.list(value)
# now we have to guess...
try:
v = float(value)
if v.is_integer():
return int(v)
else:
return v
except Exception:
pass
try:
from datetime import date
return date.fromisoformat(value)
except Exception:
pass
try:
from ._interpret import Interpret
return Interpret.number(value)
except Exception:
pass
try:
from ._interpret import Interpret
return Interpret.boolean(value)
except Exception:
pass
# don't try to interpret complex dates from strings as this
# leads to weird errors when the string is definitely not a date,
# but dateparser thinks it is, e.g. "a" is a date!
# this can only be a string...
return Interpret.string(value)
def _wrap(text, width=70):
"""Return 'text' wrapped to at most 'width' characters, as
a list of lines
"""
import textwrap
text = text.strip().replace(r"\s+", " ").replace("\n", " ")
return textwrap.wrap(text, width)
[docs]class VariableSet:
"""This class holds a single set of adjustable variables that
are used to adjust the variables as part of a model run
Examples
--------
>>> v = VariableSet()
>>> v["beta[1]"] = 0.95
>>> v["beta[2]"] = 0.9
>>> print(v.fingerprint())
(beta[1]=0.95, beta[2]=0.9)[repeat 1]
>>> params = Parameters()
>>> params.set_disease("ncov")
>>> v.adjust(params)
>>> print(params.disease_params.beta[1],
>>> params.disease_params.beta[2])
0.95 0.9
"""
[docs] def __init__(self,
variables: _Dict[str, float] = None,
repeat_index: int = 1,
names: _List[str] = None,
values: _List[float] = None):
"""Construct a new VariableSet from the passed adjusted variable
values.
Parameters
----------
names: List[str]
The list of the names of the variables to adjust
values: List[float]
The values of the variables to adjust (same order as names)
variables: Dict[str, float]
names and values of variables to adjust passed as a dictionary
repeat_index: int
the index used to distinguish different repeats of the same
VariableSet from one another
Examples
--------
>>> v = VariableSet()
>>> v["beta[1]"] = 0.95
>>> v["beta[2]"] = 0.9
>>> print(v.fingerprint())
(beta[1]=0.95, beta[2]=0.9)[repeat 1]
"""
self._names = None
self._vals = None
self._varnames = None
self._varidxs = None
self._idx = None
self._nrepeats = None
self._output = None
if variables is not None:
for name, value in variables.items():
self._add(name, value)
if values is not None:
if names is None:
names = ["beta[2]", "beta[3]", "progress[1]",
"progress[2]", "progress[3]"]
if len(names) != len(values):
raise IndexError(
f"The number of variable values '{values}' must equal "
f"the number of variable names '{names}'")
for name, value in zip(names, values):
self._add(name, value)
self._idx = repeat_index
self._nrepeats = max(1, repeat_index)
[docs] def __str__(self):
"""Return a printable representation of the variables to
be adjusted
"""
s = []
if self._vals is not None and len(self._vals) > 0:
for key, value in zip(self._names, self._vals):
s.append(f"{key}={value}")
if len(s) == 0:
return f"(NO_CHANGE)[repeat {self._idx}]"
else:
return f"({', '.join(s)})[repeat {self._idx}]"
[docs] def __repr__(self):
return str(self)
[docs] def __eq__(self, other):
if isinstance(other, dict):
other = VariableSet(variables=other)
if self.__class__ != other.__class__:
return False
if self._idx != other._idx:
return False
if len(self) != len(other):
return False
if self._vals is None:
return
for i in range(0, len(self._vals)):
if self._vals[i] != other._vals[i]:
return False
if self._varnames[i] != other._varnames[i]:
return False
if self._varidxs[i] != other._varidxs[i]:
return False
if self._names[i] != other._names[i]:
return False
return True
def __len__(self):
if self._vals is None:
return 0
else:
return len(self._vals)
[docs] def is_empty(self):
"""Return whether or not there is nothing to change"""
return len(self) == 0
def __getitem__(self, key):
if self._vals is None:
raise KeyError(f"No adjustable parameter {key} in an empty set")
if key.startswith("user."):
key = key[4:]
for i, name in enumerate(self._names):
if key == name:
return self._vals[i]
raise KeyError(f"No adjustable parameter {key}. Available parameters "
f"are '{self._names}'")
def __setitem__(self, key, value):
if self._names is None:
self._names = []
for i, name in enumerate(self._names):
if key == name:
self._vals[i] = value
return
self._add(key, value)
[docs] def set_value(self, key, value):
"""Set the value of 'key' to 'value'"""
self.__setitem__(key, value)
[docs] def get_value(self, key):
"""Return the value of 'key'"""
return self.__getitem__(key)
def _add(self, name, value):
"""Internal function to add a new variable called 'name' to
be varied - it will be set equal to 'value'
"""
import re
if self._vals is None:
self._names = []
self._vals = []
self._varnames = []
self._varidxs = []
name = name.strip()
if name == "output":
# this is the output directory for this run. This will
# replace the default name, which is based on the fingerprint
value = str(value).strip()
if len(value) == 0:
raise ValueError("You cannot use an empty string as the "
"output directory")
self._output = value
return
# look for 'variable[index]', demographic:variable[index],
# .variable[index], user.variable[index],
# variable["key"], variable['key'] and all combinations of above
m = re.search(r"([\s:\.\w]+)\[\s*([\d+]|\".+\"|'.+')\s*\]", name)
if m:
varname = m.group(1)
index = m.group(2)
if index.startswith('"') or index.startswith("'"):
index = index[1:-1]
else:
index = int(index)
value = value
else:
varname = name
index = None
value = value
if varname.find(":") != -1:
# this sets the variable in a demographic
parts = varname.split(":")
if len(parts) != 2:
raise KeyError(
f"It is not possible to adjust the variable {name} to "
f"equal {value} as it is not in the format "
f"variable=value or demographic:variable=value")
demographic = parts[0]
varname = parts[1]
else:
demographic = None
if not (varname.startswith("user.") or varname.startswith(".") or
varname in _adjustable):
from .utils._console import Console
Console.error(f"It is not possible to adjust the variable "
f"{name} to equal {value}. Available variables "
f"are {list(_adjustable.keys())}, or to set "
f"a user parameter 'user.parameter' or "
f"'.parameter'. To set an index use "
f"'parameter[index]', e.g. 'beta[2]', "
f"'parameter[\"key\"]', e.g. 'beta[\"I1\"]'")
raise KeyError(f"It is not possible to adjust {name} to "
f"equal {value}.")
if varname.startswith("user."):
# strip off 'user' so that it just starts with a dot
varname = varname[4:]
if name.startswith("user."):
name = name[4:]
if demographic is None:
self._varnames.append(varname)
else:
self._varnames.append((demographic, varname))
self._varidxs.append(index)
self._names.append(name)
self._vals.append(value)
[docs] @staticmethod
def adjustable_help():
"""Return a string that contains some help that is useful
for finding out which variables can be adjusted
"""
lines = []
names = list(_adjustable.keys())
names.sort()
for name in names:
func = _adjustable[name]
docs = _wrap(func.__doc__, 60)
lines.append("")
lines.append(f"{name}:")
for doc in docs:
lines.append(f" {doc}")
return "\n".join(lines)
[docs] def variable_names(self):
"""Return the names of the variables that will be adjusted
by this VariableSet
Returns
-------
names: List[str]
The list of names of variables to be adjusted
"""
if self._vals is None or len(self._vals) == 0:
return None
else:
from copy import deepcopy
return deepcopy(self._names)
[docs] def variable_values(self):
"""Return the values that the variables will be adjusted to.
Note that 'None' means that the variable won't be adjusted
from its default (original) value
Returns
-------
values: List[float]
The list of values for variables to be adjusted to
"""
if self._vals is None or len(self._vals) == 0:
return None
else:
from copy import deepcopy
return deepcopy(self._vals)
[docs] def variables(self):
"""Return the variables (name and values) to be adjusted
Returns
-------
variables: Dict[str, float]
The dictionary mapping the names of the variables that
with be adjusted to their desired values
"""
v = {}
for name, value in zip(self._names, self._vals):
v[name] = value
return v
[docs] def repeat_index(self):
"""Return the repeat index of this set. The repeat index is the
ID of this set if the VariableSet is repeated. The index should
range from 1 to nrepeats
Returns
-------
index: int
The repeat index of this set
"""
return self._idx
[docs] def make_compatible_with(self, other):
"""Return a copy of this VariableSet which has been made
compatible with 'other'. This means that it will change
the same variables as 'other', e.g. by adding 'None'
changes for missing variables. This will raise an error
if it is not possible to make this set compatible
Parameters
----------
other: VariableSet
The passed VariableSet for which this should be made compatible
Returns
-------
result: VariableSet
A copy of this VariableSet which is now compatible with 'other'
Example
-------
>>> v1 = VariableSet()
>>> v1["beta[1]"] = 0.9
>>> v1["beta[2]"] = 0.8
>>> v2 = VariableSet()
>>> v2["beta[1]"] = 0.6
>>> v2 = v2.make_compatible_with(v1)
>>> print(v2)
(beta[1]=0.6, beta[2]=0.8)[repeat 1]
"""
from copy import deepcopy
if self._names is None:
v = deepcopy(other)
v._idx = self._idx
return v
if other._names is None:
raise ValueError(f"VariableSet {self} is not compatible with "
f"VariableSet {other}")
nmatch = 0
for name in self._names:
if name not in other._names:
raise ValueError(f"VariableSet {self} is not compatible with "
f"VariableSet {other}")
nmatch += 1
if len(other._names) == nmatch:
# fully compatible
return deepcopy(self)
v = deepcopy(self)
for name in other._names:
if name not in self._names:
v[name] = other[name]
return v
@staticmethod
def _extract_values(fingerprint: str):
fingerprint = str(fingerprint)
# has this fingerprint been added onto the end of a filename
# using a hyphen or underscore? If so, then remove this
if fingerprint.find("-") != -1:
fingerprint = fingerprint.split("-")[-1]
if fingerprint.find("_") != -1:
fingerprint = fingerprint.split("_")[-1]
fingerprint.strip()
# does this have a repeat index - if so, this is the last
# value at the end after the 'x'
parts = fingerprint.split("x")
if len(parts) > 1:
repeat_index = int(parts[-1])
fingerprint = "x".join(parts[0:-1])
else:
repeat_index = None
# now get the values
values = []
if fingerprint != "REPEAT":
for part in fingerprint.split("v"):
try:
if part == "T":
values.append(True)
elif part == "F":
values.append(False)
else:
values.append(float(part.replace("i", ".")))
except Exception:
# this is not part of the fingerprint
pass
return (values, repeat_index)
[docs] @staticmethod
def create_fingerprint(vals: _List[float], index: int = None,
include_index: bool = False):
"""Create the fingerprint for the passed values"""
f = None
if vals is None or len(vals) == 0:
f = "REPEAT"
else:
for val in vals:
if isinstance(val, bool):
if val:
v = "T"
else:
v = "F"
elif isinstance(val, list):
v = [str(x) for x in val]
v = "|".join(v)
else:
v = float(val)
if v.is_integer():
v = int(val)
v = f"{v}.0"
v = str(v).replace(".", "i")
if f is None:
f = v
else:
f += "v" + v
if include_index:
return "%sx%03d" % (f, index)
else:
return f
[docs] def fingerprint(self, include_index: bool = False):
"""Return a fingerprint for this VariableSet. This can be
used to quickly identify and distinguish the values of
the variables in this set from the values in other
VariableSets which have the same adjustable variables,
but different parameters
Parameters
----------
include_index: bool
Whether or not to include the repeat_index in the fingerprint
Returns
-------
fingerprint: str
The fingerprint for this VariableSet
"""
return VariableSet.create_fingerprint(vals=self._vals,
index=self._idx,
include_index=include_index)
[docs] def output_dir(self):
"""Return the output directory in which runs using this
variable set should be placed. Normally this would
be the fingerprint of the variable set, but users may
prefer to specify their own naming scheme, which
can be added via a design file
"""
if self._output is None:
return self.fingerprint(include_index=True)
else:
if self._nrepeats is None or self._nrepeats < 2:
return self._output
else:
return "%sx%03d" % (self._output, self._idx)
[docs] @staticmethod
def read(filename: str):
"""Read a single set of adjustable variables from the passed
file. The file can either write the variables in horizontal
or vertical mode, using space or comma separated values.
This is useful for when you want to set a global set of parameters
at the start of a calculation and don't want to use
a large VariableSets
Parameters
----------
filename: str
The name of the file containing the VariableSet
Returns
-------
variables: VariableSet
The VariableSet that has been read
"""
variables = VariableSets.read(filename)
if len(variables) == 0:
return VariableSet()
elif len(variables) != 1:
raise ValueError(
"More than one set of variables was read from {filename}")
else:
return variables[0]
[docs] def adjust(self, params): # should be 'Parameters' but circular include
"""Use the variables in this set to adjust the passed parameters.
Note that this directly modifies 'params'
Parameters
----------
params: Parameters
The parameters whose variables will be adjusted
Returns
-------
None
Examples
--------
>>> v = VariableSet()
>>> v["beta[1]"] = 0.95
>>> v["beta[2]"] = 0.9
>>> print(v.fingerprint())
(beta[1]=0.95, beta[2]=0.9)[repeat 1]
>>> params = Parameters()
>>> params.set_disease("ncov")
>>> v.adjust(params)
>>> print(params.disease_params.beta[1],
>>> params.disease_params.beta[2])
0.95 0.9
"""
if self._vals is None or len(self._vals) == 0:
return
try:
# loop over all adjustable variables, and adjust global
# (all demographic) variables first
specialised = params.specialised_demographics()
for varname, varidx, value in zip(self._varnames, self._varidxs,
self._vals):
if isinstance(varname, tuple):
continue
if varname.startswith("user.") or varname.startswith("."):
_adjustable["user"](params=params,
name=varname,
index=varidx,
value=value)
for s in specialised:
_adjustable["user"](params=params[s],
name=varname,
index=varidx,
value=value)
elif varname in _adjustable:
_adjustable[varname](params=params,
name=varname,
index=varidx,
value=value)
for s in specialised:
_adjustable[varname](params=params[s],
name=varname,
index=varidx,
value=value)
else:
raise KeyError(
f"Cannot set unrecognised parameter {varname} "
f"to {value}")
# now loop over and adjust the demographic-specific values
# (except for 'overall', which must be done last)
need_overall = False
for varname, varidx, value in zip(self._varnames, self._varidxs,
self._vals):
if isinstance(varname, tuple):
demographic, varname = varname
else:
continue
if demographic == "overall":
need_overall = True
continue
if varname.startswith("user.") or varname.startswith("."):
_adjustable["user"](params=params[demographic],
name=varname,
index=varidx,
value=value)
elif varname in _adjustable:
_adjustable[varname](params=params[demographic],
name=varname,
index=varidx,
value=value)
else:
raise KeyError(
f"Cannot set unrecognised parameter {varname} "
f"to {value} in demographic {demographic}")
# finally(!) loop over and adjust the overall values
if need_overall:
for varname, varidx, value in zip(self._varnames,
self._varidxs,
self._vals):
if isinstance(varname, tuple):
demographic, varname = varname
else:
continue
if demographic != "overall":
continue
if varname.startswith("user.") or varname.startswith("."):
_adjustable["user"](params=params[demographic],
name=varname,
index=varidx,
value=value)
elif varname in _adjustable:
_adjustable[varname](params=params[demographic],
name=varname,
index=varidx,
value=value)
else:
raise KeyError(
f"Cannot set unrecognised parameter {varname} "
f"to {value} in demographic {demographic}")
# save this adjustment to the record in the parameters
# object
if params.adjustments is None:
params.adjustments = []
params.adjustments.append(self)
except Exception as e:
raise ValueError(
f"Unable to set parameters from {self}. Error "
f"equals {e.__class__}: {e}")
[docs] def to_data(self):
"""Return a data dictionary that can be serialised to json"""
data = {}
if self._names is None:
return data
d = {}
for name, value in zip(self._names, self._vals):
name = str(name)
try:
value = float(value)
if value.is_integer():
d[name] = int(value)
else:
d[name] = value
except Exception:
d[name] = str(value)
data["variables"] = d
if self._idx != 1:
data["index"] = self._idx
if self._nrepeats != 1:
data["nrepeats"] = self._nrepeats
if self._output:
data["output"] = str(self._output)
return data
[docs] @staticmethod
def from_data(data):
"""Build a VariableSet from the passed data dictionary (which
may have been deserialised from json
"""
v = VariableSet()
d = data.get("variables", {})
for key, value in d.items():
v[key] = value
v._idx = data.get("index", 1)
v._nrepreats = data.get("nrepeats", 1)
v._output = data.get("output", None)
return v
[docs] def to_json(self, filename: str = None, indent: int = None,
auto_bzip: bool = True) -> str:
"""Serialise the VariableSet to JSON. This will write to a file
if filename is set, otherwise it will return a JSON string.
Parameters
----------
filename: str
The name of the file to write the JSON to. The absolute
path to the written file will be returned. If filename is None
then this will serialise to a JSON string which will be
returned.
indent: int
The number of spaces of indent to use when writing the json
auto_bzip: bool
Whether or not to automatically bzip2 the written json file
Returns
-------
str
Returns either the absolute path to the written file, or
the json-serialised string
"""
import json
if indent is not None:
indent = int(indent)
if filename is None:
return json.dumps(self.to_data(), indent=indent)
else:
from pathlib import Path
filename = str(Path(filename).expanduser().resolve().absolute())
if auto_bzip:
if not filename.endswith(".bz2"):
filename += ".bz2"
import bz2
with bz2.open(filename, "wt") as FILE:
try:
json.dump(self.to_data(), FILE, indent=indent)
except Exception:
import os
FILE.close()
os.unlink(filename)
raise
else:
with open(filename, "w") as FILE:
try:
json.dump(self.to_data(), FILE, indent=indent)
except Exception:
import os
FILE.close()
os.unlink(filename)
raise
return filename
[docs] @staticmethod
def from_json(s: str):
"""Construct and return VariableSet loaded from the passed
json file
"""
import os
import json
if os.path.exists(s):
try:
import bz2
with bz2.open(s, "rt") as FILE:
data = json.load(FILE)
except Exception:
data = None
if data is None:
with open(s, "rt") as FILE:
data = json.load(FILE)
else:
try:
data = json.loads(s)
except Exception:
data = None
if data is None:
from .utils._console import Console
Console.error(
f"Unable to load the VariableSet from '{s}'. Check that "
f"this is valid JSON or that the file exists.")
raise IOError(f"Cannot load VariableSet from '{s}'")
return VariableSet.from_data(data)
[docs]class VariableSets:
"""This class holds the collection of all VariableSet objects
that contain the set of adjustable variables that are used
to control a single run of the model
Examples
--------
>>> v = VariableSets()
>>> v.append({"beta[2]": 0.95, "beta[3]": 0.9})
>>> v.append({"beta[1]": 0.86, "beta[2]": 0.89})
>>> print(v)
{(beta[2]=0.95, beta[3]=0.9)[repeat 1], (beta[1]=0.86,
beta[2]=0.89)[repeat 1]}
>>> v = v.repeat(2)
>>> print(v)
{(beta[2]=0.95, beta[3]=0.9)[repeat 1], (beta[1]=0.86,
beta[2]=0.89)[repeat 1], (beta[2]=0.95, beta[3]=0.9)[repeat 2],
(beta[1]=0.86, beta[2]=0.89)[repeat 2]}
"""
[docs] def __init__(self):
"""Initialise an empty VariableSets object
Parameters
----------
None
Returns
-------
None
"""
self._vars = []
[docs] def __str__(self):
s = []
for v in self._vars:
s.append(str(v))
if len(s) == 1:
return s[0]
elif len(s) > 0:
return "{" + ",\n ".join(s) + "\n}"
else:
return "VariableSets:empty"
[docs] def __repr__(self):
return str(self)
[docs] def __eq__(self, other):
if isinstance(other, dict):
v = VariableSet(variables=other)
other = VariableSets()
other.append(v)
if len(self._vars) != len(other._vars):
return False
for v0, v1 in zip(self._vars, other._vars):
if v0 != v1:
return False
return True
def __len__(self):
if self._vars is None:
return 0
else:
return len(self._vars)
[docs] def __getitem__(self, i: int):
"""Return the VariableSet at the specified index"""
if self._vars is None:
raise IndexError("Cannot index an empty VariableSets object")
else:
return self._vars[i]
[docs] def append(self, variables: VariableSet):
"""Append the passed set of variables to the set that will
be used to run a model. If there are any existing
VariableSet objects in this list, then the new VariableSet
must adjust the same variables
Parameters
----------
variables: VariableSet
The VariableSet to append to this list. If you pass a
dict of {str: float} values, then this will automatically
be converted into a VariableSet. Note that all VariableSet
objects in a VariableSets must adjust the same variables
Returns
-------
None
"""
if isinstance(variables, dict):
variables = VariableSet(variables=variables)
if self._vars is None:
self._vars = []
if len(self._vars) > 0:
variables = variables.make_compatible_with(self._vars[0])
self._vars.append(variables)
[docs] def repeat(self, nrepeats: _Union[_List[int], int]):
"""Return a copy of this VariableSet in which all of the
unique VaribleSet objects have been repeated 'nrepeats'
times
Parameters
----------
nrepeats: int or list[int]
The number of repeats of the VariableSet objects to
perform. If this is a list, then nrepeats[i] will be
the number of times to repeat variables[i]
Returns
-------
repeats: VariableSets
A new VariableSets object containing 'nrepeats' copies
of the VariableSet objects from this set
"""
if not isinstance(nrepeats, list):
nrepeats = [nrepeats]
if len(nrepeats) == 1 and nrepeats[0] <= 1:
return self
from copy import deepcopy
repeats = VariableSets()
if len(nrepeats) == 1:
nrepeats = nrepeats[0]
for i in range(1, nrepeats+1):
for v in self._vars:
v2 = deepcopy(v)
v2._idx = i
v2._nrepeats = nrepeats
repeats.append(v2)
else:
if len(nrepeats) != len(self._vars):
raise ValueError(
f"Disagreement of the number of repeats {len(nrepeats)} "
f"and the number of variables {len(self._vars)}")
added = True
n = 1
while added:
added = False
for i, v in enumerate(self._vars):
if n <= nrepeats[i]:
v2 = deepcopy(v)
v2._idx = n
v2._nrepeats = nrepeats[i]
repeats.append(v2)
added = True
n += 1
return repeats
[docs] @staticmethod
def read(filename: str, line_numbers: _List[int] = None):
"""Read and return collection of VariableSet objects from the
specified line number(s) of the specified file
Parameters
----------
filename: str
The name of the file from which to read the VariableSets
line_numbers: List[int]
The line numbers from the file to read. This is 0-indexed,
meaning that the first line is line 0. If this is None,
then all lines are read and used
Returns
-------
variables: VariableSets
The collection of VariableSet objects that have been read,
in the order they were read from the file
"""
if not isinstance(line_numbers, list):
if line_numbers is not None:
line_numbers = [line_numbers]
# parse all lines using the csv module
import csv
lines = open(filename, "r").readlines()
csvlines = []
for line in lines:
line = line.strip()
if len(line) > 0 and (not line.startswith("#")):
csvlines.append(line)
lines = []
# first try to guess the dialect of the file (space or comma
# separated, newline character etc.)
try:
dialect = csv.Sniffer().sniff(csvlines[0], delimiters=[" ", ","])
except Exception:
words = csvlines[0].strip().split(" ")
if len(words) > 1:
from .utils._console import Console
Console.warning(
f"Could not identify what sort of separator to use to "
f"read {filename}, so will assume commas. If this is "
f"wrong, then could you add commas to separate the "
f"fields?")
dialect = csv.excel # default comma-separated file
for line in csv.reader(csvlines, dialect=dialect,
quoting=csv.QUOTE_ALL,
skipinitialspace=True):
cleaned = [_clean(x) for x in line]
line = []
if len(cleaned) > 0:
for clean in cleaned:
if isinstance(clean, list) or isinstance(clean, tuple):
for c in clean:
if len(c) > 0:
line.append(c)
else:
line.append(clean)
lines.append(line)
if len(lines) == 0:
# there is nothing to read?
return VariableSets()
if lines[0][0].find("=") != -1 or lines[0][0].find(":") != -1:
# this is a vertical file that should be split on spaces
lines = []
for line in csvlines:
cleaned = [_clean(x) for x in line.split()]
line = []
if len(cleaned) > 0:
for clean in cleaned:
if isinstance(clean, list) or isinstance(clean, tuple):
for c in clean:
if len(c) > 0:
line.append(c)
else:
line.append(clean)
lines.append(line)
if len(lines) == 0:
# there is nothing to read?
return VariableSets()
if len(lines[0]) > 1 and lines[0][1] == "==":
# this is a vertical file
if line_numbers is not None:
raise ValueError(
"You cannot specify line numbers for a vertical file!")
return VariableSets._read_vertical(lines)
else:
return VariableSets._read_horizontal(lines=lines,
line_numbers=line_numbers)
@staticmethod
def _read_vertical(lines):
"""Read the data from the vertical lines"""
variables = VariableSets()
variable = VariableSet()
for line in lines:
if len(line) > 1:
key = line[0]
if line[1] == "==":
if len(line) > 3:
value = " ".join(line[2:])
else:
value = line[2]
else:
if len(line) > 2:
value = " ".join(line[1:])
else:
value = line[1]
variable[key] = _interpret(value)
variables.append(variable)
return variables
@staticmethod
def _read_horizontal(lines, line_numbers):
"""Read the data from the horizontal lines"""
variables = VariableSets()
# are there any strings on the first line? If so, then these
# are the titles
has_titles = False
for v in lines[0]:
try:
float(v)
except Exception:
has_titles = True
break
if has_titles:
titles = lines[0]
lines = lines[1:]
else:
# default adjustable variables
titles = ["beta[2]", "beta[3]", "progress[1]",
"progress[2]", "progress[3]"]
if "repeats" in titles:
repeats_index = titles.index("repeats")
titles.pop(repeats_index)
else:
repeats_index = None
repeats = []
for i, line in enumerate(lines):
if line_numbers is None or i in line_numbers:
values = [_interpret(x) for x in line]
if repeats_index is not None:
repeats.append(int(values.pop(repeats_index)))
variable = VariableSet()
for j, key in enumerate(titles):
variable[key] = values[j]
variables.append(variable)
if line_numbers is not None:
if len(line_numbers) == len(variables):
if repeats_index is not None:
variables = variables.repeat(repeats)
return variables
# get here if we can't find this line in the file (or if we
# are supposed to read all lines)
if line_numbers is None:
if repeats_index is not None:
variables = variables.repeat(repeats)
return variables
else:
raise ValueError(
f"Cannot read parameters from line {line_numbers} "
f"as the number of lines in the file is {i+1}")
[docs] def set_outdir_from_number(self):
"""This function resets the names of all of the output directories
for each run so that they are numbered sequentially from one
"""
max_i = len(self._vars) + 1
nchars = len(str(max_i))
fmt = f"%0{nchars}d"
for i, variable in enumerate(self._vars):
variable._output = fmt % (i+1)
variable._idx = 1
variable._nrepeats = 1
[docs] def set_outdir_from_uid(self):
"""This function resets the names of all of the output directories
for each run so that they all have a globally unique UID
"""
from uuid import uuid4
for variable in self._vars:
variable._output = str(uuid4())
variable._idx = 1
variable._nrepeats = 1