Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
else:
pt[LAT] = rc_latlon[2]
pt[LON] = rc_latlon[3]
p_grids.setPtValues(pt, chgLatLon=False)
#Set the monthly lst values and optim nnghs on the point
for mth in np.arange(1,13):
pt[get_lst_varname(mth)] = pt["%s%02d"%(varname,mth)]
pt[get_optim_varname(mth)],pt[get_optim_anom_varname(mth)] = rgn_nnghs[pt[NEON]][mth]
tair_mean, tair_var = gwr.gwr_predict(pt, interp_mth)
tair_se = np.sqrt(tair_var)
interp_tair[:] = rc_latlon[0],rc_latlon[1],tair_mean,tair_se
MPI.COMM_WORLD.Send([interp_tair,MPI.DOUBLE],dest=RANK_WRITE,tag=TAG_DOWORK)
MPI.COMM_WORLD.send(rank,RANK_COORD,tag=TAG_DOWORK)
def proc_work(params,rank):
status = MPI.Status()
xval = XvalTairMean(params[P_PATH_DB], params[P_PATH_RLIB], params[P_VARNAME])
while 1:
stn_id = MPI.COMM_WORLD.recv(source=RANK_COORD,tag=MPI.ANY_TAG,status=status)
if status.tag == TAG_STOPWORK:
MPI.COMM_WORLD.send([None]*3, dest=RANK_WRITE, tag=TAG_STOPWORK)
print "".join(["Worker ",str(rank),": Finished"])
return 0
else:
try:
err,std_err = xval.run_xval(stn_id)
#in_ci = True if xval_stn[MEAN_OBS] >= ci[0] and xval_stn[MEAN_OBS] <= ci[1] else False
except Exception as e:
print "".join(["ERROR: Worker ",str(rank),": could not xval ",stn_id,str(e)])
err = np.ones(13)*netCDF4.default_fillvals['f8']
std_err = np.ones(13)*netCDF4.default_fillvals['f8']
mask_stns_xval = np.logical_and(stn_da.stns[NEON]==neon,mask_stns)
stn_ids_xval = stn_da.stn_ids[mask_stns_xval]
for stn_id in stn_ids_xval:
if cnt < nwrkers:
dest = cnt+N_NON_WRKRS
else:
dest = MPI.COMM_WORLD.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG)
nrec+=1
MPI.COMM_WORLD.send(stn_id, dest=dest, tag=TAG_DOWORK)
cnt+=1
for w in np.arange(nwrkers):
MPI.COMM_WORLD.send(None, dest=w+N_NON_WRKRS, tag=TAG_STOPWORK)
print "coord_proc: done"
cnt = 0
nrec = 0
for stn_id in stns[STN_ID]:
if cnt < nwrkers:
dest = cnt+N_NON_WRKRS
else:
dest = MPI.COMM_WORLD.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG)
nrec+=1
MPI.COMM_WORLD.send(stn_id, dest=dest, tag=TAG_DOWORK)
cnt+=1
for w in np.arange(nwrkers):
MPI.COMM_WORLD.send(None, dest=w+N_NON_WRKRS, tag=TAG_STOPWORK)
print "coord_proc: done"
else:
try:
bias,mae,r2 = optim.run_xval(stn_id, params[P_NGH_RNG])
except Exception as e:
print "".join(["ERROR: Worker ",str(rank),": could not xval ",stn_id,"...",str(e)])
mae = np.ones((params[P_NGH_RNG].size,12))*netCDF4.default_fillvals['f8']
bias = np.ones((params[P_NGH_RNG].size,12))*netCDF4.default_fillvals['f8']
r2 = np.ones((params[P_NGH_RNG].size,12))*netCDF4.default_fillvals['f8']
MPI.COMM_WORLD.send((stn_id,mae,bias,r2), dest=RANK_WRITE, tag=TAG_DOWORK)
MPI.COMM_WORLD.send(rank, dest=RANK_COORD, tag=TAG_DOWORK)
def kill_workers():
from mpi4py import MPI
all_workers = range(1, MPI.COMM_WORLD.Get_size())
for worker in all_workers:
MPI.COMM_WORLD.send(None, dest=worker, tag=666)
nrec = 0
for climdiv in climdivs:
mask_stns_xval = np.logical_and(stn_da.stns[CLIMDIV] == climdiv, mask_stns)
stn_ids_xval = stn_da.stn_ids[mask_stns_xval]
for stn_id in stn_ids_xval:
if cnt < nwrkers:
dest = cnt + N_NON_WRKRS
else:
dest = MPI.COMM_WORLD.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG)
nrec += 1
MPI.COMM_WORLD.send(stn_id, dest=dest, tag=TAG_DOWORK)
cnt += 1
print "".join(["COORD: Finished xval of climate division: ", str(climdiv)])
for w in np.arange(nwrkers):
MPI.COMM_WORLD.send(None, dest=w + N_NON_WRKRS, tag=TAG_STOPWORK)
print "COORD: done"
#Send stn ids to all processes
MPI.COMM_WORLD.bcast(stn_ids, root=RANK_COORD)
print "Coord: Done initialization. Starting to send work."
cnt = 0
nrec = 0
for stn_id in stn_ids:
if cnt < nwrkers:
dest = cnt+N_NON_WRKRS
else:
dest = MPI.COMM_WORLD.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG)
nrec+=1
MPI.COMM_WORLD.send(stn_id, dest=dest, tag=TAG_DOWORK)
cnt+=1
for w in np.arange(nwrkers):
MPI.COMM_WORLD.send(None, dest=w+N_NON_WRKRS, tag=TAG_STOPWORK)
print "coord_proc: done"
if stn_id in stn_ids_tmax:
tair_vars.append('tmax')
for tair_var in tair_vars:
if cnt < nwrkers:
dest = cnt + N_NON_WRKRS
else:
dest = MPI.COMM_WORLD.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG)
nrec += 1
MPI.COMM_WORLD.send((stn_id, tair_var), dest=dest, tag=TAG_DOWORK)
cnt += 1
for w in np.arange(nwrkers):
MPI.COMM_WORLD.send((None, None), dest=w + N_NON_WRKRS, tag=TAG_STOPWORK)
print "COORD: done"
fill_rslts.calc_obs_vs_fit_stats(yr_mth_masks)
ntries+=1
if ntries < 2 and (fill_rslts.hss <= 0.0 or
np.abs(fill_rslts.perr_freq) >= 100 or
np.abs(fill_rslts.perr_intsy) >= 100 or
np.abs(fill_rslts.perr_ttlamt) >= 100):
nngh_prcp+=1
print "".join(["WARNING: ",stn_id," had large error when infilling prcp. Trying again with more nnghs."])
else:
break
MPI.COMM_WORLD.send(fill_rslts, dest=RANK_WRITE, tag=TAG_DOWORK)
MPI.COMM_WORLD.send(rank, dest=RANK_COORD, tag=TAG_DOWORK)
except Exception as e:
print "".join(["ERROR: Could not infill ",stn_id,"|",str(e)])
MPI.COMM_WORLD.send(rank, dest=RANK_COORD, tag=TAG_DOWORK)