Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def ys(self):
start_indices = self.start_indices // 2
flat_array = self.flat_array[1::2]
return RaggedArray({"start_indices": start_indices, "flat_array": flat_array})
def copy(self, deep=False):
data = dict(
flat_array=self.flat_array,
start_indices=self.start_indices)
return RaggedArray(data, copy=deep)
def _concat_same_type(cls, to_concat):
# concat flat_arrays
flat_array = np.hstack([ra.flat_array for ra in to_concat])
# offset and concat start_indices
offsets = np.hstack([
[0],
np.cumsum([len(ra.flat_array) for ra in to_concat[:-1]])])
start_indices = np.hstack([ra.start_indices + offset
for offset, ra in zip(offsets, to_concat)])
return RaggedArray(dict(
flat_array=flat_array, start_indices=start_indices),
copy=False)
def construct_array_type(cls):
return RaggedArray
def take(self, indices, allow_fill=False, fill_value=None):
if allow_fill:
invalid_inds = [i for i in indices if i < -1]
if invalid_inds:
raise ValueError("""
Invalid indices for take with allow_fill True: {inds}""".format(
inds=invalid_inds[:9]))
sequence = [self[i] if i >= 0 else fill_value
for i in indices]
else:
if len(self) == 0 and len(indices) > 0:
raise IndexError("cannot do a non-empty take")
sequence = [self[i] for i in indices]
return RaggedArray(sequence, dtype=self.flat_array.dtype)
def _from_sequence(cls, scalars, dtype=None, copy=False):
return RaggedArray(scalars, dtype=dtype)
all(k in data for k in
['start_indices', 'flat_array'])):
_validate_ragged_properties(
start_indices=data['start_indices'],
flat_array=data['flat_array'])
self._start_indices = data['start_indices']
self._flat_array = data['flat_array']
dtype = self._flat_array.dtype
if copy:
self._start_indices = self._start_indices.copy()
self._flat_array = self._flat_array.copy()
elif isinstance(data, RaggedArray):
self._flat_array = data.flat_array
self._start_indices = data.start_indices
dtype = self._flat_array.dtype
if copy:
self._start_indices = self._start_indices.copy()
self._flat_array = self._flat_array.copy()
else:
# Compute lengths
index_len = len(data)
buffer_len = sum(len(datum)
if not missing(datum)
else 0 for datum in data)
# Compute necessary precision of start_indices array
for nbits in [8, 16, 32, 64]: