Source code for metawards._demographics


from dataclasses import dataclass as _dataclass
from dataclasses import field as _field
from typing import List as _List
from typing import Dict as _Dict
import os as _os
import pathlib as _pathlib

from ._demographic import Demographic
from ._network import Network

__all__ = ["Demographics"]

_default_demographics_path = _os.path.join(_pathlib.Path.home(),
                                           "GitHub", "MetaWardsData")

_default_folder_name = "demographics"


def _get_value(value):
    """Extract a numeric value from the passed value - this is used
       to allow the demographics.json file to store numbers is
       a variety of formats
    """
    from metawards.utils import safe_eval_float

    if value is None:
        return 0.0

    elif isinstance(value, list):
        lst = []
        for v in value:
            lst.append(safe_eval_float(v))
        return lst

    elif isinstance(value, dict):
        d = []
        for k, v in value.items():
            d[k] = safe_eval_float(v)

        return d
    else:
        return safe_eval_float(value)


[docs]@_dataclass(eq=False) class Demographics: """This class holds metadata about all of the demographics being modelled """ #: The list of individual Demographic objects, one for each #: demographic being modelled demographics: _List[Demographic] = _field(default_factory=list) #: The random seed to used when using any random number generator #: to resolve decisions needed when allocating individuals to #: demographics. This is set here so that the Demographics #: are uniquely determined and reproducible across runs random_seed: int = None #: The interaction matrix between demographics. This should #: be a list of lists that shows how demographic 'i' affects #: demographic 'j' interaction_matrix: _List[_List[int]] = None #: Map from index to names of demographics - enables lookup by name _names: _Dict[str, int] = _field(default_factory=dict) _name: str = None _version: str = None _authors: str = None _contacts: str = None _references: str = None _filename: str = None _repository: str = None _repository_version: str = None _repository_branch: str = None def __str__(self): d = "\n ".join([str(x) for x in self.demographics]) return f"Demographics {self._name}\n" \ f"loaded from {self._filename}\n" \ f"version: {self._version}\n" \ f"author(s): {self._authors}\n" \ f"contact(s): {self._contacts}\n" \ f"references(s): {self._references}\n" \ f"repository: {self._repository}\n" \ f"repository_branch: {self._repository_branch}\n" \ f"repository_version: {self._repository_version}\n" \ f"demographics = [\n {d}\n]" def __len__(self): return len(self.demographics) def __eq__(self, other): if not isinstance(other, Demographics): return False elif len(self) != len(other): return False else: for name, index in self._names.items(): if other._names.get(name, None) != index: return False if self.demographics[index] != other.demographics[index]: return False return True def __getitem__(self, item): if isinstance(item, str): # Lookup by name return self.demographics[self._names[item]] else: # Lookup by index return self.demographics[item]
[docs] def copy(self): """Return a copy of this demographics object that should allow a safe reset between runs. This deepcopies things that may change, while shallow copying things that won't """ from copy import copy, deepcopy demographics = copy(self) demographics.interaction_matrix = deepcopy(self.interaction_matrix) demographics.demographics = copy(self.demographics) return demographics
[docs] def add(self, demographic: Demographic): """Add a demographic to the set to be modelled""" if demographic.name is None: raise ValueError( f"You can only add named demographics to the set.") if demographic.name in self._names: raise ValueError( f"There is already a demographic called " f"{demographic.name} in this set. Please rename " f"and try again.") self.demographics.append(demographic) self._names[demographic.name] = len(self.demographics) - 1
[docs] def get_name(self, item): """Return the name of the demographic at 'item'""" return self.demographics[self.get_index(item)].name
[docs] def get_index(self, item): """Return the index of the passed item""" try: item = int(item) except Exception: pass if isinstance(item, str): try: return self._names[item] except Exception: pass elif isinstance(item, int): try: if self.demographics[item] is not None: return item except Exception: pass elif isinstance(item, Demographic): for i, d in enumerate(self.demographics): if item == d: return i # haven't found the item raise KeyError(f"There is no demographic is this set that " f"matches {item}.")
[docs] @staticmethod def load(name: str = None, repository: str = None, folder: str = _default_folder_name, filename: str = None): """Load the parameters for the specified set of demographics. This will look for a file called f"{name}.json" in the directory f"{repository}/{folder}/{name}.json" By default this will load nothing. Alternatively you can provide the full path to the json file via the "filename" argument Parameters ---------- name: str The name of the demographics to load. This is the name that will be searched for in the METAWARDSDATA diseases directory repository: str The location of the cloned METAWARDSDATA repository folder: str The name of the folder within the METAWARDSDATA repository that contains the diseases filename: str The name of the file to load the disease from - this directly loads this file without searching through the METAWARDSDATA repository Returns ------- demographics: Demographics The constructed and validated demographics """ repository_version = None repository_branch = None if filename is None: import os if os.path.exists(name): filename = name elif os.path.exists(f"{name}.json"): filename = f"{name}.json" import os if filename is None: if repository is None: repository = os.getenv("METAWARDSDATA") if repository is None: repository = _default_demographics_path filename = os.path.join(repository, folder, f"{name}.json") from ._parameters import get_repository_version v = get_repository_version(repository) repository = v["repository"] repository_version = v["version"] repository_branch = v["branch"] json_file = filename try: with open(json_file, "r") as FILE: import json data = json.load(FILE) except Exception as e: print(f"Could not find the demographics file {json_file}") print(f"Either it does not exist of was corrupted.") print(f"Error was {e.__class__} {e}") print(f"To download the disease data type the command:") print(f" git clone https://github.com/metawards/MetaWardsData") print(f"and then re-run this function passing in the full") print(f"path to where you downloaded this directory") raise FileNotFoundError(f"Could not find or read {json_file}: " f"{e.__class__} {e}") demographics = data.get("demographics", []) work_ratios = data.get("work_ratios", []) play_ratios = data.get("play_ratios", []) random_seed = data.get("random_seed", None) if (len(demographics) != len(work_ratios) or len(demographics) != len(play_ratios)): raise ValueError( f"The number of work_ratios ({len(work_ratios)}) must " f"equal to number of play_ratios " f"({len(play_ratios)}) which must equal the number " f"of demographics ({len(demographics)})") demos = Demographics(random_seed=random_seed, _name=name, _authors=data.get("author(s)", "unknown"), _contacts=data.get("contact(s)", "unknown"), _references=data.get("reference(s)", "none"), _filename=json_file, _repository=repository, _repository_branch=repository_branch, _repository_version=repository_version) for i in range(0, len(demographics)): demographic = Demographic(name=demographics[i], work_ratio=_get_value(work_ratios[i]), play_ratio=_get_value(play_ratios[i])) demos.add(demographic) return demos
[docs] def specialise(self, network: Network, profiler=None, nthreads: int = 1): """Build the set of networks that will model this set of demographics applied to the passed Network. Parameters ---------- network: Network The overall population model - this contains the base parameters, wards, work and play links that define the model outbreak profiler: Profiler Profiler used to profile the specialisation nthreads: int Number of threads over which to parallelise the work Returns ------- networks: Networks The set of Networks that represent the model run over the full set of different demographics """ if len(self) == 0: return network else: from ._networks import Networks return Networks.build(network=network, demographics=self, profiler=profiler, nthreads=nthreads)