Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_parse_isotime(self):
expect = timeutils.parse_isotime(self.skynet_self_aware_time_str)
skynet_self_aware_time_utc = self.skynet_self_aware_time.replace(
tzinfo=iso8601.iso8601.UTC)
self.assertEqual(skynet_self_aware_time_utc, expect)
def _make_date_index_continuous(self, target_date, time_delta):
"""Hace el índice de tiempo de los resultados continuo (según
el intervalo de resultados), sin saltos, hasta la fecha
especificada
"""
# Si no hay datos cargados no hay nada que hacer
if not len(self.data):
return
# Caso fecha target > última fecha (rellenar al final)
target_date = iso8601.parse_date(target_date)
last_date = iso8601.parse_date(self.data[-1][0])
delta = time_delta
row_len = len(self.data[0])
while last_date < target_date:
last_date = last_date + delta
date_str = self._format_timestamp(str(last_date.date()))
row = [date_str]
row.extend([None for _ in range(1, row_len)])
self.data.append(row)
# Caso fecha target < primera fecha (rellenar al principio)
first_date = iso8601.parse_date(self.data[0][0])
lead_rows = []
current_date = target_date
while current_date < first_date:
date_str = self._format_timestamp(str(current_date.date()))
row = [date_str]
def datetime_or_none(dt):
"""Validate a datetime or None value."""
if dt is None:
return None
elif isinstance(dt, datetime.datetime):
if dt.utcoffset() is None:
# NOTE(danms): Legacy objects from sqlalchemy are stored in UTC,
# but are returned without a timezone attached.
# As a transitional aid, assume a tz-naive object is in UTC.
return dt.replace(tzinfo=iso8601.iso8601.Utc())
else:
return dt
raise ValueError('A datetime.datetime is required here')
os.listdir(store_directory) if uuid_re.match(x)]
files = [(x, os.path.getsize(p),
datetime.fromtimestamp(os.path.getmtime(p), iso8601.Utc()))
for x, p in files if os.path.isfile(p)]
# Fetch the list of glance images
glance_images = []
kwargs = {'sort_key': 'id', 'sort_dir': 'asc', 'owner': None, 'filters': {}, 'is_public': None}
for x in glance.images.list(**kwargs):
if x.status == 'active':
tz_aware_time = parser.parse(x.created_at)
glance_images.append((x.id, x.size, tz_aware_time, is_remote_image(x)))
# Check all active images 1 hour or older are present
time_cutoff = datetime.now(iso8601.Utc()) - timedelta(0, 3600)
alert_squelch = datetime.now(iso8601.Utc()) - timedelta(0, 43200) # 12 hours
result = 0
for image in [x for x in glance_images if x[2] < time_cutoff]:
if not [x for x in files if x[0] == image[0]]:
if image[3] == False:
print "Glance image %s not found in %s" % (image[0], store_directory)
result = switch_on_criticality()
# Check all files have a corresponding glance image and ignore brand new / zero size files
for image_file in files:
if not [x for x in glance_images if x[0] == image_file[0]] and image_file[2] < alert_squelch and image_file[1] > 0:
print "Unknown file %s found in %s" % (image_file[0], store_directory)
result = switch_on_criticality()
from corehq.util.datadog.gauges import datadog_counter
reconciliation_soft_assert = soft_assert('jroth@dimagi.com', include_breadcrumbs=True)
def _validate_length(length):
def __inner(value):
if len(value) > length:
raise ValueError('Value exceeds allowed length: {}'.format(length))
return value
return __inner
PROPERTY_TYPE_MAPPING = {
'opened_on': iso8601.parse_date,
'name': _validate_length(255),
'type': _validate_length(255),
'owner_id': _validate_length(255),
'external_id': _validate_length(255),
}
def _convert_type_check_length(property_name, value):
try:
return PROPERTY_TYPE_MAPPING.get(property_name, lambda x: x)(value)
except ValueError as e:
raise CaseValueError('Error processing case update: Field: {}, Error: {}'.format(property_name, str(e)))
class SqlCaseUpdateStrategy(UpdateStrategy):
case_implementation_class = CommCareCaseSQL
UTCNOW = lambda: datetime.datetime.now(iso8601.iso8601.UTC)
UIDGEN = lambda: uuid.uuid4().hex
def _make_date_index_continuous(self, start_date, end_date):
"""Hace el índice de tiempo de los resultados continuo (según
el intervalo de resultados), sin saltos, entre start_date y end_date.
Esto implica llenar el diccionario self.data_dict con claves de los
timestamp faltantes para asegurar la continuidad
"""
# Si no hay datos cargados no hay nada que hacer
if not self.data_dict:
return
current_date = iso8601.parse_date(start_date)
end_date = iso8601.parse_date(end_date)
while current_date < end_date:
current_date += get_relative_delta(self.args[constants.PARAM_PERIODICITY])
self.data_dict.setdefault(str(current_date.date()), {})
def coerce(self, obj, attr, value):
if isinstance(value, six.string_types):
# NOTE(danms): Being tolerant of isotime strings here will help us
# during our objects transition
value = timeutils.parse_isotime(value)
elif not isinstance(value, datetime.datetime):
raise ValueError(_('A datetime.datetime is required '
'in field %(attr)s, not a %(type)') %
{'attr': attr, 'type': type(value).__name__})
if value.utcoffset() is None and self.tzinfo_aware:
# NOTE(danms): Legacy objects from sqlalchemy are stored in UTC,
# but are returned without a timezone attached.
# As a transitional aid, assume a tz-naive object is in UTC.
value = value.replace(tzinfo=iso8601.iso8601.Utc())
elif not self.tzinfo_aware:
value = value.replace(tzinfo=None)
return value