Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def event_only_picks(self, picks):
return ev.Event(picks=picks)
def event(self):
origin = Origin(time=self.time, latitude=47, longitude=-111.7)
return Event(origins=[origin])
def test_put_new_events(self, bing_ebank):
""" ensure a new event can be put into the bank """
ori = ev.Origin(time=obspy.UTCDateTime("2016-01-01"))
event = ev.Event(origins=[ori])
event.origins[0].depth_errors = None # see obspy 2173
bing_ebank.put_events(event)
event_out = bing_ebank.get_events(event_id=event.resource_id)
assert len(event_out) == 1
assert event_out[0] == event
warnings.warn(msg, ObsPyNDKWarning)
continue
# Use one creation info for essentially every item.
creation_info = CreationInfo(
agency_id="GCMT",
version=record["version_code"]
)
# Use the ObsPy Flinn Engdahl region determiner as the region in the
# NDK files is oftentimes trimmed.
region = fe.get_region(record["centroid_longitude"],
record["centroid_latitude"])
# Create an event object.
event = Event(
force_resource_id=False,
event_type="earthquake",
event_type_certainty="known",
event_descriptions=[
EventDescription(text=region, type="Flinn-Engdahl region"),
EventDescription(text=record["cmt_event_name"],
type="earthquake name")
]
)
# Assemble the time for the reference origin.
try:
time = _parse_date_time(record["date"], record["time"])
except ObsPyNDKException:
msg = ("Invalid time in event %i. '%s' and '%s' cannot be "
"assembled to a valid time. Event will be skipped.") % \
""" Given a catalog or event, return a dataframe of picks """
return picks_to_df(cat_or_event)
obspy.core.event.Catalog.picks_to_df = picks_to_dataframe
obspy.core.event.Event.picks_to_df = picks_to_dataframe
# arrivals_to_dataframe
def arrivals_to_dataframe(cat_or_event):
""" Given a catalog or event, return a dataframe of arrivals """
return arrivals_to_df(cat_or_event)
obspy.core.event.Catalog.arrivals_to_df = arrivals_to_dataframe
obspy.core.event.Event.arrivals_to_df = arrivals_to_dataframe
obspy.core.event.Origin.arrivals_to_df = arrivals_to_dataframe
# amplitudes_to_dataframe
def amplitudes_to_dataframe(cat_or_event):
""" Given a catalog or event, return a dataframe of amplitudes """
return amplitudes_to_df(cat_or_event)
obspy.core.event.Catalog.amplitudes_to_df = amplitudes_to_dataframe
obspy.core.event.Event.amplitudes_to_df = amplitudes_to_dataframe
# station_magnitudes_to_dataframe
def station_magnitudes_to_dataframe(cat_or_event):
""" Given a catalog or event, return a dataframe of station magnitudes """
:type sfile: str
:param sfile: Path to the s-file
:returns: :class: obspy.core.event.Event
>>> event = readheader('eqcorrscan/tests/test_data/REA/TEST_/' +
... '01-0411-15L.S201309')
>>> print(event.origins[0].time)
2013-09-01T04:11:15.700000Z
"""
import warnings
from obspy.core.event import Event, Origin, Magnitude, Comment
from obspy.core.event import EventDescription, CreationInfo
f = open(sfile, 'r')
# Base populate to allow for empty parts of file
new_event = Event()
topline = f.readline()
if not len(topline.rstrip()) == 80:
raise IOError('s-file has a corrupt header, not 80 char long')
f.seek(0)
for line in f:
if line[79] in [' ', '1']:
topline = line
break
if line[79] == '7':
raise IOError('No header found, corrupt s-file?')
try:
sfile_seconds = int(topline[16:18])
if sfile_seconds == 60:
sfile_seconds = 0
add_seconds = 60
else:
assert len(detect_streams) > 0, "No appropriate data found, check your " \
"family and detections - make sure seed " \
"ids match"
if len(detect_streams) != len(family):
Logger.warning("Not all detections have matching data. "
"Proceeding anyway. HINT: Make sure SEED IDs match")
# Correlation function needs a list of streams, we need to maintain order.
ccc, chans = _concatenate_and_correlate(
streams=detect_streams, template=family.template.st, cores=cores)
for i, detection_id in enumerate(detection_ids):
detection = [d for d in family.detections if d.id == detection_id][0]
correlations = ccc[i]
picked_chans = chans[i]
detect_stream = detect_streams_dict[detection_id]
checksum, cccsum, used_chans = 0.0, 0.0, 0
event = Event()
for correlation, stachan in zip(correlations, picked_chans):
if not stachan.used:
continue
tr = detect_stream.select(
station=stachan.channel[0], channel=stachan.channel[1])[0]
if interpolate:
shift, cc_max = _xcorr_interp(correlation, dt=delta)
else:
cc_max = np.amax(correlation)
shift = np.argmax(correlation) * delta
if np.isnan(cc_max): # pragma: no cover
Logger.error(
'Problematic trace, no cross correlation possible')
continue
picktime = tr.stats.starttime + shift
checksum += cc_max
.. rubric:: Example
>>> from obspy import read
>>> from eqcorrscan.utils.picker import cross_net
>>> st = read()
>>> event = cross_net(st, env=True)
>>> print(event.creation_info.author)
EQcorrscan
.. warning::
This routine is not designed for accurate picking, rather it can be
used for a first-pass at picks to obtain simple locations. Based on
the waveform-envelope cross-correlation method.
"""
event = Event()
event.origins.append(Origin())
event.creation_info = CreationInfo(author='EQcorrscan',
creation_time=UTCDateTime())
event.comments.append(Comment(text='cross_net'))
samp_rate = stream[0].stats.sampling_rate
if not env:
Logger.info('Using the raw data')
st = stream.copy()
st.resample(samp_rate)
else:
st = stream.copy()
Logger.info('Computing envelope')
for tr in st:
tr.resample(samp_rate)
tr.data = envelope(tr.data)
if not master:
if len(userid) != 4:
raise NordicParsingError('%s User ID must be 4 characters long'
% userid)
# Check that outdir exists
if not os.path.isdir(outdir):
raise NordicParsingError('Out path does not exist, I will not '
'create this: ' + outdir)
# Check that evtype is one of L,R,D
if evtype not in ['L', 'R', 'D']:
raise NordicParsingError('Event type must be either L, R or D')
if explosion:
evtype += 'E'
# Check that there is one event
if isinstance(event, Catalog) and len(event) == 1:
event = event[0]
elif isinstance(event, Event):
event = event
else:
raise NordicParsingError('Needs a single event')
if not isinstance(wavefiles, list):
wavefiles = [str(wavefiles)]
# Determine name from origin time
try:
origin = event.preferred_origin() or event.origins[0]
except IndexError:
msg = 'Need at least one origin with at least an origin time'
raise NordicParsingError(msg)
evtime = origin.time
if not evtime:
msg = ('event has an origin, but time is not populated. ' +
'This is required!')
raise NordicParsingError(msg)
class Magnitude(obspy.core.event.Magnitude, CommonEventHelper):
def __init__(self, *args, **kwargs):
super(Magnitude, self).__init__()
self.newID()
self._CommonEventHelper__set_creation_info()
class Catalog(obspy.core.event.Catalog, CommonEventHelper):
def __init__(self, *args, **kwargs):
super(Catalog, self).__init__()
self.newID()
self._CommonEventHelper__set_creation_info()
class Event(obspy.core.event.Event, CommonEventHelper):
def __init__(self, *args, **kwargs):
super(Event, self).__init__()
self.newID()
self._CommonEventHelper__set_creation_info()
def set_creation_info_username(self, username):
if not self.creation_info:
self._CommonEventHelper__set_creation_info()
self.creation_info.author = username
class Origin(obspy.core.event.Origin, CommonEventHelper):
def __init__(self, *args, **kwargs):
super(Origin, self).__init__()
self.newID()
self._CommonEventHelper__set_creation_info()