Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_hdf5_headonly(self):
stream = self.stream
with NamedTemporaryFile(suffix='.h5') as ft:
fname = ft.name
stream.write(fname, 'H5')
stream2 = read(fname, 'H5', headonly=True)
stream2[0].stats.header = -42
self.assertEqual(len(stream2[0]), 0)
stream2.write(fname, 'H5', mode='a', headonly=True)
stream2 = read(fname, 'H5')
self.assertEqual(stream2[0].stats.header, -42)
stream2[0].stats.header = 42
for tr in stream2:
del tr.stats._format
self.assertEqual(stream, stream2)
def test_trc_num(self):
stream = self.stream.copy()
set_index('waveforms/{trc_num:03d}')
with NamedTemporaryFile(suffix='.h5') as ft:
fname = ft.name
stream.write(fname, 'H5')
stream.write(fname, 'H5', mode='a', offset_trc_num=3)
stream2 = read(fname, 'H5')
for tr in stream2:
del tr.stats._format
set_index()
self.assertEqual(len(stream2), 6)
self.assertEqual(stream2[::2], stream)
self.assertEqual(stream2[1::2], stream)
def setUp(self):
self.stream = read().sort()
# add processing info
self.stream.decimate(2)
self.stream.differentiate()
self.stream[0].stats.onset = UTC()
self.stream[0].stats.header = 42
self.stream[0].stats.header2 = 'Test entry'
self.stream[0].stats.header3 = u'Test entry unicode'
stack = dict(group='all', count=5, type=['pw', 2])
self.stream[0].stats.stack = stack
for tr in self.stream:
if 'response' in tr.stats:
del tr.stats.response
import numpy as np
def FileToList(fileName):
f=open(fileName,"r",encoding=None)
fdata=f.readlines()
data=[]
for itr in fdata:
#print(itr)
if(itr[:2]=='20'):
data.append([ii.strip() for ii in itr.split() if(len(ii)>0)])
return data
stream = Stream()
stream = pread("after/SC.XJI.2008133160000.D.00.BHN.sac")
stream += pread("after/SC.XJI.2008133160000.D.00.BHZ.sac")
stream += pread("after/SC.XJI.2008133160001.D.00.BHE.sac")
stream.detrend()
ptd=[]
"""
for ii in range(1000):
ptd.append(stream[0].data[ii:ii+1000])
plt.matshow(ptd)
plt.show()
"""
nzy1,nzd1,nzs1=(stream[0].stats.sac.nzyear,
stream[0].stats.sac.nzjday,
stream[2].stats.sac.nzhour*3600+
stream[2].stats.sac.nzmin*60+
stream[2].stats.sac.nzsec+
stream[2].stats.sac.nzmsec/1000)
nzy2,nzd2,nzs2=(stream[1].stats.sac.nzyear,
from obspy import read
from obspy.signal import PPSD
from obspy.io.xseed import Parser
st = read("https://examples.obspy.org/BW.KW1..EHZ.D.2011.037")
parser = Parser("https://examples.obspy.org/dataless.seed.BW_KW1")
ppsd = PPSD(st[0].stats, metadata=parser)
ppsd.add(st)
st = read("https://examples.obspy.org/BW.KW1..EHZ.D.2011.038")
ppsd.add(st)
ppsd.plot()
zss=tshift(zss,tdelay[kfault])
eds=tshift(eds,tdelay[kfault])
nds=tshift(nds,tdelay[kfault])
zds=tshift(zds,tdelay[kfault])
if decimate!=None:
ess=stdecimate(ess,decimate)
nss=stdecimate(nss,decimate)
zss=stdecimate(zss,decimate)
eds=stdecimate(eds,decimate)
nds=stdecimate(nds,decimate)
zds=stdecimate(zds,decimate)
if decimate!=None and kfault==0:#Only need to do this once
#Load raw data, this will be used to trim the GFs
edata=read(datafiles[ksta]+'.e')
ndata=read(datafiles[ksta]+'.n')
udata=read(datafiles[ksta]+'.u')
edata=stdecimate(edata,decimate)
ndata=stdecimate(ndata,decimate)
udata=stdecimate(udata,decimate)
#How many points left in the tiem series
npts=edata[0].stats.npts
print "... "+str(npts)+" data points left over after decimation"
#Now time align stuff (This is where we might have some timing issues, check later, consider re-sampling to data time vector)
ess=resample_to_data(ess,edata)
ess=prep_synth(ess,edata)
nss=resample_to_data(nss,ndata)
nss=prep_synth(nss,ndata)
zss=resample_to_data(zss,udata)
zss=prep_synth(zss,udata)
eds=resample_to_data(eds,edata)
eds=prep_synth(eds,edata)
nds=resample_to_data(nds,ndata)
# #bandpass=None
### END AWFUL TERRIBLE REALLY VERY BAD HACK
if decimate!=None:
Ess[ktrace]=stdecimate(Ess[ktrace],decimate)
Nss[ktrace]=stdecimate(Nss[ktrace],decimate)
Zss[ktrace]=stdecimate(Zss[ktrace],decimate)
Eds[ktrace]=stdecimate(Eds[ktrace],decimate)
Nds[ktrace]=stdecimate(Nds[ktrace],decimate)
Zds[ktrace]=stdecimate(Zds[ktrace],decimate)
ktrace+=1
#Read time series
for ksta in range(Nsta):
edata=read(datafiles[ksta]+'.e')
ndata=read(datafiles[ksta]+'.n')
udata=read(datafiles[ksta]+'.u')
if decimate!=None:
edata[0]=stdecimate(edata[0],decimate)
ndata[0]=stdecimate(ndata[0],decimate)
udata[0]=stdecimate(udata[0],decimate)
if ksta==0:
Edata=edata.copy()
Ndata=ndata.copy()
Udata=udata.copy()
else:
Edata+=edata
Ndata+=ndata
Udata+=udata
#Finished reading, filtering, etc now time shift by rupture time and resmaple to data
ktrace=0
for k in range(source.shape[0]):
#Get subfault parameters
nfault='subfault'+rjust(str(int(source[k,0])),4,'0')
nsub='sub'+rjust(str(int(source[k,0])),4,'0')
zs=source[k,3]
ss_slip=source[k,8]
ds_slip=source[k,9]
rtime=source[k,12]
#Where's the data
strdepth='%.4f' % zs
syn_path=home+project_name+'/GFs/dynamic/'+model_name+'_'+strdepth+'.'+nsub+'/'
#Get synthetics
ess=read(syn_path+sta+'.'+nfault+'.SS.'+vord+'.e')
nss=read(syn_path+sta+'.'+nfault+'.SS.'+vord+'.n')
zss=read(syn_path+sta+'.'+nfault+'.SS.'+vord+'.z')
eds=read(syn_path+sta+'.'+nfault+'.DS.'+vord+'.e')
nds=read(syn_path+sta+'.'+nfault+'.DS.'+vord+'.n')
zds=read(syn_path+sta+'.'+nfault+'.DS.'+vord+'.z')
#Decide if resampling is required
if resample < (1/ess[0].stats.delta): #Downsample
ess[0].resample(resample)
nss[0].resample(resample)
zss[0].resample(resample)
eds[0].resample(resample)
nds[0].resample(resample)
zds[0].resample(resample)
elif resample > (1/ess[0].stats.delta): #Upsample
upsample(ess,1./resample)
upsample(nss,1./resample)
upsample(zss,1./resample)
upsample(eds,1./resample)
upsample(nds,1./resample)
Nfaults=source.shape[0] #Number of subfaults
kindex=0
for ksta in range(Nsta):
print('Reading green functions for station #'+str(ksta+1)+' of '+str(Nsta))
for kfault in range(Nfaults):
#Get subfault GF directory
nsub='sub'+str(int(source[kfault,0])).rjust(4,'0')
nfault='subfault'+str(int(source[kfault,0])).rjust(4,'0')
strdepth='%.4f' % source[kfault,3]
syn_path=home+project_name+'/GFs/dynamic/'+model_name+'_'+strdepth+'.'+nsub+'/'
#Get synthetics
if kfault==0 and ksta==0: #It's the first one, initalize stream object
Ess=read(syn_path+staname[ksta]+'.'+nfault+'.SS.'+vord+'.e')
Nss=read(syn_path+staname[ksta]+'.'+nfault+'.SS.'+vord+'.n')
Zss=read(syn_path+staname[ksta]+'.'+nfault+'.SS.'+vord+'.z')
Eds=read(syn_path+staname[ksta]+'.'+nfault+'.DS.'+vord+'.e')
Nds=read(syn_path+staname[ksta]+'.'+nfault+'.DS.'+vord+'.n')
Zds=read(syn_path+staname[ksta]+'.'+nfault+'.DS.'+vord+'.z')
else: #Just add to stream object
Ess+=read(syn_path+staname[ksta]+'.'+nfault+'.SS.'+vord+'.e')
Nss+=read(syn_path+staname[ksta]+'.'+nfault+'.SS.'+vord+'.n')
Zss+=read(syn_path+staname[ksta]+'.'+nfault+'.SS.'+vord+'.z')
Eds+=read(syn_path+staname[ksta]+'.'+nfault+'.DS.'+vord+'.e')
Nds+=read(syn_path+staname[ksta]+'.'+nfault+'.DS.'+vord+'.n')
Zds+=read(syn_path+staname[ksta]+'.'+nfault+'.DS.'+vord+'.z')
kindex+=1
print('Writting synthetics to miniSEED, hang on this might take a minute or two.')
Ess.write(home+project_name+'/GFs/matrices/'+G_name+'.Ess.'+vord+'.mseed',format='MSEED')
Nss.write(home+project_name+'/GFs/matrices/'+G_name+'.Nss.'+vord+'.mseed',format='MSEED')
Zss.write(home+project_name+'/GFs/matrices/'+G_name+'.Zss.'+vord+'.mseed',format='MSEED')
Eds.write(home+project_name+'/GFs/matrices/'+G_name+'.Eds.'+vord+'.mseed',format='MSEED')
Nds.write(home+project_name+'/GFs/matrices/'+G_name+'.Nds.'+vord+'.mseed',format='MSEED')
stream = obspy.read(fname, data_format)
except Exception:
return
t1 = stream[0].stats.starttime
t2 = stream[-1].stats.endtime
if t1 - day < 60:
fname = data.format(t=day - 1, **smeta)
try:
stream += obspy.read(fname, data_format, starttime=day - edge)
except Exception:
pass
if next_day - t2 < 60:
endtime = next_day + overlap + edge
fname = data.format(t=next_day, **smeta)
try:
stream += obspy.read(fname, data_format, endtime=endtime)
except Exception:
pass
if trim_and_merge:
stream.merge(method=1, interpolation_samples=10)
stream.trim(day, next_day + overlap)
return stream