Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
chunksize = 1
class ChunkSize2PickleFifoDiskQueueTest(PickleFifoDiskQueueTest):
chunksize = 2
class ChunkSize3PickleFifoDiskQueueTest(PickleFifoDiskQueueTest):
chunksize = 3
class ChunkSize4PickleFifoDiskQueueTest(PickleFifoDiskQueueTest):
chunksize = 4
class MarshalLifoDiskQueueTest(t.LifoDiskQueueTest):
def queue(self):
return MarshalLifoDiskQueue(self.qpath)
def test_serialize(self):
q = self.queue()
q.push('a')
q.push(123)
q.push({'a': 'dict'})
self.assertEqual(q.pop(), {'a': 'dict'})
self.assertEqual(q.pop(), 123)
self.assertEqual(q.pop(), 'a')
test_nonserializable_object = nonserializable_object_test
assert r2.meta['request'] is r2
class ChunkSize1PickleFifoDiskQueueTest(PickleFifoDiskQueueTest):
chunksize = 1
class ChunkSize2PickleFifoDiskQueueTest(PickleFifoDiskQueueTest):
chunksize = 2
class ChunkSize3PickleFifoDiskQueueTest(PickleFifoDiskQueueTest):
chunksize = 3
class ChunkSize4PickleFifoDiskQueueTest(PickleFifoDiskQueueTest):
chunksize = 4
class MarshalLifoDiskQueueTest(t.LifoDiskQueueTest):
def queue(self):
return MarshalLifoDiskQueue(self.qpath)
def test_serialize(self):
q = self.queue()
q.push('a')
q.push(123)
q.push({'a': 'dict'})
self.assertEqual(q.pop(), {'a': 'dict'})
self.assertEqual(q.pop(), 123)
self.assertEqual(q.pop(), 'a')
def test_nonserializable_object(self):
# Trigger Twisted bug #7989
import twisted.persisted.styles # NOQA