Source code for metawards.utils._worker

from typing import Union as _Union
from typing import Dict as _Dict

from .._network import Network
from .._networks import Networks
from .._demographics import Demographics
from .._parameters import Parameters
from .._outputfiles import OutputFiles

__all__ = ["run_worker", "prepare_worker", "must_rebuild_network"]

global_network = None

def must_rebuild_network(network: _Union[Network, Networks],
                         params: Parameters,
                         demographics: Demographics) -> bool:
    """Return whether a change in parameters or demographics
       would force a rebuild of the passed network
    if network is None or params is None:
        return True
    elif demographics is None and isinstance(network, Networks):
        return True
    elif demographics is not None:
        if len(demographics) <= 1 and isinstance(network, Networks):
            return True
        elif len(demographics) > 1 and isinstance(network, Network):
            return True

    if network.params.input_files != params.input_files:
        return True

    if demographics is None:
        return False

    if len(demographics) != len(network.demographics):
        return True

    if demographics.is_multi_network():
        # check if any of the networks has changed
        if not network.demographics.is_multi_network():
            return True

        for d1, d2 in zip(demographics, network.demographics):
            if !=
                return True

    return False

[docs]def prepare_worker(params: Parameters, demographics: Demographics, options: _Dict[str, any]) -> _Union[Network, Networks]: """Prepare a worker to receive work to run a model using the passed parameters. This will build the network specified by the parameters and will store it in global memory ready to be used for a model run. Note that these are silent, printing nothing to stdout or stderr Parameters ---------- params: Parameters Parameters used to build the network demographics: Demographics If not None, then demographics used to specialise the Network into Networks """ global global_network max_nodes = options["max_nodes"] max_links = options["max_links"] nthreads = options["nthreads"] del options["max_nodes"] del options["max_links"] profiler = options["profiler"] from ._console import Console if must_rebuild_network(network=global_network, params=params, demographics=demographics): Console.print("Must rebuild network...") if demographics is not None: network =, population=options.get("population", None), max_nodes=max_nodes, max_links=max_links, nthreads=nthreads, profiler=profiler) else: network =, population=options.get("population", None), profiler=profiler, nthreads=nthreads, max_nodes=max_nodes, max_links=max_links) global_network = network # always work in a copy network = global_network.copy() if params.adjustments is not None: Console.rule("Adjustable parameters to scan") Console.print("\n".join([f"* {x}" for x in params.adjustments]), markdown=True) Console.rule() network.update(params=params, demographics=demographics, nthreads=nthreads, profiler=profiler) return network
[docs]def run_worker(arguments): """Ask the worker to run a model using the passed variables and options. This will write to options['output_dir'] and will also return the population object that contains the final population data. WARNING - the iterator and extractor arguments rely on the workers starting in the same directory as the main process, so that they can load the same python files (if the user is using a custom iterator or extractor) """ params = arguments["params"] demographics = arguments["demographics"] options = arguments["options"] # next, run the job, writing to output outdir = options["output_dir"] auto_bzip = options["auto_bzip"] del options["auto_bzip"] from ._console import Console with OutputFiles(outdir, check_empty=False, force_empty=False, prompt=None, auto_bzip=auto_bzip) as output_dir: with Console.redirect_output(outdir=outdir, auto_bzip=auto_bzip): try: # first, build and prepare the Network(s). This is built once # from the parameters and demographics by loading files from # the filesystem, as sending this over the physical network # would be too expensive. Subsequent calls to this function # after the Network(s) has been built will call # network.update(params, demographics) network = prepare_worker(params=params, demographics=demographics, options=options) # if the user wanted to remove this directory then they would # have done so in the main process - no need to check again options["output_dir"] = output_dir output =**options) return output except Exception: Console.print_exception() raise