Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_assign_remove_permissions(self):
# Assing
layer = Layer.objects.all().first()
perm_spec = layer.get_all_level_info()
self.assertNotIn(self.user, perm_spec["users"])
utils.set_layers_permissions("write", None, [self.username], None, None)
layer_after = Layer.objects.get(name=layer.name)
perm_spec = layer_after.get_all_level_info()
for perm in utils.WRITE_PERMISSIONS:
self.assertIn(perm, perm_spec["users"][self.user])
# Remove
utils.set_layers_permissions("write", None, [self.username], None, True)
layer_after = Layer.objects.get(name=layer.name)
perm_spec = layer_after.get_all_level_info()
for perm in utils.WRITE_PERMISSIONS:
self.assertNotIn(perm, perm_spec["users"][self.user])
def test_layer_detail_page_slave_site(self):
"""
Test that the detail page is not found of the resource is on another site
"""
# test that the CA layer detail page, that does not belong to the SlaveSite, is not found
self.client.login(username=self.user, password=self.passwd)
response = self.client.get(reverse('layer_detail', args=[Layer.objects.all()[0].alternate]))
self.assertEqual(response.status_code, 404)
def test_sync_resources_with_guardian_delay_true(self):
with self.settings(DELAYED_SECURITY_SIGNALS=True):
# Set geofence (and so the dirty state)
set_geofence_all(self._l)
# Retrieve the same layer
dirty_layer = Layer.objects.get(pk=self._l.id)
# Check dirty state (True)
self.assertTrue(dirty_layer.dirty_state)
# Call sync resources
sync_resources_with_guardian()
clean_layer = Layer.objects.get(pk=self._l.id)
# Check dirty state
self.assertFalse(clean_layer.dirty_state)
def test_download_url(self):
layer = Layer.objects.all().first()
self.client.login(username='admin', password='admin')
# ... all should be good
response = self.client.get(reverse('download', args=(layer.id,)))
# Espected 404 since there are no files available for this layer
self.assertEqual(response.status_code, 404)
data = response.content
self.assertTrue(
"No files have been found for this resource. Please, contact a system administrator." in data)
treeqs = treeqs | HierarchicalKeyword.get_tree(kw)
except BaseException:
# Ignore keywords not actually used?
pass
documents = documents.filter(Q(keywords__in=treeqs))
if not settings.SKIP_PERMS_FILTER:
documents = documents.filter(id__in=authorized)
counts = documents.values('doc_type').annotate(count=Count('doc_type'))
facets = dict([(count['doc_type'], count['count']) for count in counts])
return facets
else:
layers = Layer.objects.filter(title__icontains=title_filter)
if category_filter:
layers = layers.filter(category__identifier__in=category_filter)
if regions_filter:
layers = layers.filter(regions__name__in=regions_filter)
if owner_filter:
layers = layers.filter(owner__username__in=owner_filter)
if date_gte_filter:
layers = layers.filter(date__gte=date_gte_filter)
if date_lte_filter:
layers = layers.filter(date__lte=date_lte_filter)
if date_range_filter:
layers = layers.filter(date__range=date_range_filter.split(','))
layers = get_visible_resources(
layers,
request.user if request else None,
def layer_count(self):
return Layer.objects.filter(owner = self.o.user).count()
def handle(self, **options):
if options['layername']:
layers = Layer.objects.filter(name__icontains=options['layername'])
else:
layers = Layer.objects.all()
layers_count = layers.count()
count = 0
for layer in layers:
count += 1
try:
print 'Synchronizing permissions for layer %s/%s: %s' % (count, layers_count, layer.alternate)
perm_spec = json.loads(_perms_info_json(layer))
layer.set_default_permissions()
layer.set_permissions(perm_spec)
except:
print("Unexpected error:", sys.exc_info()[0])
print 'perm_spec is %s' % perm_spec
def print_map(request):
from .proxy.views import proxy
from .layers.models import Layer
permissions = {}
params = json.loads(request.body)
for layer in params['layers']:
if ogc_server_settings.LOCATION in layer['baseURL']:
for layer_name in layer['layers']:
layer_obj = Layer.objects.get(alternate=layer_name)
permissions[layer_obj] = _perms_info_json(layer_obj)
layer_obj.set_default_permissions()
try:
resp = proxy(request)
except Exception:
return HttpResponse('There was an error connecting to the printing server')
finally:
for layer_obj in permissions.keys():
layer_obj.set_permissions(json.loads(permissions[layer_obj]))
return resp
def workspace(request):
owner = request.user
apps = AppInstance.objects.filter(owner=owner)
created_apps = AppInstance.objects.all()
layers = Layer.objects.filter(owner=owner)
maps = Map.objects.filter(owner=owner)
maps_count = Map.objects.all().count()
layers_count = Layer.objects.all().count()
documents = Document.objects.filter(owner=owner)
documents_count = Document.objects.all().count()
groups = owner.group_list_all()
groups_count = GroupProfile.objects.all().count()
return render(
request,
template_name='workspace/workspace.html',
context={
'my_apps': apps,
'my_layers': layers,
'created_apps': created_apps,
'my_maps': maps,
'maps_count': maps_count,
'layers_count': layers_count,
"groups": groups,
"groups_count": groups_count,
maplayer = GXPLayer(
name=layer.alternate,
ows_url=layer.ows_url,
layer_params=json.dumps(config),
source_params=json.dumps(source_params)
)
else:
maplayer = GXPLayer(
name=layer.alternate,
ows_url=layer.ows_url,
layer_params=json.dumps(config))
# Update count for popularity ranking,
# but do not includes admins or resource owners
if request.user != layer.owner and not request.user.is_superuser:
Layer.objects.filter(
id=layer.id).update(popular_count=F('popular_count') + 1)
# center/zoom don't matter; the viewer will center on the layer bounds
map_obj = GXPMap(
projection=getattr(
settings,
'DEFAULT_MAP_CRS',
'EPSG:3857'))
NON_WMS_BASE_LAYERS = [
la for la in default_map_config(request)[1] if la.ows_url is None]
if request.method == "POST":
if layer.metadata_uploaded_preserve: # layer metadata cannot be edited
out = {
'success': False,