Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
document = self._session.documents_by_id.get(self._document_id, None)
info = self._session.documents_by_entity.get(document, None) if document else None
if info and "HasTimeSeries" not in info.get('@metadata', {}).get('@flags', {}):
return []
from_date = from_date if from_date else datetime.min
to_date = to_date if to_date else datetime.max
cache = self._session.time_series_by_document_id.get(self._document_id, None)
ranges_result = cache.get(self._name, None) if cache else None
if ranges_result:
if ranges_result[0].from_date > to_date or ranges_result[-1].to_date < from_date:
# the entire range [from_date, to_date] is out of cache bounds
self._session.increment_requests_count()
ranges = TimeSeriesRange(name=self._name, from_date=from_date, to_date=to_date)
details = self._session.advanced.document_store.operations.send(
GetTimeSeriesOperation(self._document_id, ranges, start, page_size))
time_series_range_result = TimeSeriesRangeResult.create_time_series_range_entity(
details)
if not self._session.no_tracking:
index = 0 if ranges_result[0].from_date > to_date else len(ranges_result)
ranges_result.insert(index, time_series_range_result)
pass
return time_series_range_result.entries
entries_to_return, cache_ranges = self._add_to_cache(ranges_result, from_date, to_date, start, page_size)
if not self._session.no_tracking:
cache[self._name] = cache_ranges
def _add_to_cache(self, ranges, from_date, to_date, start, page_size):
start_date = from_date
index = 0
time_series_ranges = []
# Get the missing ranges we need to fetch from the server (only if needed)
for range_ in ranges:
if range_.from_date <= start_date:
if range_.to_date >= to_date:
return [entry for entry in range_.entries if
entry.timestamp >= from_date or entry.timestamp <= to_date], ranges
else:
start_date = range_.to_date
continue
else:
if range_.from_date <= to_date:
time_series_ranges.insert(index, TimeSeriesRange(self._name, start_date, range_.from_date))
index += 1
if range_.to_date >= to_date:
start_date = None
break
start_date = range_.to_date
else:
time_series_ranges.insert(index, TimeSeriesRange(self._name, start_date, to_date))
start_date = None
break
if start_date:
time_series_ranges.insert(index, TimeSeriesRange(self._name, start_date, to_date))
self._session.increment_requests_count()
details = self._session.advanced.document_store.operations.send(
def __init__(self, document_id: str, ranges: Iterable[TimeSeriesRange] or TimeSeriesRange,
start: int = 0, page_size: int = sys.maxsize):
"""
Build the time series get operations.
"""
super().__init__()
if not document_id:
raise ValueError("Invalid document Id, please provide a valid value and try again")
if not ranges:
raise ValueError("Ranges property Cannot be empty or None, please put a valid value and try again")
if isinstance(ranges, TimeSeriesRange):
ranges = [ranges]
self._ranges = ranges
self._document_id = document_id
self._start = start
self._page_size = page_size
continue
else:
if range_.from_date <= to_date:
time_series_ranges.insert(index, TimeSeriesRange(self._name, start_date, range_.from_date))
index += 1
if range_.to_date >= to_date:
start_date = None
break
start_date = range_.to_date
else:
time_series_ranges.insert(index, TimeSeriesRange(self._name, start_date, to_date))
start_date = None
break
if start_date:
time_series_ranges.insert(index, TimeSeriesRange(self._name, start_date, to_date))
self._session.increment_requests_count()
details = self._session.advanced.document_store.operations.send(
GetTimeSeriesOperation(self._document_id, time_series_ranges, start, page_size))
results = [TimeSeriesRangeResult.create_time_series_range_entity(i) for i in details['Entries']]
time_series_range_result = None
start_from, end_from = None, None
cache_merge = []
for range_ in ranges:
# Add all the entries from the ranges to the result we fetched from the server
# build the cache again, merge ranges if needed
if range_.from_date <= from_date and range_.to_date >= to_date or \
return [entry for entry in range_.entries if
entry.timestamp >= from_date or entry.timestamp <= to_date], ranges
else:
start_date = range_.to_date
continue
else:
if range_.from_date <= to_date:
time_series_ranges.insert(index, TimeSeriesRange(self._name, start_date, range_.from_date))
index += 1
if range_.to_date >= to_date:
start_date = None
break
start_date = range_.to_date
else:
time_series_ranges.insert(index, TimeSeriesRange(self._name, start_date, to_date))
start_date = None
break
if start_date:
time_series_ranges.insert(index, TimeSeriesRange(self._name, start_date, to_date))
self._session.increment_requests_count()
details = self._session.advanced.document_store.operations.send(
GetTimeSeriesOperation(self._document_id, time_series_ranges, start, page_size))
results = [TimeSeriesRangeResult.create_time_series_range_entity(i) for i in details['Entries']]
time_series_range_result = None
start_from, end_from = None, None
cache_merge = []
details)
if not self._session.no_tracking:
index = 0 if ranges_result[0].from_date > to_date else len(ranges_result)
ranges_result.insert(index, time_series_range_result)
pass
return time_series_range_result.entries
entries_to_return, cache_ranges = self._add_to_cache(ranges_result, from_date, to_date, start, page_size)
if not self._session.no_tracking:
cache[self._name] = cache_ranges
return entries_to_return
self._session.increment_requests_count()
range = TimeSeriesRange(name=self._name, from_date=from_date, to_date=to_date)
details = self._session.advanced.document_store.operations.send(
GetTimeSeriesOperation(self._document_id, range, start, page_size))
time_series_range_result = TimeSeriesRangeResult.create_time_series_range_entity(
details)
if time_series_range_result is None:
return []
entries = time_series_range_result.entries
if not self._session.no_tracking and entries:
if cache:
cache.update({self._name: [time_series_range_result]})
else:
self._session.time_series_by_document_id[self._document_id] = {self._name: [time_series_range_result]}
return [TimeSeriesEntry.create_time_series_entry(entry) for entry in entries]