Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def register_server(
body
): # noqa: E501
"""Register a server
Add an API server to the testbed. # noqa: E501
:param body:
:type body: dict | bytes
:rtype: str
"""
if connexion.request.is_json:
body = Server.from_dict(connexion.request.get_json()) # noqa: E501
return controller.register_server(
body=body
)
def put(id):
if connexion.request.is_json:
data = connexion.request.get_json()
film = Film.query.filter_by(id=id).first()
film.name = data["name"]
film.pub_date = datetime.strptime(data["pubDate"], "%Y-%m-%d").date()
FilmCast.query.filter_by(film=film).delete()
film_cast = []
for actor in data.get("cast", []):
if actor["id"]:
film_cast.append(FilmCast(film_id=film.id, actor_id=actor["id"]))
db.session.add(film)
db.session.add_all(film_cast)
db.session.commit()
return jsonify(FilmSchema().dump(film))
return jsonify({})
and updates are done. It must receive 6 positional
arguments:
- the name of the release
- the product name from the PartialReleaseForm
- the version from the PartialReleaseForm
- the data from the PartialReleaseForm
- the most recent version of the data for the
release from the database
- the old_data_version from the PartialReleaseForm
"""
new = True
product = connexion.request.get_json().get("product")
incomingData = json.loads(connexion.request.get_json().get("data"))
copyTo = list()
if connexion.request.get_json().get("copyTo"):
copyTo = json.loads(connexion.request.get_json().get("copyTo"))
alias = list()
if connexion.request.get_json().get("alias"):
alias = json.loads(connexion.request.get_json().get("alias"))
old_data_version = connexion.request.get_json().get("data_version")
# schema_version is an attribute at the root level of a blob.
# Endpoints that receive an entire blob can find it there.
# Those that don't have to pass it as a form element instead.
if connexion.request.get_json().get("schema_version"):
schema_version = connexion.request.get_json().get("schema_version")
elif incomingData.get("schema_version"):
schema_version = incomingData.get("schema_version")
or (change_type == "update" and field not in ["when", "options", "data_version"])
or (change_type == "insert" and field not in ["when", "options", "permission", "username"])
):
continue
what[field] = connexion.request.get_json()[field]
if change_type in ["update", "delete"] and not what.get("data_version", None):
return problem(400, "Bad Request", "Missing field", ext={"exception": "data_version is missing"})
if what.get("options", None):
what["options"] = json.loads(what["options"])
if len(what["options"]) == 0:
what["options"] = None
return super(PermissionScheduledChangeView, self)._post(sc_id, what, transaction, changed_by, connexion.request.get_json().get("sc_data_version"))
def create_user(user): # noqa: E501
"""Create user
This can only be done by the logged in user. # noqa: E501
:param user: Created user object
:type user: dict | bytes
:rtype: None
"""
if connexion.request.is_json:
user = User.from_dict(connexion.request.get_json()) # noqa: E501
return 'do some magic!'
def update_pet(body): # noqa: E501
"""Update an existing pet
# noqa: E501
:param body: Pet object that needs to be added to the store
:type body: dict | bytes
:rtype: None
"""
if connexion.request.is_json:
body = Pet.from_dict(connexion.request.get_json()) # noqa: E501
return 'do some magic!'
def mi_qa(contents=None): # noqa: E501
"""小米AI音箱
小米AI音箱 # noqa: E501
:param contents: 输入
:type contents: dict | bytes
:rtype: MiResponse
"""
if connexion.request.is_json:
contents = MiRequest.from_dict(connexion.request.get_json()) # noqa: E501
headers = connexion.request.headers
ip=headers.get('X_FORWARDED_FOR')
query=contents.query
print(ip,query)
if not validate(headers):
print("验证失败")
return {"version": "0.1"}
result = {"version": "0.1",
"session_attributes": {},
"is_session_end": False,
"response": {
"to_speak": {
"type": 0,
"text": query
def update_pet(pet): # noqa: E501
"""Update an existing pet
# noqa: E501
:param pet: Pet object that needs to be added to the store
:type pet: dict | bytes
:rtype: None
"""
if connexion.request.is_json:
pet = Pet.from_dict(connexion.request.get_json()) # noqa: E501
return 'do some magic!'
alias = json.loads(connexion.request.get_json().get("alias"))
old_data_version = connexion.request.get_json().get("data_version")
# schema_version is an attribute at the root level of a blob.
# Endpoints that receive an entire blob can find it there.
# Those that don't have to pass it as a form element instead.
if connexion.request.get_json().get("schema_version"):
schema_version = connexion.request.get_json().get("schema_version")
elif incomingData.get("schema_version"):
schema_version = incomingData.get("schema_version")
else:
return problem(400, "Bad Request", "schema_version is required")
if connexion.request.get_json().get("hashFunction"):
hashFunction = connexion.request.get_json().get("hashFunction")
elif incomingData.get("hashFunction"):
hashFunction = incomingData.get("hashFunction")
else:
hashFunction = None
allReleases = [release]
if copyTo:
allReleases += copyTo
for rel in allReleases:
try:
releaseInfo = dbo.releases.getReleases(name=rel, transaction=transaction)[0]
if existsCallback(rel, product):
new = False
# "release" is the one named in the URL (as opposed to the
# ones that can be provided in copyTo), and we treat it as