Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _test_all_graph_types_without_schema(self, schema, graphson):
"""
Exhaustively goes through each type that is supported by dse_graph.
creates a vertex for each type using a dse-tinkerpop traversal,
It then attempts to fetch it from the server and compares it to what was inserted
Do not prime the graph with the correct schema first
@since 1.0.0
@jira_ticket PYTHON-641
@expected_result inserted objects are equivalent to those retrieved
@test_category dse graph
"""
if schema is not ClassicGraphSchema:
raise unittest.SkipTest('schema-less is only for classic graphs')
self._write_and_read_data_types(schema, graphson, use_schema=False)
def setUp(self):
if os.name == "nt":
raise unittest.SkipTest("IPv6 is currently not supported under Windows")
if LibevConnection == -1:
raise unittest.SkipTest("Can't test libev with monkey patching")
elif LibevConnection is None:
raise unittest.SkipTest("Libev does not appear to be installed properly")
def setUpBuilder(self):
raise unittest.SkipTest("no builder")
def _test_vertex_property_properties(self, schema, graphson):
"""
Test verifying vertex property properties
@since 1.0.0
@jira_ticket PYTHON-487
@test_category dse graph
"""
if schema is not ClassicGraphSchema:
raise unittest.SkipTest('skipped because rich properties are only supported with classic graphs')
self.execute_graph("schema.propertyKey('k0').Text().ifNotExists().create();", graphson)
self.execute_graph("schema.propertyKey('k1').Text().ifNotExists().create();", graphson)
self.execute_graph("schema.propertyKey('key').Text().properties('k0', 'k1').ifNotExists().create();", graphson)
self.execute_graph("schema.vertexLabel('MLP').properties('key').ifNotExists().create();", graphson)
v = self.execute_graph('''v = graph.addVertex('MLP')
v.property('key', 'value', 'k0', 'v0', 'k1', 'v1')
v''', graphson)[0]
self.assertEqual(len(v.properties), 1)
self.assertEqual(len(v.properties['key']), 1)
p = v.properties['key'][0]
self.assertEqual(p.label, 'key')
self.assertEqual(p.value, 'value')
self.assertEqual(p.properties, {'k0': 'v0', 'k1': 'v1'})
def __test_udt(self, schema, graphson, address_class, address_with_tags_class,
complex_address_class, complex_address_with_owners_class):
if schema is not CoreGraphSchema or DSE_VERSION < Version('6.8'):
raise unittest.SkipTest("Graph UDT is only supported with DSE 6.8+ and Core graphs.")
ep = self.get_execution_profile(graphson)
Address = address_class
AddressWithTags = address_with_tags_class
ComplexAddress = complex_address_class
ComplexAddressWithOwners = complex_address_with_owners_class
# setup udt
self.session.execute_graph("""
schema.type('address').property('address', Text).property('city', Text).property('state', Text).create();
schema.type('addressTags').property('address', Text).property('city', Text).property('state', Text).
property('tags', setOf(Text)).create();
schema.type('complexAddress').property('address', Text).property('address_tags', frozen(typeOf('addressTags'))).
property('city', Text).property('state', Text).property('props', mapOf(Text, Int)).create();
schema.type('complexAddressWithOwners').property('address', Text).
def setUp(self):
if is_monkey_patched():
raise unittest.SkipTest("Can't test libev with monkey patching")
if LibevConnection is None:
raise unittest.SkipTest(
'libev does not appear to be installed properly')
ConnectionTests.setUp(self)
def setUp(self):
self.oerp = oerplib.OERP(
ARGS.server, protocol=ARGS.protocol, port=ARGS.port,
version=ARGS.version)
if v(self.oerp.version) < v('6.1'):
raise unittest.SkipTest(
"The targetted OpenERP server does not support the "
"'execute_kw()' method.")
self.user = self.oerp.login(ARGS.user, ARGS.passwd, ARGS.database)
def test_replicas(self):
"""
Ensure cluster.metadata.get_replicas return correctly when not attached to keyspace
"""
if murmur3 is None:
raise unittest.SkipTest('the murmur3 extension is not available')
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
self.assertEqual(cluster.metadata.get_replicas('test3rf', 'key'), [])
cluster.connect('test3rf')
self.assertNotEqual(list(cluster.metadata.get_replicas('test3rf', six.b('key'))), [])
host = list(cluster.metadata.get_replicas('test3rf', six.b('key')))[0]
self.assertEqual(host.datacenter, 'dc1')
self.assertEqual(host.rack, 'r1')
cluster.shutdown()
import threading
import tempfile
try:
import unittest2 as unittest
except ImportError:
import unittest
try:
# Try to import modules
from multiprocessing import Process, Queue
# IronPython fails when creating a queue
Queue()
except ImportError:
# Some interpreters don't have support for multiprocessing
raise unittest.SkipTest("Interpreter doesn't support multiprocessing")
try:
import queue
except ImportError:
import Queue as queue
# Pelix
from pelix.framework import create_framework
from pelix.ipopo.constants import use_ipopo
import pelix.http
import pelix.remote
# Local utilities
from tests.http.test_basic_ssl import install_ipopo, make_certs, \
instantiate_server
def skip_not_available(self):
if self.SKIP_IF_NOT_AVAILABLE and not self.CLS(**self.CONF).is_available():
# Skip networked engines if not available to prevent spurious failiures
raise unittest.SkipTest() # pragma: no cover