Source code for metawards._demographics

from __future__ import annotations

from dataclasses import dataclass as _dataclass
from dataclasses import field as _field
from typing import List as _List
from typing import Dict as _Dict
from typing import Union as _Union
import os as _os
import pathlib as _pathlib

from ._demographic import Demographic
from ._network import Network

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from .utils._profiler import Profiler
    from .utils._profiler import Networks
    from ._population import Population
    from ._parameters import Parameters

__all__ = ["Demographics", "DemographicID", "DemographicIDs"]

_default_demographics_path = _os.path.join(_pathlib.Path.home(),
                                           "GitHub", "MetaWardsData")

_default_folder_name = "demographics"

DemographicID = _Union[str, int]
DemographicIDs = _List[DemographicID]


def _get_value(value):
    """Extract a numeric value from the passed value - this is used
       to allow the demographics.json file to store numbers is
       a variety of formats
    """
    from ._interpret import Interpret

    if value is None:
        return 0.0

    elif isinstance(value, list):
        lst = []
        for v in value:
            lst.append(Interpret.number(v))
        return lst

    elif isinstance(value, dict):
        d = []
        for k, v in value.items():
            d[k] = Interpret.number(v)

        return d
    else:
        return Interpret.number(value)


[docs]@_dataclass(eq=False) class Demographics: """This class holds metadata about all of the demographics being modelled """ #: The list of individual Demographic objects, one for each #: demographic being modelled demographics: _List[Demographic] = _field(default_factory=list) #: The random seed to used when using any random number generator #: to resolve decisions needed when allocating individuals to #: demographics. This is set here so that the Demographics #: are uniquely determined and reproducible across runs random_seed: int = None #: The interaction matrix between demographics. This should #: be a list of lists that shows how demographic 'i' affects #: demographic 'j' interaction_matrix: _List[_List[int]] = None #: Map from index to names of demographics - enables lookup by name _names: _Dict[str, int] = _field(default_factory=dict) _name: str = None _version: str = None _authors: str = None _contacts: str = None _references: str = None _filename: str = None _repository: str = None _repository_version: str = None _repository_branch: str = None
[docs] def __str__(self): d = "\n ".join([str(x) for x in self.demographics]) return f"[\n {d}\n]"
[docs] def __repr__(self): return self.__str__()
def __len__(self): return len(self.demographics)
[docs] def __eq__(self, other): if not isinstance(other, Demographics): return False elif len(self) != len(other): return False else: for name, index in self._names.items(): if other._names.get(name, None) != index: return False if self.demographics[index] != other.demographics[index]: return False return True
def __getitem__(self, item): if isinstance(item, str): # Lookup by name return self.demographics[self.get_index(item)] else: # Lookup by index return self.demographics[item]
[docs] def copy(self): """Return a copy of this demographics object that should allow a safe reset between runs. This deepcopies things that may change, while shallow copying things that won't """ from copy import copy, deepcopy demographics = copy(self) demographics.interaction_matrix = deepcopy(self.interaction_matrix) demographics.demographics = copy(self.demographics) return demographics
def __add__(self, other: Demographic): from copy import deepcopy r = deepcopy(self) r.add(other) return r def __radd__(self, other: Demographic): r = Demographics() r.add(other) for d in self.demographics: r.add(d) return r
[docs] def add(self, demographic: Demographic): """Add a demographic to the set to be modelled""" if demographic.name is None: raise ValueError( f"You can only add named demographics to the set.") if demographic.name in self._names: raise ValueError( f"There is already a demographic called " f"{demographic.name} in this set. Please rename " f"and try again.") from copy import deepcopy self.demographics.append(deepcopy(demographic)) self._names[demographic.name] = len(self.demographics) - 1
[docs] def get_name(self, item): """Return the name of the demographic at 'item'""" return self.demographics[self.get_index(item)].name
[docs] def get_index(self, item): """Return the index of the passed item""" try: item = int(item) except Exception: pass if isinstance(item, str): try: return self._names[item] except Exception: pass elif isinstance(item, int): try: if self.demographics[item] is not None: return item except Exception: pass elif isinstance(item, Demographic): for i, d in enumerate(self.demographics): if item == d: return i # haven't found the item raise KeyError(f"There is no demographic is this set that " f"matches {item}. Available names are " f"{self._names}. Available indexes are " f"0 -> {len(self._names)}")
[docs] def uses_named_network(self): """Return whether or not at least one of these demographics specifies the use of a named network model """ for demographic in self.demographics: if demographic.network is not None: return True return False
[docs] def is_multi_network(self): """Return whether or not these demographics need to use multiple custom networks (e.g. refer to different network models) """ if len(self) <= 1: return False else: first_network = self.demographics[0].network for demographic in self.demographics[1:]: if first_network != demographic.network: return True return False
[docs] @staticmethod def load(name: str = None, repository: str = None, folder: str = _default_folder_name, filename: str = None): """Load the parameters for the specified set of demographics. This will look for a file called f"{name}.json" in the directory f"{repository}/{folder}/{name}.json" By default this will load nothing. Alternatively you can provide the full path to the json file via the "filename" argument Parameters ---------- name: str The name of the demographics to load. This is the name that will be searched for in the METAWARDSDATA diseases directory repository: str The location of the cloned METAWARDSDATA repository folder: str The name of the folder within the METAWARDSDATA repository that contains the diseases filename: str The name of the file to load the disease from - this directly loads this file without searching through the METAWARDSDATA repository Returns ------- demographics: Demographics The constructed and validated demographics """ repository_version = None repository_branch = None if filename is None: import os if os.path.exists(name): filename = name elif os.path.exists(f"{name}.json"): filename = f"{name}.json" import os if filename is None: if repository is None: repository = os.getenv("METAWARDSDATA") if repository is None: repository = _default_demographics_path filename = os.path.join(repository, folder, f"{name}.json") from ._parameters import get_repository_version v = get_repository_version(repository) repository = v["repository"] repository_version = v["version"] repository_branch = v["branch"] json_file = filename try: demographics = Demographics.from_json(json_file) except Exception as e: from .utils._console import Console Console.error(f""" Could not find the demographics file {json_file}. "Either it does not exist or was corrupted. Error was {e.__class__} {e}. To download the disease data follow the instructions at [https://metawards.org/model_data](https://metawards.org/model_data).""") raise FileNotFoundError(f"Could not find or read {json_file}: " f"{e.__class__} {e}") demographics._name = name demographics._filename = json_file demographics._repository = repository demographics._repository_branch = repository_branch demographics._repository_version = repository_version return demographics
[docs] def to_data(self): """Return a data dictionary for this object that can be serialised to json """ data = {} if self.demographics is None: return data default = [1.0] * len(self.demographics) all_none = [None] * len(self.demographics) def _get_filename(x): if x is None: return None elif isinstance(x, str): import os if os.path.exists(x): from pathlib import Path return str(Path(x).expanduser().absolute()) else: return x else: if x._filename is None: raise IOError(f"Cannot locate file for {x}") return _get_filename(x._filename) demographics = [str(x.name) for x in self.demographics] work_ratios = [float(x.work_ratio) for x in self.demographics] play_ratios = [float(x.play_ratio) for x in self.demographics] diseases = [_get_filename(x.disease) for x in self.demographics] networks = [_get_filename(x.network) for x in self.demographics] adjustments = [x.adjustment for x in self.demographics] data["demographics"] = demographics if work_ratios != default: data["work_ratios"] = work_ratios if play_ratios != default: data["play_ratios"] = play_ratios if self.random_seed is not None: data["random_seed"] = int(self.random_seed) if diseases != all_none: data["diseases"] = diseases if networks != all_none: data["networks"] = networks if adjustments != all_none: data["adjustments"] = [x.to_data() if x is not None else None for x in adjustments] return data
[docs] def to_json(self, filename: str = None, indent: int = None, auto_bzip: bool = True) -> str: """Serialise the Demographics to JSON. This will write to a file if filename is set, otherwise it will return a JSON string. Parameters ---------- filename: str The name of the file to write the JSON to. The absolute path to the written file will be returned. If filename is None then this will serialise to a JSON string which will be returned. indent: int The number of spaces of indent to use when writing the json auto_bzip: bool Whether or not to automatically bzip2 the written json file Returns ------- str Returns either the absolute path to the written file, or the json-serialised string """ import json if indent is not None: indent = int(indent) if filename is None: return json.dumps(self.to_data(), indent=indent) else: from pathlib import Path filename = str(Path(filename).expanduser().resolve().absolute()) if auto_bzip: if not filename.endswith(".bz2"): filename += ".bz2" import bz2 with bz2.open(filename, "wt") as FILE: try: json.dump(self.to_data(), FILE, indent=indent) except Exception: import os FILE.close() os.unlink(filename) raise else: with open(filename, "w") as FILE: try: json.dump(self.to_data(), FILE, indent=indent) except Exception: import os FILE.close() os.unlink(filename) raise return filename
[docs] @staticmethod def from_data(data, json_dir=None) -> Demographics: """Construct and return a Demographics object constructed from a (json-deserialised) data dictionary """ demographics = data.get("demographics", []) work_ratios = data.get("work_ratios", [1.0] * len(demographics)) play_ratios = data.get("play_ratios", [1.0] * len(demographics)) random_seed = data.get("random_seed", None) diseases = data.get("diseases", None) networks = data.get("networks", None) adjustments = data.get("adjustments", None) if diseases is None: diseases = len(demographics) * [None] else: from ._disease import Disease diseases = [Disease.load(x, folder=json_dir) if x is not None else None for x in diseases] if networks is None: networks = len(demographics) * [None] else: from ._inputfiles import InputFiles networks = [InputFiles.load(x, folder=json_dir) if x is not None else None for x in networks] if adjustments is None: adjustments = len(demographics) * [None] else: from ._variableset import VariableSet adjustments = [VariableSet.from_data(x) if x is not None else None for x in adjustments] if (len(demographics) != len(work_ratios) or len(demographics) != len(play_ratios) or len(demographics) != len(diseases) or len(demographics) != len(networks) or len(adjustments) != len(networks)): raise ValueError( f"The number of work_ratios ({len(work_ratios)}) must " f"equal to number of play_ratios " f"({len(play_ratios)}) which must equal the number " f"of diseases ({len(diseases)}) which must equal " f"the number of demographics ({len(demographics)}), " f"which must equal the number of networks ({len(networks)}).") demos = Demographics(random_seed=random_seed, _authors=data.get("author(s)", None), _contacts=data.get("contact(s)", None), _references=data.get("reference(s)", None)) for i in range(0, len(demographics)): demographic = Demographic(name=demographics[i], work_ratio=_get_value(work_ratios[i]), play_ratio=_get_value(play_ratios[i]), disease=diseases[i], network=networks[i], adjustment=adjustments[i]) demos.add(demographic) return demos
[docs] @staticmethod def from_json(s: str): """Construct and return Demographics loaded from the passed json file """ import os import json json_dir = None if os.path.exists(s): json_dir = os.path.split(os.path.abspath(s))[0] try: import bz2 with bz2.open(s, "rt") as FILE: data = json.load(FILE) except Exception: data = None if data is None: with open(s, "rt") as FILE: data = json.load(FILE) else: try: data = json.loads(s) except Exception: data = None if data is None: from .utils._console import Console Console.error(f"Unable to load Demographics from '{s}'. Check that " f"this is valid JSON or that the file exists.") raise IOError(f"Cannot load Demographics from '{s}'") return Demographics.from_data(data, json_dir=json_dir)
[docs] def build(self, params: Parameters, population: Population = None, max_nodes: int = 16384, max_links: int = 4194304, nthreads: int = 1, profiler: Profiler = None) -> _Union[Network, Networks]: """Build the set of networks described by these demographics and the passed parameters Parameters ---------- params: Parameters Parameters used to help build the model networks max_nodes: int Initial guess for the maximum number of nodes(wards) max_links: int Initial guess for the maximum number of links between wards profiler: Profiler Profiler used to profile the specialisation nthreads: int Number of threads over which to parallelise the work Returns ------- Network or Networks The set of Networks that represent the model run over the full set of different demographics(or Network if there is just a single demographic) """ from .utils._console import Console if len(self) == 0: return Network.build(params=params, population=population, max_nodes=max_nodes, max_links=max_links, nthreads=nthreads, profiler=profiler) if len(self) == 1: demographic = self[0] if demographic.adjustment is not None: demographic.adjustment.adjust(params) if demographic.disease is not None: params.disease_params = demographic.disease if demographic.network is not None: params.input_files = demographic.network network = Network.build(params=params, population=population, max_nodes=max_nodes, max_links=max_links, nthreads=nthreads, profiler=profiler) if demographic.work_ratio != 1.0 or demographic.play_ratio != 1.0: network.scale_susceptibles(work_ratio=demographic.work_ratio, play_ratio=demographic.play_ratio) network.name = demographic.name return network if not self.uses_named_network(): # build a single network that is then specialised network = Network.build(params=params, population=population, max_nodes=max_nodes, max_links=max_links, nthreads=nthreads, profiler=profiler) Console.rule("Specialising into demographics") return self.specialise(network=network, profiler=profiler, nthreads=nthreads) # need to load each network separately, and then merge wards = {} shared_wards = {} from ._wards import Wards from copy import deepcopy for i, demographic in enumerate(self.demographics): if demographic.network is None: input_files = params.input_files else: input_files = demographic.network if input_files not in shared_wards: if input_files.is_wards_data: wards[input_files] = Wards.from_json( input_files.wards_data) else: network_params = deepcopy(params) network_params.input_files = input_files network = Network.build(params=network_params, population=population, max_nodes=max_nodes, max_links=max_links, nthreads=nthreads, profiler=profiler) wards[input_files] = network.to_wards() shared_wards[input_files] = [i] else: shared_wards[input_files].append(i) wardss = [None] * len(self) input_files = [None] * len(self) for key, value in shared_wards.items(): if len(value) > 1: # this is a combined network - need to divide the population # between multiple demographics. First create the network # and then use specialise to divide the population # between the demographics w = wards[key] network = Network.from_wards(w, params=params, nthreads=nthreads) ds = Demographics( demographics=[deepcopy(self.demographics[x]) for x in value]) for d in ds: d.network = None network = ds.specialise(network=network, nthreads=nthreads) for i, idx in enumerate(value): wardss[idx] = network.subnets[i].to_wards( nthreads=nthreads) input_files[idx] = key else: i = value[0] demographic = self.demographics[i] w = wards[key] if demographic.work_ratio != 1.0 or \ demographic.play_ratio != 1.0: w = w.scale(work_ratio=demographic.work_ratio, play_ratio=demographic.play_ratio) wardss[i] = w input_files[i] = key total_pop = worker_pop = player_pop = 0 for wards in wardss: total_pop += wards.population() worker_pop += wards.num_workers() player_pop += wards.num_players() overall, wardss = Wards.harmonise(wardss) assert overall.population() == total_pop assert overall.num_workers() == worker_pop assert overall.num_players() == player_pop overall = Network.from_wards(overall, params=params, nthreads=nthreads) subnets = [None] * len(self) total_pop = worker_pop = player_pop = 0 for i, demographic in enumerate(self.demographics): subparams = deepcopy(params) subparams.input_files = input_files[i] if demographic.adjustment is not None: demographic.adjustment.adjust(subparams) subnets[i] = Network.from_wards(wardss[i], params=subparams, nthreads=nthreads) subnets[i].name = demographic.name total_pop += subnets[i].population worker_pop += subnets[i].work_population player_pop += subnets[i].play_population assert total_pop == overall.population assert worker_pop == overall.work_population assert player_pop == overall.play_population from ._networks import Networks networks = Networks() networks.overall = overall networks.subnets = subnets networks.demographics = deepcopy(self) return networks
[docs] def specialise(self, network: Network, profiler: Profiler = None, nthreads: int = 1): """Build the set of networks that will model this set of demographics applied to the passed Network. Parameters ---------- network: Network The overall population model - this contains the base parameters, wards, work and play links that define the model outbreak profiler: Profiler Profiler used to profile the specialisation nthreads: int Number of threads over which to parallelise the work Returns ------- networks: Networks The set of Networks that represent the model run over the full set of different demographics """ if len(self) == 0: return network else: from ._networks import Networks return Networks.build(network=network, demographics=self, profiler=profiler, nthreads=nthreads)