Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class ExampleE(typesystem.Schema, definitions=definitions):
field_on_e = typesystem.Integer()
example_f = typesystem.Array(items=[typesystem.Reference("ExampleF")])
class ExampleF(typesystem.Schema, definitions=definitions):
field_on_f = typesystem.Integer()
value = ExampleE.validate(
{"field_on_e": "123", "example_f": [{"field_on_f": "456"}]}
)
assert value == ExampleE(field_on_e=123, example_f=[ExampleF(field_on_f=456)])
class ExampleG(typesystem.Schema, definitions=definitions):
field_on_g = typesystem.Integer()
example_h = typesystem.Object(
properties={"h": typesystem.Reference("ExampleH")}
)
class ExampleH(typesystem.Schema, definitions=definitions):
field_on_h = typesystem.Integer()
value = ExampleG.validate(
{"field_on_g": "123", "example_h": {"h": {"field_on_h": "456"}}}
)
assert value == ExampleG(field_on_g=123, example_h={"h": ExampleH(field_on_h=456)})
table_id = request.path_params["table_id"]
can_edit = check_can_edit(request, username)
datasource = await load_datasource_or_404(username, table_id)
form = await request.form()
data = await form["upload-file"].read()
encoding = chardet.detect(data)["encoding"]
lines = data.decode(encoding).splitlines()
rows = normalize_table([row for row in csv.reader(lines)])
column_identities = determine_column_identities(rows)
column_types, schema = determine_column_types(rows)
unvalidated_data = [dict(zip(column_identities, row)) for row in rows[1:]]
validated_data = [
dict(instance)
for instance in typesystem.Array(
items=typesystem.Reference(to=schema)
).validate(unvalidated_data, strict=False)
]
column_insert_values = [
{
"created_at": datetime.datetime.now(),
"name": name,
"identity": column_identities[idx],
"datatype": column_types[idx],
"table": datasource.table["pk"],
"position": idx + 1,
}
for idx, name in enumerate(rows[0])
]
query = tables.column.insert()
additional_properties=typesystem.Reference(
"Response", definitions=definitions
)
),
"parameters": typesystem.Object(
additional_properties=typesystem.Reference(
"Parameter", definitions=definitions
)
),
"requestBodies": typesystem.Object(
additional_properties=typesystem.Reference(
"RequestBody", definitions=definitions
)
),
"securitySchemes": typesystem.Object(
additional_properties=typesystem.Reference(
"SecurityScheme", definitions=definitions
)
),
# TODO: Other fields
},
pattern_properties={"^x-": typesystem.Any()},
additional_properties=False,
)
definitions["Tag"] = typesystem.Object(
properties={
"name": typesystem.String(),
"description": typesystem.Text(allow_blank=True),
"externalDocs": typesystem.Reference(
"ExternalDocumentation", definitions=definitions
),
# Union(X) is just X
return self.possibilities[0].isdataset(data)
elif len(self.possibilities) == 2 and any(isinstance(x, Null) for x in self.possibilities):
# support Union(Null(), X) for any X (i.e. "nullable X")
subtype = [x for x in self.possibilities if not isinstance(x, Null)][0]
if isinstance(data, numpy.ma.MaskedArray):
return subtype.isdataset(data.data)
else:
return False
else:
# don't support any other cases
return False
class Reference(NumpySchema, typesystem.Reference):
def supported(self):
return False
def isinstance(self, datum):
return False
def isdataset(self, data):
return False
def toNumpySchema(schema):
return schema.copy({
"Anything": Anything,
"Nothing": Nothing,
"Null": Null,
"Boolean": Boolean,
"Number": Number,
"String": String,
"Tensor": Tensor,
additional_properties=False,
)
definitions["Operation"] = typesystem.Object(
properties={
"tags": typesystem.Array(items=typesystem.String()),
"summary": typesystem.String(allow_blank=True),
"description": typesystem.Text(allow_blank=True),
"externalDocs": typesystem.Reference(
"ExternalDocumentation", definitions=definitions
),
"operationId": typesystem.String(),
"consumes": typesystem.Array(items=typesystem.String()),
"produces": typesystem.Array(items=typesystem.String()),
"parameters": typesystem.Array(
items=typesystem.Reference("Parameter", definitions=definitions)
), # TODO: | ReferenceObject
"responses": typesystem.Reference("Responses", definitions=definitions),
"schemes": typesystem.Array(items=typesystem.Choice(choices=["http", "https", "ws", "wss"])),
"deprecated": typesystem.Boolean(),
"security": typesystem.Array(
typesystem.Reference("SecurityRequirement", definitions=definitions)
),
},
pattern_properties={"^x-": typesystem.Any()},
additional_properties=False,
)
definitions["ExternalDocumentation"] = typesystem.Object(
properties={
"description": typesystem.Text(),
"url": typesystem.String(format="url"),
"maxProperties": typesystem.Integer(minimum=0),
"patternProperties": typesystem.Object(
additional_properties=typesystem.Reference(
"JSONSchema", definitions=definitions
)
),
"additionalProperties": (
typesystem.Reference("JSONSchema", definitions=definitions)
| typesystem.Boolean()
),
"required": typesystem.Array(items=typesystem.String(), unique_items=True),
# Array
"items": (
typesystem.Reference("JSONSchema", definitions=definitions)
| typesystem.Array(
items=typesystem.Reference("JSONSchema", definitions=definitions),
min_items=1,
)
),
"additionalItems": (
typesystem.Reference("JSONSchema", definitions=definitions)
| typesystem.Boolean()
),
"minItems": typesystem.Integer(minimum=0),
"maxItems": typesystem.Integer(minimum=0),
"uniqueItems": typesystem.Boolean(),
}
)
| typesystem.Boolean()
)
definitions["JSONSchema"] = JSON_SCHEMA
"externalDocs": typesystem.Reference(
"ExternalDocumentation", definitions=definitions
),
},
pattern_properties={"^x-": typesystem.Any()},
additional_properties=False,
required=["openapi", "info", "paths"],
)
definitions["Info"] = typesystem.Object(
properties={
"title": typesystem.String(allow_blank=True),
"description": typesystem.Text(allow_blank=True),
"termsOfService": typesystem.String(format="url"),
"contact": typesystem.Reference("Contact", definitions=definitions),
"license": typesystem.Reference("License", definitions=definitions),
"version": typesystem.String(allow_blank=True),
},
pattern_properties={"^x-": typesystem.Any()},
additional_properties=False,
required=["title", "version"],
)
definitions["Contact"] = typesystem.Object(
properties={
"name": typesystem.String(allow_blank=True),
"url": typesystem.String(format="url"),
"email": typesystem.String(format="email"),
},
pattern_properties={"^x-": typesystem.Any()},
additional_properties=False,
)
from apistar.schemas.jsonschema import JSON_SCHEMA
SCHEMA_REF = typesystem.Object(
properties={"$ref": typesystem.String(pattern="^#/definitiions/")}
)
RESPONSE_REF = typesystem.Object(
properties={"$ref": typesystem.String(pattern="^#/responses/")}
)
definitions = typesystem.SchemaDefinitions()
SWAGGER = typesystem.Object(
title="Swagger",
properties={
"swagger": typesystem.String(),
"info": typesystem.Reference("Info", definitions=definitions),
"paths": typesystem.Reference("Paths", definitions=definitions),
"host": typesystem.String(),
"basePath": typesystem.String(pattern="^/"),
"schemes": typesystem.Array(items=typesystem.Choice(choices=["http", "https", "ws", "wss"])),
"consumes": typesystem.Array(items=typesystem.String()),
"produces": typesystem.Array(items=typesystem.String()),
"definitions": typesystem.Object(additional_properties=typesystem.Any()),
"parameters": typesystem.Object(
additional_properties=typesystem.Reference(
"Parameters", definitions=definitions
)
),
"responses": typesystem.Object(
additional_properties=typesystem.Reference(
"Responses", definitions=definitions
)
additional_properties=typesystem.Reference(
"Parameters", definitions=definitions
)
),
"responses": typesystem.Object(
additional_properties=typesystem.Reference(
"Responses", definitions=definitions
)
),
"securityDefinitions": typesystem.Object(
additional_properties=typesystem.Reference(
"SecurityScheme", definitions=definitions
)
),
"security": typesystem.Array(
items=typesystem.Reference("SecurityRequirement", definitions=definitions)
),
"tags": typesystem.Array(
items=typesystem.Reference("Tag", definitions=definitions)
),
"externalDocs": typesystem.Reference(
"ExternalDocumentation", definitions=definitions
),
},
pattern_properties={"^x-": typesystem.Any()},
additional_properties=False,
required=["swagger", "info", "paths"],
)
definitions["Info"] = typesystem.Object(
properties={
"title": typesystem.String(allow_blank=True),
OPEN_API = typesystem.Object(
title="OpenAPI",
properties={
"openapi": typesystem.String(),
"info": typesystem.Reference("Info", definitions=definitions),
"servers": typesystem.Array(
items=typesystem.Reference("Server", definitions=definitions)
),
"paths": typesystem.Reference("Paths", definitions=definitions),
"components": typesystem.Reference("Components", definitions=definitions),
"security": typesystem.Array(
items=typesystem.Reference("SecurityRequirement", definitions=definitions)
),
"tags": typesystem.Array(
items=typesystem.Reference("Tag", definitions=definitions)
),
"externalDocs": typesystem.Reference(
"ExternalDocumentation", definitions=definitions
),
},
pattern_properties={"^x-": typesystem.Any()},
additional_properties=False,
required=["openapi", "info", "paths"],
)
definitions["Info"] = typesystem.Object(
properties={
"title": typesystem.String(allow_blank=True),
"description": typesystem.Text(allow_blank=True),
"termsOfService": typesystem.String(format="url"),
"contact": typesystem.Reference("Contact", definitions=definitions),