Source code for source_release.prime_utils

import sys
import numpy as np
import csv

from   scipy.stats import gamma,lognorm,norm
import matplotlib.pyplot as plt

def normal_logpdf(x,loc,scale):
    return norm._logpdf((x-loc)/scale)-np.log(scale)

def lognorm_pdf(x,s,loc=0,scale=1):
    return lognorm._pdf((x - loc)/scale,s)/scale

def lognorm_cdf(x,s,loc=0,scale=1):
    return lognorm.cdf(x,s,loc=loc,scale=scale)

def gamma_pdf(x,s,loc=0,scale=1):
    return gamma._pdf((x - loc)/scale,s)/scale

def _runningAvgWgts(nDays):
    '''
    Compute average weights
    '''
    disi = np.ones(nDays) / nDays
   
    ka = nDays // 2 + 1

    disb = []
    for i in range(ka,nDays):
        disb.append( np.ones(i) / i )

    return disi,disb

[docs] def runningAvg(f,nDays): r""" Apply nDays running average to the input f Parameters ---------- f: numpy array array (with daily data for this project) to by filtered nDays: int window width for the running average Returns ------- favg: numpy array filtered data """ disi,disb = _runningAvgWgts(nDays) ka = nDays // 2 npts = f.shape[0] favg = np.empty_like(f) # interior for i in range(ka,favg.shape[0]-ka): favg[i] = np.sum(disi*f[i-ka:i+ka+1]) # boundaries for i in range(ka): fwidth = len(disb[i]) favg[i] = np.sum(disb[i]*f[0:fwidth]) favg[npts-1-i] = np.sum(disb[i]*f[npts-1:npts-fwidth-1:-1]) return favg
def test_runningAvg(): np.random.seed(2020) n = 100 sig = np.random.randn(n)**3 + 3*np.random.randn(n).cumsum() plt.plot(sig,label='data') sigf = runningAvg(sig,3) plt.plot(sigf,label='3') sigf = runningAvg(sig,5) plt.plot(sigf,label='5') sigf = runningAvg(sig,7) plt.plot(sigf,label='7') sigf = runningAvg(sig,9) plt.plot(sigf,label='9') plt.legend() plt.show()
[docs] def prediction_filename(run_setup): r""" Generate informative name for hdf5 file with prediction data Parameters ---------- run_setup: dictionary detailed settings for the epidemiological model Returns ------- filename: string file name ending with a .h5 extension """ fh5 = run_setup["ppopts"]["fpredout"] return fh5+".h5"
def output_epicurves(pred,daysPred,newcases,nskip,quantile_list,fileout): with open(fileout, mode='w') as output_file: csv_writer = csv.writer(output_file,delimiter=',',quotechar='"',quoting=csv.QUOTE_MINIMAL) outputData=["#Date"] for qk in quantile_list: outputData = outputData+["quantile"+'%.3f'%(qk)] for j in range(1,pred.shape[0]+1,nskip): outputData = outputData+["sample"+str((j-1)//nskip+1)] outputData = outputData+["ConfirmedCases"] cso = csv_writer.writerow(outputData) ndaysData = len(newcases) ndaysPred = pred.shape[1] for i in range(ndaysPred): outputData = [daysPred[i].date()] for qk in quantile_list: outputData = outputData+["%d"%(np.quantile(pred,qk,axis=0)[i])] outputData = outputData+["%d"%(pred[j,i]) for j in range(0,pred.shape[0],nskip)] if i < ndaysData: outputData = outputData+["%d"%(newcases[i])] else: outputData = outputData+[-999] cso = csv_writer.writerow(outputData) output_file.close() def output_infcurves(infc,datesmean,nskip,quantile_list,fileout): with open(fileout, mode='w') as output_file: csv_writer = csv.writer(output_file,delimiter=',',quotechar='"',quoting=csv.QUOTE_MINIMAL) outputData=["#Date"] for qk in quantile_list: outputData = outputData+["quantile"+'%.3f'%(qk)] for j in range(1,infc.shape[0]+1,nskip): outputData = outputData+["sample"+str((j-1)//nskip+1)] cso = csv_writer.writerow(outputData) for i in range(len(datesmean)): outputData = [datesmean[i].date()] for qk in quantile_list: outputData = outputData+["%d"%(np.quantile(infc,qk,axis=0)[i])] outputData = outputData+["%d"%(infc[j,i]) for j in range(0,infc.shape[0],nskip)] cso = csv_writer.writerow(outputData) output_file.close() def _linear_error_weight(min_wgt,days): ''' compute linearly increasing weighting from at first data point to 1.0 for most recent data point ''' ndays = len(days) return min_wgt + (1.0 - min_wgt)*np.arange(1,int(ndays)+1) / ndays def _gaussian_error_weight(min_wgt,tau,days): ''' compute semi-gaussian increasing weighting "mean" is at most recent data point. Weight increases from min_wgt to 1 ''' day_max = np.max(days) return min_wgt + (1.0-min_wgt)*np.exp(-0.5 * ((days-day_max)/tau)**2)
[docs] def compute_error_weight(error_info,days): r""" Compute array with specified weighting for the daily cases data. The weights follow either linear of Gaussian expressions with higher weights for recent data and lower weights for older data Parameters ---------- error_info: list (error_type,min_wgt,[tau]), error type is either 'linear' or 'gaussian', min_wgt is the minimum weight and tau is the standard deviation of the exponential term if a Gaussian formulation is chosen. days: int lenght of the weights array Returns ------- error_weight: numpy array array of weights """ error_type = error_info[0] assert error_info[1] > 0.0, "error_weight second parameter needs to be positive" assert error_info[1] < 1.0, "error_weight second parameter needs to be less than 1.0" inv_error_weight = None if error_type=="linear": inv_error_weight = _linear_error_weight(error_info[1],days) elif error_type=="gaussian": if len(error_info) < 3: sys.exit("Need to specify minimum weight and width for 'gaussian'") inv_error_weight = _gaussian_error_weight(error_info[1],error_info[2],days) else: sys.exit("Only current options for error weighting are 'linear' or 'gaussian'") # compute error_weight from reciprocal error_weight = 1./inv_error_weight return error_weight