Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
COPY_IN_RESPONSE: self.handle_COPY_IN_RESPONSE,
COPY_OUT_RESPONSE: self.handle_COPY_OUT_RESPONSE}
# Int32 - Message length, including self.
# Int32(196608) - Protocol version number. Version 3.0.
# Any number of key/value pairs, terminated by a zero byte:
# String - A parameter name (user, database, or options)
# String - Parameter value
protocol = 196608
val = bytearray(i_pack(protocol) + b("user\x00"))
val.extend(user.encode("ascii") + NULL_BYTE)
if database is not None:
val.extend(
b("database\x00") + database.encode("ascii") + NULL_BYTE)
val.append(0)
self._write(i_pack(len(val) + 4))
self._write(val)
self._flush()
self._cursor = self.cursor()
try:
try:
self._lock.acquire()
self.handle_messages(None)
finally:
self._lock.release()
except:
self.close()
raise exc_info()[1]
self.in_transaction = False
self.notifies = []
# Int16 - Number of parameter values.
# For each parameter value:
# Int32 - The length of the parameter value, in bytes, not
# including this length. -1 indicates a NULL parameter
# value, in which no value bytes follow.
# Byte[n] - Value of the parameter.
# Int16 - The number of result-column format codes.
# For each result-column format code:
# Int16 - The format code.
retval = bytearray(cursor.portal_name_bin + ps['bind_1'])
for value, send_func in zip(args, ps['param_funcs']):
if value is None:
val = NULL
else:
val = send_func(value)
retval.extend(i_pack(len(val)))
retval.extend(val)
retval.extend(ps['bind_2'])
self._send_message(BIND, retval)
self.send_EXECUTE(cursor)
self._write(SYNC_MSG)
self._flush()
self.handle_messages(cursor)
if not cursor.portal_suspended:
self.close_portal(cursor)
continue
if min_int2 < v < max_int2:
continue
int2_ok = False
if min_int4 < v < max_int4:
continue
int4_ok = False
if min_int8 < v < max_int8:
continue
int8_ok = False
if int2_ok:
array_oid = 1005 # INT2[]
oid, fc, send_func = (21, FC_BINARY, h_pack)
elif int4_ok:
array_oid = 1007 # INT4[]
oid, fc, send_func = (23, FC_BINARY, i_pack)
elif int8_ok:
array_oid = 1016 # INT8[]
oid, fc, send_func = (20, FC_BINARY, q_pack)
else:
raise ArrayContentNotSupportedError(
"numeric not supported as array contents")
else:
try:
oid, fc, send_func = self.make_params((first_element,))[0]
# If unknown, assume it's a string array
if oid == 705:
oid = 25
array_oid = pg_array_types[oid]
except KeyError:
raise ArrayContentNotSupportedError(
def _send_message(self, code, data):
try:
self._write(code)
self._write(i_pack(len(data) + 4))
self._write(data)
self._write(FLUSH_MSG)
except ValueError:
if str(exc_info()[1]) == "write to closed file":
raise pg8000.InterfaceError("connection is closed")
else:
raise exc_info()[1]
except AttributeError:
raise pg8000.InterfaceError("connection is closed")
# Byte1('P') - Identifies the message as a Parse command.
# Int32 - Message length, including self.
# String - Prepared statement name. An empty string selects the
# unnamed prepared statement.
# String - The query string.
# Int16 - Number of parameter data types specified (can be zero).
# For each parameter:
# Int32 - The OID of the parameter data type.
val = bytearray(statement_name_bin)
val.extend(statement.encode(self._client_encoding) + NULL_BYTE)
val.extend(h_pack(len(params)))
for oid, fc, send_func in params:
# Parse message doesn't seem to handle the -1 type_oid for NULL
# values that other messages handle. So we'll provide type_oid
# 705, the PG "unknown" type.
val.extend(i_pack(705 if oid == -1 else oid))
# Byte1('D') - Identifies the message as a describe command.
# Int32 - Message length, including self.
# Byte1 - 'S' for prepared statement, 'P' for portal.
# String - The name of the item to describe.
self._send_message(PARSE, val)
self._send_message(DESCRIBE, STATEMENT + statement_name_bin)
self._write(SYNC_MSG)
try:
self._flush()
except AttributeError:
if self._sock is None:
raise InterfaceError("connection is closed")
else:
raise exc_info()[1]
InterfaceError = property(lambda self: self._getError(InterfaceError))
DatabaseError = property(lambda self: self._getError(DatabaseError))
OperationalError = property(lambda self: self._getError(OperationalError))
IntegrityError = property(lambda self: self._getError(IntegrityError))
InternalError = property(lambda self: self._getError(InternalError))
ProgrammingError = property(lambda self: self._getError(ProgrammingError))
NotSupportedError = property(
lambda self: self._getError(NotSupportedError))
# Determines the number of rows to read from the database server at once.
# Reading more rows increases performance at the cost of memory. The
# default value is 100 rows. The effect of this parameter is transparent.
# That is, the library reads more rows when the cache is empty
# automatically.
_row_cache_size = 100
_row_cache_size_bin = i_pack(_row_cache_size)
def _getError(self, error):
warn(
"DB-API extension connection.%s used" %
error.__name__, stacklevel=3)
return error
def __init__(
self, user, host, unix_sock, port, database, password,
socket_timeout, ssl):
self._client_encoding = "ascii"
self._commands_with_count = (
b("INSERT"), b("DELETE"), b("UPDATE"), b("MOVE"),
b("FETCH"), b("COPY"), b("SELECT"))
self._lock = threading.Lock()
COPY_DATA = b("d")
COPY_IN_RESPONSE = b("G")
COPY_OUT_RESPONSE = b("H")
BIND = b("B")
PARSE = b("P")
EXECUTE = b("E")
FLUSH = b('H')
SYNC = b('S')
PASSWORD = b('p')
DESCRIBE = b('D')
TERMINATE = b('X')
CLOSE = b('C')
FLUSH_MSG = FLUSH + i_pack(4)
SYNC_MSG = SYNC + i_pack(4)
TERMINATE_MSG = TERMINATE + i_pack(4)
COPY_DONE_MSG = COPY_DONE + i_pack(4)
# DESCRIBE constants
STATEMENT = b('S')
PORTAL = b('P')
# ErrorResponse codes
RESPONSE_SEVERITY = b("S") # always present
RESPONSE_CODE = b("C") # always present
RESPONSE_MSG = b("M") # always present
RESPONSE_DETAIL = b("D")
RESPONSE_HINT = b("H")
RESPONSE_POSITION = b("P")
RESPONSE__POSITION = b("p")
RESPONSE__QUERY = b("q")
def handle_COPY_IN_RESPONSE(self, data, ps):
# Int16(2) - Number of columns
# Int16(N) - Format codes for each column (0 text, 1 binary)
is_binary, num_cols = bh_unpack(data)
# column_formats = unpack_from('!' + 'h' * num_cols, data, 3)
assert self._lock.locked()
if ps.stream is None:
raise CopyQueryWithoutStreamError()
if PY2:
while True:
data = ps.stream.read(8192)
if not data:
break
self._write(COPY_DATA + i_pack(len(data) + 4))
self._write(data)
self._flush()
else:
bffr = bytearray(8192)
while True:
bytes_read = ps.stream.readinto(bffr)
if bytes_read == 0:
break
self._write(COPY_DATA + i_pack(bytes_read + 4))
self._write(bffr[:bytes_read])
self._flush()
# Send CopyDone
# Byte1('c') - Identifier.
# Int32(4) - Message length, including self.
self._write(COPY_DONE_MSG)