Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _check_parameters(self, size, start, step):
"""Checks if parameters are data slice offsets."""
if not isinstance(size, DataSliceStep):
size = DataSliceStep(size)
start = start or self._df.index[0]
if not isinstance(start, DataSliceOffset):
start = DataSliceOffset(start)
step = step or size
if not isinstance(step, DataSliceStep):
step = DataSliceStep(step)
info = 'offset must be positive'
assert size._is_positive, info
assert step._is_positive, info
if any(offset._is_offset_period for offset in (size, start, step)):
info = 'offset by time requires a time index'
assert self._is_time_index, info
def _check_parameters(self, size, start, step):
"""Checks if parameters are data slice offsets."""
if not isinstance(size, DataSliceStep):
size = DataSliceStep(size)
start = start or self._df.index[0]
if not isinstance(start, DataSliceOffset):
start = DataSliceOffset(start)
step = step or size
if not isinstance(step, DataSliceStep):
step = DataSliceStep(step)
info = 'offset must be positive'
assert size._is_positive, info
assert step._is_positive, info
if any(offset._is_offset_period for offset in (size, start, step)):
info = 'offset by time requires a time index'
assert self._is_time_index, info
return size, start, step
def _check_parameters(self, size, start, step):
"""Checks if parameters are data slice offsets."""
if not isinstance(size, DataSliceStep):
size = DataSliceStep(size)
start = start or self._df.index[0]
if not isinstance(start, DataSliceOffset):
start = DataSliceOffset(start)
step = step or size
if not isinstance(step, DataSliceStep):
step = DataSliceStep(step)
info = 'offset must be positive'
assert size._is_positive, info
assert step._is_positive, info
if any(offset._is_offset_period for offset in (size, start, step)):
info = 'offset by time requires a time index'
assert self._is_time_index, info
return size, start, step
def _check_parameters(self, size, start, step):
"""Checks if parameters are data slice offsets."""
if not isinstance(size, DataSliceStep):
size = DataSliceStep(size)
start = start or self._df.index[0]
if not isinstance(start, DataSliceOffset):
start = DataSliceOffset(start)
step = step or size
if not isinstance(step, DataSliceStep):
step = DataSliceStep(step)
info = 'offset must be positive'
assert size._is_positive, info
assert step._is_positive, info
if any(offset._is_offset_period for offset in (size, start, step)):
info = 'offset by time requires a time index'
def read_parquet(path, filename='label_times.parquet', load_settings=True):
"""Read label times in parquet format from disk.
Args:
path (str) : Directory on disk to read from.
filename (str) : Filename for label times. Default value is `label_times.parquet`.
load_settings (bool) : Whether to load the settings used to make the label times.
Returns:
LabelTimes : Deserialized label times.
"""
file = os.path.join(path, filename)
assert os.path.exists(file), "data not found: '%s'" % file
data = pd.read_parquet(file)
label_times = LabelTimes(data=data)
if load_settings:
label_times = label_times._load_settings(path)
return label_times
def read_pickle(path, filename='label_times.pickle', load_settings=True):
"""Read label times in parquet format from disk.
Args:
path (str) : Directory on disk to read from.
filename (str) : Filename for label times. Default value is `label_times.parquet`.
load_settings (bool) : Whether to load the settings used to make the label times.
Returns:
LabelTimes : Deserialized label times.
"""
file = os.path.join(path, filename)
assert os.path.exists(file), "data not found: '%s'" % file
data = pd.read_pickle(file)
label_times = LabelTimes(data=data)
if load_settings:
label_times = label_times._load_settings(path)
return label_times
def read_csv(path, filename='label_times.csv', load_settings=True):
"""Read label times in csv format from disk.
Args:
path (str) : Directory on disk to read from.
filename (str) : Filename for label times. Default value is `label_times.csv`.
load_settings (bool) : Whether to load the settings used to make the label times.
Returns:
LabelTimes : Deserialized label times.
"""
file = os.path.join(path, filename)
assert os.path.exists(file), "data not found: '%s'" % file
data = pd.read_csv(file, index_col='id')
label_times = LabelTimes(data=data)
if load_settings:
label_times = label_times._load_settings(path)
return label_times
def _constructor(self):
return LabelTimes
is_label_search = isinstance(num_examples_per_instance, dict)
search = (LabelSearch if is_label_search else ExampleSearch)(num_examples_per_instance)
records = self._run_search(
df=df,
search=search,
gap=gap,
min_data=minimum_data,
drop_empty=drop_empty,
verbose=verbose,
*args,
**kwargs,
)
lt = LabelTimes(
data=records,
target_columns=list(self.labeling_function),
target_entity=self.target_entity,
search_settings={
'num_examples_per_instance': num_examples_per_instance,
'minimum_data': str(minimum_data),
'window_size': str(self.window_size),
'gap': str(gap),
},
)
return lt
def _load_settings(self, path):
"""Read the settings in json format from disk.
Args:
path (str) : Directory on disk to read from.
"""
file = os.path.join(path, 'settings.json')
assert os.path.exists(file), 'settings not found'
with open(file, 'r') as file:
settings = json.load(file)
if 'dtypes' in settings:
dtypes = settings.pop('dtypes')
self = LabelTimes(self.astype(dtypes))
self.settings.update(settings)
return self