Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def connect(self):
""" Attempt to connect to exasol."""
schema = self.config.schema if ("schema" in self.config) else None
params = {"schema": schema, "compression": True}
params.update(self.config.params)
try:
self.connection = pyexasol.connect(
dsn=self.dsn,
user=self.config.user,
password=self.config.password,
fetch_dict=True,
**params,
)
except pyexasol.exceptions.ExaError as err:
log.exception(err)
return self.connection
password=config.password,
fetch_dict=True,
fetch_mapper=pyexasol.exasol_mapper,
**params,
)
except pyexasol.exceptions.ExaConnectionDsnError as exc:
raise ConnectionEstablishError(config.name, reason="Bad dsn", dsn=config.dsn) from exc
except pyexasol.exceptions.ExaAuthError as exc:
raise ConnectionEstablishError(
config.name, reason="Authentication failed", dsn=config.dsn, user=config.user,
) from exc
except pyexasol.exceptions.ExaConnectionFailedError as exc:
raise ConnectionEstablishError(
config.name, reason="Connection refused", dsn=config.dsn,
) from exc
except pyexasol.exceptions.ExaError as exc:
raise ConnectionEstablishError(config.name, dsn=config.dsn) from exc
if "schema" in config:
params["schema"] = config.schema
params.update(config.params)
try:
return pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
fetch_dict=True,
fetch_mapper=pyexasol.exasol_mapper,
**params,
)
except pyexasol.exceptions.ExaConnectionDsnError as exc:
raise ConnectionEstablishError(config.name, reason="Bad dsn", dsn=config.dsn) from exc
except pyexasol.exceptions.ExaAuthError as exc:
raise ConnectionEstablishError(
config.name, reason="Authentication failed", dsn=config.dsn, user=config.user,
) from exc
except pyexasol.exceptions.ExaConnectionFailedError as exc:
raise ConnectionEstablishError(
config.name, reason="Connection refused", dsn=config.dsn,
) from exc
except pyexasol.exceptions.ExaError as exc:
raise ConnectionEstablishError(config.name, dsn=config.dsn) from exc
try:
return pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
fetch_dict=True,
fetch_mapper=pyexasol.exasol_mapper,
**params,
)
except pyexasol.exceptions.ExaConnectionDsnError as exc:
raise ConnectionEstablishError(config.name, reason="Bad dsn", dsn=config.dsn) from exc
except pyexasol.exceptions.ExaAuthError as exc:
raise ConnectionEstablishError(
config.name, reason="Authentication failed", dsn=config.dsn, user=config.user,
) from exc
except pyexasol.exceptions.ExaConnectionFailedError as exc:
raise ConnectionEstablishError(
config.name, reason="Connection refused", dsn=config.dsn,
) from exc
except pyexasol.exceptions.ExaError as exc:
raise ConnectionEstablishError(config.name, dsn=config.dsn) from exc
"""Establish connection with exasol"""
params = {"compression": True}
if "schema" in config:
params["schema"] = config.schema
params.update(config.params)
try:
return pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
fetch_dict=True,
fetch_mapper=pyexasol.exasol_mapper,
**params,
)
except pyexasol.exceptions.ExaConnectionDsnError as exc:
raise ConnectionEstablishError(config.name, reason="Bad dsn", dsn=config.dsn) from exc
except pyexasol.exceptions.ExaAuthError as exc:
raise ConnectionEstablishError(
config.name, reason="Authentication failed", dsn=config.dsn, user=config.user,
) from exc
except pyexasol.exceptions.ExaConnectionFailedError as exc:
raise ConnectionEstablishError(
config.name, reason="Connection refused", dsn=config.dsn,
) from exc
except pyexasol.exceptions.ExaError as exc:
raise ConnectionEstablishError(config.name, dsn=config.dsn) from exc