Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_country_list_is_represented_with_classname_and_ids(self):
# Given
countries = CatalogList([test_country1, test_country2])
# When
countries_repr = repr(countries)
# Then
assert countries_repr == "[, ]"\
.format(id1=db_country1['id'], id2=db_country2['id'])
def test_get_by_slug_list(self, mocked_repo):
# Given
mocked_repo.return_value = [db_geography1, db_geography2]
repo = GeographyRepository()
# When
geographies = repo.get_by_id_list([db_geography1['slug'], db_geography2['slug']])
# Then
mocked_repo.assert_called_once_with({'slug': [db_geography1['slug'], db_geography2['slug']]})
assert isinstance(geographies, CatalogList)
assert geographies == test_geographies
def test_get_all(self, mocked_repo):
# Given
mocked_repo.return_value = [db_variable_group1, db_variable_group2]
repo = VariableGroupRepository()
# When
variables_groups = repo.get_all()
# Then
mocked_repo.assert_called_once_with(None)
assert isinstance(variables_groups, CatalogList)
assert variables_groups == test_variables_groups
def test_missing_fields_are_mapped_as_None(self, mocked_repo):
# Given
mocked_repo.return_value = [{'id': 'geography1'}]
repo = GeographyRepository()
expected_geographies = CatalogList([Geography({
'id': 'geography1',
'slug': None,
'name': None,
'description': None,
'provider_id': None,
'provider_name': None,
'country_id': None,
'lang': None,
'geom_coverage': None,
'geom_type': None,
'update_frequency': None,
'version': None,
'is_public_data': None,
'summary_json': None,
'available_in': None
})])
def test_country_list_is_printed_with_classname_and_ids(self):
# Given
countries = CatalogList([test_country1, test_country2])
# When
countries_str = str(countries)
# Then
assert countries_str == "[, ]" \
.format(id1=db_country1['id'], id2=db_country2['id'])
def test_get_all_datasets_credentials(self, mocked_repo):
# Given
mocked_repo.return_value = test_datasets
credentials = Credentials('fake_user', '1234')
# When
datasets = Dataset.get_all(credentials=credentials)
# Then
mocked_repo.assert_called_once_with(None, credentials)
assert isinstance(datasets, list)
assert isinstance(datasets, CatalogList)
assert datasets == test_datasets
def test_get_all(self, mocked_repo):
# Given
mocked_repo.return_value = [db_provider1, db_provider2]
repo = ProviderRepository()
# When
providers = repo.get_all()
# Then
mocked_repo.assert_called_once_with(None)
assert isinstance(providers, CatalogList)
assert providers == test_providers
def test_get_datasets_by_geography(self, mocked_repo):
# Given
mocked_repo.return_value = test_datasets
# When
datasets = test_geography1.datasets
# Then
mocked_repo.assert_called_once_with({'geography_id': test_geography1.id})
assert isinstance(datasets, list)
assert isinstance(datasets, CatalogList)
assert datasets == test_datasets
def test_get_by_slug_list(self, mocked_repo):
# Given
mocked_repo.return_value = [db_variable1, db_variable2]
repo = VariableRepository()
# When
variables = repo.get_by_id_list([db_variable1['slug'], db_variable2['slug']])
# Then
mocked_repo.assert_called_once_with({'slug': [db_variable1['slug'], db_variable2['slug']]})
assert isinstance(variables, CatalogList)
assert variables == test_variables
def _get_filtered_entities(self, filters=None):
cleaned_filters = self._get_filters(filters)
rows = self._get_rows(cleaned_filters)
if len(rows) == 0:
return None
normalized_data = [self._get_entity_class()(self._map_row(row)) for row in rows]
return CatalogList(normalized_data)