Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def add_success_result(self, repo_id, offset):
started = datetime.datetime.now(dateutils.local_tz())
completed = started + datetime.timedelta(days=offset)
r = RepoSyncResult.expected_result(repo_id, 'foo', 'bar', dateutils.format_iso8601_datetime(started), dateutils.format_iso8601_datetime(completed), 1, 1, 1, '', '', RepoSyncResult.RESULT_SUCCESS)
RepoSyncResult.get_collection().save(r, safe=True)
"""
# Setup
self.repo_manager.create_repo('creeper')
for i in range(1, 6):
add_result('creeper', i)
# Test
entries = self.sync_manager.sync_history('creeper')
# Verify
self.assertEqual(5, len(entries))
# Verify descending order
for i in range(0, 4):
first = dateutils.parse_iso8601_datetime(entries[i]['completed'])
second = dateutils.parse_iso8601_datetime(entries[i + 1]['completed'])
self.assertTrue(first > second)
def _populate_for_date_queries(self):
'''
Populates the history with events scattered over the course of a year, suitable for
date range tests.
'''
e1 = ConsumerHistoryEvent(123, 'admin', consumer_history.TYPE_CONSUMER_CREATED, None)
e2 = ConsumerHistoryEvent(123, 'admin', consumer_history.TYPE_CONSUMER_DELETED, None)
e3 = ConsumerHistoryEvent(123, 'admin', consumer_history.TYPE_REPO_BOUND, None)
e4 = ConsumerHistoryEvent(123, 'admin', consumer_history.TYPE_REPO_UNBOUND, None)
e1.timestamp = dateutils.format_iso8601_datetime(datetime.datetime(2000, 2, 1))
e2.timestamp = dateutils.format_iso8601_datetime(datetime.datetime(2000, 4, 1))
e3.timestamp = dateutils.format_iso8601_datetime(datetime.datetime(2000, 6, 1))
e4.timestamp = dateutils.format_iso8601_datetime(datetime.datetime(2000, 10, 1))
self.consumer_history_api.collection.insert(e1)
self.consumer_history_api.collection.insert(e2)
self.consumer_history_api.collection.insert(e3)
self.consumer_history_api.collection.insert(e4)
'''
Populates the CDS history collection with entries staggered by a large date range,
suitable for being able to query within date ranges.
The events are manually created and stored in the database; using the API calls
will cause the timestamps to be set to 'now' in all cases.
'''
e1 = CDSHistoryEvent('cds1.example.com', 'admin', CDSHistoryEventType.REGISTERED)
e2 = CDSHistoryEvent('cds2.example.com', 'admin', CDSHistoryEventType.REPO_ASSOCIATED)
e3 = CDSHistoryEvent('cds3.example.com', 'admin', CDSHistoryEventType.REPO_UNASSOCIATED)
e4 = CDSHistoryEvent('cds4.example.com', 'admin', CDSHistoryEventType.UNREGISTERED)
e1.timestamp = dateutils.format_iso8601_datetime(datetime.datetime(2000, 2, 1, tzinfo=dateutils.utc_tz()))
e2.timestamp = dateutils.format_iso8601_datetime(datetime.datetime(2000, 4, 1, tzinfo=dateutils.utc_tz()))
e3.timestamp = dateutils.format_iso8601_datetime(datetime.datetime(2000, 6, 1, tzinfo=dateutils.utc_tz()))
e4.timestamp = dateutils.format_iso8601_datetime(datetime.datetime(2000, 10, 1, tzinfo=dateutils.utc_tz()))
self.cds_history_api.collection.insert(e1)
self.cds_history_api.collection.insert(e2)
self.cds_history_api.collection.insert(e3)
self.cds_history_api.collection.insert(e4)
def test_add_distributor_no_distributor(self):
"""
Tests adding a distributor that doesn't exist.
"""
# Setup
self.repo_manager.create_repo('real-repo')
# Test
try:
self.distributor_manager.add_distributor('real-repo', 'fake-distributor', {}, True)
self.fail('No exception thrown for an invalid distributor type')
except exceptions.InvalidValue, e:
print(e) # for coverage
def test_enqueue_duplicate_with_same_kw_args(self):
# Setup
task1 = Task(kwargs, kwargs={'foo':1, 'bar':2})
self.queue.enqueue(task1)
self.assertEqual(1, len(list(self.queue._TaskQueue__storage._all_tasks())))
# Test
task2 = Task(kwargs, kwargs={'foo':1, 'bar':2})
self.queue.enqueue(task2, unique=True)
# Verify
self.assertEqual(1, len(list(self.queue._TaskQueue__storage._all_tasks())))
def test_at_time(self):
now = datetime.now()
then = timedelta(seconds=10)
at = AtScheduler(now + then)
task = Task(noop, scheduler=at)
self.queue.enqueue(task)
time.sleep(8)
self.assertTrue(task.state is task_waiting, 'state is %s' % task.state)
time.sleep(3)
self.assertTrue(task.state is task_finished, 'state is %s' % task.state)
def test_enqueue_duplicate_with_different_schedulers(self):
at = AtScheduler(datetime.now() + timedelta(minutes=10))
task1 = Task(noop, scheduler=at)
task2 = Task(noop, scheduler=ImmediateScheduler())
self.queue.enqueue(task1, True)
enqueued_2 = self.queue.enqueue(task2, True)
self.assertTrue(enqueued_2)
def test_task_with_immediate_scheduler_deserialization(self):
task1 = Task(noop, scheduler=ImmediateScheduler())
snapshot = task1.snapshot()
task2 = snapshot.to_task()
self.assertTrue(isinstance(task2.scheduler, ImmediateScheduler))
def test_find_multiple_matching(self):
# Setup
task1 = Task(noop)
task2 = Task(noop)
self.queue.enqueue(task1)
self.queue.enqueue(task2)
# Test
found = self.queue.find(state=task_waiting)
# Verify
self.assertTrue(task2 in found)