Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Parameters
----------
source : `str`, `file`, `lal.Cache`, `glue.lal.Cache`
data source to read
Returns
-------
stream : `lalframe.FrStream`
an open `FrStream`
Raises
------
ValueError
if the input format cannot be identified
"""
lalframe = import_method_dependency('lalframe')
import lal
if isinstance(source, (file, tempfile._TemporaryFileWrapper)): # pylint:disable=protected-access
source = source.name
if isinstance(source, GlueCacheEntry):
source = source.path
# read single file
if isinstance(source, string_types) and source.endswith('.gwf'):
return lalframe.FrStreamOpen(*os.path.split(source))
# read cache file
elif (isinstance(source, string_types) and
source.endswith(('.lcf', '.cache'))):
return lalframe.FrStreamCacheOpen(lal.CacheImport(source))
# read glue cache object
elif isinstance(source, GlueCache):
list of channel names to read
start : `~gwpy.time.LIGOTimeGPS`, `float`, `str`, optional
GPS start time of required data,
any input parseable by `~gwpy.time.to_gps` is fine
end : `~gwpy.time.LIGOTimeGPS`, `float`, `str`, optional
GPS end time of required data,
any input parseable by `~gwpy.time.to_gps` is fine
Returns
-------
timeseriesdict : `TimeSeriesDict`
a dict of `(channel, TimeSeries)` pairs of data read from the stream
"""
lalframe = import_method_dependency('lalframe')
import lal
from gwpy.utils import lal as lalutils
# parse times and restrict to available data
epoch = lal.LIGOTimeGPS(stream.epoch.gpsSeconds,
stream.epoch.gpsNanoSeconds)
streamdur = get_stream_duration(stream)
if start is None:
start = epoch
else:
start = max(epoch, lalutils.to_lal_ligotimegps(start))
if end is None:
offset = float(start - epoch)
duration = streamdur - offset
else:
end = min(epoch + streamdur, end)
any input parseable by `~gwpy.time.to_gps` is fine
end : `~gwpy.time.LIGOTimeGPS`, `float`, `str`, optional
GPS end time of required data,
any input parseable by `~gwpy.time.to_gps` is fine
project : `str`, optional
name to write into frame header
run : `int`, optional
run number to write into frame header
frame : `int`, optional
frame number to write into frame header
"""
lalframe = import_method_dependency('lalframe')
import lal
from gwpy.utils import lal as lalutils
if not start:
start = tsdict.values()[0].xspan[0]
if not end:
end = tsdict.values()[0].xspan[1]
duration = end - start
# get ifos list
try:
detectors = 0
for series in tsdict.values():
idx = list(lalutils.LAL_DETECTORS.keys()).index(series.channel.ifo)
detectors |= 2**idx
except (KeyError, AttributeError):
def get_stream_duration(stream):
"""Find the duration of time stored in a frame stream
Parameters
----------
stream : `lal.FrStream`
stream of data to search
Returns
-------
duration : `float`
the duration (seconds) of the data for this channel
"""
lalframe = import_method_dependency('lalframe')
import lal
epoch = lal.LIGOTimeGPS(stream.epoch.gpsSeconds,
stream.epoch.gpsNanoSeconds)
# loop over each file in the stream cache and query its duration
nfile = stream.cache.length
duration = 0
for dummy_i in range(nfile):
for dummy_j in range(lalframe.FrFileQueryNFrame(stream.file)):
duration += lalframe.FrFileQueryDt(stream.file, 0)
lalframe.FrStreamNext(stream)
# rewind stream and return
lalframe.FrStreamSeek(stream, epoch)
return duration
def _read_channel(stream, channel, start, duration):
lalframe = import_method_dependency('lalframe')
from gwpy.utils import lal as lalutils
dtype = lalframe.FrStreamGetTimeSeriesType(channel, stream)
typestr = lalutils.LAL_TYPE_STR[dtype]
reader = getattr(lalframe, 'FrStreamRead%sTimeSeries' % typestr)
return reader(stream, channel, start, duration, 0)