Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def nds2_buffer(channel, data, epoch, sample_rate, unit,
name=None, slope=1, offset=0):
import nds2
epoch = LIGOTimeGPS(epoch)
ndsbuffer = mock.create_autospec(nds2.buffer)
ndsbuffer.length = len(data)
ndsbuffer.channel = nds2_channel(channel, sample_rate, unit)
ndsbuffer.name = name or ndsbuffer.channel.name
ndsbuffer.sample_rate = sample_rate
ndsbuffer.gps_seconds = epoch.gpsSeconds
ndsbuffer.gps_nanoseconds = epoch.gpsNanoSeconds
ndsbuffer.signal_slope = slope
ndsbuffer.signal_offset = offset
ndsbuffer.data = data
return ndsbuffer
def _iter_cache(cachefile, gpstype=LIGOTimeGPS):
"""Internal method that yields a `_CacheEntry` for each line in the file
This method supports reading LAL- and (nested) FFL-format cache files.
"""
try:
path = os.path.abspath(cachefile.name)
except AttributeError:
path = None
for line in cachefile:
try:
yield _CacheEntry.parse(line, gpstype=LIGOTimeGPS)
except ValueError:
# virgo FFL format (seemingly) supports nested FFL files
parts = line.split()
if len(parts) == 3 and os.path.abspath(parts[0]) != path:
with open(parts[0], 'r') as cache2:
def _parse_entry_ffl(line, gpstype=LIGOTimeGPS):
from ..segments import Segment
path, start, dur, _, _ = map(str, line)
start = gpstype(start)
end = start + float(dur)
try:
observatory, description = Path(path).name.split('-', 2)[:2]
except ValueError:
return _CacheEntry(None, None, Segment(start, end), path)
return _CacheEntry(observatory, description, Segment(start, end), path)
def parse(cls, line, gpstype=LIGOTimeGPS):
# format line string
if isinstance(line, bytes):
line = line.decode('utf-8')
parts = line.strip().split()
# if single entry, parse filename
if len(parts) == 1:
path = parts[0]
return cls(*filename_metadata(path) + (path,))
try:
return _parse_entry_ffl(parts)
except (RuntimeError, TypeError, ValueError) as exc:
try:
return _parse_entry_lal(parts)
if not start:
starts = set([LIGOTimeGPS(tsdict[c].x0.value) for c in tsdict])
if len(starts) != 1:
raise RuntimeError("Cannot write multiple TimeSeries to a single "
"frame with different start times, "
"please write into different frames")
start = list(starts)[0]
if not end:
ends = set([tsdict[c].span[1] for ts in tsdict])
if len(ends) != 1:
raise RuntimeError("Cannot write multiple TimeSeries to a single "
"frame with different end times, "
"please write into different frames")
end = list(ends)[0]
duration = end - start
start = LIGOTimeGPS(start)
ifos = set([ts.channel.ifo for ts in tsdict.values() if
ts.channel and ts.channel.ifo and
hasattr(frameCPP, 'DETECTOR_LOCATION_%s' % ts.channel.ifo)])
# create frame
frame = io_gwf.create_frame(time=start, duration=duration, name=name,
run=run, ifos=ifos)
# append channels
for i, c in enumerate(tsdict):
try:
ctype = tsdict[c].channel._ctype or 'proc'
except AttributeError:
ctype = 'proc'
append_to_frame(frame, tsdict[c].crop(start, end),
type=ctype, channelid=i)
def _need_frame(frame, start, end):
frstart = LIGOTimeGPS(*frame.GetGTime())
if end and frstart >= end:
return False
frend = frstart + frame.GetDt()
if start and frend <= start:
return False
return True
try:
nframe = int(stream.GetNumberOfFrames())
except (AttributeError, ValueError):
nframe = None
# if single frame, trust filename to provide GPS epoch of data
# as required by the file-naming convention
epochs = None
try:
if nframe == 1:
epochs = [file_segment(framefile)[0]]
except ValueError:
pass
if epochs is None:
toc = stream.GetTOC()
epochs = [LIGOTimeGPS(s, n) for s, n in zip(toc.GTimeS, toc.GTimeN)]
toclist = {} # only get names once
for channel in channels:
# if ctype not declared, find it from the table-of-contents
if not ctype.get(channel, None):
toc = stream.GetTOC()
for typename in ['Sim', 'Proc', 'ADC']:
if typename not in toclist:
get_ = getattr(toc, 'Get%s' % typename)
try:
toclist[typename] = get_().keys()
except AttributeError:
toclist[typename] = get_()
if str(channel) in toclist[typename]:
ctype[channel] = typename.lower()
break
This method cannot convert exact leap seconds to
`datetime.datetime`, that object doesn't support it,
so you should consider using `astropy.time.Time` directly.
Examples
--------
>>> from_gps(1167264018)
datetime.datetime(2017, 1, 1, 0, 0)
>>> from_gps(1126259462.3910)
datetime.datetime(2015, 9, 14, 9, 50, 45, 391000)
"""
try:
gps = LIGOTimeGPS(gps)
except (ValueError, TypeError, RuntimeError):
gps = LIGOTimeGPS(float(gps))
sec, nano = gps.gpsSeconds, gps.gpsNanoSeconds
try:
date = Time(sec, format='gps', scale='utc').datetime
except ValueError as exc:
if "within a leap second" in str(exc):
exc.args = (
"cannot represent leap second using datetime.datetime, "
"consider using "
"astropy.time.Time({}, format=\"gps\", scale=\"utc\") "
"directly".format(gps),
)
raise
return date + datetime.timedelta(microseconds=nano*1e-3)
# Number/Decimal -> str
if isinstance(t, Decimal):
t = str(t)
if isinstance(t, Number):
# note, on python < 3, str() isn't very good, so we use repr
# for python > 3 we can just use str for both Decimal and Number
t = repr(t)
# -- convert to LIGOTimeGPS
if isinstance(t, Time):
return _time_to_gps(t, *args, **kwargs)
try:
return LIGOTimeGPS(t)
except (TypeError, ValueError):
return LIGOTimeGPS(float(t))
def _row_from_frevent(frevent, columns, selection):
"""Generate a table row from an FrEvent
Filtering (``selection``) is done here, rather than in the table reader,
to enable filtering on columns that aren't being returned.
"""
# read params
params = dict(frevent.GetParam())
params['time'] = float(LIGOTimeGPS(*frevent.GetGTime()))
params['amplitude'] = frevent.GetAmplitude()
params['probability'] = frevent.GetProbability()
params['timeBefore'] = frevent.GetTimeBefore()
params['timeAfter'] = frevent.GetTimeAfter()
params['comment'] = frevent.GetComment()
# filter
if not all(op_(params[c], t) for c, op_, t in selection):
return None
# return event as list
return [params[c] for c in columns]