Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
#
# You should have received a copy of the GNU General Public License
# along with GWpy. If not, see .
"""LIGO_LW I/O registrations for gwpy.frequencyseries objects
"""
from ...io import registry as io_registry
from ...io.ligolw import is_ligolw
from ...types.io.ligolw import read_series
from .. import FrequencySeries
# -- registration -------------------------------------------------------------
io_registry.register_reader('ligolw', FrequencySeries, read_series)
io_registry.register_identifier('ligolw', FrequencySeries, is_ligolw)
# write file object
if header:
print('# seg\tstart\tstop\tduration', file=target)
for i, seg in enumerate(segs):
print(
'\t'.join(map(
str, (i, coltype(seg[0]), coltype(seg[1]), coltype(abs(seg))),
)), file=target,
)
# -- register -----------------------------------------------------------------
registry.register_reader('segwizard', SegmentList, from_segwizard)
registry.register_writer('segwizard', SegmentList, to_segwizard)
registry.register_identifier('segwizard', SegmentList,
identify_factory('txt', 'dat'))
return filter_table(t, *filters)
return t
def table_to_root(table, filename, **kwargs):
"""Write a Table to a ROOT file
"""
import root_numpy
root_numpy.array2root(table.as_array(), filename, **kwargs)
# register I/O
for table_class in (Table, EventTable):
registry.register_reader('root', table_class, table_from_root)
registry.register_writer('root', table_class, table_to_root)
registry.register_identifier('root', table_class,
identify_factory('.root'))
return False
finally:
fileobj.seek(loc)
elif filepath is not None:
return filepath.endswith(('.wav', '.wave'))
else:
try:
wave.open(args[0])
except (wave.Error, AttributeError):
return False
else:
return True
io_registry.register_reader('wav', TimeSeries, read)
io_registry.register_writer('wav', TimeSeries, write)
io_registry.register_identifier('wav', TimeSeries, is_wav)
clist = out.get(group, 'channels')
except configparser.NoOptionError:
out.set(group, 'channels', '\n%s' % entry)
else:
out.set(group, 'channels', clist + '\n%s' % entry)
if isinstance(fobj, FILE_LIKE):
close = False
else:
fobj = open(fobj, 'w')
close = True
out.write(fobj)
if close:
fobj.close()
registry.register_reader('ini', ChannelList, read_channel_list_file)
registry.register_identifier('ini', ChannelList,
identify_factory('.ini', '.clf'))
registry.register_writer('ini', ChannelList, write_channel_list_file)
h5g = h5f
# write each timeseries
kwargs.setdefault('format', 'hdf5')
for key, series in tsdict.items():
series.write(h5g, path=str(key), **kwargs)
# -- register -----------------------------------------------------------------
# series classes
for series_class in (TimeSeries, StateVector):
reader = read_hdf5_factory(series_class)
io_registry.register_reader('hdf5', series_class, reader)
io_registry.register_writer('hdf5', series_class, write_hdf5_series)
io_registry.register_identifier('hdf5', series_class, identify_hdf5)
# dict classes
for dict_class in (TimeSeriesDict, StateVectorDict):
reader = read_hdf5_factory(dict_class)
io_registry.register_reader('hdf5', dict_class, reader)
io_registry.register_writer('hdf5', dict_class, write_hdf5_dict)
io_registry.register_identifier('hdf5', dict_class, identify_hdf5)
comment = '%'
class Omega(core.BaseReader):
"""Read an Omega file
"""
_format_name = 'omega'
_io_registry_can_write = True
_description = 'Omega format table'
header_class = OmegaHeader
data_class = OmegaData
# register ascii.omega for EventTable
registry.register_reader(
'ascii.omega', EventTable, registry.get_reader('ascii.omega', Table))
return to_segwizard(segs, fobj, header=header, coltype=coltype)
# write file object
if header:
print('# seg\tstart\tstop\tduration', file=target)
for i, seg in enumerate(segs):
print(
'\t'.join(map(
str, (i, coltype(seg[0]), coltype(seg[1]), coltype(abs(seg))),
)), file=target,
)
# -- register -----------------------------------------------------------------
registry.register_reader('segwizard', SegmentList, from_segwizard)
registry.register_writer('segwizard', SegmentList, to_segwizard)
registry.register_identifier('segwizard', SegmentList,
identify_factory('txt', 'dat'))
h5g = h5f[group]
else:
h5g = h5f
# write each timeseries
kwargs.setdefault('format', 'hdf5')
for key, series in tsdict.items():
series.write(h5g, path=str(key), **kwargs)
# -- register -----------------------------------------------------------------
# series classes
for series_class in (TimeSeries, StateVector):
reader = read_hdf5_factory(series_class)
io_registry.register_reader('hdf5', series_class, reader)
io_registry.register_writer('hdf5', series_class, write_hdf5_series)
io_registry.register_identifier('hdf5', series_class, identify_hdf5)
# dict classes
for dict_class in (TimeSeriesDict, StateVectorDict):
reader = read_hdf5_factory(dict_class)
io_registry.register_reader('hdf5', dict_class, reader)
io_registry.register_writer('hdf5', dict_class, write_hdf5_dict)
io_registry.register_identifier('hdf5', dict_class, identify_hdf5)