Source code for metawards._networks


from ._outputfiles import OutputFiles
from ._population import Population
from ._demographics import Demographics
from ._parameters import Parameters
from ._network import Network

from dataclasses import dataclass as _dataclass
from dataclasses import field as _field
from typing import List as _List


__all__ = ["Networks"]


[docs]@_dataclass class Networks: """This is a combination of Network objects which together represent an entire diverse population. Each individual Network is used to model the disease outbreak within a single demographic of the population. Multiple demographics are modelled by combining multiple networks. Special merge functions enable joint FOIs to be calculated, through which an outbreak in one network can cross-infect a demographic in another network. The Networks can be very independent, and don't necessarily need to have the same links. However, it is assumed (and checked) that each network will have the same nodes. """ #: The overall Network, which contains a combination of all of the #: sub-networks. This is used for summary analysis and also as #: a means of merging and distributing data between sub-networks overall: Network = None #: The list of Networks, one for each demographic, ordered in the #: same order as the "Demographics" object. This is empty if #: only a single demographic is modelled subnets: _List[Network] = _field(default_factory=list) #: Metadata about each of the demographics being modelled. This is #: None if only a single demographic is modelled demographics: Demographics = None @property def params(self) -> int: """The overall parameters that are then specialised for the different demographics. Note that this returns a copy, so changing this will not change any parameters in the networks """ if self.overall is not None: # return the parameters for all of the demographics params = self.overall.params.copy() params._subparams = {} for subnet in self.subnets: params._subparams[subnet.name] = subnet.params return params else: return None
[docs] def num_demographics(self) -> int: """Return the number of demographics""" return len(self.subnets)
[docs] def assert_sane(self, profiler: None): """Assert that the networks is sane. This checks that the networks and all of the demographic sub-networks are laid out correctly in memory and that they don't have anything unexpected. Checking here will prevent us from having to check every time the networks are accessed """ if self.overall: self.overall.assert_sane(profiler=profiler) for subnet in self.subnets: subnet.assert_sane(profiler=profiler)
# SHOULD ASSERT HERE THAT THE POPULATIONS OF ALL OF THE SUBNETS # IN EACH WARD SUM UP TO THE POPULATION IN THE OVERALL NETWORK # WARDS
[docs] @staticmethod def build(network: Network, demographics: Demographics, profiler=None, nthreads: int = 1): """Build the set of networks that will model the passed demographics based on the overall population model in the passed network Parameters ---------- network: Network The overall population model - this contains the base parameters, wards, work and play links that define the model outbreak demographics: Demographics Information about each of the demographics to be modelled. Note that the sum of the "work" and "play" populations across all demographics must be 1.0 in all wards in the model profiler: Profiler Optional profiler used to profile this build nthreads: int Number of threads over which to distribute the work Returns ------- networks: Networks The set of Networks that represent the model run over the full set of different demographics """ if not isinstance(network, Network): raise TypeError(f"You can only specialise a Network") if demographics is None or len(demographics) < 2: raise ValueError(f"You can only create a Networks object " f"with a valid Demographics that contains " f"more than one demographic") if demographics.uses_named_network(): raise ValueError( f"You cannot specialise an existing network with demographics " f"that specify named networks - instead you need to call " f"demographics.build(...)") if profiler is None: from .utils._profiler import NullProfiler profiler = NullProfiler() p = profiler.start("specialise") subnets = [] # specialise the network for each demographic for i in range(0, len(demographics)): p = p.start(f"demographic_{i}") subnets.append(network.specialise(demographic=demographics[i], profiler=p, nthreads=nthreads)) p = p.stop() p = p.start("distribute_remainders") from .utils._scale_susceptibles import distribute_remainders distribute_remainders(network=network, subnets=subnets, demographics=demographics, profiler=p, nthreads=nthreads, random_seed=demographics.random_seed) # we have changed the population, so need to recalculate the # denominators again... for subnet in subnets: subnet.reset_everything(nthreads=nthreads, profiler=p) subnet.rescale_play_matrix(nthreads=nthreads, profiler=p) subnet.move_from_play_to_work(nthreads=nthreads, profiler=p) p = p.stop() total_pop = network.population sum_pop = 0 from .utils._console import Console Console.print(f"Specialising network - population: {total_pop}, " f"workers: {network.work_population}, " f"players: {network.play_population}") for i, subnet in enumerate(subnets): pop = subnet.population sum_pop += pop Console.print(f" {demographics[i].name} - population: {pop}, " f"workers: {subnet.work_population}, " f"players: {subnet.play_population}") if subnet.work_population + subnet.play_population != pop: Console.error( f"Disagreement in subnet population. Should be " f"{pop} but is instead " f"{subnet.work_population+subnet.play_population}.") raise AssertionError("Disagreement in subnet population.") if total_pop != sum_pop: raise AssertionError( f"The sum of the population of the demographic " f"sub-networks ({sum_pop}) does not equal the population " f"of the total network ({total_pop}). This is a bug!") result = Networks() result.overall = network result.subnets = subnets result.demographics = demographics p = p.stop() return result
[docs] def copy(self): """Return a copy of this Networks. Use this to hold a copy of the networks that you can use to reset between runs """ from copy import copy networks = copy(self) networks.overall = self.overall.copy() subnets = [] for subnet in self.subnets: subnets.append(subnet.copy()) networks.subnets = subnets networks.demographics = self.demographics.copy() return networks
[docs] def aggregate(self, profiler=None, nthreads: int = 1): """Aggregate all of the sub-network population infection data so that this is available in the overall network """ from .utils._aggregate import aggregate_networks aggregate_networks(network=self, profiler=profiler, nthreads=nthreads)
[docs] def run(self, population: Population, output_dir: OutputFiles, seed: int = None, nsteps: int = None, nthreads: int = None, iterator=None, extractor=None, mover=None, mixer=None, profiler=None) -> Population: """Run the model simulation for the passed population. The random number seed is given in 'seed'. If this is None, then a random seed is used. All output files are written to 'output_dir' The simulation will continue until the infection has died out or until 'nsteps' has passed (keep as 'None' to prevent exiting early). Parameters ---------- population: Population The initial population at the start of the model outbreak. This is also used to set start date and day of the model outbreak output_dir: OutputFiles The directory to write all of the output into seed: int The random number seed used for this model run. If this is None then a very random random number seed will be used nsteps: int The maximum number of steps to run in the outbreak. If None then run until the outbreak has finished profiler: Profiler The profiler to use - a new one is created if one isn't passed nthreads: int Number of threads over which to parallelise this model run iterator: function Function that is called at each iteration to get the functions that are used to advance the model extractor: function Function that is called at each iteration to get the functions that are used to extract data for analysis or writing to files mixer: function Function that is called to mix the data calculated for each of the sub-networks for the different demographics and merge it together so that this is shared mover: function Function that is called to move the population between different demographics Returns ------- population: Population The final population at the end of the run """ # Create the random number generator from .utils._ran_binomial import seed_ran_binomial, ran_binomial if seed == 0: # this is a special mode that a developer can use to force # all jobs to use the same random number seed (15324) that # is used for comparing outputs. This should NEVER be used # for production code from .utils._console import Console Console.warning("Using special mode to fix all random number " "seeds to 15324. DO NOT USE IN PRODUCTION!!!") rng = seed_ran_binomial(seed=15324) else: rng = seed_ran_binomial(seed=seed) # Print the first five random numbers so that we can # compare to other codes/runs, and be sure that we are # generating the same random sequence randnums = [] for i in range(0, 5): randnums.append(str(ran_binomial(rng, 0.5, 100))) from .utils._console import Console Console.print( f"* First five random numbers equal **{'**, **'.join(randnums)}", markdown=True) randnums = None if nthreads is None: from .utils._parallel import get_available_num_threads nthreads = get_available_num_threads() from .utils._parallel import create_thread_generators rngs = create_thread_generators(rng, nthreads) # Create space to hold the results of the simulation infections = self.initialise_infections() Console.rule("Running the model") from .utils import run_model population = run_model(network=self, population=population, infections=infections, rngs=rngs, output_dir=output_dir, nsteps=nsteps, nthreads=nthreads, profiler=profiler, iterator=iterator, extractor=extractor, mixer=mixer, mover=mover) return population
[docs] def reset_everything(self, nthreads: int = 1, profiler=None): """Resets the networks ready for a new run of the model""" if self.overall: self.overall.reset_everything(nthreads=nthreads, profiler=profiler) for subnet in self.subnets: subnet.reset_everything(nthreads=nthreads, profiler=profiler)
[docs] def update(self, params: Parameters, demographics=None, population=None, nthreads: int = 1, profiler=None): """Update this network with a new set of parameters (and optionally demographics). This is used to update the parameters for the network for a new run. The network will be reset and ready for a new run. Parameters ---------- params: Parameters The new parameters with which to update this Network demographics: Demographics The new demographics with which to update this Network. Note that this will return a Network object that contains the specilisation of this Network nthreads: int Number of threads over which to parallelise this update profiler: Profiler The profiler used to profile this update Returns ------- network: Network or Networks Either this Network after it has been updated, or the resulting Networks from specialising this Network using Demographics """ if profiler is None: from .utils import NullProfiler profiler = NullProfiler() p = profiler.start("overall.update") self.overall.update(params, profiler=p) p = p.stop() if demographics is not None: if demographics != self.demographics: from .utils._worker import must_rebuild_network if must_rebuild_network(network=self, params=self.params, demographics=demographics): networks = demographics.build( params=self.params, population=population, nthreads=nthreads, profiler=p) else: # we have a change in demographics, so need to re-specialise networks = demographics.specialise(network=self.overall, profiler=p, nthreads=nthreads) p.stop() return networks for i in range(0, len(self.demographics)): demographic = self.demographics[i] p = p.start(f"{demographic.name}.update") if demographic.name in params.specialised_demographics(): subnet_params = params[demographic.name] else: subnet_params = params if demographic.adjustment: subnet_params = subnet_params.set_variables( demographic.adjustment) self.subnets[i].update(subnet_params, profiler=p) p = p.stop()
[docs] def initialise_infections(self, nthreads: int = 1): """Initialise and return the space that will be used to track infections """ from ._infections import Infections return Infections.build(network=self)
[docs] def rescale_play_matrix(self, nthreads: int = 1, profiler=None): """Rescale the play matrix""" if self.overall: self.overall.reset_everything(nthreads=nthreads, profiler=profiler) for subnet in self.subnets: subnet.rescale_play_matrix(nthreads=nthreads, profiler=profiler)
[docs] def move_from_play_to_work(self, nthreads: int = 1, profiler=None): """Move the population from play to work""" if self.overall: self.overall.move_from_play_to_work(nthreads=nthreads, profiler=profiler) for subnet in self.subnets: subnet.move_from_play_to_work(nthreads=nthreads, profiler=profiler)