Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_map_async_started_simultaneously_timings(self):
items = list(range(4))
mytime0 = time.time()
# These are started in parallel:
with parmap.map_async(_wait, items, pm_processes=4) as compute1:
elapsed1 = time.time() - mytime0
mytime = time.time()
with parmap.map_async(_wait, items, pm_processes=4) as compute2:
elapsed2 = time.time() - mytime
mytime = time.time()
result1 = compute1.get()
elapsed3 = time.time() - mytime0
mytime = time.time()
result2 = compute2.get()
elapsed4 = time.time() - mytime0
self.assertTrue(elapsed1 < TIME_OVERHEAD)
self.assertTrue(elapsed2 < TIME_OVERHEAD)
self.assertTrue(elapsed3 < 4*TIME_PER_TEST+2*TIME_OVERHEAD)
self.assertTrue(elapsed4 < 4*TIME_PER_TEST+2*TIME_OVERHEAD)
self.assertEqual(result1, result2)
self.assertEqual(result1, items)
def test_map_async_noparallel_started_simultaneously_timings(self):
NTASKS = 4
items = list(range(NTASKS))
mytime = time.time()
# These are started in parallel:
with parmap.map_async(_wait, items, pm_parallel=False) as compute1:
elapsed1 = time.time() - mytime
mytime = time.time()
with parmap.map_async(_wait, items, pm_parallel=False) as compute2:
elapsed2 = time.time() - mytime
mytime = time.time()
result1 = compute1.get()
result2 = compute2.get()
finished = time.time() - mytime
self.assertTrue(elapsed1 >= NTASKS*TIME_PER_TEST)
self.assertTrue(elapsed2 >= NTASKS*TIME_PER_TEST)
self.assertTrue(finished <= 2*TIME_OVERHEAD)
self.assertEqual(result1, result2)
self.assertEqual(result1, items)
def test_map_async_noparallel_started_simultaneously_timings(self):
NTASKS = 4
items = list(range(NTASKS))
mytime = time.time()
# These are started in parallel:
with parmap.map_async(_wait, items, pm_parallel=False) as compute1:
elapsed1 = time.time() - mytime
mytime = time.time()
with parmap.map_async(_wait, items, pm_parallel=False) as compute2:
elapsed2 = time.time() - mytime
mytime = time.time()
result1 = compute1.get()
result2 = compute2.get()
finished = time.time() - mytime
self.assertTrue(elapsed1 >= NTASKS*TIME_PER_TEST)
self.assertTrue(elapsed2 >= NTASKS*TIME_PER_TEST)
self.assertTrue(finished <= 2*TIME_OVERHEAD)
self.assertEqual(result1, result2)
self.assertEqual(result1, items)
def test_map_async(self):
NUM_TASKS = 6
NCPU = 6
items = range(NUM_TASKS)
mytime = time.time()
pfalse = parmap.map_async(_wait, items, pm_parallel=False).get()
elapsed_false = time.time() - mytime
mytime = time.time()
with parmap.map_async(_wait, items, pm_processes=NCPU) as ptrue:
elap_true_async = time.time() - mytime
mytime = time.time()
ptrue_result = ptrue.get()
elap_true_get = time.time() - mytime
noparmap = list(items)
self.assertEqual(pfalse, ptrue_result)
self.assertEqual(pfalse, noparmap)
self.assertTrue(elapsed_false > TIME_PER_TEST*(NUM_TASKS-1))
self.assertTrue(elap_true_async < TIME_OVERHEAD)
self.assertTrue(elap_true_get < TIME_PER_TEST*(NUM_TASKS-1))
def test_map_async_started_simultaneously_timings(self):
items = list(range(4))
mytime0 = time.time()
# These are started in parallel:
with parmap.map_async(_wait, items, pm_processes=4) as compute1:
elapsed1 = time.time() - mytime0
mytime = time.time()
with parmap.map_async(_wait, items, pm_processes=4) as compute2:
elapsed2 = time.time() - mytime
mytime = time.time()
result1 = compute1.get()
elapsed3 = time.time() - mytime0
mytime = time.time()
result2 = compute2.get()
elapsed4 = time.time() - mytime0
self.assertTrue(elapsed1 < TIME_OVERHEAD)
self.assertTrue(elapsed2 < TIME_OVERHEAD)
self.assertTrue(elapsed3 < 4*TIME_PER_TEST+2*TIME_OVERHEAD)
self.assertTrue(elapsed4 < 4*TIME_PER_TEST+2*TIME_OVERHEAD)
self.assertEqual(result1, result2)
self.assertEqual(result1, items)
def test_map_async(self):
NUM_TASKS = 6
NCPU = 6
items = range(NUM_TASKS)
mytime = time.time()
pfalse = parmap.map_async(_wait, items, pm_parallel=False).get()
elapsed_false = time.time() - mytime
mytime = time.time()
with parmap.map_async(_wait, items, pm_processes=NCPU) as ptrue:
elap_true_async = time.time() - mytime
mytime = time.time()
ptrue_result = ptrue.get()
elap_true_get = time.time() - mytime
noparmap = list(items)
self.assertEqual(pfalse, ptrue_result)
self.assertEqual(pfalse, noparmap)
self.assertTrue(elapsed_false > TIME_PER_TEST*(NUM_TASKS-1))
self.assertTrue(elap_true_async < TIME_OVERHEAD)
self.assertTrue(elap_true_get < TIME_PER_TEST*(NUM_TASKS-1))
def test_starmap(self):
items = [(1, 2), (3, 4), (5, 6)]
pfalse = parmap.starmap(_identity, items, 5, 6, pm_parallel=False)
ptrue = parmap.starmap(_identity, items, 5, 6, pm_parallel=True)
self.assertEqual(pfalse, ptrue)
def test_starmap(self):
items = [(1, 2), (3, 4), (5, 6)]
pfalse = parmap.starmap(_identity, items, 5, 6, pm_parallel=False)
ptrue = parmap.starmap(_identity, items, 5, 6, pm_parallel=True)
self.assertEqual(pfalse, ptrue)
def test_starmap_async(self):
items = [(1, 2), (3, 4), (5, 6)]
pfalse = parmap.starmap_async(_identity, items, 5, 6, pm_parallel=False)
ptrue = parmap.starmap_async(_identity, items, 5, 6, pm_parallel=True)
self.assertEqual(pfalse.get(), ptrue.get())
def test_starmap_async(self):
items = [(1, 2), (3, 4), (5, 6)]
pfalse = parmap.starmap_async(_identity, items, 5, 6, pm_parallel=False)
ptrue = parmap.starmap_async(_identity, items, 5, 6, pm_parallel=True)
self.assertEqual(pfalse.get(), ptrue.get())