Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_init(self):
"""Test whether an archive is propertly initialized"""
archive_path = os.path.join(self.test_path, 'myarchive')
_ = Archive.create(archive_path)
archive = Archive(archive_path)
self.assertEqual(archive.archive_path, archive_path)
self.assertEqual(archive.created_on, None)
self.assertEqual(archive.origin, None)
self.assertEqual(archive.backend_name, None)
self.assertEqual(archive.backend_version, None)
self.assertEqual(archive.category, None)
self.assertEqual(archive.backend_params, None)
def setUp(self):
self.test_path = tempfile.mkdtemp(prefix='perceval_')
archive_path = os.path.join(self.test_path, 'myarchive')
self.archive = Archive.create(archive_path)
("https://example.com/", {'q': 'issues', 'date': '2017-01-10'}, {}),
("https://example.com/", {'q': 'issues', 'date': '2018-01-01'}, {}),
("https://example.com/tasks", {'task_id': 10}, {'Accept': 'application/json'}),
]
httpretty.register_uri(httpretty.GET,
"https://example.com/",
body='{"hey": "there"}',
status=200)
httpretty.register_uri(httpretty.GET,
"https://example.com/tasks",
body='{"task": "my task"}',
status=200)
archive_path = os.path.join(self.test_path, 'myarchive')
archive = Archive.create(archive_path)
# Store data in the archive
responses = []
for dr in data_requests:
response = requests.get(dr[0], params=dr[1], headers=dr[2])
archive.store(dr[0], dr[1], dr[2], response)
responses.append(response)
db = sqlite3.connect(archive.archive_path)
cursor = db.cursor()
cursor.execute("SELECT hashcode, data, uri, payload, headers FROM archive")
data_stored = cursor.fetchall()
cursor.close()
self.assertEqual(len(data_stored), len(data_requests))
def setUp(self):
self.test_path = tempfile.mkdtemp(prefix='perceval_')
archive_path = os.path.join(self.test_path, 'myarchive')
self.archive = Archive.create(archive_path)
def test_init_archive(self):
"""Test whether the archive is properly initialized when executing the fetch method"""
archive_path = os.path.join(self.test_path, 'myarchive')
archive = Archive.create(archive_path)
b = MockedBackend('test', archive=archive)
_ = [item for item in b.fetch()]
self.assertEqual(b.archive.backend_name, b.__class__.__name__)
self.assertEqual(b.archive.backend_version, b.version)
self.assertEqual(b.archive.origin, b.origin)
self.assertEqual(b.archive.category, MockedBackend.DEFAULT_CATEGORY)
def test_error_archive_and_filter_classified(self):
"""Check if an error is raised when archive and classified fields filtering are both active"""
archive_path = os.path.join(self.test_path, 'myarchive')
archive = Archive.create(archive_path)
backend = ClassifiedFieldsBackend('http://example.com/', archive=archive)
msg_error = "classified fields filtering is not compatible with archiving items"
with self.assertRaisesRegex(BackendError, msg_error):
_ = [item for item in backend.fetch(category=ClassifiedFieldsBackend.DEFAULT_CATEGORY,
filter_classified=True)]
def test_fetch_from_archive_exception(self):
"""Test whether serialized exceptions are thrown"""
archive_path = os.path.join(self.test_path, 'myarchive')
archive = Archive.create(archive_path)
httpretty.register_uri(httpretty.GET,
CLIENT_SPIDERMAN_URL,
body="bad",
status=404)
# populate the archive and check that an exception is thown when fetching data from the API
client = MockedClient(CLIENT_API_URL, sleep_time=0.1, max_retries=1, archive=archive)
with self.assertRaises(requests.exceptions.HTTPError):
_ = client.fetch(CLIENT_SPIDERMAN_URL)
# retrieve data from the archive and check that an exception is
# thown as happened when fetching data from the API)
client = MockedClient(CLIENT_API_URL, sleep_time=0.1, max_retries=1, archive=archive, from_archive=True)
with self.assertRaises(requests.exceptions.HTTPError):
_ = client.fetch(CLIENT_SPIDERMAN_URL)
def test_init_metadata(self):
"""Test whether metadata information is properly initialized"""
archive_path = os.path.join(self.test_path, 'myarchive')
archive = Archive.create(archive_path)
before_dt = datetime_to_utc(datetime_utcnow())
archive.init_metadata('marvel.com', 'marvel-comics-backend', '0.1.0',
'issue', {'from_date': before_dt})
after_dt = datetime_to_utc(datetime_utcnow())
archive_copy = Archive(archive_path)
# Both copies should have the same parameters
for arch in [archive, archive_copy]:
self.assertEqual(arch.origin, 'marvel.com')
self.assertEqual(arch.backend_name, 'marvel-comics-backend')
self.assertEqual(arch.backend_version, '0.1.0')
self.assertEqual(arch.category, 'issue')
self.assertGreaterEqual(arch.created_on, before_dt)
self.assertLessEqual(arch.created_on, after_dt)
:returns: a new `Archive` object
:raises ArchiveManagerError: when an error occurs creating the
new archive
"""
hashcode = uuid.uuid4().hex
archive_dir = os.path.join(self.dirpath, hashcode[0:2])
archive_name = hashcode[2:] + self.STORAGE_EXT
archive_path = os.path.join(archive_dir, archive_name)
if not os.path.exists(archive_dir):
os.makedirs(archive_dir)
try:
archive = Archive.create(archive_path)
except ArchiveError as e:
raise ArchiveManagerError(cause=str(e))
return archive