from dataclasses import dataclass as _dataclass
from typing import List as _List
from typing import Union as _Union
from typing import Tuple as _Tuple
from enum import Enum as _Enum
from ._parameters import Parameters
from ._disease import Disease
from ._nodes import Nodes
from ._links import Links
from ._population import Population
from ._outputfiles import OutputFiles
from ._wardinfo import WardInfos
from ._wardid import WardID
__all__ = ["Network", "PersonType"]
class PersonType(_Enum):
"""The type of individual in the network."""
#: A WORKER is an individual that makes fixed movements between
#: their home and commute (work) ward
WORKER = 1
#: A PLAYER is an individual who makes random movements between
#: their home ward and the play wards linked to their home ward
PLAYER = 2
def __str__(self):
return self.name
def __repr__(self):
return str(self)
def __eq__(self, other):
if self.__class__ == other.__class__:
return self.value == other.value
else:
return self.name == other or self.value == other
[docs]@_dataclass
class Network:
"""This class represents a network of wards. The network comprises
nodes (representing wards), connected with links which represent
work (predictable movements) and play (unpredictable movements)
"""
#: The name of the Network. This equals the name of the demographic
#: if this is a multi-demographic sub-network
name: str = None
#: The list of nodes (wards) in the network
nodes: Nodes = None
#: The links between nodes (work)
links: Links = None
#: The links between nodes (play)
play: Links = None
#: The number of nodes in the network
nnodes: int = 0
#: The number of links in the network
nlinks: int = 0
#: The number of play links in the network
nplay: int = 0
#: The maximum allowable number of nodes in the network
max_nodes: int = 16384
#: The maximum allowable number of links in the network
max_links: int = 4194304
#: The metadata for all of the wards
info: WardInfos = WardInfos()
#: To seed provides additional seeding information
to_seed: _List[int] = None
#: The parameters used to generate this network
params: Parameters = None
#: The number of workers
work_population: int = None
#: The number of players
play_population: int = None
#: The index in the overall network's work matrix of the ith
#: index in this subnetworks work matrix. If this is None then
#: both this subnetwork has the same work matrix as the overall
#: network
_work_index = None
@property
def population(self) -> int:
"""Return the total population in the network"""
if self.nodes is None:
return 0
node_pop = self.nodes.population()
link_pop = self.links.population()
return int(node_pop + link_pop)
[docs] @staticmethod
def single(params: Parameters,
population: Population,
profiler=None):
"""Builds and returns a new Network that contains just a single
ward, in which 'population' individuals are resident.
"""
if profiler is None:
from .utils import NullProfiler
profiler = NullProfiler()
pop = float(population.population)
if pop <= 0:
pop = float(population.initial)
if pop <= 0:
raise ValueError(
f"You cannot create a Network with a zero or negative "
f"population ({population}).")
from .utils._console import Console
Console.print(
f"Creating a single ward Network with a population "
f"of {int(pop)}")
from ._wards import Wards
from ._ward import Ward
ward = Ward(name="single")
ward.set_num_players(pop)
wards = Wards()
wards.add(ward)
assert wards.population() == pop
network = Network.from_wards(wards, params=params)
network.params.input_files = params.input_files
return network
[docs] @staticmethod
def build(params: Parameters,
population: Population = None,
max_nodes: int = 16384,
max_links: int = 4194304,
nthreads: int = 1,
profiler=None):
"""Builds and returns a new Network that is described by the
passed parameters.
The network is built in allocated memory, so you need to specify
the maximum possible number of nodes and links. The memory buffers
will be shrunk back after building.
"""
if profiler is None:
from .utils import NullProfiler
profiler = NullProfiler()
p = profiler.start("Network.build")
if params.input_files is None:
from .utils._console import Console
Console.error("You must specify the model/network to use, e.g. "
"setting it from 'single', from a Wards object, "
"or specifying the model to load from "
"MetaWardsData")
raise AssertionError("You must specify the model to run")
elif params.input_files.is_wards_data:
from ._wards import Wards
wards = Wards.from_json(params.input_files.wards_data)
network = Network.from_wards(wards, params=params,
profiler=p, nthreads=nthreads)
network.params.input_files = params.input_files
p.stop()
return network
elif params.input_files.is_single:
if population is None:
population = Population(initial=1000)
network = Network.single(params=params,
population=population,
profiler=profiler)
p.stop()
return network
p = p.start("build_function")
from .utils import build_wards_network
network = build_wards_network(params=params,
profiler=p,
max_nodes=max_nodes,
max_links=max_links,
nthreads=nthreads)
p = p.stop()
# sanity-check that the network makes sense - there are specific
# requirements for the data layout
network.assert_sane(profiler=p)
p = p.start("add_distances")
from .utils._add_wards_network_distance \
import add_wards_network_distance
add_wards_network_distance(network, nthreads=nthreads)
from .utils._console import Console
p = p.stop()
# add metadata about the wards
p = p.start("add_lookup")
network._add_lookup(nthreads=nthreads)
p = p.stop()
if params.input_files.seed:
from .utils import read_done_file
p = p.start("read_done_file")
to_seed = read_done_file(params.input_files.seed)
nseeds = len(to_seed)
Console.print(to_seed)
Console.print(f"Number of seeds equals {nseeds}")
network.to_seed = to_seed
p = p.stop()
# By default, we initialise the network ready for a run,
# namely make sure everything is reset and the population
# is at work
p = p.start("reset_everything")
network.reset_everything(nthreads=nthreads, profiler=p)
p = p.stop()
p = p.start("rescale_play_matrix")
network.rescale_play_matrix(nthreads=nthreads, profiler=p)
p = p.stop()
p = p.start("move_from_play_to_work")
network.move_from_play_to_work(nthreads=nthreads, profiler=p)
p = p.stop()
if not p.is_null():
p = p.stop()
Console.print(str(p))
Console.print(f"[bold]Network loaded. Population: {network.population}, "
f"Workers: {network.work_population}, Players: "
f"{network.play_population}[/]", markup=True)
return network
[docs] def num_demographics(self) -> int:
"""Return the number of demographics (always 1 for a Network).
This function is added so that a single Network has
the same functions as a multi-demographics Networks
"""
return 1
[docs] def copy(self):
"""Return a copy of this Network. Use this to hold a copy of
the network that you can use to reset between runs
"""
from copy import copy, deepcopy
network = copy(self)
network.nodes = self.nodes.copy()
network.links = self.links.copy()
network.play = self.play.copy()
network.to_seed = deepcopy(self.to_seed)
network.params = deepcopy(self.params)
return network
[docs] def assert_sane(self, profiler: None):
"""Assert that this network is sane. This checks that the network
is laid out correctly in memory and that it doesn't have
anything unexpected. Checking here will prevent us from having
to check every time the network is accessed
"""
from .utils import assert_sane_network
assert_sane_network(network=self, profiler=profiler)
def _add_lookup(self, lookup_function=None, nthreads: int = 1):
"""Read in the ward lookup information that is used to
locate wards by name or region
"""
if lookup_function is None:
from .utils import add_lookup
lookup_function = add_lookup
lookup_function(self, nthreads=nthreads)
[docs] def get_index(self, id: WardID) -> _Tuple[PersonType, int, int]:
"""Return the index of the Node or Link(s) that corresponds
to the passed WardID.
This returns a tuple of three values;
(PersonType, start_idx, end_idx)
If this is a worker, then it will either return the
index of the Link for a specific work-link connection,
or the range of indicies for all of the work links
to this ward, so
(PersonType.WORKER, link_idx, link_idx+!) for a single link, or
(PersonType.WORKER, link.begin_to, link.end_to) for all links
If this is a player, then it will return the ID of the
Node (which is the index of the Node in Nodes), and
so
(PersonType.PLAYER, node_index, node_index+1)
This raises a KeyError if there is no ward or ward-link
that matches the WardID
"""
from .utils._network_functions import network_get_index
return network_get_index(self, id)
[docs] def get_node_index(self, index: _Union[str, int]):
"""Return the index of the node in this network that matches
'index'. This could be an integer, in which case this
will directly look up the index of the node in the
Nodes, or else it could be a string, in which case
the WardInfo will be used to identify the node and
look up the index from there.
"""
try:
index = int(index)
except Exception:
pass
if isinstance(index, int):
return self.nodes.get_index(index)
else:
matches = self.info.find(index)
if len(matches) == 0:
from .utils._console import Console
Console.error(f"Cannot find a ward that matches {index}")
raise KeyError(f"Cannot find a ward that matches {index}")
elif len(matches) > 1:
# do we have a perfect single match?
name = index.split("/")[0].strip()
for match in matches:
if self.info[match].name == name:
# perfect match
return match
from .utils._console import Console
err = [f"Too many wards match {index}"]
for match in matches:
err.append(f"* {self.info[match]}")
err.append("Please narrow down your search to match one.")
Console.error("\n".join(err), markdown=True)
raise KeyError(
f"Cannot find a single ward that matches {index}")
else:
return matches[0]
[docs] def initialise_infections(self, nthreads: int = 1):
"""Initialise and return the space that will be used
to track infections
"""
from ._infections import Infections
return Infections.build(network=self)
[docs] def recalculate_denominators(self, nthreads: int = 1, profiler=None):
"""Recalculate the denominators used in the calculation. This should
be called after you have changed the population of the
network, e.g. during a move function
"""
from .utils._recalculate_denominators import \
recalculate_play_denominator_day, \
recalculate_work_denominator_day
workers = recalculate_work_denominator_day(self, nthreads=nthreads,
profiler=profiler)
players = recalculate_play_denominator_day(self, nthreads=nthreads,
profiler=profiler)
self.work_population = workers
self.play_population = players
test_pop = int(self.work_population + self.play_population)
if test_pop != self.population:
from .utils._console import Console
# this could be because individuals are in the NULL ward
n_null = int(self.nodes.save_play_suscept[0]) + \
self.links.weight[0]
if test_pop + n_null != self.population:
Console.error(
f"Disagreement in the population size: "
f"{int(self.work_population)}+"
f"{int(self.play_population)} == "
f"{test_pop} != {self.population}")
raise AssertionError("Disagreement in population size")
[docs] def get_min_max_distances(self, nthreads: int = 1,
profiler=None):
"""Calculate and return the minimum and maximum distances
between nodes in the network
"""
try:
return self._min_max_distances
except Exception:
pass
from .utils import get_min_max_distances
self._min_max_distances = get_min_max_distances(network=self,
nthreads=nthreads)
return self._min_max_distances
[docs] def reset_everything(self, nthreads: int = 1,
profiler=None):
"""Resets the network ready for a new run of the model"""
from .utils import reset_everything
reset_everything(network=self, nthreads=nthreads, profiler=profiler)
[docs] def update(self, params: Parameters, demographics=None, population=None,
nthreads: int = 1, profiler=None):
"""Update this network with a new set of parameters
( and optionally demographics).
This is used to update the parameters for the network
for a new run. The network will be reset
and ready for a new run.
Parameters
----------
params: Parameters
The new parameters with which to update this Network
demographics: Demographics
The new demographics with which to update this Network.
Note that this will return a Network object that contains
the specilisation of this Network
nthreads: int
Number of threads over which to parallelise this update
profiler: Profiler
The profiler used to profile this update
Returns
-------
network: Network or Networks
Either this Network after it has been updated, or the
resulting Networks from specialising this Network using
Demographics
"""
if profiler is None:
from .utils._profiler import NullProfiler
profiler = NullProfiler()
p = profiler.start("Network.update")
if self.name is None or \
self.name not in params.specialised_demographics():
self.params = params
else:
self.params = params[self.name]
p = p.start("reset_everything")
self.reset_everything(nthreads=nthreads, profiler=p)
p = p.stop()
if demographics:
from .utils._worker import must_rebuild_network
if must_rebuild_network(network=self, params=self.params,
demographics=demographics):
network = demographics.build(params=self.params,
population=population,
nthreads=nthreads,
profiler=p)
else:
network = demographics.specialise(network=self,
profiler=p,
nthreads=nthreads)
else:
network = self
p = p.start("rescale_play_matrix")
network.rescale_play_matrix(nthreads=nthreads, profiler=p)
p = p.stop()
p = p.start("move_from_play_to_work")
network.move_from_play_to_work(nthreads=nthreads, profiler=p)
p = p.stop()
p = p.stop()
return network
[docs] def rescale_play_matrix(self, nthreads: int = 1,
profiler=None):
"""Rescale the play matrix"""
from .utils import rescale_play_matrix
rescale_play_matrix(network=self, nthreads=nthreads, profiler=profiler)
[docs] def move_from_play_to_work(self, nthreads: int = 1,
profiler=None):
"""Move the population from play to work"""
from .utils import move_population_from_play_to_work
move_population_from_play_to_work(network=self, nthreads=nthreads,
profiler=profiler)
[docs] def has_different_work_matrix(self):
"""Return whether or not the sub-network work matrix
is different to that of the overall network
"""
return self._work_index is not None
[docs] def get_work_index(self):
"""Return the mapping from the index in this sub-networks work
matrix to the mapping in the overall network's work matrix
"""
if self.has_different_work_matrix():
# remember this is 1-indexed, so work_index[1] is the first
# value
return self._work_index
else:
return range(1, self.nlinks + 1)
[docs] def specialise(self, demographic, profiler=None,
nthreads: int = 1):
"""Return a copy of this network that has been specialised
for the passed demographic. The returned network will
contain only members of that demographic, with the
parameters of the network adjusted according to the rules
of that demographic
Parameters
----------
demographic: Demographic
The demographic with which to specialise
Returns
-------
network: Network
The specialised network
"""
return demographic.specialise(network=self,
profiler=profiler,
nthreads=nthreads)
[docs] def scale_susceptibles(self, ratio: any = None,
work_ratio: any = None, play_ratio: any = None):
"""Scale the number of susceptibles in this Network
by the passed scale ratios. These can be values, e.g.
ratio = 2.0 will scale the total number of susceptibles
in each ward by 2.0. They can also be lists of values,
where ward[i] will be scaled by ratio[i]. They can also
be dictionaries, e.g. ward[i] scaled by ratio[i]
Parameters
----------
ratio: None, float, list or dict
The amount by which to scale the total population of
susceptibles - evenly scales the work and play populations
work_ratio: None, float, list or dict
Scale only the work population of susceptibles
play_ratio: None, float, list or dict
Scale only the play population of susceptibles
Returns
-------
None
"""
if ratio is not None:
work_ratio = ratio
play_ratio = ratio
if work_ratio is not None:
self.links.scale_susceptibles(work_ratio)
self.nodes.scale_susceptibles(work_ratio=work_ratio,
play_ratio=play_ratio)
[docs] def to_wards(self, profiler=None, nthreads: int = 1):
"""Return the ward-level data in this network converted to
a Wards object. This supports editing and save/restore
to JSON
"""
from .utils._network_wards import save_to_wards
return save_to_wards(self, profiler=profiler, nthreads=nthreads)
[docs] @staticmethod
def from_wards(wards, params: Parameters = None,
disease: Disease = None,
profiler=None,
nthreads: int = 1):
"""Construct a Network from the passed Wards object(e.g. after
editing, or restoring from JSON
"""
from .utils._network_wards import load_from_wards
return load_from_wards(wards, params=params, disease=disease,
profiler=profiler, nthreads=nthreads)
[docs] def run(self, population: Population,
output_dir: OutputFiles,
seed: int = None,
nsteps: int = None,
nthreads: int = None,
iterator=None,
extractor=None,
mixer=None,
mover=None,
profiler=None) -> Population:
"""Run the model simulation for the passed population.
The random number seed is given in 'seed'. If this
is None, then a random seed is used.
All output files are written to 'output_dir'
The simulation will continue until the infection has
died out or until 'nsteps' has passed(keep as 'None'
to prevent exiting early).
Parameters
- ---------
population: Population
The initial population at the start of the model outbreak.
This is also used to set start date and day of the model
outbreak
output_dir: OutputFiles
The directory to write all of the output into
seed: int
The random number seed used for this model run. If this is
None then a very random random number seed will be used
nsteps: int
The maximum number of steps to run in the outbreak. If None
then run until the outbreak has finished
profiler: Profiler
The profiler to use - a new one is created if one isn't passed
nthreads: int
Number of threads over which to parallelise this model run
iterator: function
Function that is called at each iteration to get the functions
that are used to advance the model
extractor: function
Function that is called at each iteration to get the functions
that are used to extract data for analysis or writing to files
mixer: function
Function that is used to mix demographic data. Not used
by a single Network(used by Networks)
mover: function
Function that is used to move the population between different
demographics. Not used by a single Network(used by Networks)
"""
# Create the random number generator
from .utils._ran_binomial import seed_ran_binomial, ran_binomial
if seed == 0:
# this is a special mode that a developer can use to force
# all jobs to use the same random number seed (15324) that
# is used for comparing outputs. This should NEVER be used
# for production code
from .utils._console import Console
Console.warning("Using special mode to fix all random number "
"seeds to 15324. DO NOT USE IN PRODUCTION!!!")
seed = 15324
rng = seed_ran_binomial(seed=seed)
else:
rng = seed_ran_binomial(seed=seed)
# Print the first five random numbers so that we can
# compare to other codes/runs, and be sure that we are
# generating the same random sequence
randnums = []
for i in range(0, 5):
randnums.append(str(ran_binomial(rng, 0.5, 100)))
from .utils._console import Console
Console.print(
f"* Using random number seed {seed}\n"
f"* First five random numbers equal **{'**, **'.join(randnums)}**",
markdown=True)
randnums = None
if nthreads is None:
from .utils._parallel import get_available_num_threads
nthreads = get_available_num_threads()
from .utils._parallel import create_thread_generators
rngs = create_thread_generators(rng, nthreads)
# Create space to hold the results of the simulation
infections = self.initialise_infections()
if nthreads == 1:
s = ""
else:
s = "s"
Console.rule(f"Running the model using {nthreads} thread{s}")
from .utils import run_model
population = run_model(network=self,
population=population,
infections=infections,
rngs=rngs, output_dir=output_dir,
nsteps=nsteps,
nthreads=nthreads,
profiler=profiler,
iterator=iterator, extractor=extractor,
mover=mover, mixer=mixer)
return population