Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# generate output spectrum
create = find_typed_function(timeseries.dtype, 'Create', 'FrequencySeries')
lalfs = create(timeseries.name, lal.LIGOTimeGPS(timeseries.epoch.gps), 0,
1 / segmentlength, lal.StrainUnit,
int(segmentlength // 2 + 1))
# find LAL method (e.g. median-mean -> lal.REAL8AverageSpectrumMedianMean)
methodname = ''.join(map(str.title, re.split('[-_]', method)))
spec_func = find_typed_function(timeseries.dtype, '',
'AverageSpectrum{}'.format(methodname))
# calculate spectrum
spec_func(lalfs, timeseries.to_lal(), segmentlength, stride, window, plan)
# format and return
spec = FrequencySeries.from_lal(lalfs)
spec.name = timeseries.name
spec.channel = timeseries.channel
spec.override_unit(scale_timeseries_unit(
timeseries.unit, scaling='density'))
return spec
if size != required:
warnings.warn("Data array is the wrong size for the correct number "
"of averages given the input parameters. The trailing "
"%d samples will not be used in this calculation."
% (size - required))
timeseries = timeseries[:required]
# generate output spectrum
create = find_typed_function(timeseries.dtype, 'Create', 'FrequencySeries')
lalfs = create(timeseries.name, lal.LIGOTimeGPS(timeseries.epoch.gps), 0,
1 / segmentlength, lal.StrainUnit,
int(segmentlength // 2 + 1))
# find LAL method (e.g. median-mean -> lal.REAL8AverageSpectrumMedianMean)
methodname = ''.join(map(str.title, re.split('[-_]', method)))
spec_func = find_typed_function(timeseries.dtype, '',
'AverageSpectrum{}'.format(methodname))
# calculate spectrum
spec_func(lalfs, timeseries.to_lal(), segmentlength, stride, window, plan)
# format and return
spec = FrequencySeries.from_lal(lalfs)
spec.name = timeseries.name
spec.channel = timeseries.channel
spec.override_unit(scale_timeseries_unit(
timeseries.unit, scaling='density'))
return spec
if method == 'median-mean' and numsegs % 2:
numsegs -= 1
if not numsegs:
raise ValueError("Cannot calculate median-mean spectrum with "
"this small a TimeSeries.")
required = int((numsegs - 1) * stride + segmentlength)
if size != required:
warnings.warn("Data array is the wrong size for the correct number "
"of averages given the input parameters. The trailing "
"%d samples will not be used in this calculation."
% (size - required))
timeseries = timeseries[:required]
# generate output spectrum
create = find_typed_function(timeseries.dtype, 'Create', 'FrequencySeries')
lalfs = create(timeseries.name, lal.LIGOTimeGPS(timeseries.epoch.gps), 0,
1 / segmentlength, lal.StrainUnit,
int(segmentlength // 2 + 1))
# find LAL method (e.g. median-mean -> lal.REAL8AverageSpectrumMedianMean)
methodname = ''.join(map(str.title, re.split('[-_]', method)))
spec_func = find_typed_function(timeseries.dtype, '',
'AverageSpectrum{}'.format(methodname))
# calculate spectrum
spec_func(lalfs, timeseries.to_lal(), segmentlength, stride, window, plan)
# format and return
spec = FrequencySeries.from_lal(lalfs)
spec.name = timeseries.name
spec.channel = timeseries.channel
def nds2_buffer(channel, data, epoch, sample_rate, unit,
name=None, slope=1, offset=0):
import nds2
epoch = LIGOTimeGPS(epoch)
ndsbuffer = mock.create_autospec(nds2.buffer)
ndsbuffer.length = len(data)
ndsbuffer.channel = nds2_channel(channel, sample_rate, unit)
ndsbuffer.name = name or ndsbuffer.channel.name
ndsbuffer.sample_rate = sample_rate
ndsbuffer.gps_seconds = epoch.gpsSeconds
ndsbuffer.gps_nanoseconds = epoch.gpsNanoSeconds
ndsbuffer.signal_slope = slope
ndsbuffer.signal_offset = offset
ndsbuffer.data = data
return ndsbuffer
def find_trigger_urls(channel, etg, gpsstart, gpsend, verbose=False):
"""Find the paths of trigger files that represent the given
observatory, channel, and ETG (event trigger generator) for a given
GPS [start, end) segment.
"""
if etg.lower().startswith('omicron'):
etg = '?' + etg[1:]
# construct search
gpsstart = to_gps(gpsstart).seconds
gpsend = to_gps(gpsend).seconds
span = Segment(gpsstart, gpsend)
ifo, channel = channel.split(':', 1)
trigtype = "%s_%s" % (channel, etg.lower())
epoch = '*'
searchbase = os.path.join(TRIGFIND_BASE_PATH, epoch, ifo, trigtype)
gpsdirs = range(int(str(gpsstart)[:5]), int(str(gpsend)[:5])+1)
trigform = ('%s-%s_%s-%s-*.xml*'
% (ifo, re_dash.sub('_', channel), etg.lower(), '[0-9]'*10))
# test for channel-level directory
if not glob.glob(searchbase):
raise ValueError("No channel-level directory found at %s. Either the "
"channel name or ETG names are wrong, or this "
"channel is not configured for this ETG."
% searchbase)
def find_latest(observatory, frametype, gpstime=None, allow_tape=False,
connection=None, **connection_kw):
"""Find the path of the latest file of a given data type.
See also
--------
gwdatafind.http.HTTPConnection.find_latest
FflConnection.find_latest
for details on the underlying method(s)
"""
observatory = observatory[0]
try:
if gpstime is not None:
gpstime = int(to_gps(gpstime))
path = find_urls(observatory, frametype, gpstime, gpstime+1,
on_gaps='ignore', connection=connection)[-1]
else:
path = connection.find_latest(observatory, frametype,
on_missing='ignore')[-1]
except (IndexError, RuntimeError):
raise RuntimeError(
"no files found for {}-{}".format(observatory, frametype))
path = urlparse(path).path
if not allow_tape and on_tape(path):
raise IOError("Latest frame file for {}-{} is on tape "
"(pass allow_tape=True to force): "
"{}".format(observatory, frametype, path))
return path
def _auto_epoch(self, axis):
# use the lower data/view limit as the epoch
epoch = round(self._lim(axis)[0])
# round epoch in successive units for large scales
unit = self.get_unit()
date = from_gps(epoch)
fields = ('second', 'minute', 'hour', 'day')
for i, u in enumerate(fields[1:]):
if unit < units.Unit(u):
break
if u in ('day',):
date = date.replace(**{fields[i]: 1})
else:
date = date.replace(**{fields[i]: 0})
return int(to_gps(date))
def _iter_cache(cachefile, gpstype=LIGOTimeGPS):
"""Internal method that yields a `_CacheEntry` for each line in the file
This method supports reading LAL- and (nested) FFL-format cache files.
"""
try:
path = os.path.abspath(cachefile.name)
except AttributeError:
path = None
for line in cachefile:
try:
yield _CacheEntry.parse(line, gpstype=LIGOTimeGPS)
except ValueError:
# virgo FFL format (seemingly) supports nested FFL files
parts = line.split()
if len(parts) == 3 and os.path.abspath(parts[0]) != path:
with open(parts[0], 'r') as cache2:
def _parse_entry_ffl(line, gpstype=LIGOTimeGPS):
from ..segments import Segment
path, start, dur, _, _ = map(str, line)
start = gpstype(start)
end = start + float(dur)
try:
observatory, description = Path(path).name.split('-', 2)[:2]
except ValueError:
return _CacheEntry(None, None, Segment(start, end), path)
return _CacheEntry(observatory, description, Segment(start, end), path)
def parse(cls, line, gpstype=LIGOTimeGPS):
# format line string
if isinstance(line, bytes):
line = line.decode('utf-8')
parts = line.strip().split()
# if single entry, parse filename
if len(parts) == 1:
path = parts[0]
return cls(*filename_metadata(path) + (path,))
try:
return _parse_entry_ffl(parts)
except (RuntimeError, TypeError, ValueError) as exc:
try:
return _parse_entry_lal(parts)