How to use the perceval.archive.Archive.create function in perceval

To help you get started, we’ve selected a few perceval examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github chaoss / grimoirelab-perceval / tests / test_archive.py View on Github external
def test_init(self):
        """Test whether an archive is propertly initialized"""

        archive_path = os.path.join(self.test_path, 'myarchive')
        _ = Archive.create(archive_path)

        archive = Archive(archive_path)
        self.assertEqual(archive.archive_path, archive_path)
        self.assertEqual(archive.created_on, None)
        self.assertEqual(archive.origin, None)
        self.assertEqual(archive.backend_name, None)
        self.assertEqual(archive.backend_version, None)
        self.assertEqual(archive.category, None)
        self.assertEqual(archive.backend_params, None)
github chaoss / grimoirelab-perceval / tests / base.py View on Github external
def setUp(self):
        self.test_path = tempfile.mkdtemp(prefix='perceval_')
        archive_path = os.path.join(self.test_path, 'myarchive')
        self.archive = Archive.create(archive_path)
github chaoss / grimoirelab-perceval / tests / test_archive.py View on Github external
("https://example.com/", {'q': 'issues', 'date': '2017-01-10'}, {}),
            ("https://example.com/", {'q': 'issues', 'date': '2018-01-01'}, {}),
            ("https://example.com/tasks", {'task_id': 10}, {'Accept': 'application/json'}),
        ]

        httpretty.register_uri(httpretty.GET,
                               "https://example.com/",
                               body='{"hey": "there"}',
                               status=200)
        httpretty.register_uri(httpretty.GET,
                               "https://example.com/tasks",
                               body='{"task": "my task"}',
                               status=200)

        archive_path = os.path.join(self.test_path, 'myarchive')
        archive = Archive.create(archive_path)

        # Store data in the archive
        responses = []

        for dr in data_requests:
            response = requests.get(dr[0], params=dr[1], headers=dr[2])
            archive.store(dr[0], dr[1], dr[2], response)
            responses.append(response)

        db = sqlite3.connect(archive.archive_path)
        cursor = db.cursor()
        cursor.execute("SELECT hashcode, data, uri, payload, headers FROM archive")
        data_stored = cursor.fetchall()
        cursor.close()

        self.assertEqual(len(data_stored), len(data_requests))
github chaoss / grimoirelab-perceval / tests / test_nntp.py View on Github external
def setUp(self):
        self.test_path = tempfile.mkdtemp(prefix='perceval_')
        archive_path = os.path.join(self.test_path, 'myarchive')
        self.archive = Archive.create(archive_path)
github chaoss / grimoirelab-perceval / tests / test_backend.py View on Github external
def test_init_archive(self):
        """Test whether the archive is properly initialized when executing the fetch method"""

        archive_path = os.path.join(self.test_path, 'myarchive')
        archive = Archive.create(archive_path)
        b = MockedBackend('test', archive=archive)

        _ = [item for item in b.fetch()]

        self.assertEqual(b.archive.backend_name, b.__class__.__name__)
        self.assertEqual(b.archive.backend_version, b.version)
        self.assertEqual(b.archive.origin, b.origin)
        self.assertEqual(b.archive.category, MockedBackend.DEFAULT_CATEGORY)
github chaoss / grimoirelab-perceval / tests / test_backend.py View on Github external
def test_error_archive_and_filter_classified(self):
        """Check if an error is raised when archive and classified fields filtering are both active"""

        archive_path = os.path.join(self.test_path, 'myarchive')
        archive = Archive.create(archive_path)

        backend = ClassifiedFieldsBackend('http://example.com/', archive=archive)

        msg_error = "classified fields filtering is not compatible with archiving items"

        with self.assertRaisesRegex(BackendError, msg_error):
            _ = [item for item in backend.fetch(category=ClassifiedFieldsBackend.DEFAULT_CATEGORY,
                                                filter_classified=True)]
github chaoss / grimoirelab-perceval / tests / test_client.py View on Github external
def test_fetch_from_archive_exception(self):
        """Test whether serialized exceptions are thrown"""

        archive_path = os.path.join(self.test_path, 'myarchive')
        archive = Archive.create(archive_path)

        httpretty.register_uri(httpretty.GET,
                               CLIENT_SPIDERMAN_URL,
                               body="bad",
                               status=404)

        # populate the archive and check that an exception is thown when fetching data from the API
        client = MockedClient(CLIENT_API_URL, sleep_time=0.1, max_retries=1, archive=archive)
        with self.assertRaises(requests.exceptions.HTTPError):
            _ = client.fetch(CLIENT_SPIDERMAN_URL)

        # retrieve data from the archive and check that an exception is
        # thown as happened when fetching data from the API)
        client = MockedClient(CLIENT_API_URL, sleep_time=0.1, max_retries=1, archive=archive, from_archive=True)
        with self.assertRaises(requests.exceptions.HTTPError):
            _ = client.fetch(CLIENT_SPIDERMAN_URL)
github chaoss / grimoirelab-perceval / tests / test_archive.py View on Github external
def test_init_metadata(self):
        """Test whether metadata information is properly initialized"""

        archive_path = os.path.join(self.test_path, 'myarchive')
        archive = Archive.create(archive_path)

        before_dt = datetime_to_utc(datetime_utcnow())
        archive.init_metadata('marvel.com', 'marvel-comics-backend', '0.1.0',
                              'issue', {'from_date': before_dt})
        after_dt = datetime_to_utc(datetime_utcnow())

        archive_copy = Archive(archive_path)

        # Both copies should have the same parameters
        for arch in [archive, archive_copy]:
            self.assertEqual(arch.origin, 'marvel.com')
            self.assertEqual(arch.backend_name, 'marvel-comics-backend')
            self.assertEqual(arch.backend_version, '0.1.0')
            self.assertEqual(arch.category, 'issue')
            self.assertGreaterEqual(arch.created_on, before_dt)
            self.assertLessEqual(arch.created_on, after_dt)
github chaoss / grimoirelab-perceval / perceval / archive.py View on Github external
:returns: a new `Archive` object

        :raises ArchiveManagerError: when an error occurs creating the
            new archive
        """
        hashcode = uuid.uuid4().hex
        archive_dir = os.path.join(self.dirpath, hashcode[0:2])
        archive_name = hashcode[2:] + self.STORAGE_EXT
        archive_path = os.path.join(archive_dir, archive_name)

        if not os.path.exists(archive_dir):
            os.makedirs(archive_dir)

        try:
            archive = Archive.create(archive_path)
        except ArchiveError as e:
            raise ArchiveManagerError(cause=str(e))

        return archive