Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
)
except AuthError:
# This endpoint is supposed to return a 404 when the requester does
# not have permission to access the event
# https://matrix.org/docs/spec/client_server/r0.5.0#get-matrix-client-r0-rooms-roomid-event-eventid
raise SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND)
time_now = self.clock.time_msec()
if event:
event = await self._event_serializer.serialize_event(event, time_now)
return 200, event
return SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND)
class RoomEventContextServlet(RestServlet):
PATTERNS = client_patterns(
"/rooms/(?P[^/]*)/context/(?P[^/]*)$", v1=True
)
def __init__(self, hs):
super(RoomEventContextServlet, self).__init__()
self.clock = hs.get_clock()
self.room_context_handler = hs.get_room_context_handler()
self._event_serializer = hs.get_event_client_serializer()
self.auth = hs.get_auth()
async def on_GET(self, request, room_id, event_id):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
limit = parse_integer(request, "limit", default=10)
# delete, so it should only throw if something is wrong
# that we ought to care about.
logger.exception("Failed to remove threepid")
raise SynapseError(500, "Failed to remove threepid")
if ret:
id_server_unbind_result = "success"
else:
id_server_unbind_result = "no-support"
defer.returnValue((200, {
"id_server_unbind_result": id_server_unbind_result,
}))
class WhoamiRestServlet(RestServlet):
PATTERNS = client_patterns("/account/whoami$")
def __init__(self, hs):
super(WhoamiRestServlet, self).__init__()
self.auth = hs.get_auth()
@defer.inlineCallbacks
def on_GET(self, request):
requester = yield self.auth.get_user_by_req(request)
defer.returnValue((200, {'user_id': requester.user.to_string()}))
def register_servlets(hs, http_server):
EmailPasswordRequestTokenRestServlet(hs).register(http_server)
MsisdnPasswordRequestTokenRestServlet(hs).register(http_server)
self.message_handler = hs.get_message_handler()
self.auth = hs.get_auth()
async def on_GET(self, request, room_id):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
# Get all the current state for this room
events = await self.message_handler.get_state_events(
room_id=room_id,
user_id=requester.user.to_string(),
is_guest=requester.is_guest,
)
return 200, events
# TODO: Needs unit testing
class RoomInitialSyncRestServlet(RestServlet):
PATTERNS = client_patterns("/rooms/(?P[^/]*)/initialSync$", v1=True)
def __init__(self, hs):
super(RoomInitialSyncRestServlet, self).__init__()
self.initial_sync_handler = hs.get_initial_sync_handler()
self.auth = hs.get_auth()
async def on_GET(self, request, room_id):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
pagination_config = PaginationConfig.from_request(request)
content = await self.initial_sync_handler.room_initial_sync(
room_id=room_id, requester=requester, pagin_config=pagination_config
)
return 200, content
yield self.store.update_aliases_for_room(
room_id, new_room_id, requester_user_id
)
return (
200,
{
"kicked_users": kicked_users,
"failed_to_kick_users": failed_to_kick_users,
"local_aliases": aliases_for_room,
"new_room_id": new_room_id,
},
)
class ResetPasswordRestServlet(RestServlet):
"""Post request to allow an administrator reset password for a user.
This needs user to have administrator access in Synapse.
Example:
http://localhost:8008/_synapse/admin/v1/reset_password/
@user:to_reset_password?access_token=admin_access_token
JsonBodyToSend:
{
"new_password": "secret"
}
Returns:
200 OK with empty object if success otherwise an error.
"""
PATTERNS = historical_admin_path_patterns(
"/reset_password/(?P[^/]*)"
)
"""
self.pagination_handler = hs.get_pagination_handler()
self.auth = hs.get_auth()
@defer.inlineCallbacks
def on_GET(self, request, purge_id):
yield assert_requester_is_admin(self.auth, request)
purge_status = self.pagination_handler.get_purge_status(purge_id)
if purge_status is None:
raise NotFoundError("purge id '%s' not found" % purge_id)
return (200, purge_status.asdict())
class DeactivateAccountRestServlet(RestServlet):
PATTERNS = historical_admin_path_patterns("/deactivate/(?P[^/]*)")
def __init__(self, hs):
self._deactivate_account_handler = hs.get_deactivate_account_handler()
self.auth = hs.get_auth()
@defer.inlineCallbacks
def on_POST(self, request, target_user_id):
yield assert_requester_is_admin(self.auth, request)
body = parse_json_object_from_request(request, allow_empty_body=True)
erase = body.get("erase", False)
if not isinstance(erase, bool):
raise SynapseError(
http_client.BAD_REQUEST,
"Param 'erase' must be a boolean, if given",
Codes.BAD_JSON,
Note that this should only be used for existing endpoints: new ones should just
register for the /_synapse/admin path.
"""
return list(
re.compile(prefix + path_regex)
for prefix in (
"^/_synapse/admin/v1",
"^/_matrix/client/api/v1/admin",
"^/_matrix/client/unstable/admin",
"^/_matrix/client/r0/admin"
)
)
class UsersRestServlet(RestServlet):
PATTERNS = historical_admin_path_patterns("/users/(?P[^/]*)")
def __init__(self, hs):
self.hs = hs
self.auth = hs.get_auth()
self.handlers = hs.get_handlers()
@defer.inlineCallbacks
def on_GET(self, request, user_id):
target_user = UserID.from_string(user_id)
requester = yield self.auth.get_user_by_req(request)
is_admin = yield self.auth.is_server_admin(requester.user)
if not is_admin:
raise AuthError(403, "You are not a server admin")
"""
yield assert_requester_is_admin(self.auth, request)
UserID.from_string(target_user_id)
order = "name" # order by name in user table
params = parse_json_object_from_request(request)
assert_params_in_dict(params, ["limit", "start"])
limit = params["limit"]
start = params["start"]
logger.info("limit: %s, start: %s", limit, start)
ret = yield self.handlers.admin_handler.get_users_paginate(order, start, limit)
return (200, ret)
class SearchUsersRestServlet(RestServlet):
"""Get request to search user table for specific users according to
search term.
This needs user to have administrator access in Synapse.
Example:
http://localhost:8008/_synapse/admin/v1/search_users/
@admin:user?access_token=admin_access_token&term=alice
Returns:
200 OK with json object {list[dict[str, Any]], count} or empty object.
"""
PATTERNS = historical_admin_path_patterns("/search_users/(?P[^/]*)")
def __init__(self, hs):
self.store = hs.get_datastore()
self.hs = hs
self.auth = hs.get_auth()
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from synapse.api.errors import AuthError, NotFoundError, StoreError, SynapseError
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.types import UserID
from ._base import client_patterns, set_timeline_upper_limit
logger = logging.getLogger(__name__)
class GetFilterRestServlet(RestServlet):
PATTERNS = client_patterns("/user/(?P[^/]*)/filter/(?P[^/]*)")
def __init__(self, hs):
super(GetFilterRestServlet, self).__init__()
self.hs = hs
self.auth = hs.get_auth()
self.filtering = hs.get_filtering()
async def on_GET(self, request, user_id, filter_id):
target_user = UserID.from_string(user_id)
requester = await self.auth.get_user_by_req(request)
if target_user != requester.user:
raise AuthError(403, "Cannot get filters for other users")
if not self.hs.is_mine(target_user):
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.groups_handler = hs.get_groups_local_handler()
async def on_GET(self, request, group_id):
requester = await self.auth.get_user_by_req(request)
requester_user_id = requester.user.to_string()
result = await self.groups_handler.get_invited_users_in_group(
group_id, requester_user_id
)
return 200, result
class GroupSettingJoinPolicyServlet(RestServlet):
"""Set group join policy
"""
PATTERNS = client_patterns("/groups/(?P[^/]*)/settings/m.join_policy$")
def __init__(self, hs):
super(GroupSettingJoinPolicyServlet, self).__init__()
self.auth = hs.get_auth()
self.groups_handler = hs.get_groups_local_handler()
async def on_PUT(self, request, group_id):
requester = await self.auth.get_user_by_req(request)
requester_user_id = requester.user.to_string()
content = parse_json_object_from_request(request)
def __init__(self, hs):
super(RoomInitialSyncRestServlet, self).__init__()
self.initial_sync_handler = hs.get_initial_sync_handler()
self.auth = hs.get_auth()
async def on_GET(self, request, room_id):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
pagination_config = PaginationConfig.from_request(request)
content = await self.initial_sync_handler.room_initial_sync(
room_id=room_id, requester=requester, pagin_config=pagination_config
)
return 200, content
class RoomEventServlet(RestServlet):
PATTERNS = client_patterns(
"/rooms/(?P[^/]*)/event/(?P[^/]*)$", v1=True
)
def __init__(self, hs):
super(RoomEventServlet, self).__init__()
self.clock = hs.get_clock()
self.event_handler = hs.get_event_handler()
self._event_serializer = hs.get_event_client_serializer()
self.auth = hs.get_auth()
async def on_GET(self, request, room_id, event_id):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
try:
event = await self.event_handler.get_event(
requester.user, room_id, event_id