Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_project(self):
db = self.application.DBSession()
projects = [
database.Project(name="first project", description=""),
database.Project(name="a test", description="Test project"),
database.Project(name="private P", description="admin can't see"),
database.Project(name="last project", description="Other"),
]
for project in projects:
db.add(project)
db.commit()
for i in [1, 2, 4]:
db.add(database.ProjectMember(
project_id=i,
user_login='admin',
privileges=database.Privileges.ADMIN))
db.commit()
# Authenticate with token
response = self.get('/?token=' + self.application.single_user_token)
self.assertEqual(response.code, 302)
self.assertEqual(response.headers['Location'], '/')
def test_project(self):
db = self.application.DBSession()
projects = [
database.Project(name="first project", description=""),
database.Project(name="a test", description="Test project"),
database.Project(name="private P", description="admin can't see"),
database.Project(name="last project", description="Other"),
]
for project in projects:
db.add(project)
db.commit()
for i in [1, 2, 4]:
db.add(database.ProjectMember(
project_id=i,
user_login='admin',
privileges=database.Privileges.ADMIN))
db.commit()
# Authenticate with token
response = self.get('/?token=' + self.application.single_user_token)
def post(self):
name = self.get_body_argument('name', '')
description = self.get_body_argument('description', '')
if not name:
return self.render('project_new.html',
name=name, description=description,
error="Please enter a name")
# Create project
project = database.Project(name=name, description=description)
self.db.add(project)
# Add user as admin
membership = database.ProjectMember(
project=project,
user_login=self.current_user,
privileges=database.Privileges.ADMIN
)
self.db.add(membership)
# Add default set of tags
self.db.add(database.Tag(project=project, path='interesting',
description="Further review required"))
self.db.add(database.Tag(project=project, path='people',
description="Known people"))
self.db.commit()
self.redirect(self.reverse_url('project', project.id))
@classmethod
def member_remove(cls, user_login, project_id, member_login):
assert isinstance(project_id, int)
return cls(
user_login=user_login,
project_id=project_id,
payload={'type': 'member_remove', # keep in sync above
'member': member_login}
)
Project.last_event = column_property(
select(
[Command.id]
).where(
Command.project_id == Project.id
).order_by(
Command.id.desc()
).limit(1)
.as_scalar()
)
class Highlight(Base):
__tablename__ = 'highlights'
id = Column(Integer, primary_key=True)
document_id = Column(Integer, ForeignKey('documents.id',
ondelete='CASCADE'),
nullable=False, index=True)
document = relationship('Document', back_populates='highlights')
start_offset = Column(Integer, nullable=False)
'member': member_login,
'privileges': privileges.name}
)
@classmethod
def member_remove(cls, user_login, project_id, member_login):
assert isinstance(project_id, int)
return cls(
user_login=user_login,
project_id=project_id,
payload={'type': 'member_remove', # keep in sync above
'member': member_login}
)
Project.last_event = column_property(
select(
[Command.id]
).where(
Command.project_id == Project.id
).order_by(
Command.id.desc()
).limit(1)
.as_scalar()
)
class Highlight(Base):
__tablename__ = 'highlights'
id = Column(Integer, primary_key=True)
document_id = Column(Integer, ForeignKey('documents.id',
def post(self):
name = self.get_body_argument('name', '')
description = self.get_body_argument('description', '')
try:
validate.project_name(name)
validate.project_description(description)
# Create project
project = database.Project(name=name, description=description)
self.db.add(project)
# Add user as admin
membership = database.ProjectMember(
project=project,
user_login=self.current_user,
privileges=database.Privileges.ADMIN
)
self.db.add(membership)
# Add default tags
self.db.add(database.Tag(
project=project,
# TRANSLATORS: Default tag name
path=self.gettext("interesting"),
# TRANSLATORS: Default tag description
description=self.gettext("Further review required")),
)
def get_document(self, project_id, document_id, contents=False):
try:
project_id = int(project_id)
document_id = int(document_id)
except ValueError:
raise HTTPError(404)
q = (
self.db.query(database.Document)
.options(joinedload(database.Document.project)
.joinedload(database.Project.members))
.filter(database.Project.id == project_id)
.filter(database.ProjectMember.user_login == self.current_user)
.filter(database.Document.id == document_id)
)
if contents:
q = q.options(undefer(database.Document.contents))
document = q.one_or_none()
if document is None:
raise HTTPError(404)
return document
def get_document(self, project_id, document_id, contents=False):
try:
project_id = int(project_id)
document_id = int(document_id)
except ValueError:
raise HTTPError(404)
q = (
self.db.query(database.Document)
.options(joinedload(database.Document.project)
.joinedload(database.Project.members))
.filter(database.Project.id == project_id)
.filter(database.ProjectMember.user_login == self.current_user)
.filter(database.Document.id == document_id)
)
if contents:
q = q.options(undefer(database.Document.contents))
document = q.one_or_none()
if document is None:
raise HTTPError(404)
return document
engine = create_engine(db_url, **kwargs)
# logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
if db_url.startswith('sqlite:'):
@sqlalchemy.event.listens_for(sqlalchemy.engine.Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
alembic_cfg = alembic.config.Config()
alembic_cfg.set_main_option('script_location', 'taguette:migrations')
alembic_cfg.set_main_option('sqlalchemy.url', db_url)
conn = engine.connect()
if not engine.dialect.has_table(conn, Project.__tablename__):
logger.warning("The tables don't seem to exist; creating")
Base.metadata.create_all(bind=engine)
# Mark this as the most recent Alembic version
alembic.command.stamp(alembic_cfg, "head")
# Set SQLite's "application ID"
if db_url.startswith('sqlite:'):
conn.execute("PRAGMA application_id=0x54677474;") # 'Tgtt'
else:
# Perform Alembic migrations if needed
context = MigrationContext.configure(conn)
current_rev = context.get_current_revision()
scripts = ScriptDirectory.from_config(alembic_cfg)
if [current_rev] != scripts.get_heads():
logger.warning("Database schema is out of date: %s", current_rev)