Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_commit(dsn, configuration):
connection = connect(dsn, **get_credentials(configuration))
connection.cursor().execute('CREATE TABLE test_commit (a INTEGER)')
connection.commit()
connection.close()
connection = connect(dsn, **get_credentials(configuration))
cursor = connection.cursor()
cursor.execute('SELECT * FROM test_commit')
results = cursor.fetchall()
assert results == []
cursor.execute('DROP TABLE test_commit')
connection.commit()
def test_closing_twice_is_ok(dsn, configuration):
connection = connect(dsn, **get_credentials(configuration))
connection.close()
connection.close()
def test_setinputsizes_does_not_raise(dsn, configuration):
"""
It is legal for setinputsizes() to do nothing, so anything except
raising an exception is ok
"""
cursor = connect(dsn, **get_credentials(configuration)).cursor()
cursor.setinputsizes([10, 20])
def open_connection(configuration,
rows_to_buffer=None,
parameter_sets_to_buffer=100,
**turbodbc_options):
dsn = configuration['data_source_name']
prefer_unicode = configuration.get('prefer_unicode', False)
read_buffer_size = turbodbc.Rows(rows_to_buffer) if rows_to_buffer else turbodbc.Megabytes(1)
options = turbodbc.make_options(read_buffer_size=read_buffer_size,
parameter_sets_to_buffer=parameter_sets_to_buffer,
prefer_unicode=prefer_unicode,
**turbodbc_options)
connection = turbodbc.connect(dsn, turbodbc_options=options, **get_credentials(configuration))
yield connection
connection.close()
def test_rollback_on_closed_connection_raises(dsn, configuration):
connection = connect(dsn, **get_credentials(configuration))
connection.close()
with pytest.raises(InterfaceError):
connection.rollback()
def test_connect_returns_connection_when_successful(dsn, configuration):
connection = connect(dsn, **get_credentials(configuration))
assert isinstance(connection, Connection)
def test_callproc_unsupported(dsn, configuration):
cursor = connect(dsn, **get_credentials(configuration)).cursor()
with pytest.raises(AttributeError):
cursor.callproc()
def test_nextset_unsupported(dsn, configuration):
cursor = connect(dsn, **get_credentials(configuration)).cursor()
with pytest.raises(AttributeError):
cursor.nextset()
def test_closing_twice_does_not_raise(dsn, configuration):
connection = connect(dsn, **get_credentials(configuration))
cursor = connection.cursor()
cursor.close()
cursor.close()
def test_cursor_on_closed_connection_raises(dsn, configuration):
connection = connect(dsn, **get_credentials(configuration))
connection.close()
with pytest.raises(InterfaceError):
connection.cursor()