Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# PEP 249 module globals
apilevel = '2.0'
threadsafety = 2 # Threads may share the module and connections.
paramstyle = 'pyformat' # Python extended format codes, e.g. ...WHERE name=%(name)s
_logger = logging.getLogger(__name__)
class PrestoParamEscaper(common.ParamEscaper):
def escape_datetime(self, item, format):
_type = "timestamp" if isinstance(item, datetime.datetime) else "date"
formatted = super(PrestoParamEscaper, self).escape_datetime(item, format, 3)
return "{} {}".format(_type, formatted)
_escaper = PrestoParamEscaper()
def connect(*args, **kwargs):
"""Constructor for creating a connection to the database. See class :py:class:`Connection` for
arguments.
:returns: a :py:class:`Connection` object.
"""
return Connection(*args, **kwargs)
class Connection(object):
"""Presto does not have a notion of a persistent connection.
Thus, these objects are small stateless factories for cursors, which do all the real work.
"""
def escape_datetime(self, item, format):
_type = "timestamp" if isinstance(item, datetime.datetime) else "date"
formatted = super(PrestoParamEscaper, self).escape_datetime(item, format, 3)
return "{} {}".format(_type, formatted)