Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# access the layer delete page but redirected to login
response = self.client.get(reverse('layer_remove', args=(layer.alternate,)))
self.assertTrue(response.status_code in (302, 403))
# 4. change_resourcebase_metadata
# 4.1 has not change_resourcebase_metadata: verify that anonymous user
# cannot access the layer metadata page but redirected to login
response = self.client.get(reverse('layer_metadata', args=(layer.alternate,)))
self.assertTrue(response.status_code in (302, 403))
# 5 N\A? 6 is an integration test...
# 7. change_layer_style
# 7.1 has not change_layer_style: verify that anonymous user cannot access
# the layer style page but redirected to login
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
# Only for geoserver backend
response = self.client.get(reverse('layer_style_manage', args=(layer.alternate,)))
self.assertTrue(response.status_code in (302, 403))
"kml", "kmz", "openlayers", "rss", "text/html; subtype=openlayers", "utfgrid"],
"attribution": {
"title": attribution
},
"infoFormats": ["text/plain", "application/vnd.ogc.gml", "text/xml", "application/vnd.ogc.gml/3.1.1",
"text/xml; subtype=gml/3.1.1", "text/html", "application/json"],
"styles": [sld_definition(s) for s in layer.styles.all()],
"prefix": layer.alternate.split(":")[0] if ":" in layer.alternate else "",
"keywords": [k.name for k in layer.keywords.all()] if layer.keywords else [],
"llbbox": decimal_encode(bbox) if layer.srid == 'EPSG:4326' else
decimal_encode(bbox_to_projection(
[float(coord) for coord in layer_bbox] + [layer.srid, ], target_srid=4326)[:4])
}
all_times = None
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
from geonode.geoserver.views import get_capabilities
workspace, layername = layer.alternate.split(
":") if ":" in layer.alternate else (None, layer.alternate)
# WARNING Please make sure to have enabled DJANGO CACHE as per
# https://docs.djangoproject.com/en/2.0/topics/cache/#filesystem-caching
wms_capabilities_resp = get_capabilities(
request, layer.id, tolerant=True)
if wms_capabilities_resp.status_code >= 200 and wms_capabilities_resp.status_code < 400:
wms_capabilities = wms_capabilities_resp.getvalue()
if wms_capabilities:
import xml.etree.ElementTree as ET
namespaces = {'wms': 'http://www.opengis.net/wms',
'xlink': 'http://www.w3.org/1999/xlink',
'xsi': 'http://www.w3.org/2001/XMLSchema-instance'}
e = ET.fromstring(wms_capabilities)
try:
tempdir, base_file = form.write_files()
if layer.is_vector() and is_raster(base_file):
out['success'] = False
out['errors'] = _(
"You are attempting to replace a vector layer with a raster.")
elif (not layer.is_vector()) and is_vector(base_file):
out['success'] = False
out['errors'] = _(
"You are attempting to replace a raster layer with a vector.")
else:
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
# delete geoserver's store before upload
# cascading_delete(gs_catalog, layer.alternate)
out['ogc_backend'] = geoserver.BACKEND_PACKAGE
elif check_ogc_backend(qgis_server.BACKEND_PACKAGE):
try:
qgis_layer = QGISServerLayer.objects.get(
layer=layer)
qgis_layer.delete()
except QGISServerLayer.DoesNotExist:
pass
out['ogc_backend'] = qgis_server.BACKEND_PACKAGE
saved_layer = file_upload(
base_file,
layer=layer,
title=layer.title,
abstract=layer.abstract,
is_approved=layer.is_approved,
is_published=layer.is_published,
name=layer.name,
'.shp', tempdir=tempdir)
else:
for field in self.spatial_files:
f = self.cleaned_data[field]
if f is not None:
path = os.path.join(tempdir, f.name)
with open(path, 'wb') as writable:
for c in f.chunks():
writable.write(c)
absolute_base_file = os.path.join(tempdir,
self.cleaned_data["base_file"].name)
return tempdir, absolute_base_file
class NewLayerUploadForm(LayerUploadForm):
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
sld_file = forms.FileField(required=False)
if check_ogc_backend(qgis_server.BACKEND_PACKAGE):
qml_file = forms.FileField(required=False)
xml_file = forms.FileField(required=False)
abstract = forms.CharField(required=False)
layer_title = forms.CharField(required=False)
permissions = JSONField()
charset = forms.CharField(required=False)
metadata_uploaded_preserve = forms.BooleanField(required=False)
spatial_files = [
"base_file",
"dbf_file",
"shx_file",
"prj_file",
def filter_by_resource_ids(self, object_list, permitted_ids):
"""Filter Style queryset by permitted resource ids."""
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
return object_list.filter(layer_styles__id__in=permitted_ids)
elif check_ogc_backend(qgis_server.BACKEND_PACKAGE):
return object_list.filter(
layer_styles__layer__id__in=permitted_ids)
"""Override build_bundle method to add additional info."""
if obj is None and self._meta.object_class:
obj = self._meta.object_class()
elif obj:
obj = self.populate_object(obj)
return Bundle(
obj=obj,
data=data,
request=request,
**kwargs)
if check_ogc_backend(qgis_server.BACKEND_PACKAGE):
class StyleResource(QGISStyleResource):
"""Wrapper for Generic Style Resource"""
pass
elif check_ogc_backend(geoserver.BACKEND_PACKAGE):
class StyleResource(GeoserverStyleResource):
"""Wrapper for Generic Style Resource"""
pass
def _get_resource_counts(
resourcebase_filter_kwargs
):
"""Return a dict with counts of resources of various types
The ``resourcebase_filter_kwargs`` argument should be a dict with a suitable
queryset filter that can be applied to select only the relevant
views.layer_metadata_upload, name='layer_metadata_upload'),
url(r'^(?P[^/]*)/style_upload$',
views.layer_sld_upload, name='layer_sld_upload'),
url(r'^(?P[^/]*)/style_edit$',
views.layer_sld_edit, name='layer_sld_edit'),
url(r'^(?P[^/]*)/feature_catalogue$',
views.layer_feature_catalogue, name='layer_feature_catalogue'),
url(r'^metadata/batch/(?P[^/]*)/$',
views.layer_batch_metadata, name='layer_batch_metadata'),
url(r'^permissions/batch/(?P[^/]*)/$',
views.layer_batch_permissions, name='layer_batch_permissions'),
]
# -- Deprecated url routes for Geoserver authentication -- remove after GeoNode 2.1
# -- Use /gs/acls, gs/resolve_user/, gs/download instead
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
from geonode.geoserver.views import layer_acls, resolve_user
urlpatterns = [ # 'geonode.geoserver.views',
url(r'^acls/?$', layer_acls, name='layer_acls_dep'),
url(r'^resolve_user/?$', resolve_user,
name='layer_resolve_user_dep'),
] + urlpatterns
from geonode.utils import default_map_config, forward_mercator, \
llbbox_to_mercator, check_ogc_backend
from geonode import geoserver, qgis_server
try:
from urllib.parse import urlsplit
except ImportError:
# Python 2 compatibility
from urlparse import urlsplit
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
# FIXME: The post service providing the map_status object
# should be moved to geonode.geoserver.
from geonode.geoserver.helpers import ogc_server_settings
elif check_ogc_backend(qgis_server.BACKEND_PACKAGE):
from geonode.qgis_server.helpers import ogc_server_settings
from geonode.qgis_server.tasks.update import create_qgis_server_thumbnail
logger = logging.getLogger("geonode.maps.qgis_server_views")
class MapCreateView(CreateView):
model = Map
fields = '__all__'
template_name = 'leaflet/maps/map_view.html'
context_object_name = 'map'
def get_context_data(self, **kwargs):
request = self.request
if request and 'access_token' in request.session:
access_token = request.session['access_token']