Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
frametype_match = GRB_TYPE
# -- go
match = {}
ifos = set() # record IFOs we have queried to prevent duplication
# loop over set of names, which should get smaller as names are searched
while names:
# parse first channel name (to get IFO and channel type)
try:
name = next(iter(names))
except KeyError:
break
else:
chan = Channel(chans[name])
# parse IFO ID
try:
ifo = chan.ifo[0]
except TypeError: # chan.ifo is None
raise ValueError("Cannot parse interferometer prefix from channel "
"name %r, cannot proceed with find()" % str(chan))
# if we've already gone through the types for this IFO, skip
if ifo in ifos:
names.pop()
continue
ifos.add(ifo)
types = find_types(ifo, match=frametype_match, trend=chan.type,
connection=connection)
def channel(self, chan):
if isinstance(chan, Channel):
self._channel = chan
else:
self._channel = Channel(chan)
def channel(self, chan):
if isinstance(chan, Channel):
self._channel = chan
elif chan is None:
self._channel = None
else:
self._channel = Channel(chan)
def _create_series(ndschan, value, start, end, series_class=TimeSeries):
"""Create a timeseries to cover the specified [start, end) limits
To cover a gap in data returned from NDS
"""
channel = Channel.from_nds2(ndschan)
nsamp = int((end - start) * channel.sample_rate.value)
return series_class(numpy_ones(nsamp) * value, t0=start,
sample_rate=channel.sample_rate, unit=channel.unit,
channel=channel)
----------
data : `dict`
input data from CIS json query
Returns
-------
c : `Channel`
a `Channel` built from the data
"""
name = data['name']
sample_rate = data['datarate']
unit = data['units']
dtype = CIS_DATA_TYPE[data['datatype']]
model = data['source']
url = data['displayurl']
return Channel(data['name'], sample_rate=sample_rate, unit=unit,
dtype=dtype, model=model, url=url)
def _unpickle_channel(raw):
"""Try and unpickle a channel with sensible error handling
"""
try:
return pickle.loads(raw)
except (ValueError, pickle.UnpicklingError, EOFError, TypeError,
IndexError) as exc:
# maybe not pickled
if isinstance(raw, bytes):
raw = raw.decode('utf-8')
try: # test if this is a valid channel name
Channel.MATCH.match(raw)
except ValueError:
raise exc
return raw
import pickle
from decimal import Decimal
from operator import attrgetter
from astropy.units import (Quantity, UnitBase)
from ...detector import Channel
from ...io import (hdf5 as io_hdf5, registry as io_registry)
from ...time import LIGOTimeGPS
from .. import (Array, Series, Index)
__author__ = 'Duncan Macleod '
ATTR_TYPE_MAP = {
Quantity: attrgetter('value'),
Channel: str,
UnitBase: str,
Decimal: float,
LIGOTimeGPS: float,
}
# -- read ---------------------------------------------------------------------
@io_hdf5.with_read_hdf5
def read_hdf5_array(source, path=None, array_type=Array):
"""Read an `Array` from the given HDF5 object
Parameters
----------
source : `str`, :class:`h5py.HLObject`
path to HDF file on disk, or open `h5py.HLObject`.
def channel(self, chan):
if isinstance(chan, Channel):
self._channel = chan
else:
self._channel = Channel(chan)
"""
# this function is now horrendously complicated to support a large
# number of different use cases, hopefully the comments are sufficient
from ..detector import Channel
# format channel names as list
if isinstance(channel, (list, tuple)):
channels = channel
else:
channels = [channel]
# create set() of GWF channel names, and dict map back to user names
# this allows users to use nds-style names in this query, e.g.
# 'X1:TEST.mean,m-trend', and still get results
chans = {Channel(c).name: c for c in channels}
names = set(chans.keys())
# format GPS time(s)
if isinstance(gpstime, (list, tuple)):
gpssegment = LigoSegment(*gpstime)
gpstime = gpssegment[0]
else:
gpssegment = None
if gpstime is not None:
gpstime = int(to_gps(gpstime))
# if use gaps post-S5 GPStime, forcibly skip _GRBYYMMDD frametypes at CIT
if frametype_match is None and gpstime is not None and gpstime > 875232014:
frametype_match = GRB_TYPE
# -- go