Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def addXcorrSides(self):
self.setHI('starttime', UTCDateTime('2000-01-01'))
for tr in self:
N = tr.stats.npts
tr.data = tr.data[N // 2:] + tr.data[N // 2 + N % 2 - 1::-1]
def main():
stations = 'PB01 PB02 PB03 PB04 PB05 PB06 PB07 PB08 HMBCX MNMCX PATCX PSGCX LVC'
#stations = 'PB09 PB10 PB11 PB12 PB13 PB14 PB15 PB16'
stations2 = None
components = 'Z'
# TOcopilla earthquake: 2007-11-14 15:14
t1 = UTC('2006-02-01')
t2 = UTC('2012-10-01')
shift = 100
correlations = get_correlations(stations, components, stations2, only_auto=True)
method = 'FINAL_filter1-3_1bit_auto'
data = IPOC(xcorr_append='/' + method, use_local_LVC=False)
data.setXLogger('_' + method)
# pool = Pool()
# prepare(data, stations.split(), t1, t2, component=components,
# filter=(1, 3, 2, True), downsample=20,
# eventremoval='waterlevel_env2', param_removal=(10, 0),
# whitening=False,
# normalize='1bit', param_norm=None,
# pool=pool)
# Read fixed header.
fixed_header = file_string.read(48)
# Check if the header as 48 bytes. Otherwise end the loop.
if len(fixed_header) < 48:
break
# Unpack the whole header at once. Fields not needed in this case are
# marked with a pad byte for speed issues.
unpacked_tuple = unpack('>xxxxxxxxccccccccccccHHBBBBHHHHxxxxxxxxHH',
fixed_header)
# Create list containing network, location, station, channel,
# starttime, sampling_rate and number of samples
temp_list = [''.join([_i for _i in unpacked_tuple[10:12]]),
''.join([_i for _i in unpacked_tuple[5:7]]),
''.join([_i for _i in unpacked_tuple[0:5]]),
''.join([_i for _i in unpacked_tuple[7:10]]),
UTCDateTime(unpacked_tuple[12], 1, 1,
unpacked_tuple[14], unpacked_tuple[15], unpacked_tuple[16],
unpacked_tuple[17] * 100) + (unpacked_tuple[12] - 1) * \
24 * 60 * 60, get_samplingRate(unpacked_tuple[20],
unpacked_tuple[21]), unpacked_tuple[19]]
# Loop through the blockettes until blockette 1000 is found. The file
# pointer is always supposed to be at the beginning of the next
# blockette.
while True:
if unpack('>H', file_string.read(2))[0] == 1000:
# Read encoding.
file_string.seek(2, 1)
encoding = unpack('>B', file_string.read(1))[0]
break
else:
file_string.seek(starting_pointer + \
unpack('>H', file_string.read(2))[0] , 0)
def zoom_last_month(self):
"""
Zooms to the last 31 days including today.
"""
now = UTCDateTime()
starttime = UTCDateTime(now.year, now.month, now.day)
endtime = starttime + 86400 - 1
starttime -= 31 * 86400
self.env.main_window.changeTimes(starttime, endtime)
#!/usr/bin/env python
# by TR
import argparse
from obspy.core import UTCDateTime as UTC
import logging
from obspy.core.utcdatetime import UTCDateTime
logging.basicConfig()
parser = argparse.ArgumentParser(description='Process some integers.')
parser.add_argument('file_station',
help='file to plot or station to plot')
parser.add_argument('date', nargs='?', default=None, type=UTC,
help='if first argument is station: date')
parser.add_argument('-a', '--absolute-scale', type=float, default=0.0005,
help='display with different scale, default: 0.0005')
parser.add_argument('-r', '--relative-scale', type=float,
help='display with different relative scale - '
'overwrites ABSOLUTE_SCALE')
parser.add_argument('-s', '--save',
help='save plot to this file instead of showing')
parser.add_argument('-x', '--xcorr-append',
help='dont plot raw data and pass this argument to Data object')
parser.add_argument('-c', '--component', default='Z',
help='component to plot, default: Z')
def _read_extended_header_1(fi, start_byte):
"""
Extract information contained in the extended header block number 1.
"""
deployment_time = _read(fi, start_byte + 8, 8, 'binary') / 1e6
pick_up_time = _read(fi, start_byte + 16, 8, 'binary') / 1e6
start_time_ru = _read(fi, start_byte + 24, 8, 'binary') / 1e6
extended_header_1 = dict(
id_ru=_read(fi, start_byte, 8, 'binary'),
deployment_time=UTCDateTime(deployment_time),
pick_up_time=UTCDateTime(pick_up_time),
start_time_ru=UTCDateTime(start_time_ru),
)
return extended_header_1
def _read_extended_header_1(fi, start_byte):
"""
Extract information contained in the extended header block number 1.
"""
deployment_time = _read(fi, start_byte + 8, 8, 'binary') / 1e6
pick_up_time = _read(fi, start_byte + 16, 8, 'binary') / 1e6
start_time_ru = _read(fi, start_byte + 24, 8, 'binary') / 1e6
extended_header_1 = dict(
id_ru=_read(fi, start_byte, 8, 'binary'),
deployment_time=UTCDateTime(deployment_time),
pick_up_time=UTCDateTime(pick_up_time),
start_time_ru=UTCDateTime(start_time_ru),
)
return extended_header_1
def _convertMSTimeToDatetime(timestring):
"""
Takes Mini-SEED timestamp and returns a obspy.util.UTCDateTime object.
:param timestamp: Mini-SEED timestring (Epoch time string in ms).
"""
return UTCDateTime(timestring / HPTMODULUS)