Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
for part in reversed(dsn.split(',')):
if len(part) == 0:
continue
m = dsn_re.search(part)
if not m:
raise ExaConnectionDsnError(self, f'Could not parse connection string part [{part}]')
# Optional port was specified
if m.group('port'):
current_port = int(m.group('port'))
# If current port is still empty, use default port
if current_port is None:
current_port = constant.DEFAULT_PORT
# Hostname or IP range was specified, expand it
if m.group('range_start'):
if int(m.group('range_start')) > int(m.group('range_end')):
raise ExaConnectionDsnError(self,
f'Connection string part [{part}] contains an invalid range, '
f'lower bound is higher than upper bound')
zfill_width = len(m.group('range_start'))
for i in range(int(m.group('range_start')), int(m.group('range_end')) + 1):
host = f"{m.group('host_prefix')}{str(i).zfill(zfill_width)}{m.group('host_suffix')}"
result.extend(self._resolve_host(host, current_port))
# Just a single hostname or single IP address
else:
result.extend(self._resolve_host(m.group('host_prefix'), current_port))
def _get_log_filename(self):
return f'{self.name}_{datetime.datetime.now().strftime(constant.LOGGER_FILENAME_TIMESTAMP_FORMAT)}.log'
def debug_json(self, message, data):
if self.isEnabledFor(logging.DEBUG):
json_str = self.connection._json_encode(data, indent=4)
if len(json_str) > constant.LOGGER_MAX_JSON_LENGTH:
json_str = f'{json_str[0:constant.LOGGER_MAX_JSON_LENGTH]}\n------ TRUNCATED TOO LONG MESSAGE ------\n'
self.debug(f'[{message}]\n{json_str}')
def __init__(self
, dsn=None
, user=None
, password=None
, schema=''
, autocommit=constant.DEFAULT_AUTOCOMMIT
, snapshot_transactions=False
, socket_timeout=constant.DEFAULT_SOCKET_TIMEOUT
, query_timeout=constant.DEFAULT_QUERY_TIMEOUT
, compression=False
, encryption=False
, fetch_dict=False
, fetch_mapper=None
, fetch_size_bytes=constant.DEFAULT_FETCH_SIZE_BYTES
, lower_ident=False
, quote_ident=False
, json_lib='json'
, verbose_error=True
, debug=False
, debug_logdir=None
, udf_output_bind_address=None
, udf_output_connect_address=None
, udf_output_dir=None
, http_proxy=None
def fetchmany(self, size=constant.DEFAULT_FETCHMANY_SIZE):
return [row for row in itertools.islice(self, size)]
def _init_logger(self):
self.logger = self.cls_logger(self, constant.DRIVER_NAME)
self.logger.setLevel('DEBUG' if self.options['debug'] else 'WARNING')
self.logger.add_default_handler()
def __init__(self
, dsn=None
, user=None
, password=None
, schema=''
, autocommit=constant.DEFAULT_AUTOCOMMIT
, snapshot_transactions=False
, socket_timeout=constant.DEFAULT_SOCKET_TIMEOUT
, query_timeout=constant.DEFAULT_QUERY_TIMEOUT
, compression=False
, encryption=False
, fetch_dict=False
, fetch_mapper=None
, fetch_size_bytes=constant.DEFAULT_FETCH_SIZE_BYTES
, lower_ident=False
, quote_ident=False
, json_lib='json'
, verbose_error=True
, debug=False
, debug_logdir=None
, udf_output_bind_address=None
, udf_output_connect_address=None