Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# See the License for the specific language governing permissions and
# limitations under the License.
#
############################################################################
import phoenixdb
import phoenixdb.cursor
import sys
import os
if __name__ == '__main__':
database_url = os.environ.get('PHOENIXDB_TEST_DB_URL')
print("CREATING PQS CONNECTION")
conn = phoenixdb.connect(database_url, autocommit=True, auth="SPNEGO")
cursor = conn.cursor()
cursor.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, username VARCHAR)")
cursor.execute("UPSERT INTO users VALUES (?, ?)", (1, 'admin'))
cursor.execute("UPSERT INTO users VALUES (?, ?)", (2, 'user'))
cursor.execute("SELECT * FROM users")
print("RESULTS")
print(cursor.fetchall())
def commit(self):
"""Commits pending database changes.
Currently, this does nothing, because the RPC does not support
transactions. Only defined for DB API 2.0 compatibility.
You need to use :attr:`autocommit` mode.
"""
# TODO can support be added for this?
if self._closed:
raise ProgrammingError('the connection is already closed')
def transactionisolation(self, value):
if self._closed:
raise ProgrammingError('the connection is already closed')
props = self._client.connection_sync(self._id, {'transactionIsolation': bool(value)})
self._transactionisolation = props.transaction_isolation
def fetchone(self):
if self._frame is None:
raise ProgrammingError('no select statement was executed')
if self._pos is None:
return None
rows = self._frame.rows
row = self._transform_row(rows[self._pos])
self._pos += 1
if self._pos >= len(rows):
self._pos = None
if not self._frame.done:
self._fetch_next_frame()
return row
def close(self):
"""Closes the cursor.
No further operations are allowed once the cursor is closed.
If the cursor is used in a ``with`` statement, this method will
be automatically called at the end of the ``with`` block.
"""
if self._closed:
raise ProgrammingError('the cursor is already closed')
if self._id is not None:
self._connection._client.close_statement(self._connection._id, self._id)
self._id = None
self._signature = None
self._column_data_types = []
self._frame = None
self._pos = None
self._closed = True
def rollback(self):
if self._closed:
raise ProgrammingError('The connection is already closed.')
self._client.rollback(self._id)
def close(self):
"""Closes the connection.
No further operations are allowed, either on the connection or any
of its cursors, once the connection is closed.
If the connection is used in a ``with`` statement, this method will
be automatically called at the end of the ``with`` block.
"""
if self._closed:
raise ProgrammingError('the connection is already closed')
for cursor_ref in self._cursors:
cursor = cursor_ref()
if cursor is not None and not cursor._closed:
cursor.close()
self._client.close_connection(self._id)
self._client.close()
self._closed = True
if is_array:
if type(value) in [list, tuple]:
for element in value:
if mutate_to is not None:
element = mutate_to(element)
typed_element = common_pb2.TypedValue()
if element is None:
typed_element.null = True
else:
typed_element.type = rep
setattr(typed_element, field_name, element)
typed_value.array_value.append(typed_element)
typed_value.type = common_pb2.ARRAY
typed_value.component_type = rep
else:
raise ProgrammingError('scalar value specified for array parameter')
else:
if mutate_to is not None:
value = mutate_to(value)
typed_value.type = rep
setattr(typed_value, field_name, value)
typed_parameters.append(typed_value)
return typed_parameters
def readonly(self, value):
if self._closed:
raise ProgrammingError('the connection is already closed')
props = self._client.connection_sync(self._id, {'readOnly': bool(value)})
self._readonly = props.read_only
def fetchone(self):
if self._frame is None:
raise ProgrammingError('no select statement was executed')
if self._pos is None:
return None
rows = self._frame.rows
row = self._transform_row(rows[self._pos])
self._pos += 1
if self._pos >= len(rows):
self._pos = None
if not self._frame.done:
self._fetch_next_frame()
return row