Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if type(value) == date:
value = datetime.combine(value, time(0, 0, 0))
days = (value - cls._base_date).days
ms = value.microsecond // 1000
tm = (value.hour * 60 * 60 + value.minute * 60 + value.second) * 300 + int(round(ms * 3 / 10.0))
return cls._struct.pack(days, tm)
@classmethod
def decode(cls, days, time):
ms = int(round(time % 300 * 10 / 3.0))
secs = time // 300
return cls._base_date + timedelta(days=days, seconds=secs, milliseconds=ms)
DateTime.instance = DateTime()
class DateTimeN(BaseType):
type = SYBDATETIMN
_base_date = datetime(1900, 1, 1)
_min_date = datetime(1753, 1, 1, 0, 0, 0)
_max_date = datetime(9999, 12, 31, 23, 59, 59, 997000)
def __init__(self, size):
assert size in (4, 8)
self._size = size
self._subtype = {4: SmallDateTime.instance, 8: DateTime.instance}[size]
@classmethod
def from_stream(self, r):
size = r.get_byte()
if size not in (4, 8):
raise InterfaceError('Invalid SYBDATETIMN size', size)
'hour(s)': 3600,
'day(s)': 86400,
'week(s)': 604800,
'month(s)': 2592000,
'year(s)': 31536000}
# Se snoozo di n minuti l'allarme in generale non combacia più con ***
# i minuti esatti
# Controllare bene da tutte le parti che non facessi affidamento
# sulla separazione a minuti esatti anche per altri motivi
newalarm = ((alarmswindow.number.get() *
mult[alarmswindow.unit.get()] +
int(_time.time())) // 60 + 1) * 60
elif mode == 'dismiss':
newalarm = 'NULL'
if not group:
group = databases.memory.get_next_history_group()
qconn = databases.dbs[filename].connection.get()
cursor = qconn.cursor()
query_redo = queries.occurrences_update_id_alarm.format(newalarm, # @UndefinedVariable
self.id_)
query_undo = queries.occurrences_update_id_alarm.format(alarm, # @UndefinedVariable
self.id_)
cursor.execute(query_redo)
databases.dbs[filename].connection.give(qconn)
qmemory = databases.memory.get()
cursorh = qmemory.cursor()
cursorh.execute(queries.history_insert, (group, filename, self.id_,
'alarm', query_redo, '', query_undo, ''))
databases.memory.give(qmemory)
timer.refresh_timer()
self.close()
newalarm = ((alarmswindow.number.get() *
mult[alarmswindow.unit.get()] +
int(_time.time())) // 60 + 1) * 60
elif mode == 'dismiss':
newalarm = 'NULL'
if not group:
group = databases.memory.get_next_history_group()
qconn = databases.dbs[filename].connection.get()
cursor = qconn.cursor()
query_redo = queries.occurrences_update_id_alarm.format(newalarm, # @UndefinedVariable
self.id_)
query_undo = queries.occurrences_update_id_alarm.format(alarm, # @UndefinedVariable
self.id_)
cursor.execute(query_redo)
databases.dbs[filename].connection.give(qconn)
qmemory = databases.memory.get()
cursorh = qmemory.cursor()
cursorh.execute(queries.history_insert, (group, filename, self.id_,
'alarm', query_redo, '', query_undo, ''))
databases.memory.give(qmemory)
timer.refresh_timer()
self.close()
async def test_iterate_outside_transaction_with_temp_table(database_url):
"""
Same as test_iterate_outside_transaction_with_values but uses a
temporary table instead of a list of values.
"""
database_url = DatabaseURL(database_url)
if database_url.dialect == "sqlite":
pytest.skip("SQLite interface does not work with temporary tables.")
async with Database(database_url) as database:
query = "CREATE TEMPORARY TABLE no_transac(num INTEGER)"
await database.execute(query)
query = "INSERT INTO no_transac(num) VALUES (1), (2), (3), (4), (5)"
await database.execute(query)
query = "SELECT * FROM no_transac"
iterate_results = []
async for result in database.iterate(query=query):
iterate_results.append(result)
assert len(iterate_results) == 5
import databases
import orm
import sqlalchemy
from orm.models import QuerySet
from starlette.authentication import BaseUser
from .resources import hasher
database = databases.Database("sqlite:///tests/test.db", force_rollback=True)
metadata = sqlalchemy.MetaData()
class UserQuerySet(QuerySet):
async def create_user(self, *, password: str, **kwargs):
kwargs["password"] = await hasher.make(password)
return await self.create(**kwargs)
class User(BaseUser, orm.Model):
__tablename__ = "user"
__database__ = database
__metadata__ = metadata
objects = UserQuerySet()
async def test_concurrent_access_on_single_connection(database_url):
database_url = DatabaseURL(database_url)
if database_url.dialect != "postgresql":
pytest.skip("Test requires `pg_sleep()`")
async with Database(database_url, force_rollback=True) as database:
async def db_lookup():
await database.fetch_one("SELECT pg_sleep(1)")
await asyncio.gather(db_lookup(), db_lookup())
from starlette.authentication import BaseUser, requires
from starlette.middleware.authentication import AuthenticationMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse, PlainTextResponse
from starlette_auth_toolkit.backends import MultiAuth
from starlette_auth_toolkit.base.backends import BaseTokenAuth
from starlette_auth_toolkit.contrib.orm import ModelBasicAuth
from starlette_auth_toolkit.cryptography import (
PBKDF2Hasher,
generate_random_string,
)
# Database setup
database = databases.Database(
os.getenv("DATABASE_URL"), force_rollback=os.getenv("TESTING")
)
metadata = sqlalchemy.MetaData()
class UserQuerySet(QuerySet):
async def create_user(self, *, password: str, **kwargs):
kwargs["password"] = await hasher.make(password)
return await self.create(**kwargs)
class User(BaseUser, orm.Model):
__tablename__ = "user"
__database__ = database
__metadata__ = metadata
async def sqlalchemy_user_db() -> AsyncGenerator[SQLAlchemyUserDatabase, None]:
Base: DeclarativeMeta = declarative_base()
class User(SQLAlchemyBaseUserTable, Base):
pass
DATABASE_URL = "sqlite:///./test.db"
database = Database(DATABASE_URL)
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
Base.metadata.create_all(engine)
await database.connect()
yield SQLAlchemyUserDatabase(database, User.__table__)
Base.metadata.drop_all(engine)
def test_database_url_repr():
u = DatabaseURL("postgresql://localhost/name")
assert repr(u) == "DatabaseURL('postgresql://localhost/name')"
u = DatabaseURL("postgresql://username@localhost/name")
assert repr(u) == "DatabaseURL('postgresql://username@localhost/name')"
u = DatabaseURL("postgresql://username:password@localhost/name")
assert repr(u) == "DatabaseURL('postgresql://username:********@localhost/name')"
def test_replace_database_url_components():
u = DatabaseURL("postgresql://localhost/mydatabase")
assert u.database == "mydatabase"
new = u.replace(database="test_" + u.database)
assert new.database == "test_mydatabase"
assert str(new) == "postgresql://localhost/test_mydatabase"
assert u.driver == ""
new = u.replace(driver="asyncpg")
assert new.driver == "asyncpg"
assert str(new) == "postgresql+asyncpg://localhost/mydatabase"
assert u.port is None
new = u.replace(port=123)
assert new.port == 123
assert str(new) == "postgresql://localhost:123/mydatabase"