Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:param connection_id:
ID of the current connection.
:param connProps:
Dictionary with the properties that should be changed.
:returns:
A ``common_pb2.ConnectionProperties`` object.
"""
if connProps:
props = connProps.copy()
else:
props = {}
request = requests_pb2.ConnectionSyncRequest()
request.connection_id = connection_id
request.conn_props.has_auto_commit = True
request.conn_props.has_read_only = True
if 'autoCommit' in props:
request.conn_props.auto_commit = props.pop('autoCommit')
if 'readOnly' in props:
request.conn_props.read_only = props.pop('readOnly')
if 'transactionIsolation' in props:
request.conn_props.transaction_isolation = props.pop('transactionIsolation', None)
if 'catalog' in props:
request.conn_props.catalog = props.pop('catalog', None)
if 'schema' in props:
request.conn_props.schema = props.pop('schema', None)
if props:
logger.warning("Unhandled connection property:" + props)
def connection_sync(self, connection_id, connProps=None):
"""Synchronizes connection properties with the server.
:param connection_id:
ID of the current connection.
:param connProps:
Dictionary with the properties that should be changed.
:returns:
A ``common_pb2.ConnectionProperties`` object.
"""
if connProps is None:
connProps = {}
request = requests_pb2.ConnectionSyncRequest()
request.connection_id = connection_id
request.conn_props.auto_commit = connProps.get('autoCommit', False)
request.conn_props.has_auto_commit = True
request.conn_props.read_only = connProps.get('readOnly', False)
request.conn_props.has_read_only = True
request.conn_props.transaction_isolation = connProps.get('transactionIsolation', 0)
request.conn_props.catalog = connProps.get('catalog', '')
request.conn_props.schema = connProps.get('schema', '')
response_data = self._apply(request)
response = responses_pb2.ConnectionSyncResponse()
response.ParseFromString(response_data)
return response.conn_props
def connection_sync(self, connection_id, connProps=None):
"""Synchronizes connection properties with the server.
:param connection_id:
ID of the current connection.
:param connProps:
Dictionary with the properties that should be changed.
:returns:
A ``common_pb2.ConnectionProperties`` object.
"""
if connProps is None:
connProps = {}
request = requests_pb2.ConnectionSyncRequest()
request.connection_id = connection_id
request.conn_props.auto_commit = connProps.get('autoCommit', False)
request.conn_props.has_auto_commit = True
request.conn_props.read_only = connProps.get('readOnly', False)
request.conn_props.has_read_only = True
request.conn_props.transaction_isolation = connProps.get('transactionIsolation', 0)
request.conn_props.catalog = connProps.get('catalog', '')
request.conn_props.schema = connProps.get('schema', '')
response_data = self._apply(request)
response = responses_pb2.ConnectionSyncResponse()
response.ParseFromString(response_data)
return response.conn_props