Source code for EoN.simulation

import networkx as nx
import random
import heapq
import numpy as np
import EoN
from collections import defaultdict
from collections import Counter

#######################
#                     #
#   Auxiliary stuff   #
#                     #
#######################



def _truncated_exponential_(rate, T):
    r'''returns a number between 0 and T from an
    exponential distribution conditional on the outcome being between 0 and T'''
    t = random.expovariate(rate)
    L = int(t/T)
    return t - L*T
   
class myQueue(object):
    r'''
    This class is used to store and act on a priority queue of events for 
    event-driven simulations.  It is based on heapq.

    Each queue is given a tmax (default is infinity) so that any event at later 
    time is ignored.
    
    This is a priority queue of 4-tuples of the form 
                   ``(t, counter, function, function_arguments)``

    The ``'counter'`` is present just to break ties, which generally only occur when 
    multiple events are put in place for the initial condition, but could also 
    occur in cases where events tend to happen at discrete times.

    note that the function is understood to have its first argument be t, and 
    the tuple ``function_arguments`` does not include this first t.

    So function is called as 
        ``function(t, *function_arguments)``

    Previously I used a class of events, but sorting using the __lt__ function 
    I wrote was significantly slower than simply using tuples.
    '''
    def __init__(self, tmax=float("Inf")):
        self._Q_ = []
        self.tmax=tmax
        self.counter = 0 #tie-breaker for putting things in priority queue
    def add(self, time, function, args = ()):
        r'''time is the time of the event.  args are the arguments of the
        function not including the first argument which must be time'''
        if time<self.tmax:   
            heapq.heappush(self._Q_, (time, self.counter, function, args))
            self.counter += 1
    def pop_and_run(self):
        r'''Pops the next event off the queue and performs the function'''
        t, counter, function, args = heapq.heappop(self._Q_)
        function(t, *args)
    def __len__(self): 
        r'''this will allow us to use commands like ``while Q:`` '''
        return len(self._Q_)
        
# from sortedcontainers import SortedList 
# 
#         
# class _MySortedList_(object):
#     r'''
#     This code is intended as a possible replacement for the ListDict that I've
#     been using.  
#     
#     I'm leaving it in place, but so far for my tests it is slower than ListDict.
#     It might do better in cases where the probabilities of events have a wider
#     range (in which case the rejection sampling in ListDict would be slow).  
#     
#     Since the cases I've tried may have fairly similar probabilities, the rejection
#     sampling seems to outperform it.  If rejection sampling often resulted in rejections, this
#     data type might perform better.
#      ''' 
#     
#      
#     def __init__(self, weighted = False):
#         if weighted is True:
#             self.weighted = True
#             self.SL = SortedList()
#             self.itemcounter = 0
#             #
#             #the SortedList self.SL will hold tuples that look like
#             #(weight, itemcounter, item).
#             #itemcounter is included to break any ties and make sure that
#             #the comparison algorithm never looks at item (since I don't know
#             #whether comparison is even defined for them.
#             #
#             #The entries will be ordered by weight first, and if there is
#             #a tie, then by itemcounter (which increments whenever something
#             #is added)
#             #
#             
#             self.item2weight = {}
#             #
#             #self.item2weight is a dict where self.item2weight[item] is a tuple
#             # of the form (weight, itemcounter), which will allow us to quickly
#             #find the item in the list if we ever need it.
#             #it also allows for a quick test to see if item has been put in.
#             self._total_weight = 0
#         else:
#             self.weighted = False #need to build up this case.  Should basically be a listdict
#             raise EoN.EoNError("you should be using a ListDict instead")
#             
#     def insert(self, item, weight = None):
#         r'''
#         If not present, then inserts the thing (with weight if appropriate)
#         if already there, replaces the weight unless weight is 0
#         
#         If weight is 0, then it removes the item and doesn't replace.
#         
#         WARNING:
#             if already present, then replaces weight.  To increment weight
#             use update
#         '''
#         
#         if item in self.item2weight:
#             datum = (*self.item2weight.pop(item), item)
#             self.SL.remove(datum)
#         if weight > 0:
#             self.update(item, weight_increment=weight)
# 
#     def update(self, item, weight_increment = None):
#         r'''
#         If not present, then inserts the thing (with weight if appropriate)
#         if already there, increments weight
#         
#         WARNING:
#             if already present, just increments weight.  To overwrite weight 
#             use insert.
#         '''
#         if weight_increment is not None: #will break if passing a weight to unweighted case
#             self._total_weight += weight_increment
#             if item in self.item2weight:
#                 datum = (*self.item2weight.pop(item), item)
#                 self.SL.remove(datum)
#                 new_weight = datum[0]+weight_increment
#             else:
#                 new_weight = weight_increment
#             if new_weight >0:
#                 self.itemcounter += 1
#                 self.SL.add((new_weight, self.itemcounter, item))
#                 self.item2weight[item] = (new_weight, self.itemcounter)
#         elif self.weighted:
#             raise Exception('if weighted, must assign weight_increment')
#     
#     
#     def __len__(self):
#         return len(self.items)
# 
#     def __contains__(self, item):
#         return item in self.item2weight
# 
#     def remove(self, item):
#         datum = (*self.item2weight.pop(item), item)
#         self.SL.remove(datum)
#         self._total_weight -= datum[0]
#         
#     def choose_random(self):
#         #presumes weighted and sorted in increasing order.  So start from end
#         #and work back.
#         current_index = len(self.SL)-1
#         largest_r = 0
#         q=0
#         p=1
#         while current_index >= 0:
#             datum = self.SL[current_index]
#             W = datum[0]
#             r = (q+p*random.random())**(1./W)
#             if r> largest_r:
#                 largest_r = r
#                 returnindex = current_index
#                 
#             q = largest_r**W
#             p = 1-q
#             current_index -= np.random.geometric(p)
#     
#         return self.SL[returnindex][2]
# 
#         
# 
#     def random_removal(self):
#         r'''uses other class methods to choose an entry by weight and then remove it'''
#         choice = self.choose_random()
#         self.remove(choice)
#         return choice
# 
#     def total_weight(self):
#         if self.weighted:
#             return self._total_weight
#         else:
#             return len(self)
# 
#     def update_total_weight(self):
#         self._total_weight = sum(self.weight[item] for item in self.items)


class _ListDict_(object):
    r'''
    The Gillespie algorithm will involve a step that samples a random element 
    from a set based on its weight.  This is awkward in Python.
    
    So I'm introducing a new class based on a stack overflow answer by
    Amber (http://stackoverflow.com/users/148870/amber) 
    for a question by
    tba (http://stackoverflow.com/users/46521/tba) 
    found at
    http://stackoverflow.com/a/15993515/2966723
    
    This will allow me to select a random element uniformly, and then use 
    rejection sampling to make sure it's been selected with the appropriate 
    weight.

    
    I believe a faster data structure can be created with a (binary) tree.
    We add an object with a weight to the tree.  The nodes track their weights
    and the sum of the weights below it.  So choosing a random object (by weight)
    means that we choose a random number between 0 and weight_sum.  Then 
    if it's less than the first node's weight, we choose that.  Otherwise,
    we see if the remaining bit is less than the total under the first child.  
    If so, go there, otherwise, it's the other child.  Then iterate.  Adding
    a node would probably involve placing higher weight nodes higher in
    the tree.  Currently I don't have a fast enough implementation of this
    for my purposes.  So for now I'm sticking to the mixture of lists & 
    dictionaries.
    
    I believe this structure I'm describing is similar to a "partial sum tree"
    or a "Fenwick tree", but they seem subtly different from this.
    '''
    def __init__(self, weighted = False):
        self.item_to_position = {}
        self.items = []

        self.weighted = weighted
        if self.weighted:
            self.weight = defaultdict(int) #presume all weights positive
            self.max_weight = 0
            self._total_weight = 0
            self.max_weight_count = 0
            

    def __len__(self):
        return len(self.items)

    def __contains__(self, item):
        return item in self.item_to_position

    def _update_max_weight(self):
        C = Counter(self.weight.values())  #may be a faster way to do this, we only need to count the max.
        self.max_weight = max(C.keys())
        self.max_weight_count = C[self.max_weight]

        
    def insert(self, item, weight = None):
        r'''
        If not present, then inserts the thing (with weight if appropriate)
        if already there, replaces the weight unless weight is 0
        
        If weight is 0, then it removes the item and doesn't replace.
        
        WARNING:
            replaces weight if already present, does not increment weight.
            
        
        '''
        if self.__contains__(item):
            self.remove(item)
        if weight != 0:
            self.update(item, weight_increment=weight)
        

    def update(self, item, weight_increment = None):
        r'''
        If not present, then inserts the thing (with weight if appropriate)
        if already there, increments weight
        
        WARNING:
            increments weight if already present, cannot overwrite weight.
        '''
        if weight_increment is not None: #will break if passing a weight to unweighted case
            if weight_increment >0 or self.weight[item] != self.max_weight:
                self.weight[item] = self.weight[item] + weight_increment
                self._total_weight += weight_increment
                if self.weight[item] > self.max_weight:
                    self.max_weight_count = 1
                    self.max_weight = self.weight[item]
                elif self.weight[item] == self.max_weight:
                    self.max_weight_count += 1
            else: #it's a negative increment and was at max
                self.max_weight_count -= 1
                self.weight[item] = self.weight[item] + weight_increment
                self._total_weight += weight_increment
                self.max_weight_count -= 1 
                if self.max_weight_count == 0:
                    self._update_max_weight               
        elif self.weighted:
            raise Exception('if weighted, must assign weight_increment')

        if item in self: #we've already got it, do nothing else
            return
        self.items.append(item)
        self.item_to_position[item] = len(self.items)-1

    def remove(self, choice):
        position = self.item_to_position.pop(choice)
        last_item = self.items.pop()
        if position != len(self.items):
            self.items[position] = last_item
            self.item_to_position[last_item] = position
            
        if self.weighted:
            weight = self.weight.pop(choice)
            self._total_weight -= weight
            if weight == self.max_weight:  
                #if we find ourselves in this case often
                #it may be better just to let max_weight be the
                #largest weight *ever* encountered, even if all remaining weights are less
                #
                self.max_weight_count -= 1
                if self.max_weight_count == 0 and len(self)>0:
                    self._update_max_weight()

    def choose_random(self):
        # r'''chooses a random node.  If there is a weight, it will use rejection
        # sampling to choose a random node until it succeeds'''
        if self.weighted:
            while True:
                choice = random.choice(self.items)
                if random.random() < self.weight[choice]/self.max_weight:
                    break
            # r = random.random()*self.total_weight
            # for item in self.items:
            #     r-= self.weight[item]
            #     if r<0:
            #         break
            return choice

        else:
            return random.choice(self.items)
        

    def random_removal(self):
        r'''uses other class methods to choose and then remove a random node'''
        choice = self.choose_random()
        self.remove(choice)
        return choice

    def total_weight(self):
        if self.weighted:
            return self._total_weight
        else:
            return len(self)
    def update_total_weight(self):
        self._total_weight = sum(self.weight[item] for item in self.items)

def _transform_to_node_history_(infection_times, recovery_times, tmin, SIR = True):
    r'''The original (v0.96 and earlier) returned infection_times and recovery_times.
    The new version returns node_history instead. This code transforms
    the former to the latter.
    
    It is only used for the continuous time cases.
    '''
    if SIR:
        node_history = defaultdict(lambda : ([tmin], ['S']))
        for node, time in infection_times.items():
            if time == tmin:
                node_history[node] = ([], [])
            node_history[node][0].append(time)
            node_history[node][1].append('I')
        for node, time in recovery_times.items():
            if time == tmin:
                node_history[node] = ([], [])
            node_history[node][0].append(time)
            node_history[node][1].append('R')        
    else:
        node_history = defaultdict(lambda : ([tmin], ['S']))
        for node, Itimes in infection_times.items():
            Rtimes = recovery_times[node]
            while Itimes:
                time = Itimes.pop(0)
                if time == tmin:
                    node_history[node] = ([], [])
                node_history[node][0].append(time)
                node_history[node][1].append('I')
                if Rtimes:
                    time = Rtimes.pop(0)
                    node_history[node][0].append(time)
                    node_history[node][1].append('S')
        
    return node_history


##########################
#                        #
#    SIMULATION CODE     #
#                        #
##########################

'''
    The code in the region below is used for stochastic simulation of 
    epidemics on networks
'''

def _simple_test_transmission_(u, v, p):
    r'''
    A simple test for whether u transmits to v assuming constant probability p
    
    From figure 6.8 of Kiss, Miller, & Simon.  Please cite the book if 
    using this test_transmission function for basic_discrete_SIR.

    This handles the simple case where transmission occurs with 
    probability p.

    :Arguments:

        u (node)
            the infected node
        v : node
            the susceptible node
        p : number between 0 and 1
            the transmission probability

    :Returns:
        
    
            
            True if u will infect v (given opportunity)
            False otherwise
    '''

    return random.random()<p


[docs]def discrete_SIR(G, test_transmission=_simple_test_transmission_, args=(), initial_infecteds=None, initial_recovereds = None, rho = None, tmin = 0, tmax = float('Inf'), return_full_data = False, sim_kwargs = None): #tested in test_discrete_SIR r''' Simulates an SIR epidemic on G in discrete time, allowing user-specified transmission rules From figure 6.8 of Kiss, Miller, & Simon. Please cite the book if using this algorithm. Return details of epidemic curve from a discrete time simulation. It assumes that individuals are infected for exactly one unit of time and then recover with immunity. This is defined to handle a user-defined function ``test_transmission(node1,node2,*args)`` which determines whether transmission occurs. So elaborate rules can be created as desired by the user. By default it uses ``_simple_test_transmission_`` in which case args should be entered as (p,) :Arguments: **G** NetworkX Graph (or some other structure which quacks like a NetworkX Graph) The network on which the epidemic will be simulated. **test_transmission** function(u,v,*args) (see below for args definition) A function that determines whether u transmits to v. It returns True if transmission happens and False otherwise. The default will return True with probability p, where args=(p,) This function can be user-defined. It is called like: test_transmission(u,v,*args) Note that if args is not entered, then args=(), and this call is equivalent to test_transmission(u,v) **args** a list or tuple The arguments of test_transmission coming after the nodes. If simply having transmission with probability p it should be entered as args=(p,) [note the comma is needed to tell Python that this is really a tuple] **initial_infecteds** node or iterable of nodes (default None) if a single node, then this node is initially infected if an iterable, then whole set is initially infected if None, then choose randomly based on rho. If rho is also None, a random single node is chosen. If both initial_infecteds and rho are assigned, then there is an error. **initial_recovereds** as for initial_infecteds, but initially recovered nodes. **rho** number (default is None) initial fraction infected. initial number infected is int(round(G.order()*rho)). The default results in a single randomly chosen initial infection. **tmin** start time **tmax** stop time (default Infinity). **return_full_data** boolean (default False) Tells whether a Simulation_Investigation object should be returned. **sim_kwargs** keyword arguments Any keyword arguments to be sent to the Simulation_Investigation object Only relevant if ``return_full_data=True`` :Returns: **t, S, I, R** numpy arrays Or ``if return_full_data is True`` returns **full_data** Simulation_Investigation object from this we can extract the status history of all nodes We can also plot the network at given times and even create animations using class methods. :SAMPLE USE: :: import networkx as nx import EoN import matplotlib.pyplot as plt G = nx.fast_gnp_random_graph(1000,0.002) t, S, I, R = EoN.discrete_SIR(G, args = (0.6,), initial_infecteds=range(20)) plt.plot(t,I) Because this sample uses the defaults, it is equivalent to a call to basic_discrete_SIR ''' if rho is not None and initial_infecteds is not None: raise EoN.EoNError("cannot define both initial_infecteds and rho") if initial_infecteds is None: #create initial infecteds list if not given if rho is None: initial_number = 1 else: initial_number = int(round(G.order()*rho)) initial_infecteds=random.sample(G.nodes(), initial_number) elif G.has_node(initial_infecteds): initial_infecteds=[initial_infecteds] #else it is assumed to be a list of nodes. if return_full_data: node_history = defaultdict(lambda : ([tmin], ['S'])) transmissions = [] for node in initial_infecteds: node_history[node] = ([tmin], ['I']) transmissions.append((tmin-1, None, node)) if initial_recovereds is not None: for node in initial_recovereds: node_history[node] = ([tmin], ['R']) N=G.order() t = [tmin] S = [N-len(initial_infecteds)] I = [len(initial_infecteds)] R = [0] susceptible = defaultdict(lambda: True) #above line is equivalent to u.susceptible=True for all nodes. for u in initial_infecteds: susceptible[u] = False if initial_recovereds is not None: for u in initial_recovereds: susceptible[u] = False infecteds = set(initial_infecteds) while infecteds and t[-1]<tmax: new_infecteds = set() infector = {} #used for returning full data. a waste of time otherwise for u in infecteds: for v in G.neighbors(u): if susceptible[v] and test_transmission(u, v, *args): new_infecteds.add(v) susceptible[v] = False infector[v] = [u] elif return_full_data and v in new_infecteds and test_transmission(u, v, *args): #if ``v`` already infected on this round, consider if it is #multiply infected this round. infector[v].append(u) if return_full_data: for v in infector.keys(): transmissions.append((t[-1], random.choice(infector[v]), v)) next_time = t[-1]+1 if next_time <= tmax: for u in infecteds: node_history[u][0].append(next_time) node_history[u][1].append('R') for v in new_infecteds: node_history[v][0].append(next_time) node_history[v][1].append('I') infecteds = new_infecteds R.append(R[-1]+I[-1]) I.append(len(infecteds)) S.append(S[-1]-I[-1]) t.append(t[-1]+1) if not return_full_data: return np.array(t), np.array(S), np.array(I), \ np.array(R) else: if sim_kwargs is None: sim_kwargs = {} return EoN.Simulation_Investigation(G, node_history, transmissions, possible_statuses = ['S', 'I', 'R'], **sim_kwargs)
[docs]def basic_discrete_SIR(G, p, initial_infecteds=None, initial_recovereds = None, rho = None, tmin = 0, tmax=float('Inf'), return_full_data = False, sim_kwargs = None): #tested in test_basic_discrete_SIR r''' Performs simple discrete SIR simulation assuming constant transmission probability p. From figure 6.8 of Kiss, Miller, & Simon. Please cite the book if using this algorithm. Does a simulation of the simple case of all nodes transmitting with probability p independently to each neighbor and then recovering. :Arguments: **G** networkx Graph The network the disease will transmit through. **p** number transmission probability **initial_infecteds** node or iterable of nodes (default None) if a single node, then this node is initially infected if an iterable, then whole set is initially infected if None, then choose randomly based on rho. If rho is also None, a random single node is chosen. If both initial_infecteds and rho are assigned, then there is an error. **initial_recovereds** as for initial_infecteds, but for initially recovered nodes. **rho** number (default None) initial fraction infected. number initially infected is int(round(G.order()*rho)) The default results in a single randomly chosen initial infection. **tmin** float (default 0) start time **tmax** float (default infinity) stop time (if not extinct first). **return_full_data** boolean (default False) Tells whether a Simulation_Investigation object should be returned. **sim_kwargs** keyword arguments Any keyword arguments to be sent to the Simulation_Investigation object Only relevant if ``return_full_data=True`` :Returns: if return_full_data is False returns **t, S, I, R** numpy arrays these numpy arrays give all the times observed and the number in each state at each time. Or ``if return_full_data is True`` returns **full_data** Simulation_Investigation object from this we can extract the status history of all nodes We can also plot the network at given times and even create animations using class methods. :SAMPLE USE: :: import networkx as nx import EoN import matplotlib.pyplot as plt G = nx.fast_gnp_random_graph(1000,0.002) t, S, I, R = EoN.basic_discrete_SIR(G, 0.6) plt.plot(t,S) #This sample may be boring if the randomly chosen initial infection #doesn't trigger an epidemic. ''' return discrete_SIR(G, _simple_test_transmission_, (p,), initial_infecteds, initial_recovereds, rho, tmin, tmax, return_full_data, sim_kwargs=sim_kwargs)
[docs]def basic_discrete_SIS(G, p, initial_infecteds=None, rho = None, tmin = 0, tmax = 100, return_full_data = False, sim_kwargs = None): '''Does a simulation of the simple case of all nodes transmitting with probability p independently to each susceptible neighbor and then recovering. This is not directly described in Kiss, Miller, & Simon. :Arguments: **G** networkx Graph The network the disease will transmit through. **p** number transmission probability **initial_infecteds** node or iterable of nodes (default None) if a single node, then this node is initially infected if an iterable, then whole set is initially infected if None, then choose randomly based on rho. If rho is also None, a random single node is chosen. If both initial_infecteds and rho are assigned, then there is an error. **rho** number initial fraction infected. number is int(round(G.order()*rho)) **return_full_data** boolean (default False) Tells whether a Simulation_Investigation object should be returned. **sim_kwargs** keyword arguments Any keyword arguments to be sent to the Simulation_Investigation object Only relevant if ``return_full_data=True`` :Returns: if return_full_data is False **t, S, I** All numpy arrays if return_full_data is True **full_data** Simulation_Investigation object from this we can extract the status history of all nodes We can also plot the network at given times and even create animations using class methods. :SAMPLE USE: :: import networkx as nx import EoN import matplotlib.pyplot as plt G = nx.fast_gnp_random_graph(1000,0.002) t, S, I = EoN.basic_discrete_SIS(G, 0.6, tmax = 20) plt.plot(t,S) ''' if rho is not None and initial_infecteds is not None: raise EoN.EoNError("cannot define both initial_infecteds and rho") if initial_infecteds is None: #create initial infecteds list if not given if rho is None: initial_number = 1 else: initial_number = int(round(G.order()*rho)) initial_infecteds=random.sample(G.nodes(), initial_number) elif G.has_node(initial_infecteds): initial_infecteds=[initial_infecteds] #else it is assumed to be a list of nodes. if return_full_data: transmissions = [] node_history = defaultdict(lambda : ([tmin], ['S'])) for u in initial_infecteds: node_history[u] = ([tmin], ['I']) transmissions.append((tmin-1, None, u)) N=G.order() t = [tmin] S = [N-len(initial_infecteds)] I = [len(initial_infecteds)] infecteds = set(initial_infecteds) while infecteds and t[-1]<tmax: new_infecteds = set() infector={} for u in infecteds: for v in G.neighbors(u): if v not in infecteds and random.random()<p: if v not in new_infecteds: new_infecteds.add(v) infector[v] = [u] else: infector[v].append(u) # new_infecteds.union({v for v in G.neighbors(u) # if random.random()<p and v not in infecteds}) if return_full_data: for v in infector.keys(): transmissions.append((t[-1], random.choice(infector[v]), v)) next_time = t[-1]+1 if next_time<= tmax: for u in infecteds: node_history[u][0].append(next_time) node_history[u][1].append('S') for v in new_infecteds: node_history[v][0].append(next_time) node_history[v][1].append('I') infecteds = new_infecteds t.append(t[-1]+1) S.append(N-len(infecteds)) I.append(len(infecteds)) if not return_full_data: return np.array(t), np.array(S), np.array(I) else: if sim_kwargs is None: sim_kwargs = {} return EoN.Simulation_Investigation(G, node_history, transmissions, possible_statuses = ['S', 'I'], **sim_kwargs)
[docs]def percolate_network(G, p): #tested indirectly in test_basic_discrete_SIR r''' Performs percolation on a network G with each edge persisting with probability p From figure 6.10 of Kiss, Miller, & Simon. Please cite the book if using this algorithm. Performs bond percolation on the network G, keeping edges with probability p :Arguments: **G** networkx Graph The contact network **p** number between 0 and 1 the probability of keeping edge :Returns: **H** NetworkX Graph A network with same nodes as G, but with each edge retained independently with probability p. :SAMPLE USE: :: import networkx as nx import EoN import matplotlib.pyplot as plt G = nx.fast_gnp_random_graph(1000,0.002) H = EoN.percolate_network(G, 0.6) #H is now a graph with about 60% of the edges of G ''' H = nx.Graph() H.add_nodes_from(G.nodes()) for edge in G.edges(): if random.random()<p: H.add_edge(*edge) return H
def _edge_exists_(u, v, H): r''' Tests if directed edge ``u``, ``v`` exists in graph ``H``. From figure 6.10 of Kiss, Miller, & Simon. Please cite the book if using this algorithm. Tests whether ``H`` has an edge from ``u`` to ``v``. :Arguments: **u** node **v** node **H** networkx graph :Returns: **True** if ``H`` has the edge **False** if ``H`` does not have the edge ''' return H.has_edge(u,v)
[docs]def percolation_based_discrete_SIR(G, p, initial_infecteds=None, initial_recovereds = None, rho = None, tmin = 0, tmax = float('Inf'), return_full_data = False, sim_kwargs = None): #tested in test_basic_discrete_SIR r''' perfoms a simple SIR epidemic but using percolation as the underlying method. From figure 6.10 of Kiss, Miller, & Simon. Please cite the book if using this algorithm. The simple case of all nodes transmitting with probability p independently to each neighbor and then recovering, but using a percolation-based approach. :Warning: You probably **shouldn't use this algorithm**. See ``basic_discrete_SIR`` which produces equivalent outputs. That algorithm will be faster than this one. The value of this function is that by performing many simulations we can see that the outputs of the two are equivalent. This algorithm leads to a better understanding of the theory, but it's not computationally efficient. :Arguments: **G** networkx Graph The network the disease will transmit through. **p** number transmission probability **initial_infecteds** node or iterable of nodes (default None) if a single node, then this node is initially infected if an iterable, then whole set is initially infected if None, then choose randomly based on rho. If rho is also None, a random single node is chosen. If both initial_infecteds and rho are assigned, then there is an error. **initial_recovereds** as for initial_infecteds, but initially recovered nodes. **rho** number initial fraction infected. number is int(round(G.order()*rho)) **tmin** start time **tmax** stop time (if not extinct first). Default step is 1. **return_full_data** boolean (default False) Tells whether a Simulation_Investigation object should be returned. **sim_kwargs** keyword arguments Any keyword arguments to be sent to the Simulation_Investigation object Only relevant if ``return_full_data=True`` :Returns: **t, S, I, R** numpy arrays OR if ``return_full_data is True``: **full_data** Simulation_Investigation object from this we can extract the status history of all nodes We can also plot the network at given times and even create animations using class methods. :SAMPLE USE: :: import networkx as nx import EoN import matplotlib.pyplot as plt G = nx.fast_gnp_random_graph(1000,0.002) t, S, I, R = EoN.percolation_based_discrete_SIR(G, p) plt.plot(t,S) This is equivalent to basic_discrete_epidemic (but many simulations may be needed before it's clear, since these are stochastic) ''' H = percolate_network(G, p) return discrete_SIR(H, test_transmission=H.has_edge, initial_infecteds=initial_infecteds, initial_recovereds = initial_recovereds, rho = rho, tmin = tmin, tmax = tmax, return_full_data=return_full_data, sim_kwargs=sim_kwargs)
[docs]def estimate_SIR_prob_size(G, p): #tested in test_estimate_SIR_prob_size r''' Uses percolation to estimate the probability and size of epidemics assuming constant transmission probability p From figure 6.12 of Kiss, Miller, & Simon. Please cite the book if using this algorithm. Provies an estimate of epidemic probability and size assuming a fixed transmission probability p. The estimate is found by performing bond percolation and then finding the largest connected component in the remaining network. This assumes that there is a single giant component above threshold. It will not be an appropriate measure if the network is made up of several densely connected components with very weak connections between these components. :Arguments: **G** networkx Graph The network the disease will transmit through. **p** number transmission probability :Returns: **PE, AR** both floats between 0 and 1 estimates of the probability and proportion infected (attack rate) in epidemics (the two are equal, but each given for consistency with ``estimate_directed_SIR_prob_size``) :SAMPLE USE: :: import networkx as nx import EoN G = nx.fast_gnp_random_graph(1000,0.002) PE, AR = EoN.estimate_SIR_prob_size(G, 0.6) ''' H = percolate_network(G, p) size = max((len(CC) for CC in nx.connected_components(H))) returnval = float(size)/G.order() return returnval, returnval
[docs]def directed_percolate_network(G, tau, gamma, weights = True): #indirectly tested in test_estimate_SIR_prob_size r''' performs directed percolation, assuming that transmission and recovery are Markovian From figure 6.13 of Kiss, Miller, & Simon. Please cite the book if using this algorithm. This performs directed percolation corresponding to an SIR epidemic assuming that transmission is at rate tau and recovery at rate gamma :See Also: ``nonMarkov_directed_percolate_network`` which allows for duration and time to infect to come from other distributions. ``nonMarkov_directed_percolate_network`` which allows for more complex rules :Arguments: **G** networkx Graph The network the disease will transmit through. **tau** positive float transmission rate **gamma** positive float recovery rate **weights** boolean (default True) if True, then includes information on time to recovery and delay to transmission. If False, just the directed graph. :Returns: : **H** networkx DiGraph (directed graph) a u->v edge exists in H if u would transmit to v if ever infected. The edge has a time attribute (time_to_infect) which gives the delay from infection of u until transmission occurs. Each node u has a time attribute (duration) which gives the duration of its infectious period. :SAMPLE USE: :: import networkx as nx import EoN G = nx.fast_gnp_random_graph(1000,0.002) H = EoN.directed_percolate_network(G, 2, 1) ''' #simply calls directed_percolate_network_with_timing, using markovian rules. def trans_time_fxn(u, v, tau): if tau>0: return random.expovariate(tau) else: return float('Inf') trans_time_args = (tau,) def rec_time_fxn(u, gamma): if gamma>0: return random.expovariate(gamma) else: return float('Inf') rec_time_args = (gamma,) return nonMarkov_directed_percolate_network_with_timing(G, trans_time_fxn, rec_time_fxn, trans_time_args, rec_time_args, weights=weights)
def _out_component_(G, source): ''' rather than following the pseudocode in figure 6.15 of Kiss, Miller & Simon, this uses a built-in Networkx command. finds the set of nodes (including source) which are reachable from nodes in source. :Arguments: **G** networkx Graph The network the disease will transmit through. **source** either a node or an iterable of nodes (set, list, tuple) The nodes from which the infections start. We assume no node will ever have a name that is an iterable of other node names. It will run, but may not use source user expects. :Returns: **reachable_nodes** set the set of nodes reachable from source (including source). :Warning: if the graph G has nodes like 1, 2, 3, and (1,2,3), then a source of (1,2,3) is potentially ambiguous. This algorithm will interpret the source as the single node (1,2,3) ''' if G.has_node(source): source_nodes = {source} else: source_nodes = set(source) reachable_nodes = set().union(source_nodes) for node in source_nodes: reachable_nodes = reachable_nodes.union( set(nx.descendants(G, node))) return reachable_nodes def _in_component_(G, target): r''' creates the _in_component_ by basically reversing _out_component_. :Arguments: **G** networkx Graph The network the disease will transmit through. **target** a target node (or iterable of target nodes) The node whose infection we are interested in. In principle target could be an iterable, but in this case we would be finding those possible sources whose infection leads to infection of at least one target, not all. :Warning: if the graph G has nodes like 1, 2, 3, and (1,2,3), then a target of (1,2,3) is potentially ambiguous. It will interpret the target as the single node (1,2,3) :Returns: **source_nodes** (set) the set of nodes (including target) from which target is reachable ''' if G.has_node(target): #potential bug if for example G has nodes like 1, 2, 3, and also #(1,2,3). Then a target of (1,2,3) is ambiguous. target_nodes = {target} else: target_nodes = set(target) source_nodes = set().union(target_nodes) for node in target_nodes: source_nodes = source_nodes.union(set(nx.ancestors(G, node))) return source_nodes
[docs]def get_infected_nodes(G, tau, gamma, initial_infecteds=None, initial_recovereds = None): r''' Finds all eventually infected nodes in an SIR simulation, through a percolation approach From figure 6.15 of Kiss, Miller, & Simon. Please cite the book if using this algorithm Assumes that the intial infecteds are as given and transmission occurs with rate tau and recovery with rate gamma. Note that the output of this algorithm is stochastic. This code has similar run-time whether an epidemic occurs or not. There are much faster ways to implement an algorithm giving the same output, for example by actually running one of the epidemic simulation tools. :Warning: why are you using this command? If it's to better understand the relationship between percolation and SIR epidemics, that's fine. But this command IS NOT an efficient way to calculate anything. Don't do it like this. Use one of the other algorithms. Try ``fast_SIR``, for example. :Arguments: **G** networkx Graph The network the disease will transmit through. **tau** positive float transmission rate **gamma** positive float recovery rate **initial_infecteds** node or iterable of nodes if a single node, then this node is initially infected if an iterable, then whole set is initially infected if None, then a randomly chosen node is initially infected. **initial_recovereds** node or iterable of nodes if a single node, then this node is initially recovered if an iterable, then whole set is initially recovered :Returns: **infected_nodes** set the set of nodes infected eventually in a simulation. :SAMPLE USE: :: import networkx as nx import EoN G = nx.fast_gnp_random_graph(1000,0.002) finalR = EoN.get_infected_nodes(G, 2, 1, initial_infecteds=[0, 5]) #finds the nodes infected if 0 and 5 are the initial nodes infected #and tau=2, gamma=1 ''' if initial_recovereds is None: initial_recovereds = set() elif G.has_node(initial_recovereds): initial_recovereds = set([initial_recovereds]) else: initial_recovereds = set(initial_recovereds) if initial_infecteds is None: while True: node = random.choice(G.nodes()) if node not in initial_recovereds: break initial_infecteds=set([node]) elif G.has_node(initial_infecteds): initial_infecteds=set([initial_infecteds]) else: initial_infecteds = set(initial_infecteds) if initial_infecteds.intersection(initial_recovereds): raise EoN.EoNError("initial infecteds and initial recovereds overlap") H = directed_percolate_network(G, tau, gamma) for node in initial_recovereds: H.remove_node(node) infected_nodes = _out_component_(H, initial_infecteds) return infected_nodes
[docs]def estimate_directed_SIR_prob_size(G, tau, gamma): #tested in test_estimate_SIR_prob_size ''' Predicts probability and attack rate assuming continuous-time Markovian SIR disease on network G From figure 6.17 of Kiss, Miller, & Simon. Please cite the book if using this algorithm :See Also: ``estimate_nonMarkov_SIR_prob_size`` which handles nonMarkovian versions :Arguments: **G** networkx Graph The network the disease will transmit through. **tau** positive float transmission rate **gamma** positive float recovery rate :Returns: **PE, AR** numbers (between 0 and 1) Estimates of epidemic probability and attack rate found by performing directed percolation, finding largest strongly connected component and finding its in/out components. :SAMPLE USE: :: import networkx as nx import EoN G = nx.fast_gnp_random_graph(1000,0.003) PE, AR = EoN.estimate_directed_SIR_prob_size(G, 2, 1) ''' H = directed_percolate_network(G, tau, gamma) return estimate_SIR_prob_size_from_dir_perc(H)
[docs]def estimate_SIR_prob_size_from_dir_perc(H): #indirectly tested in test_estimate_SIR_prob_size r''' Estimates probability and size of SIR epidemics for an input network after directed percolation From figure 6.17 of Kiss, Miller, & Simon. Please cite the book if using this algorithm :Arguments: **H** directed graph The outcome of directed percolation on the contact network G :Returns: **PE, AR** numbers Estimates of epidemic probability and attack rate found by finding largest strongly connected component and finding in/out components. :SAMPLE USE: :: import networkx as nx import EoN G = nx.fast_gnp_random_graph(1000,0.003) H = some_user_defined_operation_to_do_percolation(G, argument) PE, AR = EoN.estimate_SIR_prob_size_from_dir_perc(H) ''' Hscc = max((nx.strongly_connected_components(H)), key = len) u = list(Hscc)[0] #random.choice(Hscc) inC = _in_component_(H, u) #includes both H_{IN} and H_{SCC} outC = _out_component_(H, u) #includes both H_{OUT} and H_{SCC} N=float(H.order()) PE = len(inC)/N AR = len(outC)/N return PE, AR
[docs]def estimate_nonMarkov_SIR_prob_size_with_timing(G, trans_time_fxn, rec_time_fxn, trans_time_args=(), rec_time_args=()): ''' estimates probability and size for user-input transmission and recovery time functions. :Arguments: **G** Networkx Graph the input graph **trans_time_fxn** function trans_time_fxn(u, v, *trans_time_args) returns the delay from u's infection to transmission to v. **rec_time_fxn** function rec_time_fxn(u, *rec_time_args) returns the delay from u's infection to its recovery. **trans_time_args** tuple any additional arguments required by trans_time_fxn. For example weights of nodes. **rec_time_args** tuple any additional arguments required by rec_time_fxn :Returns: **PE, AR** numbers (between 0 and 1) Estimates of epidemic probability and attack rate found by finding largest strongly connected component and finding in/out components. :SAMPLE USE: :: #mimicking the standard version with transmission rate tau #and recovery rate gamma #equivalent to #PE, AR = EoN.estimate_SIR_prob_size(G, tau, gamma) import networkx as nx import EoN import random from collections import defaultdict G=nx.fast_gnp_random_graph(1000,0.002) tau = 2 gamma = 1 def trans_time_fxn(u,v, tau): return random.expovariate(tau) def rec_time_fxn(u, gamma): return random.expovariate(gamma) PE, AR = EoN.estimate_nonMarkov_SIR_prob_size(G, trans_time_fxn=trans_time_fxn, rec_time_fxn = rec_time_fxn, trans_time_args = (tau,), rec_time_args = (gamma,) ) ''' H = nonMarkov_directed_percolate_network_with_timing(G, trans_time_fxn, rec_time_fxn, trans_time_args, rec_time_args) return estimate_SIR_prob_size_from_dir_perc(H)
[docs]def estimate_nonMarkov_SIR_prob_size(G, xi, zeta, transmission): ''' Predicts epidemic probability and size using nonMarkov_directed_percolate_network. This is not directly described in Kiss, Miller, & Simon, but is based on (fig 6.18). :Warning: You probably DON'T REALLY WANT TO USE THIS. Check if estimate_nonMarkov_prob_size_with_timing fits your needs better. :Arguments: **G** (networkx Graph) The input graph **xi** dict xi[u] gives all necessary information to determine what u's infectiousness is. **zeta** dict zeta[v] gives everything needed about v's susceptibility **transmission** user-defined function transmission(xi[u], zeta[v]) determines whether u transmits to v. Returns True or False depending on whether the transmission would happen :Returns: **PE, AR** numbers (between 0 and 1) Estimates of epidemic probability and attack rate found by finding largest strongly connected component and finding in/out components. :SAMPLE USE: :: #mimicking the standard version with transmission rate tau #and recovery rate gamma import networkx as nx import EoN import random from collections import defaultdict G=nx.fast_gnp_random_graph(1000,0.002) tau = 2 gamma = 1 xi = {node:random.expovariate(gamma) for node in G.nodes()} #xi[node] is duration of infection of node. zeta = defaultdict(lambda : tau) #every node has zeta=tau, so same #transmission rate def my_transmission(infection_duration, trans_rate): #infect if duration is longer than time to infection. if infection_duration > random.expovariate(trans_rate): return True else: return False PE, AR = EoN.estimate_nonMarkov_SIR_prob_size(G, xi, zeta, my_transmission) ''' H = nonMarkov_directed_percolate_network(G, xi, zeta, transmission) return estimate_SIR_prob_size_from_dir_perc(H)
[docs]def nonMarkov_directed_percolate_network_with_timing(G, trans_time_fxn, rec_time_fxn, trans_time_args=(), rec_time_args=(), weights=True): r''' Performs directed percolation on G for user-specified transmission time and recovery time distributions. A generalization of figure 6.13 of Kiss, Miller & Simon :See Also: **directed_percolate_network** if it's just a constant transmission and recovery rate. **nonMarkov_directed_percolate_network** if your rule for creating the percolated network cannot be expressed as simply calculating durations and delays until transmission. :Arguments: The arguments are very much like in fast_nonMarkov_SIR **G** Networkx Graph the input graph **trans_time_fxn** user-defined function returns the delay from u's infection to transmission to v. delay=trans_time_fxn(u, v, *trans_time_args) **rec_time_fxn** user-defined function returns the duration of from u's infection (delay to its recovery). duration = rec_time_fxn(u, *rec_time_args) Note: if delay == duration, we assume infection happens. **trans_time_args** tuple any additional arguments required by trans_time_fxn. For example weights of nodes. **rec_time_args** tuple any additional arguments required by rec_time_fxn **weights** boolean if true, then return directed network with the delay and duration as weights. :Returns: **H** directed graph. The returned graph is built by assigning each node an infection duration and then each edge (in each direction) a delay until transmission. If delay<duration it adds directed edge to ``H``. if weights is True, then ``H`` contains the duration and delays as weights. Else it's just a directed graph. ''' H = nx.DiGraph() if weights: for u in G.nodes(): duration = rec_time_fxn(u, *rec_time_args) H.add_node(u, duration = duration) for v in G.neighbors(u): delay = trans_time_fxn(u, v, *trans_time_args) if delay<=duration: H.add_edge(u,v, delay_to_infection = delay) else: for u in G.nodes(): duration = rec_time_fxn(u, *rec_time_args) H.add_node(u) for v in G.neighbors(u): delay = trans_time_fxn(u, v, *trans_time_args) if delay<=duration: H.add_edge(u,v) return H
[docs]def nonMarkov_directed_percolate_network(G, xi, zeta, transmission): r''' performs directed percolation on a network following user-specified rules. From figure 6.18 of Kiss, Miller, & Simon. Please cite the book if using this algorithm. This algorithm is particularly intended for a case where the duration and delays from infection to transmission are somehow related to one another. :Warning: You probably **shouldn't use this**. Check if nonMarkov_directed_percolate_with_timing fits your needs better. :See Also: ``nonMarkov_directed_percolate_network_with_timing`` if your rule for creating the percolated network is based on calculating a recovery time for each node and then calculating a separate transmission time for the edges this will be better. ``directed_percolate_network`` if it's just a constant transmission and recovery rate. :Arguments: **G** networkx Graph The input graph **xi** dict xi[u] gives all necessary information to determine what us infectiousness is. **zeta** dict zeta[v] gives everything needed about vs susceptibility **transmission** user-defined function transmission(xi[u], zeta[v]) determines whether u transmits to v. returns True if transmission happens and False if it does not :Returns: **H** networkx DiGraph (directed graph) Edge (u,v) exists in H if disease will transmit given the opportunity. :SAMPLE USE: for now, I'm being lazy. Look at the sample for estimate_nonMarkov_SIR_prob_size to infer it. ''' H = nx.DiGraph() for u in G.nodes(): H.add_node(u) for v in G.neighbors(u): if transmission(xi[u],zeta[v]): H.add_edge(u,v) return H
### Code starting here does event-driven simulations ### def _find_trans_and_rec_delays_SIR_(node, sus_neighbors, trans_time_fxn, rec_time_fxn, trans_time_args=(), rec_time_args=()): rec_delay = rec_time_fxn(node, *rec_time_args) trans_delay={} for target in sus_neighbors: trans_delay[target] = trans_time_fxn(node, target, *trans_time_args) return trans_delay, rec_delay def _process_trans_SIR_(time, G, source, target, times, S, I, R, Q, status, rec_time, pred_inf_time, transmissions, trans_and_rec_time_fxn, trans_and_rec_time_args = ()): r''' From figure A.4 of Kiss, Miller, & Simon. Please cite the book if using this algorithm. :Arguments: time : number time of transmission **G** networkx Graph node : node node receiving transmission. times : list list of times at which events have happened S, I, R : lists lists of numbers of nodes of each status at each time Q : myQueue the queue of events status : dict dictionary giving status of each node rec_time : dict dictionary giving recovery time of each node pred_inf_time : dict dictionary giving predicted infeciton time of nodes trans_and_rec_time_fxn : function trans_and_rec_time_fxn(node, susceptible_neighbors, *trans_and_rec_time_args) returns tuple consisting of dict of delays until transmission from node to neighbors and float having delay until recovery of node An example of how to use this appears in the code fast_SIR where depending on whether inputs are weighted, it constructs different versions of this function and then calls fast_nonMarkov_SIR. trans_and_rec_time_args : tuple (default empty) see trans_and_rec_time_fxn :Returns: nothing returned :MODIFIES: status : updates status of newly infected node rec_time : adds recovery time for node times : appends time of event S : appends new S (reduced by 1 from last) I : appends new I (increased by 1) R : appends new R (same as last) Q : adds recovery and transmission events for newly infected node. pred_inf_time : updated for nodes that will receive transmission ''' if status[target] == 'S': #nothing happens if already infected. status[target] = 'I' times.append(time) transmissions.append((time, source, target)) S.append(S[-1]-1) #one less susceptible I.append(I[-1]+1) #one more infected R.append(R[-1]) #no change to recovered suscep_neighbors = [v for v in G.neighbors(target) if status[v]=='S'] trans_delay, rec_delay = trans_and_rec_time_fxn(target, suscep_neighbors, *trans_and_rec_time_args) rec_time[target] = time + rec_delay if rec_time[target]<=Q.tmax: Q.add(rec_time[target], _process_rec_SIR_, args = (target, times, S, I, R, status)) for v in trans_delay: inf_time = time + trans_delay[v] if inf_time<= rec_time[target] and inf_time < pred_inf_time[v] and inf_time<=Q.tmax: Q.add(inf_time, _process_trans_SIR_, args = (G, target, v, times, S, I, R, Q, status, rec_time, pred_inf_time, transmissions, trans_and_rec_time_fxn, trans_and_rec_time_args ) ) pred_inf_time[v] = inf_time def _process_rec_SIR_(time, node, times, S, I, R, status): r'''From figure A.3 of Kiss, Miller, & Simon. Please cite the book if using this algorithm. :Arguments: event : event has details on node and time times : list list of times at which events have happened S, I, R : lists lists of numbers of nodes of each status at each time status : dict dictionary giving status of each node :Returns: : Nothing MODIFIES ---------- status : updates status of newly recovered node times : appends time of event S : appends new S (same as last) I : appends new I (decreased by 1) R : appends new R (increased by 1) ''' times.append(time) S.append(S[-1]) #no change to number susceptible I.append(I[-1]-1) #one less infected R.append(R[-1]+1) #one more recovered status[node] = 'R' def _trans_and_rec_time_Markovian_const_trans_(node, sus_neighbors, tau, rec_rate_fxn): r'''I introduced this with a goal of making the code run faster. It looks like the fancy way of selecting the infectees and then choosing their infection times is slower than just cycling through, finding infection times and checking if that time is less than recovery time. So I've commented out the more "sophisticated" approach. ''' duration = random.expovariate(rec_rate_fxn(node)) trans_prob = 1-np.exp(-tau*duration) number_to_infect = np.random.binomial(len(sus_neighbors),trans_prob) #print(len(suscep_neighbors),number_to_infect,trans_prob, tau, duration) transmission_recipients = random.sample(sus_neighbors,number_to_infect) trans_delay = {} for v in transmission_recipients: trans_delay[v] = _truncated_exponential_(tau, duration) return trans_delay, duration # duration = random.expovariate(rec_rate_fxn(node)) # trans_delay = {} # # # for v in sus_neighbors: # if tau == 0: # trans_delay[v] = float('Inf') # else: # trans_delay[v] = random.expovariate(tau) # # if delay<duration: # # trans_delay[v] = delay # return trans_delay, duration ##slow approach 1: # next_delay = random.expovariate(tau) # index, delay = int(next_delay//duration), next_delay%duration # while index<len(sus_neighbors): # trans_delay[sus_neighbors[index]] = delay # next_delay = random.expovariate(tau) # jump, delay = int(next_delay//duration), next_delay%duration # index += jump ##slow approach 2: #trans_prob = 1-np.exp(-tau*duration) #number_to_infect = np.random.binomial(len(sus_neighbors),trans_prob) #print(len(suscep_neighbors),number_to_infect,trans_prob, tau, duration) #transmission_recipients = random.sample(sus_neighbors,number_to_infect) #trans_delay = {} #for v in transmission_recipients: # trans_delay[v] = _truncated_exponential_(tau, duration) return trans_delay, duration
[docs]def fast_SIR(G, tau, gamma, initial_infecteds = None, initial_recovereds = None, rho = None, tmin = 0, tmax=float('Inf'), transmission_weight = None, recovery_weight = None, return_full_data = False, sim_kwargs = None): r''' fast SIR simulation for exponentially distributed infection and recovery times From figure A.3 of Kiss, Miller, & Simon. Please cite the book if using this algorithm. :Arguments: **G** networkx Graph The underlying network **tau** number transmission rate per edge **gamma** number recovery rate per node **initial_infecteds** node or iterable of nodes if a single node, then this node is initially infected if an iterable, then whole set is initially infected if None, then choose randomly based on rho. If rho is also None, a random single node is chosen. If both initial_infecteds and rho are assigned, then there is an error. **initial_recovereds** iterable of nodes (default None) this whole collection is made recovered. Currently there is no test for consistency with initial_infecteds. Understood that everyone who isn't infected or recovered initially is initially susceptible. **rho** number initial fraction infected. number is int(round(G.order()*rho)) **tmin** number (default 0) starting time **tmax** number (default Infinity) maximum time after which simulation will stop. the default of running to infinity is okay for SIR, but not for SIS. **transmission_weight** string (default None) the label for a weight given to the edges. transmission rate is G.adj[i][j][transmission_weight]*tau **recovery_weight** string (default None)) a label for a weight given to the nodes to scale their recovery rates gamma_i = G.nodes[i][recovery_weight]*gamma **return_full_data** boolean (default False) Tells whether a Simulation_Investigation object should be returned. **sim_kwargs** keyword arguments Any keyword arguments to be sent to the Simulation_Investigation object Only relevant if ``return_full_data=True`` :Returns: **times, S, I, R** numpy arrays Or if ``return_full_data is True`` **full_data** Simulation_Investigation object from this we can extract the status history of all nodes. We can also plot the network at given times and create animations using class methods. :SAMPLE USE: :: import networkx as nx import EoN import matplotlib.pyplot as plt G = nx.configuration_model([1,5,10]*100000) initial_size = 10000 gamma = 1. tau = 0.3 t, S, I, R = EoN.fast_SIR(G, tau, gamma, initial_infecteds = range(initial_size)) plt.plot(t, I) ''' #tested in test_SIR_dynamics if transmission_weight is not None or tau*gamma == 0: trans_rate_fxn, rec_rate_fxn = EoN._get_rate_functions_(G, tau, gamma, transmission_weight, recovery_weight) def trans_time_fxn(source, target, trans_rate_fxn): rate = trans_rate_fxn(source, target) if rate >0: return random.expovariate(rate) else: return float('Inf') def rec_time_fxn(node, rec_rate_fxn): rate = rec_rate_fxn(node) if rate >0: return random.expovariate(rate) else: return float('Inf') trans_time_args = (trans_rate_fxn,) rec_time_args = (rec_rate_fxn,) return fast_nonMarkov_SIR(G, trans_time_fxn = trans_time_fxn, rec_time_fxn = rec_time_fxn, trans_time_args = trans_time_args, rec_time_args = rec_time_args, initial_infecteds = initial_infecteds, initial_recovereds = initial_recovereds, rho=rho, tmin = tmin, tmax = tmax, return_full_data = return_full_data, sim_kwargs=sim_kwargs) else: #the transmission rate is tau for all edges. We can use this #to speed up the code. #get rec_rate_fxn (recovery rate may be variable) trans_rate_fxn, rec_rate_fxn = EoN._get_rate_functions_(G, tau, gamma, transmission_weight, recovery_weight) return fast_nonMarkov_SIR(G, trans_and_rec_time_fxn=_trans_and_rec_time_Markovian_const_trans_, trans_and_rec_time_args=(tau, rec_rate_fxn), initial_infecteds = initial_infecteds, initial_recovereds = initial_recovereds, rho=rho, tmin = tmin, tmax = tmax, return_full_data = return_full_data, sim_kwargs=sim_kwargs)
[docs]def fast_nonMarkov_SIR(G, trans_time_fxn=None, rec_time_fxn=None, trans_and_rec_time_fxn = None, trans_time_args=(), rec_time_args=(), trans_and_rec_time_args = (), initial_infecteds = None, initial_recovereds = None, rho=None, tmin = 0, tmax = float('Inf'), return_full_data = False, sim_kwargs = None): r''' A modification of the algorithm in figure A.3 of Kiss, Miller, & Simon to allow for user-defined rules governing time of transmission. Please cite the book if using this algorithm. This is useful if the transmission rule is non-Markovian in time, or for more elaborate models. Allows the user to define functions (details below) to determine the rules of transmission times and recovery times. There are two ways to do this. The user can define a function that calculates the recovery time and another function that calculates the transmission time. If recovery is after transmission, then transmission occurs. We do this if the time to transmission is independent of the time to recovery. Alternately, the user may want to model a situation where time to transmission and time to recovery are not independent. Then the user can define a single function (details below) that would determine both recovery and transmission times. :Arguments: **G** Networkx Graph **trans_time_fxn** a user-defined function returns the delay until transmission for an edge. May depend on various arguments and need not be Markovian. Returns float Will be called using the form ``trans_delay = trans_time_fxn(source_node, target_node, *trans_time_args)`` Here trans_time_args is a tuple of the additional arguments the functions needs. the source_node is the infected node the target_node is the node that may receive transmission rec_delay is the duration of source_node's infection, calculated by rec_time_fxn. **rec_time_fxn** a user-defined function returns the delay until recovery for a node. May depend on various arguments and need not be Markovian. Returns float. Called using the form ``rec_delay = rec_time_fxn(node, *rec_time_args)`` Here rec_time_args is a uple of additional arguments the function needs. **trans_and_rec_time_fxn** a user-defined function returns both a dict giving delay until transmissions for all edges from source to susceptible neighbors and a float giving delay until recovery of the source. Can only be used **INSTEAD OF** ``trans_time_fxn`` AND ``rec_time_fxn``. Gives an **ERROR** if these are also defined Called using the form ``trans_delay_dict, rec_delay = trans_and_rec_time_fxn( node, susceptible_neighbors, *trans_and_rec_time_args)`` here trans_delay_dict is a dict whose keys are those neighbors who receive a transmission and rec_delay is a float. **trans_time_args** tuple see trans_time_fxn **rec_time_args** tuple see rec_time_fxn **trans_and_rec_time_args** tuple see trans_and_rec_time_fxn **initial_infecteds** node or iterable of nodes if a single node, then this node is initially infected if an iterable, then whole set is initially infected if None, then choose randomly based on rho. If rho is also None, a random single node is chosen. If both initial_infecteds and rho are assigned, then there is an error. **initial_recovereds** iterable of nodes (default None) this whole collection is made recovered. Currently there is no test for consistency with initial_infecteds. Understood that everyone who isn't infected or recovered initially is initially susceptible. **rho** number initial fraction infected. number is int(round(G.order()*rho)) **tmin** number (default 0) starting time **tmax** number (default infinity) final time **return_full_data** boolean (default False) Tells whether a Simulation_Investigation object should be returned. **sim_kwargs** keyword arguments Any keyword arguments to be sent to the Simulation_Investigation object Only relevant if ``return_full_data=True`` :Returns: **times, S, I, R** numpy arrays Or if ``return_full_data is True`` **full_data** Simulation_Investigation object from this we can extract the status history of all nodes We can also plot the network at given times and even create animations using class methods. :SAMPLE USE: :: import EoN import networkx as nx import matplotlib.pyplot as plt import random N=1000000 G = nx.fast_gnp_random_graph(N, 5/(N-1.)) #set up the code to handle constant transmission rate #with fixed recovery time. def trans_time_fxn(source, target, rate): return random.expovariate(rate) def rec_time_fxn(node,D): return D D = 5 tau = 0.3 initial_inf_count = 100 t, S, I, R = EoN.fast_nonMarkov_SIR(G, trans_time_fxn=trans_time_fxn, rec_time_fxn=rec_time_fxn, trans_time_args=(tau,), rec_time_args=(D,), initial_infecteds = range(initial_inf_count)) # note the comma after ``tau`` and ``D``. This is needed for python # to recognize these are tuples # initial condition has first 100 nodes in G infected. ''' if rho and initial_infecteds: raise EoN.EoNError("cannot define both initial_infecteds and rho") if rho and initial_recovereds: raise EoN.EoNError("cannot define both initial_recovereds and rho") if (trans_time_fxn and not rec_time_fxn) or (rec_time_fxn and not trans_time_fxn): raise EoN.EoNError("must define both trans_time_fxn and rec_time_fxn or neither") elif trans_and_rec_time_fxn and trans_time_fxn: raise EoN.EoNError("cannot define trans_and_rec_time_fxn at the same time as trans_time_fxn and rec_time_fxn") elif not trans_and_rec_time_fxn and not trans_time_fxn: raise EoN.EoNError("if not defining trans_and_rec_time_fxn, must define trans_time_fxn and rec_time_fxn") if not trans_and_rec_time_fxn: #we define the joint function. trans_and_rec_time_fxn = _find_trans_and_rec_delays_SIR_ trans_and_rec_time_args = (trans_time_fxn, rec_time_fxn, trans_time_args, rec_time_args) #now we define the initial setup. status = defaultdict(lambda: 'S') #node status defaults to 'S' rec_time = defaultdict(lambda: tmin-1) #node recovery time defaults to -1 if initial_recovereds is not None: for node in initial_recovereds: status[node] = 'R' rec_time[node] = tmin-1 #default value for these. Ensures that the recovered nodes appear with a time pred_inf_time = defaultdict(lambda: float('Inf')) #infection time defaults to \infty --- this could be set to tmax, #probably with a slight improvement to performance. Q = myQueue(tmax) if initial_infecteds is None: #create initial infecteds list if not given if rho is None: initial_number = 1 else: initial_number = int(round(G.order()*rho)) initial_infecteds=random.sample(G.nodes(), initial_number) elif G.has_node(initial_infecteds): initial_infecteds=[initial_infecteds] #else it is assumed to be a list of nodes. times, S, I, R= ([tmin], [G.order()], [0], [0]) transmissions = [] for u in initial_infecteds: pred_inf_time[u] = tmin Q.add(tmin, _process_trans_SIR_, args=(G, None, u, times, S, I, R, Q, status, rec_time, pred_inf_time, transmissions, trans_and_rec_time_fxn, trans_and_rec_time_args ) ) #Note that when finally infected, pred_inf_time is correct #and rec_time is correct. #So if return_full_data is true, these are correct while Q: #all the work is done in this while loop. Q.pop_and_run() #the initial infections were treated as ordinary infection events at #time 0. #So each initial infection added an entry at time 0 to lists. #We'd like to get rid these excess events. times = times[len(initial_infecteds):] S=S[len(initial_infecteds):] I=I[len(initial_infecteds):] R=R[len(initial_infecteds):] if not return_full_data: return np.array(times), np.array(S), np.array(I), \ np.array(R) else: #strip pred_inf_time and rec_time down to just the values for nodes #that became infected #could use iteritems for Python 2, by try ... except AttributeError infection_times = {node:time for (node,time) in pred_inf_time.items() if status[node]!='S'} recovery_times = {node:time for (node,time) in rec_time.items() if status[node] =='R'} node_history = _transform_to_node_history_(infection_times, recovery_times, tmin, SIR = True) if sim_kwargs is None: sim_kwargs = {} return EoN.Simulation_Investigation(G, node_history, transmissions, possible_statuses = ['S', 'I', 'R'], **sim_kwargs)
def _find_trans_and_rec_delays_SIS_(node, neighbors, trans_time_fxn, rec_time_fxn, trans_time_args=(), rec_time_args=()): rec_delay = rec_time_fxn(node, *rec_time_args) trans_delays={} for target in neighbors: trans_delays[target] = trans_time_fxn(node, target, rec_delay, *trans_time_args) return trans_delays, rec_delay def _process_trans_SIS_Markov(time, G, source, target, times, S, I, Q, status, rec_time, infection_times, recovery_times, transmissions, trans_rate_fxn, rec_rate_fxn): r'''From figure A.6 of Kiss, Miller, & Simon. Please cite the book if using this algorithm. Does the Markovian version. So it doesn't take in a list of transmission times :Arguments: **time** number current time **G** networkx Graph **source** node node causing transmission **target** node node receiving transmission. **times** list list of times at which events have happened **S, I** lists lists of numbers of nodes of each status at each time **Q** myQueue the queue of events **status** dict dictionary giving status of each node **rec_time** dict dictionary giving recovery time of each node **infection_times** **recovery_times** **transmissions** list) list of tuples (t, source, target) for each transmission event. **trans_rate_fxn** User-defined function transmission rate trans_rate_fxn(u,v) gives transmission rate from u to v **rec_rate_fxn** User-defined function recovery rate rec_rate_fxn(u) is recovery rate of u. :Returns: nothing returned :MODIFIES: status : updates status of target rec_time : adds recovery time for target times : appends time of event infection_times[node] : appends time of infection. S : appends new S (reduced by 1 from last) I : appends new I (increased by 1) Q : adds recovery and transmission events for target. ''' if status[target] == 'S': status[target] = 'I' transmissions.append((time, source, target)) I.append(I[-1]+1) #one more infected S.append(S[-1]-1) #one less susceptible times.append(time) rec_rate = rec_rate_fxn(target) if rec_rate>0: rec_time[target] = time + random.expovariate(rec_rate_fxn(target)) elif rec_rate == 0: rec_time[target] = float('Inf') else: raise EoN.EoNError('recovery rate must be non-negative') if rec_time[target]<Q.tmax: Q.add(rec_time[target], _process_rec_SIS_, args = (target, times, recovery_times, S, I, status)) for v in G.neighbors(target): #target plays role of source here _find_next_trans_SIS_Markov(Q, time, trans_rate_fxn(target, v), target, v, status, rec_time, trans_event_args = (G, target, v, times, S, I, Q, status, rec_time, infection_times, recovery_times, transmissions, trans_rate_fxn, rec_rate_fxn ) ) infection_times[target].append(time) if source is not None: _find_next_trans_SIS_Markov(Q, time, trans_rate_fxn(source, target), source, target, status, rec_time, trans_event_args = (G, source, target, times, S, I, Q, status, rec_time, infection_times, recovery_times, transmissions, trans_rate_fxn, rec_rate_fxn ) ) def _process_trans_SIS_nonMarkov_(time, G, source, target, future_transmissions, times, S, I, Q, status, rec_time, infection_times, recovery_times, transmissions, trans_and_rec_time_fxn, trans_and_rec_time_args=()): r'''From figure A.6 of Kiss, Miller, & Simon. Please cite the book if using this algorithm. Does the nonMarkovian version. :Arguments: time : number current time **G** networkx Graph **source** node node causing transmission **target** node node receiving transmission. times : list list of times at which events have happened S, I: lists lists of numbers of nodes of each status at each time Q : myQueue the queue of events status : dict dictionary giving status of each node rec_time : dict dictionary giving recovery time of each node infection_times : recovery_times : trans_and_rec_time_fxn : function trans_and_rec_time_args : :Returns: nothing returned :MODIFIES: status : updates status of target rec_time : adds recovery time for target times : appends time of event infection_times[node] : appends time of infection. S : appends new S (reduced by 1 from last) I : appends new I (increased by 1) Q : adds recovery and transmission events for target. ''' if status[target] == 'S': status[target] = 'I' transmissions.append((time, source, target)) infection_times[target].append(time) times.append(time) I.append(I[-1]+1) #one more infected S.append(S[-1]-1) #one less susceptible trans_delays, rec_delay = trans_and_rec_time_fxn(target, G.neighbors(target), *trans_and_rec_time_args) rec_time[target] = time + rec_delay if rec_time[target]<Q.tmax: Q.add(rec_time[target], _process_rec_SIS_, args = (target, times, recovery_times, S, I, status)) for v in G.neighbors(target): #target plays role of source here if trans_delays[v]: trans_times = [time + td for td in trans_delays[v]] #when do transmissions happen if status[v] == 'I': #only care about those after current infectious period trans_times = [time for time in trans_times if time>rec_time[v]] following_transmissions = trans_times[1:] if trans_times: #no point adding any if there are none Q.add(trans_times[0], _process_trans_SIS_nonMarkov_, args = (G, target, v, following_transmissions, times, S, I, Q, status, rec_time, infection_times, recovery_times, transmissions, trans_and_rec_time_fxn, trans_and_rec_time_args)) #target is definitely infected now. It has some future_transmissions stored. #do they happen? trans_times = [time for time in future_transmissions if time> rec_time[target]] following_transmissions = trans_times[1:] if trans_times: Q.add(trans_times[0], _process_trans_SIS_nonMarkov_, args = (G, source, target, following_transmissions, times, S, I, Q, status, rec_time, infection_times, recovery_times, transmissions, trans_and_rec_time_fxn, trans_and_rec_time_args)) def _find_next_trans_SIS_Markov(Q, time, tau, source, target, status, rec_time, trans_event_args=()): r'''From figure A.6 of Kiss, Miller, & Simon. Please cite the book if using this algorithm. determines if a transmission from source to target will occur and if so puts into Q :Arguments: Q : myQueue A priority queue of events t : current time tau : transmission rate **source** infected node that may transmit **target** the possibly susceptible node that may receive a transmission status : a dict giving the current status of every node rec_time : a dict giving the recovery time of every node that has been infected. :Returns: : nothing returned MODIFIES -------- Q : if a transmission time is potentially valid, add the first event. when this transmission occurs later we will consider adding another event. note that the event includes the source, so we can later check if same source will transmit again. Entry requirement: ------- Only enter this if the source node is INFECTED. ''' #assert(status[source]=='I') if rec_time[target]<rec_time[source]: #if target is susceptible, then rec_time[target]<time if tau>0: delay = random.expovariate(tau) elif tau == 0: delay = float('Inf') else: raise EoN.EoNError('rate must be non-negative') #transmission_time = max(time, rec_time[target]) + delay transmission_time = time + delay if transmission_time<rec_time[target]: delay = random.expovariate(tau) transmission_time = rec_time[target]+delay if transmission_time < rec_time[source] and transmission_time < Q.tmax: Q.add(transmission_time, _process_trans_SIS_Markov, args = trans_event_args ) def _process_rec_SIS_(time, node, times, recovery_times, S, I, status): r'''From figure A.6 of Kiss, Miller, & Simon. Please cite the book if using this algorithm. ''' times.append(time) recovery_times[node].append(time) S.append(S[-1]+1) #one more susceptible I.append(I[-1]-1) #one less infected status[node] = 'S'
[docs]def fast_SIS(G, tau, gamma, initial_infecteds=None, rho = None, tmin=0, tmax=100, transmission_weight = None, recovery_weight = None, return_full_data = False, sim_kwargs = None): r'''Fast SIS simulations for epidemics on weighted or unweighted networks, allowing edge and node weights to scale the transmission and recovery rates. Assumes exponentially distributed times to recovery and to transmission. From figure A.5 of Kiss, Miller, & Simon. Please cite the book if using this algorithm. :Arguments: **G** networkx Graph The underlying network **tau** positive float transmission rate per edge **gamma** number recovery rate per node **initial_infecteds** node or iterable of nodes if a single node, then this node is initially infected if an iterable, then whole set is initially infected if None, then choose randomly based on rho. If rho is also None, a random single node is chosen. If both initial_infecteds and rho are assigned, then there is an error. **rho** number initial fraction infected. number infected is int(round(G.order()*rho)) **tmin** number (default 0) starting time **tmax** number (default 100) stop time **transmission_weight** string (default None) the label for a weight given to the edges. transmission rate is G.adj[i][j][transmission_weight]*tau **recovery_weight** string (default None) a label for a weight given to the nodes to scale their recovery rates ``gamma_i = G.nodes[i][recovery_weight]*gamma`` **return_full_data** boolean (default False) Tells whether a Simulation_Investigation object should be returned. **sim_kwargs** keyword arguments Any keyword arguments to be sent to the Simulation_Investigation object Only relevant if ``return_full_data=True`` :Returns: **times, S, I** each a numpy array times and number in each status for corresponding time or if return_full_data=True **full_data** Simulation_Investigation object from this we can extract the status history of all nodes. We can also plot the network at given times and even create animations using class methods. :SAMPLE USE: :: import networkx as nx import EoN import matplotlib.pyplot as plt G = nx.configuration_model([1,5,10]*100000) initial_size = 10000 gamma = 1. tau = 0.2 t, S, I = EoN.fast_SIS(G, tau, gamma, tmax = 10, initial_infecteds = range(initial_size)) plt.plot(t, I) ''' if rho is not None and initial_infecteds is not None: raise EoN.EoNError("cannot define both initial_infecteds and rho") trans_rate_fxn, rec_rate_fxn = EoN._get_rate_functions_(G, tau, gamma, transmission_weight, recovery_weight) if initial_infecteds is None: if rho is None: initial_number = 1 else: initial_number = int(round(G.order()*rho)) initial_infecteds=random.sample(G.nodes(), initial_number) elif G.has_node(initial_infecteds): initial_infecteds=[initial_infecteds] times = [tmin] S = [G.order()] I = [0] Q = myQueue(tmax) status = defaultdict(lambda: 'S') #node status defaults to 'S' rec_time = defaultdict(lambda: tmin-1) #node recovery time defaults to -1 infection_times = defaultdict(lambda: []) #defaults to empty list recovery_times = defaultdict(lambda: []) transmissions = [] for u in initial_infecteds: Q.add(tmin, _process_trans_SIS_Markov, args = (G, None, u, times, S, I, Q, status, rec_time, infection_times, recovery_times, transmissions, trans_rate_fxn, rec_rate_fxn) ) while Q: Q.pop_and_run() #the initial infections were treated as ordinary infection events at #time 0. #So each initial infection added an entry at time tmin to lists. #We'd like to get rid these excess events. times = times[len(initial_infecteds):] S=S[len(initial_infecteds):] I=I[len(initial_infecteds):] if not return_full_data: return np.array(times), np.array(S), np.array(I) else: node_history = _transform_to_node_history_(infection_times, recovery_times, tmin, SIR = False) if sim_kwargs is None: sim_kwargs = {} return EoN.Simulation_Investigation(G, node_history, transmissions, possible_statuses=['S', 'I'], **sim_kwargs)
[docs]def fast_nonMarkov_SIS(G, trans_time_fxn=None, rec_time_fxn=None, trans_and_rec_time_fxn = None, trans_time_args=(), rec_time_args = (), trans_and_rec_time_args=(), initial_infecteds = None, rho = None, tmin=0, tmax = 100, return_full_data = False, sim_kwargs = None): r'''Similar to fast_nonMarkov_SIR. :Warning: trans_time_fxn (or trans_and_rec_time_fxn) need to return lists of times. Not just the next time. So this is different from the SIR version. :Arguments: **G** networkx Graph The underlying network **trans_time_fxn** User-defined function returning a list **RETURNS A LIST** **has slightly different arguments than the SIR version** a user-defined function that returns list of delays until transmission for an edge. All delays are before recovery. Each entry is the delay from time of infection of node to time of the given transmission (i.e., it's not looking at delays from one transmission to the next) May depend on various arguments and need not be Markovian. Called using the form trans_delays = trans_time_fxn(source_node, target_node, rec_delay, *trans_time_args) the source_node is the infected node the target_node is the node that may receive transmission **rec_time_fxn** user-designed function returning a float Returns the duration of infection of a node. May depend on various arguments and need not be Markovian. Called using the form duration = rec_time_fxn(node, *rec_time_args) **trans_and_rec_time_fxn** user-defined function returning a dict and a float returns both a dict whose values are lists of delays until transmissions for all edges from source to neighbors and a float giving duration of infection of the source. can only be **used instead of** ``trans_time_fxn`` and ``rec_time_fxn``. there is an **error** if these are also defined. Called using the form trans_delay_dict, duration = trans_and_rec_time_fxn(node, susceptible_neighbors, *trans_and_rec_time_args) here trans_delay_dict is a dict whose keys are those neighbors who receive a transmission and whose values are lists of delays duration is a float. **trans_time_args** tuple see trans_time_fxn **rec_time_args** tuple see rec_time_fxn **trans_and_rec_time_args** tuple see trans_and_rec_time_fxn **initial_infecteds** node or iterable of nodes if a single node, then this node is initially infected if an iterable, then whole set is initially infected if None, then choose randomly based on rho. If rho is also None, a random single node is chosen. If both initial_infecteds and rho are assigned, then there is an error. **rho** number initial fraction infected. number is int(round(G.order()*rho)) **tmin** number (default 0) starting time **tmax** number (default 100) stop time **return_full_data** boolean (default False) Tells whether a Simulation_Investigation object should be returned. **sim_kwargs** keyword arguments Any keyword arguments to be sent to the Simulation_Investigation object Only relevant if ``return_full_data=True`` :Returns: **times, S, I** each a numpy array giving times and number in each status for corresponding time or if return_full_data=True: **full_data** a Simulation_Investigation object from this we can extract the status history of all nodes We can also plot the network at given times and even create animations using class methods. :SAMPLE USE: :: ''' if rho and initial_infecteds: raise EoN.EoNError("cannot define both initial_infecteds and rho") if (trans_time_fxn and not rec_time_fxn) or (rec_time_fxn and not trans_time_fxn): raise EoN.EoNError("must define both trans_time_fxn and rec_time_fxn or neither") if trans_and_rec_time_fxn and trans_time_fxn: raise EoN.EoNError("cannot define trans_and_rec_time_fxn at the same time as either trans_time_fxn or rec_time_fxn") elif not trans_and_rec_time_fxn and not trans_time_fxn: raise EoN.EoNError("if not defining trans_and_rec_time_fxn, must define trans_time_fxn and rec_time_fxn") if not trans_and_rec_time_fxn: #we define the joint function. trans_and_rec_time_fxn = _find_trans_and_rec_delays_SIS_ trans_and_rec_time_args = (trans_time_fxn, rec_time_fxn, trans_time_args, rec_time_args) if initial_infecteds is None: #create initial infecteds list if not given if rho is None: initial_number = 1 else: initial_number = int(round(G.order()*rho)) initial_infecteds=random.sample(G.nodes(), initial_number) elif G.has_node(initial_infecteds): initial_infecteds=[initial_infecteds] times, S, I = ([tmin], [G.order()], [0]) Q = myQueue(tmax) status = defaultdict(lambda: 'S') #node status defaults to 'S' rec_time = defaultdict(lambda: tmin-1) #node recovery time defaults to -1 infection_times = defaultdict(lambda: []) #defaults to empty list recovery_times = defaultdict(lambda: []) transmissions = [] for u in initial_infecteds: Q.add(tmin, _process_trans_SIS_nonMarkov_, args=(G, None, u, [], times, S, I, Q, status, rec_time, infection_times, recovery_times, transmissions, trans_and_rec_time_fxn, trans_and_rec_time_args ) ) while Q: #all the work is done in this while loop. Q.pop_and_run() #the initial infections were treated as ordinary infection events at #time 0. #So each initial infection added an entry at time tmin to lists. #We'd like to get rid these excess events. times = times[len(initial_infecteds):] S=S[len(initial_infecteds):] I=I[len(initial_infecteds):] if not return_full_data: return np.array(times), np.array(S), np.array(I) else: node_history = _transform_to_node_history_(infection_times, recovery_times, tmin, SIR = False) if sim_kwargs is None: sim_kwargs = {} return EoN.Simulation_Investigation(G, node_history, transmissions, possible_statuses = ['S', 'I'], **sim_kwargs)
#####Now dealing with Gillespie code#####
[docs]def Gillespie_SIR(G, tau, gamma, initial_infecteds=None, initial_recovereds = None, rho = None, tmin = 0, tmax=float('Inf'), recovery_weight = None, transmission_weight = None, return_full_data = False, sim_kwargs = None): #tested in test_SIR_dynamics r''' Performs SIR simulations for epidemics. For unweighted networks, the run time is usually slower than fast_SIR, but they are close. If we add weights, then this Gillespie implementation slows down much more. I think there are better ways to implement the algorithm to remove this. This will need a new data type that allows us to quickly sample a random event with appropriate weight. I think this is doable through a binary tree and it is in development. Rather than using figure A.1 of Kiss, Miller, & Simon, this uses a method from Petter Holme "Model versions and fast algorithms for network epidemiology" which focuses on SI edges (versions before 0.99.2 used a method more like fig A.1). This approach will not work for nonMarkovian transmission. Boguna et al "Simulating non-Markovian stochastic processes" have looked at how to handle nonMarkovian transmission in a Gillespie Algorithm. At present I don't see a way to efficientl adapt their approach - I think each substep will take O(N) time. So the full algorithm will be O(N^2). For this, it will be much better to use fast_SIR which I believe is O(N log N) :See Also: **fast_SIR** which has the same inputs but uses a different method to run faster, particularly in the weighted case. :Arguments: **G** networkx Graph The underlying network **tau** positive float transmission rate per edge **gamma** number recovery rate per node **initial_infecteds** node or iterable of nodes if a single node, then this node is initially infected if an iterable, then whole set is initially infected if None, then choose randomly based on rho. If rho is also None, a random single node is chosen. If both initial_infecteds and rho are assigned, then there is an error. **initial_recovereds** iterable of nodes (default None) this whole collection is made recovered. Currently there is no test for consistency with initial_infecteds. Understood that everyone who isn't infected or recovered initially is initially susceptible. **rho** number initial fraction infected. number is int(round(G.order()*rho)) **tmin** number (default 0) starting time **tmax** number (default Infinity) stop time **recovery_weight** string (default None) the string used to define the node attribute for the weight. Assumes that the recovery rate is gamma*G.nodes[u][recovery_weight]. If None, then just uses gamma without scaling. **transmission_weight** string (default None) the string used to define the edge attribute for the weight. Assumes that the transmission rate from u to v is tau*G.adj[u][v][transmission_weight] If None, then just uses tau without scaling. **return_full_data** boolean (default False) Tells whether a Simulation_Investigation object should be returned. **sim_kwargs** keyword arguments Any keyword arguments to be sent to the Simulation_Investigation object Only relevant if ``return_full_data=True`` :Returns: **times, S, I, R** each a numpy array giving times and number in each status for corresponding time OR if return_full_data=True: **full_data** Simulation_Investigation object from this we can extract the status history of all nodes We can also plot the network at given times and even create animations using class methods. :SAMPLE USE: :: import networkx as nx import EoN import matplotlib.pyplot as plt G = nx.configuration_model([1,5,10]*100000) initial_size = 10000 gamma = 1. tau = 0.3 t, S, I, R = EoN.Gillespie_SIR(G, tau, gamma, initial_infecteds = range(initial_size)) plt.plot(t, I) ''' if rho is not None and initial_infecteds is not None: raise EoN.EoNError("cannot define both initial_infecteds and rho") if return_full_data: infection_times = defaultdict(lambda: []) #defaults to an empty list for each node recovery_times = defaultdict(lambda: []) if transmission_weight is not None: def edgeweight(u,v): return G.adj[u][v][transmission_weight] else: def edgeweight(u,v): return None if recovery_weight is not None: def nodeweight(u): return G.nodes[u][recovery_weight] else: def nodeweight(u): return None tau = float(tau) #just to avoid integer division problems in python 2. gamma = float(gamma) if initial_infecteds is None: if rho is None: initial_number = 1 else: initial_number = int(round(G.order()*rho)) initial_infecteds=random.sample(G.nodes(), initial_number) elif G.has_node(initial_infecteds): initial_infecteds=[initial_infecteds] if initial_recovereds is None: initial_recovereds = [] I = [len(initial_infecteds)] R = [len(initial_recovereds)] S = [G.order()-I[0]-R[0]] times = [tmin] transmissions = [] t = tmin status = defaultdict(lambda : 'S') for node in initial_infecteds: status[node] = 'I' if return_full_data: infection_times[node].append(t) transmissions.append((t, None, node)) for node in initial_recovereds: status[node] = 'R' if return_full_data: recovery_times[node].append(t) if recovery_weight is not None: infecteds = _ListDict_(weighted=True) else: infecteds = _ListDict_() #unweighted - code is faster for this case if transmission_weight is not None: IS_links = _ListDict_(weighted=True) else: IS_links = _ListDict_() for node in initial_infecteds: infecteds.update(node, weight_increment = nodeweight(node)) #weight is none if unweighted for nbr in G.neighbors(node): #must have this in a separate loop #from assigning status if status[nbr] == 'S': IS_links.update((node, nbr), weight_increment = edgeweight(node,nbr)) total_recovery_rate = gamma*infecteds.total_weight() #gamma*I_weight_sum total_transmission_rate = tau*IS_links.total_weight()#IS_weight_sum total_rate = total_recovery_rate + total_transmission_rate delay = random.expovariate(total_rate) t += delay while infecteds and t<tmax: if random.random()<total_recovery_rate/total_rate: #recover recovering_node = infecteds.random_removal() #does weighted choice and removes it status[recovering_node]='R' if return_full_data: recovery_times[recovering_node].append(t) for nbr in G.neighbors(recovering_node): if status[nbr] == 'S': IS_links.remove((recovering_node, nbr)) times.append(t) S.append(S[-1]) I.append(I[-1]-1) R.append(R[-1]+1) else: #transmit transmitter, recipient = IS_links.choose_random() #we don't use remove since that complicates the later removal of edges. status[recipient]='I' if return_full_data: transmissions.append((t, transmitter, recipient)) infection_times[recipient].append(t) infecteds.update(recipient, weight_increment = nodeweight(recipient)) for nbr in G.neighbors(recipient): if status[nbr] == 'S': IS_links.update((recipient, nbr), weight_increment=edgeweight(recipient, nbr)) elif status[nbr]=='I' and nbr != recipient: #self edge would break this without last test.elif IS_links.remove((nbr, recipient)) times.append(t) S.append(S[-1]-1) I.append(I[-1]+1) R.append(R[-1]) total_recovery_rate = gamma*infecteds.total_weight()#I_weight_sum total_transmission_rate = tau*IS_links.total_weight()#IS_weight_sum total_rate = total_recovery_rate + total_transmission_rate if total_rate>0: delay = random.expovariate(total_rate) else: delay = float('Inf') t += delay if not return_full_data: return np.array(times), np.array(S), np.array(I), \ np.array(R) else: #print(infection_times) #print(recovery_times) infection_times = {node: L[0] for node, L in infection_times.items()} recovery_times = {node: L[0] for node, L in recovery_times.items()} #print(infection_times) #print(recovery_times) node_history = _transform_to_node_history_(infection_times, recovery_times, tmin, SIR = True) if sim_kwargs is None: sim_kwargs = {} return EoN.Simulation_Investigation(G, node_history, transmissions, possible_statuses = ['S', 'I', 'R'], **sim_kwargs)
[docs]def Gillespie_SIS(G, tau, gamma, initial_infecteds=None, rho = None, tmin = 0, tmax=100, recovery_weight=None, transmission_weight = None, return_full_data = False, sim_kwargs = None): r''' Performs SIS simulations for epidemics on networks with or without weighted edges. It assumes that the edges have a weight associated with them and that the transmission rate for an edge is tau*weight[edge] Based on an algorithm by Petter Holme. It requires a weighted choice of edges and this will be done by tracking the maximum edge weight and then using repeated rejection samples until a successful selection. :See Also: **fast_SIS** which has the same inputs but uses a faster method (esp for weighted graphs). :Arguments: **G** (NetworkX Graph) The underlying network **tau** (positive float) transmission rate per edge **gamma** number recovery rate per node **initial_infecteds** node or iterable of nodes if a single node, then this node is initially infected if an iterable, then whole set is initially infected if None, then choose randomly based on rho. If rho is also None, a random single node is chosen. If both initial_infecteds and rho are assigned, then there is an error. **rho** number initial fraction infected. number is int(round(G.order()*rho)) **tmin** number (default 0) starting time **tmax** number stop time **recovery_weight** string (default None) the string used to define the node attribute for the weight. Assumes that the recovery rate is gamma*G.nodes[u][recovery_weight]. If None, then just uses gamma without scaling. **transmission_weight** string (default None) the string used to define the edge attribute for the weight. Assumes that the transmission rate from u to v is tau*G.adj[u][v][transmission_weight] **return_full_data** boolean (default False) Tells whether a Simulation_Investigation object should be returned. **sim_kwargs** keyword arguments Any keyword arguments to be sent to the Simulation_Investigation object Only relevant if ``return_full_data=True`` :Returns: **times, S, I** numpy arrays giving times and number in each status for corresponding time or if ``return_full_data==True`` **full_data** Simulation_Investigation object from this we can extract the status history of all nodes We can also plot the network at given times and even create animations using class methods. :SAMPLE USE: :: import networkx as nx import EoN import matplotlib.pyplot as plt G = nx.configuration_model([1,5,10]*100000) initial_size = 10000 gamma = 1. tau = 0.2 t, S, I = EoN.Gillespie_SIS(G, tau, gamma, tmax = 20, initial_infecteds = range(initial_size)) plt.plot(t, I) ''' if rho is not None and initial_infecteds is not None: raise EoN.EoNError("cannot define both initial_infecteds and rho") if return_full_data: infection_times = defaultdict(lambda: []) #defaults to an empty list recovery_times = defaultdict(lambda: []) #for each node if transmission_weight is not None: def edgeweight(u,v): return G.adj[u][v][transmission_weight] else: def edgeweight(u,v): return None if recovery_weight is not None: def nodeweight(u): return G.nodes[u][recovery_weight] else: def nodeweight(u): return None tau = float(tau) #just to avoid integer division problems. gamma = float(gamma) if initial_infecteds is None: if rho is None: initial_number = 1 else: initial_number = int(round(G.order()*rho)) initial_infecteds=random.sample(G.nodes(), initial_number) elif G.has_node(initial_infecteds): initial_infecteds=[initial_infecteds] I = [len(initial_infecteds)] S = [G.order()-I[0]] times = [tmin] t = tmin transmissions = [] status = defaultdict(lambda : 'S') for node in initial_infecteds: status[node] = 'I' if return_full_data: infection_times[node].append(t) transmissions.append((t, None, node)) if recovery_weight is None: infecteds = _ListDict_() else: infecteds = _ListDict_(weighted=True) if transmission_weight is None: IS_links = _ListDict_() else: IS_links = _ListDict_(weighted=True) for node in initial_infecteds: infecteds.update(node, weight_increment = nodeweight(node)) for nbr in G.neighbors(node): #must have this in a separate loop #after assigning status of node if status[nbr] == 'S': IS_links.update((node, nbr), weight_increment=edgeweight(node, nbr)) total_recovery_rate = gamma*infecteds.total_weight()#I_weight_sum total_transmission_rate = tau*IS_links.total_weight()#IS_weight_sum total_rate = total_recovery_rate + total_transmission_rate delay = random.expovariate(total_rate) t = t+delay while infecteds and t<tmax: if random.random()<total_recovery_rate/total_rate: #recover recovering_node = infecteds.random_removal() status[recovering_node]='S' if return_full_data: recovery_times[recovering_node].append(t) for nbr in G.neighbors(recovering_node): if nbr == recovering_node: #move past self edges continue elif status[nbr] == 'S': IS_links.remove((recovering_node, nbr)) else: IS_links.update((nbr, recovering_node), weight_increment = edgeweight(recovering_node, nbr)) times.append(t) S.append(S[-1]+1) I.append(I[-1]-1) else: transmitter, recipient = IS_links.choose_random() status[recipient]='I' if return_full_data: infection_times[recipient].append(t) transmissions.append((t, transmitter, recipient)) infecteds.update(recipient, weight_increment = nodeweight(recipient)) for nbr in G.neighbors(recipient): if status[nbr] == 'S': IS_links.update((recipient, nbr), weight_increment = edgeweight(recipient, nbr)) elif nbr != recipient: #otherwise a self-loop breaks the code IS_links.remove((nbr, recipient)) times.append(t) S.append(S[-1]-1) I.append(I[-1]+1) total_recovery_rate = gamma*infecteds.total_weight()#I_weight_sum total_transmission_rate = tau*IS_links.total_weight()#IS_weight_sum total_rate = total_recovery_rate + total_transmission_rate if total_rate>0: delay = random.expovariate(total_rate) else: delay = float('Inf') t += delay if not return_full_data: return np.array(times), np.array(S), np.array(I) else: node_history = _transform_to_node_history_(infection_times, recovery_times, tmin, SIR = False) if sim_kwargs is None: sim_kwargs = {} return EoN.Simulation_Investigation(G, node_history, possible_statuses=['S', 'I'], **sim_kwargs)
[docs]def Gillespie_complex_contagion(G, rate_function, transition_choice, get_influence_set, IC, return_statuses, tmin = 0, tmax=100, parameters = None, return_full_data = False, sim_kwargs = None): r''' Initially intended for a complex contagion. However, this can allow influence from any nodes, not just immediate neighbors. The complex contagion must be something that all nodes do something simultaneously **This is not the same as if node** ``v`` **primes node** ``u`` **and later node** ``w`` **causes** ``u`` **to transition. This will require that both** ``v`` **and** ``w`` **have the relevant states at the moment** **of transition and it has forgotten any previous history.** :Arguments: **G** (NetworkX Graph) The underlying network **rate_function** A function that will take the network, a node, and the statuses of all the nodes and calculate the rate at which the node changes its status. The function is called like if parameters is None: rate_function(G, node, status) else: rate_function(G, node, status, parameters) where G is the graph, node is the node, status is a dict such that status[u] returns the status of u, and parameters is the parameters passed to the function. it returns a number, the combined rate at which ``node`` might change status. **transition_choice** A function that takes the network, a node, and the statuses of all the nodes and chooses which event will happen. The function should be called [with or without ``parameters``] if ``parameters`` is ``None``: ``transition_choice(G, node, status)`` else: ``transition_choice(G, node, status, parameters)`` where ``G`` is the graph, ``node` is the node, ``status` is a dict such that ``status[u]`` returns the status of u, and ``parameters`` is the parameters passed to the function. It should return the new status of ``node`` based on the fact that the node is changing status. **get_influence_set** When a node ``u`` changes status, we want to know which nodes may have their rate altered. We need to update their rates. This function returns all nodes that may be affected by ``u`` (either in its previous state or its current state). We will go through and recalculate the rates for all of these nodes. For a contagion, we can simply choose all neighbors, but it may be faster to leave out any nodes that it wouldn't have affected before or after its transition (e.g., R or I neighbors in SIR). if ``parameters`` is None: ``get_influence_set(G, node, status)`` else: ``get_influence_set(G, node, status, parameters)`` where G is the graph, node is the node, status is a dict such that status[u] returns the status of u, and parameters is the parameters passed to the function. it should return the set of nodes whose rates need to be recalculated. Most likely, it is def get_influence_set(G, node, status): return G.neighbors(node) **IC** A dict. IC[node] returns the status of node. **return_statuses** list or other iterable (but not a generator) The statuses that we will return information for, in the order we will return them. **tmin** number (default 0) starting time **tmax** number (default 100) stop time **return_full_data** boolean currently needs to be False. True raises an error. **parameters** list/tuple. Any parameters of the functions rate_function, transition_choice, influence_set We assume all three functions can accept parameters. Examples: recovery rate, transmission rate, ... :Returns: **(times, status1, status2, ...)** tuple of numpy arrays first entry is the times at which events happen. second (etc) entry is an array with the same number of entries as ``times`` giving the number of nodes of status ordered as they are in ``return_statuses`` :SAMPLE USE: This simply does an SIR epidemic, by saying that the rate of becoming infected is tau times the number of infected neighbors. :: import networkx as nx import EoN import matplotlib.pyplot as plt from collections import defaultdict #makes defining the initial condition easier def rate_function(G, node, status, parameters): #This function needs to return the rate at which node changes status. # tau,gamma = parameters if status[node] == 'I': return gamma elif status[node] == 'S': return tau*len([nbr for nbr in G.neighbors(node) if status[nbr] == 'I']) else: return 0 def transition_choice(G, node, status, parameters): #this function needs to return the new status of node. We already #know it is changing status. # #this function could be more elaborate if there were different #possible transitions that could happen. if status[node] == 'I': return 'R' elif status[node] == 'S': return 'I' def get_influence_set(G, node, status, parameters): #this function needs to return any node whose rates might change #because ``node`` has just changed status. # #the only neighbors a node might affect are the susceptible ones. return {nbr for nbr in G.neighbors(node) if status[nbr] == 'S'} G = nx.fast_gnp_random_graph(100000,0.00005) gamma = 1. tau = 0.5 parameters = (tau, gamma) IC = defaultdict(lambda: 'S') for node in range(200): IC[node] = 'I' t, S, I, R = EoN.Gillespie_complex_contagion(G, rate_function, transition_choice, get_influence_set, IC, return_statuses=('S', 'I', 'R'), parameters=parameters) plt.plot(t, I) ''' if parameters is None: parameters = () status = {node: IC[node] for node in G.nodes()} if return_full_data: node_history = {node:([tmin], [status[node]]) for node in G.nodes()} times = [tmin] t = tmin data = {} C = Counter(status.values()) for return_status in return_statuses: data[return_status] = [C[return_status]] nodes_by_rate = _ListDict_(weighted=True) for u in G.nodes(): rate = rate_function(G, u, status, parameters) if rate>0: nodes_by_rate.insert(u, weight = rate) if nodes_by_rate.total_weight()>0: delay = random.expovariate(nodes_by_rate.total_weight()) else: delay = float('Inf') t += delay while nodes_by_rate.total_weight()>0 and t< tmax: times.append(t) # print(delta_t, nodes_by_rate.total_weight()) node = nodes_by_rate.choose_random() new_status = transition_choice(G, node, status, parameters) for x in data.keys(): data[x].append(data[x][-1]) if status[node] in return_statuses: data[status[node]][-1] -= 1 if new_status in return_statuses: data[new_status][-1] += 1 status[node] = new_status if return_full_data: node_history[node][0].append(t) node_history[node][1].append(new_status) #update self weight = rate_function(G, node, status, parameters) nodes_by_rate.insert(node, weight = weight) influence_set = get_influence_set(G, node, status, parameters) for nbr in influence_set: weight = rate_function(G, nbr, status, parameters) nodes_by_rate.insert(nbr, weight=weight) if nodes_by_rate.total_weight()>0: delay = random.expovariate(nodes_by_rate.total_weight()) else: delay = float('Inf') t += delay if not return_full_data: returnval = [] times = np.array(times) returnval.append(times) for return_status in return_statuses: data[return_status] = np.array(data[return_status]) returnval.append(data[return_status]) return returnval else: if sim_kwargs is None: sim_kwargs = {} return EoN.Simulation_Investigation(G, node_history, possible_statuses = return_statuses, **sim_kwargs)
[docs]def Gillespie_Arbitrary(G, spontaneous_transition_graph, nbr_induced_transition_graph, IC, return_statuses, tmin = 0, tmax=100, spont_kwargs = None, nbr_kwargs=None, return_full_data = False, sim_kwargs = None): r'''Calls Gillespie_simple_contagion. This is here for legacy reasons. Gillespie_Arbitrary has been replaced by Gillespie_simple_contagion. It will be removed in future versions. ''' print("Gillespie_Arbitrary has been replaced by Gillespie_simple_contagion.\n", "It will be removed in future versions.") return Gillespie_simple_contagion(G, spontaneous_transition_graph, nbr_induced_transition_graph, IC, return_statuses, tmin = tmin, tmax=tmax, return_full_data = return_full_data, **sim_kwargs)
[docs]def Gillespie_simple_contagion(G, spontaneous_transition_graph, nbr_induced_transition_graph, IC, return_statuses, tmin = 0, tmax=100, spont_kwargs = None, nbr_kwargs=None, return_full_data = False, sim_kwargs = None): r''' Performs simulations for epidemics, allowing more flexibility than SIR/SIS. This does not handle complex contagions. It assumes that when an individual changes status either he/she has received a "transmission" from a *single* neighbor or he/she is changing status independently of any neighbors. So this is like SIS or SIR. Other examples would be like SEIR, SIRS, etc There is an example below demonstrating an SEIR epidemic. We allow for nodes to undergo two types of transitions. They can be: - spontaneous - so a node of status A becomes B without any neighbor's influence - neighbor-induced - so an edge between status A and status B nodes suddenly becomes an edge between a status A and a status C node because the status B node changes status due to the existence of the edge. (in principle, both could change status, but the code currently only allows the second node to change status). Both types of transitions can be represented by weighted directed graphs. We describe two weighted graphs whose nodes represent the possible statuses and whose edges represent possible transitions of statuses. - The spontaneous transitions can be represented by a graph whose nodes are the possible statuses and an edge from ``'A'`` to ``'B'`` represent that in the absence of any influence from others an indivdiual of status ``'A'`` transitions to status ``'B'`` with default rate given by the weight of the edge in this spontaneous transition graph. The rate may be modified by properties of the nodes in the contact network ``G``. - The neighbor-induced transitions can be represented by a "transitions graph" whose nodes are length-2 tuples. The first entry represents the first individual of a partnership and the second represents the second individual. **only the second individual changes status**. An edge in the transitions graph from the node ``('A', 'B')`` to the node ``('A', 'C')`` represents that an ``'AB'`` partnership in the contact network can cause the second individual to transition to status ``'C'``. The weight of the edge in the represents the default transition rate. The rate may be modified by properties of the nodes or the edge in the contact network ``G``. [for reference, if you look at Fig 4.3 on pg 122 of Kiss, Miller & Simon the graphs for **SIS** would be: ``spontaneous_transition_graph``: ``'I'``-> ``'S'`` with the edge weighted by ``gamma`` and ``nbr_induced_transition_graph``: ``('I', 'S')`` -> ``('I', 'I')`` with the edge weighted by ``tau``. For **SIR** they would be: ``spontaneous_transition_graph``: ``'I'``->``'R'`` with weight ``gamma`` and ``nbr_induced_transition_graph``: ``('I', 'S')`` -> ``('I', 'I')`` with rate ``tau``. ] These graphs must be defined and then input into the algorithm. It is possible to weight edges or nodes in the contact network ``G`` (that is, not the 2 directed networks defined above, but the original contact network) so that some of these transitions have different rates for different individuals/partnerships. These are included as attributes in the contact network. In the most general case, the transition rate depends on some function of the attributes of the nodes or edge in the contact network ``G``. There are two ways we introduce individual or pair-level heterogeneity in the population. The first way is through introducing weights to individuals or their contacts which simply multiply the default rate. The second is through including a function of nodes or pairs of nodes which will explicitly calculate the scaling factor, possibly including more information than we can include with the weights. For a given transition, you can use at most one of these. - We first describe examples of weighting nodes/edges in the population So for the SIR case, if some people have higher recovery rate, we might define a node attribute ``'recovery_weight'`` for each node in ``G``, and the recovery would occur with rate G.nodes[node]['recovery_weight']*gamma. So a value of 1.1 would correspond to a 10% increased recovery rate. Since I don't know what name you might choose for the weight label as I write this algorithm, in defining the spontaneous transition graph (H), the ``'I'``->``'R'`` edge would be given an attribute ``'weight_label'`` so that ``H.adj['I']['R']['weight_label'] = 'recovery_weight'``. If you define the attribute ``'weight_label'`` for an edge in ``H``, then it will be assumed that every node in ``G`` has a corresponding weight. If no attribute is given, then it is assumed that all transitions happen with the original rate. We similarly define the weight_labels as edge attributes in the neighbor-induced transition graph. The edges of the graph ``'G'`` have a corresponding ``G[u,v]['transmission_weight']`` - Alternately we might introduce a function. So for the SIR case if the recovery rate depends on two attributes of a node (say, age and gender), we define a function ``rate_function(G,node)`` which will then look at G.nodes[node]['age'] and G.nodes[node]['gender'] and then return a factor which will be multiplied by ``gamma`` to give the recovery rate. Similarly, if we are considering a neighbor induced transition and the rate depends on properties of both nodes we define another function ``rate_function(G,u,v)`` which may use attributes of u or v or the edge to find the appropriate scaling factor. :Arguments: **G** NetworkX Graph The underlying contact network. If ``G`` is directed, we assume that "transmissions" can only go in the same direction as the edge. **spontaneous_transition_graph** Directed networkx graph The nodes of this graph are the possible statuses of a node in ``G``. An edge in this graph is a possible transition in ``G`` that occurs without any influence from neighbors. An edge in this directed graph is labelled with attributes - ``'rate'`` [a number, the default rate of the transition] - ``'weight_label'`` (optional) [a string, giving the label of a **node** attribute in the contact network ``G`` that scales the transition rate] - ``'rate_function'`` (optional not combinable with ``'weight_label'`` for some edge.) [a user-defined function of the contact network and node that will scale the transition rate. This cannot depend on the statuses of any nodes - we must be able to calculate it once at the beginning of the process.] It will be called as ``rate_function(G, node,**spont_kwargs)`` where ``spont_kwargs`` is described below. Only one of ``'weight_label'`` and ``'rate_function'`` can be given. In the description below, let's use - ``rate = spontaneous_transition_graph.adj[Status1][Status2]['rate']`` - ``weight_label = spontaneous_transition_graph.adj[Status1][Status2]['weight_label']`` - ``rate_function = spontaneous_transition_graph.adj[Status1][Status2]['rate_function']`` For a node ``u`` whose status is ``Status1``, the rate at which ``u`` transitions to ``Status2`` is - ``rate`` if neither ``weight_label`` nor ``rate_function`` is defined. - ``rate*G.nodes[u][weight_label]`` if ``weight_label`` is defined. - ``rate*rate_function(G, u, **spont_kwargs)`` if ``rate_function`` is defined. So for example in the case of an SIR disease, this would be a graph with an isolated node ``'S'`` and an edge from node ``'I'`` to ``'R'`` with ``rate`` equal to ``gamma`` (the recovery rate). It would not actually be necessary to have the node ``'S'``. Note that the rate_function version is more general, and in principle we don't need the weight_label option. It's here for backwards compatibility and general simplicity purposes. **nbr_induced_transition_graph** Directed networkx graph The nodes of this graph are tuples with possible statuses of nodes at the end of an edge. The first node in the tuple is the node that could be affecting the second. So for example for the SIR model we would expect a node ``('I', 'S')`` with an edge to ``('I', 'I')``. An edge in this directed graph is labelled with attributes - ``'rate'`` [a number, the default rate of the transition] - ``'weight_label'`` (optional) [a string, giving the label of an *edge** attribute in the contact network ``G`` that scales the transition rate] - ``'rate_function'`` (optional not combinable with ``'weight_label'`` for some edge.) [a user-defined function of the contact network and source and target nodes that will scale the transition rate. This cannot depend on the statuses of any nodes - we must be able to calculate it once at the beginning of the process. It will be called as ``rate_function(G, source, target, *nbr_kwargs)``] Only one of ``'weight_label'`` and ``'rate_function'`` can be given. In the description below, let's use - ``rate = spontaneous_transition_graph.adj[Status1][Status2]['rate']`` - ``weight_label = spontaneous_transition_graph.adj[Status1][Status2]['weight_label']`` - ``rate_function = spontaneous_transition_graph.adj[Status1][Status2]['rate_function']`` If the transition is (A,B) to (A,C) and we have a source node of status A joined to a target node of status B then the target node transitions from B to C with rate - ``rate`` if neither ``weight_label`` nor ``rate_function`` is defined. - ``rate*G.adj[source][target][weight_label]`` if ``weight_label`` is defined. - ``rate*rate_function(G, source, target, **nbr_kwargs)`` if ``rate_function`` is defined So for example in the case of an SIR disease with transmission rate tau, this would be a graph with an edge from the node ``('I','S')`` to ``('I', 'I')``. The attribute ``'rate'`` for the edge would be ``tau``. **IC** dict states the initial status of each node in the network. **return_statuses** list or other iterable (but not a generator) The statuses that we will return information for, in the order we will return them. **tmin** number (default 0) starting time **tmax** number (default 100) stop time **spont_kwargs** dict or None (default None) Any parameters which might be needed if the user has defined a rate function for a spontaneous transition. If any of the spontaneous transition rate functions accepts these, they all need to (even if not used). It's easiest to define the function as def f(..., **kwargs) **nbr_kwargs** dict or None (default None) Any parameters which might be needed if the user has defined a rate function for a neighbor-induced transition. If any of the neighbor-induced transition rate functions accepts these, they all need to (even if not used). It's easiest to define the function as def f(..., **kwargs) **return_full_data** boolean Tells whether to return a Simulation_Investigation object or not **sim_kwargs** keyword arguments Any keyword arguments to be sent to the Simulation_Investigation object Only relevant if ``return_full_data=True`` :Returns: **(times, status1, status2, ...)** tuple of numpy arrays first entry is the times at which events happen. second (etc) entry is an array with the same number of entries as ``times`` giving the number of nodes of status ordered as they are in ``return_statuses`` :SAMPLE USE: This does an SEIR epidemic. It treats the nodes and edges as weighted. So some individuals have a higher E->I transition rate than others, and some edges have a higher transmission rate than others. The recovery rate is taken to be the same for all nodes. There are more examples in the online documentation at :ref:``simple-contagion-section``. :: import EoN import networkx as nx from collections import defaultdict import matplotlib.pyplot as plt import random N = 100000 G = nx.fast_gnp_random_graph(N, 5./(N-1)) #they will vary in the rate of leaving exposed class. #and edges will vary in transition rate. #there is no variation in recovery rate. node_attribute_dict = {node: 0.5+random.random() for node in G.nodes()} edge_attribute_dict = {edge: 0.5+random.random() for edge in G.edges()} nx.set_node_attributes(G, values=node_attribute_dict, name='expose2infect_weight') nx.set_edge_attributes(G, values=edge_attribute_dict, name='transmission_weight') H = nx.DiGraph() H.add_node('S') H.add_edge('E', 'I', rate = 0.6, weight_label='expose2infect_weight') H.add_edge('I', 'R', rate = 0.1) J = nx.DiGraph() J.add_edge(('I', 'S'), ('I', 'E'), rate = 0.1, weight_label='transmission_weight') IC = defaultdict(lambda: 'S') for node in range(200): IC[node] = 'I' return_statuses = ('S', 'E', 'I', 'R') t, S, E, I, R = EoN.Gillespie_simple_contagion(G, H, J, IC, return_statuses, tmax = float('Inf')) plt.semilogy(t, S, label = 'Susceptible') plt.semilogy(t, E, label = 'Exposed') plt.semilogy(t, I, label = 'Infected') plt.semilogy(t, R, label = 'Recovered') plt.legend() plt.savefig('SEIR.png') ''' if spont_kwargs is None: spont_kwargs = {} if nbr_kwargs is None: nbr_kwargs = {} if sim_kwargs is None: sim_kwargs = {} status = {node: IC[node] for node in G.nodes()} if return_full_data: transmissions = [] node_history = {node:([tmin], [status[node]]) for node in G.nodes()} times = [tmin] data = {} C = Counter(status.values()) for return_status in return_statuses: data[return_status] = [C[return_status]] # try: spontaneous_transitions = sorted(spontaneous_transition_graph.edges()) induced_transitions = sorted(nbr_induced_transition_graph.edges()) except TypeError: print("note that because your states are not sortable, your simulations "+ "may produce different outcomes even if you specify the random seed") spontaneous_transitions = list(spontaneous_transition_graph.edges()) induced_transitions = list(nbr_induced_transition_graph.edges()) potential_transitions = {} rate = {}# intrinsic rate of a transition #weight_sum = defaultdict(lambda: 0) #weights = defaultdict(lambda: None) #max_weight = defaultdict(lambda: 0) get_weight = defaultdict(lambda: defaultdict(lambda:None)) #SET UP THE POSSIBLE EVENTS, STARTING WITH SPONTANEOUS for transition in spontaneous_transitions: rate[transition] = spontaneous_transition_graph.adj[transition[0]][transition[1]]['rate'] if 'weight_label' in spontaneous_transition_graph.adj[transition[0]][transition[1]]: if 'rate_function' in spontaneous_transition_graph.adj[transition[0]][transition[1]]: raise EoN.EoNError('cannot define both "weight_label" and "rate_function" in',\ 'spontaneous_transitions graph') wl = spontaneous_transition_graph.adj[transition[0]][transition[1]]['weight_label'] get_weight[transition] = nx.get_node_attributes(G, wl) #This is a dict mapping node to its weight. potential_transitions[transition] = _ListDict_(weighted=True)#max_weight[transition] = max(get_weight[transition].values()) elif 'rate_function' in spontaneous_transition_graph.adj[transition[0]][transition[1]]: rf = spontaneous_transition_graph.adj[transition[0]][transition[1]]['rate_function'] get_weight[transition] = {node: rf(G, node, **spont_kwargs) for node in G}#This is a dict mapping node to its weight. potential_transitions[transition] = _ListDict_(weighted=True) else: potential_transitions[transition] = _ListDict_()#max_weight[transition]=1 #print(spontaneous_transitions) #print([rate[transition] for transition in spontaneous_transitions]) #CONTINUING SETTING UP POSSIBLE EVENTS, NOW WITH INDUCED TRANSITIONS. for transition in induced_transitions: if transition[0][0] != transition[1][0]: raise EoN.EoNError("transition {} -> {} not allowed: first node must keep same status".format(transition[0],transition[1])) rate[transition] = nbr_induced_transition_graph.adj[transition[0]][transition[1]]['rate'] if 'weight_label' in nbr_induced_transition_graph.adj[transition[0]][transition[1]]: if 'rate_function' in nbr_induced_transition_graph.adj[transition[0]][transition[1]]: raise EoN.EoNError('cannot define both "weight_label" and "rate_function" in',\ 'nbr_induced_transitions graph') wl = nbr_induced_transition_graph.adj[transition[0]][transition[1]]['weight_label'] get_weight[transition] = nx.get_edge_attributes(G, wl)#a dict mapping edge to its weight. #note if G is directed, then this has all edges. If undirected #edge will appear only once. But we may care about the opposite direction. #so we need to add those now. if not nx.is_directed(G): get_weight[transition].update({(source, target): G.adj[source][target][wl] for target, source in get_weight[transition]}) potential_transitions[transition] = _ListDict_(weighted=True) elif 'rate_function' in nbr_induced_transition_graph.adj[transition[0]][transition[1]]: rf = nbr_induced_transition_graph.adj[transition[0]][transition[1]]['rate_function'] edges = list(G.edges()) get_weight[transition] = {(source, target): rf(G, source, target, **nbr_kwargs) for source, target in edges} if not nx.is_directed(G): get_weight[transition].update({(source, target): rf(G, source, target, **nbr_kwargs) for target, source in edges}) potential_transitions[transition] = _ListDict_(weighted=True) else: potential_transitions[transition] = _ListDict_() #initialize all potential events to start. for node in G.nodes(): if spontaneous_transition_graph.has_node(status[node]):# and spontaneous_transition_graph.degree(status[node])>0: for transition in spontaneous_transition_graph.edges(status[node]): potential_transitions[transition].update(node, weight_increment = get_weight[transition][node]) #weight increment defaults to None if not present for nbr in G.neighbors(node): #print(status[node],status[nbr]) if nbr_induced_transition_graph.has_node((status[node],status[nbr])):# and nbr_induced_transition_graph.degree((status[node],status[nbr])) >0: for transition in nbr_induced_transition_graph.edges((status[node],status[nbr])): potential_transitions[transition].update((node, nbr), weight_increment = get_weight[transition][(node, nbr)]) t = tmin #NOW WE'RE READY TO GET STARTED WITH THE SIMULATING total_rate = sum(rate[transition]*potential_transitions[transition].total_weight() for transition in spontaneous_transitions+induced_transitions) if total_rate>0: delay = random.expovariate(total_rate) else: delay = float('Inf') t = t+delay while total_rate>0 and t<tmax: times.append(t) r = random.random() for transition in spontaneous_transitions+induced_transitions: r -= rate[transition]*potential_transitions[transition].total_weight()/total_rate if r<0: break #either node doing spontaneous or edge doing an induced event if transition in spontaneous_transitions: spontaneous = True else: spontaneous = False actor = potential_transitions[transition].choose_random() if spontaneous: modified_node = actor old_status = transition[0] new_status = transition[1] #node changes status else: source, target = actor modified_node = target old_status = transition[0][1] new_status = transition[1][1] if return_full_data: transmissions.append((t, source, modified_node)) #modified_node changes status status[modified_node] = new_status if return_full_data: node_history[modified_node][0].append(t) node_history[modified_node][1].append(new_status) #it might look like there is a cleaner way to do this, but what if it #happens that old_status == status[modified_node]??? This way still works. for x in data.keys(): data[x].append(data[x][-1]) if old_status in return_statuses: data[old_status][-1] -= 1 if status[modified_node] in return_statuses: data[status[modified_node]][-1] += 1 #UPDATE OUR POTENTIAL TRANSITIONS for transition in spontaneous_transitions: #can probably make more efficient, but there aren't many #remove modified_node from any spontaneous lists #add modified_node to any spontaneous lists if transition[0] == old_status: potential_transitions[transition].remove(modified_node) if transition[0] == status[modified_node]: potential_transitions[transition].update(modified_node, weight_increment = get_weight[transition][modified_node]) #roundoff error can kill the calculation, but it's slow to do this right. #so we'll only deal with it if the value is small enough that roundoff #error might matter. if potential_transitions[transition].total_weight() < 10**(-7) and potential_transitions[transition].total_weight()!=0: potential_transitions[transition].update_total_weight() #print(modified_node, old_status, status[modified_node]) #print(potential_transitions[ for transition in induced_transitions: if G.is_directed(): for nbr in G.neighbors(modified_node): #remove edge from any induced lists #add edge to any induced lists nbr_status = status[nbr] if (modified_node, nbr) not in get_weight[transition]: get_weight[transition][(modified_node,nbr)] = get_weight[transition][(nbr,modified_node)] if transition[0] == (old_status, nbr_status): potential_transitions[transition].remove((modified_node, nbr)) if transition[0] == (status[modified_node], nbr_status): potential_transitions[transition].update((modified_node, nbr), weight_increment = get_weight[transition][(modified_node, nbr)]) for pred in G.predecessors(modified_node): #remove edge from any induced lists #add edge to any induced lists pred_status = status[pred] if (pred, modified_node) not in get_weight[transition]: get_weight[transition][(pred, modified_node)] = get_weight[transition][(pred, modified_node)] if transition[0] == (pred_status, old_status): potential_transitions[transition].remove((pred, modified_node)) if transition[0] == (pred_status, status[modified_node]): potential_transitions[transition].update((pred, modified_node), weight_increment = get_weight[transition][(pred, modified_node)]) else: for nbr in G.neighbors(modified_node): #remove edge from any induced lists #add edge to any induced lists nbr_status = status[nbr] if (modified_node, nbr) not in get_weight[transition]: get_weight[transition][(modified_node,nbr)] = get_weight[transition][(nbr,modified_node)] elif (nbr, modified_node) not in get_weight[transition]: get_weight[transition][(nbr, modified_node)] = get_weight[transition][(modified_node, nbr)] if transition[0] == (nbr_status, old_status): potential_transitions[transition].remove((nbr, modified_node)) if transition[0] == (old_status, nbr_status): potential_transitions[transition].remove((modified_node, nbr)) if transition[0] == (nbr_status, status[modified_node]): potential_transitions[transition].update((nbr, modified_node), weight_increment = get_weight[transition][(nbr, modified_node)]) if transition[0] == (status[modified_node], nbr_status): potential_transitions[transition].update((modified_node, nbr), weight_increment = get_weight[transition][(modified_node, nbr)]) #roundoff error can kill the calculation, but it's slow to do this right. #so we'll only deal with it if the value is small enough that roundoff #error might matter. if potential_transitions[transition].total_weight() < 10**(-7) and potential_transitions[transition].total_weight()!=0: potential_transitions[transition].update_total_weight() total_rate = sum(rate[transition]*potential_transitions[transition].total_weight() for transition in spontaneous_transitions+induced_transitions) if total_rate>0: delay = random.expovariate(total_rate) else: delay = float('Inf') t += delay if not return_full_data: returnval = [] times = np.array(times) returnval.append(times) for return_status in return_statuses: data[return_status] = np.array(data[return_status]) returnval.append(data[return_status]) return returnval else: if sim_kwargs is None: sim_kwargs = {} return EoN.Simulation_Investigation(G, node_history, transmissions, possible_statuses= return_statuses, **sim_kwargs)