Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_func_no_wrapping():
# Select query with function
select_query = select([
func.ST_Buffer(Point.geom), # with wrapping (default behavior)
func.ST_Buffer(Point.geom, type_=Geometry), # with wrapping
func.ST_Buffer(Point.geom, type_=RawGeometry) # without wrapping
])
# Check the query
assert str(select_query) == (
"SELECT "
"ST_AsEWKB(ST_Buffer(point.geom)) AS \"ST_Buffer_1\", "
def setup_metadata(self, tablename=None):
metadata = db.MetaData(schema='vector_layer' if tablename else None)
geom_fldtype = _GEOM_TYPE_2_DB[self.geometry_type]
class model(object):
def __init__(self, **kwargs):
for k, v in kwargs.iteritems():
setattr(self, k, v)
table = db.Table(
tablename if tablename else ('lvd_' + str(uuid.uuid4().hex)),
metadata, db.Column('id', db.Integer, primary_key=True),
db.Column('geom', ga.Geometry(
dimension=2, srid=self.srs_id,
geometry_type=geom_fldtype)),
*map(lambda (fld): db.Column(fld.key, _FIELD_TYPE_2_DB[
fld.datatype]), self.fields)
)
db.mapper(model, table)
self.metadata = metadata
self.table = table
self.model = model
def _init_table(self):
"""
Make a new table with the original columns from the staging table
"""
# Take most columns straight from the source.
original_cols = [_copy_col(c) for c in self.staging.columns
if c.name != 'hash']
# Take care that the hash column is designated the primary key.
original_cols.append(Column('hash', String(32), primary_key=True))
# We also expect geometry and date columns to be created.
derived_cols = [
Column('point_date', TIMESTAMP, nullable=True, index=True),
Column('geom', Geometry('POINT', srid=4326),
nullable=True, index=True)]
new_table = Table(self.dataset.name, MetaData(),
*(original_cols + derived_cols))
new_table.drop(postgres_engine, checkfirst=True)
new_table.create(postgres_engine)
return new_table
private = db.Column(db.Boolean, default=False) # Only allowed users can validate
featured = db.Column(
db.Boolean, default=False
) # Only PMs can set a project as featured
entities_to_map = db.Column(db.String)
changeset_comment = db.Column(db.String)
osmcha_filter_id = db.Column(
db.String
) # Optional custom filter id for filtering on OSMCha
due_date = db.Column(db.DateTime)
imagery = db.Column(db.String)
josm_preset = db.Column(db.String)
id_presets = db.Column(ARRAY(db.String))
last_updated = db.Column(db.DateTime, default=timestamp)
license_id = db.Column(db.Integer, db.ForeignKey("licenses.id", name="fk_licenses"))
geometry = db.Column(Geometry("MULTIPOLYGON", srid=4326))
centroid = db.Column(Geometry("POINT", srid=4326))
country = db.Column(ARRAY(db.String), default=[])
task_creation_mode = db.Column(
db.Integer, default=TaskCreationMode.GRID.value, nullable=False
)
organisation_id = db.Column(
db.Integer,
db.ForeignKey("organisations.id", name="fk_organisations"),
index=True,
)
organisation_tag = db.Column(db.String, index=True)
# Tags
mapping_types = db.Column(ARRAY(db.Integer), index=True)
elif isinstance(itype, dt.Timestamp):
# SQLAlchemy DateTimes do not store the timezone, just whether the db
# supports timezones.
return sa.TIMESTAMP(bool(itype.timezone))
elif isinstance(itype, dt.Array):
ibis_type = itype.value_type
if not isinstance(ibis_type, (dt.Primitive, dt.String)):
raise TypeError(
'Type {} is not a primitive type or string type'.format(
ibis_type
)
)
return sa.ARRAY(_to_sqla_type(ibis_type, type_map=type_map))
elif geospatial_supported and isinstance(itype, dt.GeoSpatial):
if itype.geotype == 'geometry':
return ga.Geometry
elif itype.geotype == 'geography':
return ga.Geography
else:
return ga.types._GISType
else:
return type_map[type(itype)]
def add_geometry_column(cls, srid=4326):
cls.geom = Column(Geometry(geometry_type='POINT', srid=srid))
if d_type == 'C':
col_type = col_type(f_len)
elif d_type == 'N':
col_type = col_type(d_len)
if fname.lower() == self.business_key:
kwargs['primary_key'] = True
columns.append(Column(fname.lower(), col_type, **kwargs))
self.multipolygon = False
for record in self.records:
geo_type = record.shape.__geo_interface__['type']
if 'multi' in geo_type.lower():
self.multipolygon = True
geo_type = 'POLYGON'
if self.multipolygon:
geo_type = 'MULTIPOLYGON'
columns.append(Column('geom', Geometry(geo_type)))
self.table = Table(self.dataset_name, Base.metadata, *columns, extend_existing=True)
self.table.create(engine, checkfirst=True)
validated=project.get_validated())
)
@event.listens_for(DBSession, "before_flush")
def before_flush(session, flush_context, instances):
for obj in session.dirty:
if isinstance(obj, Task):
obj.project.last_update = datetime.datetime.utcnow()
class Area(Base):
__tablename__ = 'areas'
id = Column(Integer, primary_key=True)
geometry = Column(Geometry('MultiPolygon', srid=4326))
centroid = Column(Geometry('Point', srid=4326))
def __init__(self, geometry):
self.geometry = ST_SetSRID(ST_Multi(geometry), 4326)
@event.listens_for(Area, "after_insert")
def area_after_insert(mapper, connection, target):
area_table = Area.__table__
connection.execute(
area_table.update().
where(area_table.c.id == target.id).
values(centroid=ST_Centroid(target.geometry))
)
project_allowed_users = Table(
""" Describes an individual mapping Task """
__tablename__ = "tasks"
# Table has composite PK on (id and project_id)
id = db.Column(db.Integer, primary_key=True)
project_id = db.Column(
db.Integer, db.ForeignKey("projects.id"), index=True, primary_key=True
)
x = db.Column(db.Integer)
y = db.Column(db.Integer)
zoom = db.Column(db.Integer)
extra_properties = db.Column(db.Unicode)
# Tasks need to be split differently if created from an arbitrary grid or were clipped to the edge of the AOI
is_square = db.Column(db.Boolean, default=True)
geometry = db.Column(Geometry("MULTIPOLYGON", srid=4326))
task_status = db.Column(db.Integer, default=TaskStatus.READY.value)
locked_by = db.Column(
db.BigInteger, db.ForeignKey("users.id", name="fk_users_locked")
)
mapped_by = db.Column(
db.BigInteger, db.ForeignKey("users.id", name="fk_users_mapper")
)
validated_by = db.Column(
db.BigInteger, db.ForeignKey("users.id", name="fk_users_validator")
)
# Mapped objects
task_history = db.relationship(TaskHistory, cascade="all")
task_annotations = db.relationship(TaskAnnotation, cascade="all")
lock_holder = db.relationship(User, foreign_keys=[locked_by])
mapper = db.relationship(User, foreign_keys=[mapped_by])
from geoalchemy2 import Geometry
from sqlalchemy import event
from sqlalchemy.schema import CreateSchema
from sqlalchemy import Column, Sequence, Integer, BigInteger, Text
from sqlalchemy.ext.declarative import declarative_base
from forge.models import Vector
from forge.lib.helpers import isShapefile
Base = declarative_base()
event.listen(Base.metadata, 'before_create', CreateSchema('data'))
table_args = {'schema': 'data'}
# management to true only for postgis 1.5
WGS84Polygon = Geometry(geometry_type='POLYGON', srid=4326, dimension=3, spatial_index=True, management=True)
def modelFactory(BaseClass, tablename, shapefiles, classname):
sequence = Sequence('id_%s_seq' % tablename, schema=table_args['schema'])
class NewClass(BaseClass, Vector):
__tablename__ = tablename
__table_args__ = table_args
__shapefiles__ = shapefiles
id = Column(BigInteger(), sequence, nullable=False, primary_key=True)
shapefilepath = Column('shapefilepath',Text)
the_geom = Column('the_geom', WGS84Polygon)
NewClass.__name__ = classname
return NewClass