Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def _edit(self, request, network_id, room_id, visibility):
requester = await self.auth.get_user_by_req(request)
if not requester.app_service:
raise AuthError(
403, "Only appservices can edit the appservice published room list"
)
await self.handlers.directory_handler.edit_published_appservice_room_list(
requester.app_service.id, network_id, room_id, visibility
)
return 200, {}
def on_query_auth(
self, origin, event_id, room_id, remote_auth_chain, rejects, missing
):
in_room = yield self.auth.check_host_in_room(room_id, origin)
if not in_room:
raise AuthError(403, "Host not in room.")
event = yield self.store.get_event(
event_id, allow_none=False, check_room_id=room_id
)
# Just go through and process each event in `remote_auth_chain`. We
# don't want to fall into the trap of `missing` being wrong.
for e in remote_auth_chain:
try:
yield self._handle_new_event(origin, e)
except AuthError:
pass
# Now get the current auth_chain for the event.
local_auth_chain = yield self.store.get_auth_chain(
[auth_id for auth_id in event.auth_event_ids()], include_given=True
def on_GET(self, request, target_user_id):
"""Get request to search user table for specific users according to
search term.
This needs user to have a administrator access in Synapse.
"""
target_user = UserID.from_string(target_user_id)
requester = yield self.auth.get_user_by_req(request)
is_admin = yield self.auth.is_server_admin(requester.user)
if not is_admin:
raise AuthError(403, "You are not a server admin")
# To allow all users to get the users list
# if not is_admin and target_user != auth_user:
# raise AuthError(403, "You are not a server admin")
if not self.hs.is_mine(target_user):
raise SynapseError(400, "Can only users a local user")
term = parse_string(request, "term", required=True)
logger.info("term: %s ", term)
ret = yield self.handlers.admin_handler.search_users(
term
)
defer.returnValue((200, ret))
Returns:
bool: Is the room currently blocked
list: The list of pinned events that are unrelated to limit blocking
This list can be used as a convenience in the case where the block
is to be lifted and the remaining pinned event references need to be
preserved
"""
currently_blocked = False
pinned_state_event = None
try:
pinned_state_event = yield self._state.get_current_state(
room_id, event_type=EventTypes.Pinned
)
except AuthError:
# The user has yet to join the server notices room
pass
referenced_events = []
if pinned_state_event is not None:
referenced_events = list(pinned_state_event.content.get("pinned", []))
events = yield self._store.get_events(referenced_events)
for event_id, event in iteritems(events):
if event.type != EventTypes.Message:
continue
if event.content.get("msgtype") == ServerNoticeMsgType:
currently_blocked = True
# remove event in case we need to disable blocking later on.
if event_id in referenced_events:
referenced_events.remove(event.event_id)
def _check_joined_room(self, member, user_id, room_id):
if not member or member.membership != Membership.JOIN:
raise AuthError(
403, "User %s not in room %s (%s)" % (user_id, room_id, repr(member))
)
return synapse.types.create_requester(user_id, app_service=app_service)
user_info = yield self.get_user_by_access_token(access_token, rights)
user = user_info["user"]
token_id = user_info["token_id"]
is_guest = user_info["is_guest"]
# Deny the request if the user account has expired.
if self._account_validity.enabled and not allow_expired:
user_id = user.to_string()
expiration_ts = yield self.store.get_expiration_ts_for_user(user_id)
if (
expiration_ts is not None
and self.clock.time_msec() >= expiration_ts
):
raise AuthError(
403, "User account has expired", errcode=Codes.EXPIRED_ACCOUNT
)
# device_id may not be present if get_user_by_access_token has been
# stubbed out.
device_id = user_info.get("device_id")
if user and access_token and ip_addr:
yield self.store.insert_client_ip(
user_id=user.to_string(),
access_token=access_token,
ip=ip_addr,
user_agent=user_agent,
device_id=device_id,
)
def edit_published_room_list(self, requester, room_id, visibility):
"""Edit the entry of the room in the published room list.
requester
room_id (str)
visibility (str): "public" or "private"
"""
user_id = requester.user.to_string()
if not self.spam_checker.user_may_publish_room(user_id, room_id):
raise AuthError(
403,
"This user is not permitted to publish rooms to the room list"
)
if requester.is_guest:
raise AuthError(403, "Guests cannot edit the published room list")
if visibility not in ["public", "private"]:
raise SynapseError(400, "Invalid visibility setting")
if visibility == "public" and not self.enable_room_list_search:
# The room list has been disabled.
raise AuthError(
403,
"This user is not permitted to publish rooms to the room list"
)
# we don't allow people to reject invites to the server notice
# room, but they can leave it once they are joined.
if (
old_membership == Membership.INVITE and
effective_membership_state == Membership.LEAVE
):
is_blocked = yield self._is_server_notice_room(room_id)
if is_blocked:
raise SynapseError(
http_client.FORBIDDEN,
"You cannot reject this invite",
errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM,
)
else:
if action == "kick":
raise AuthError(403, "The target user is not in the room")
is_host_in_room = yield self._is_host_in_room(current_state_ids)
if effective_membership_state == Membership.JOIN:
if requester.is_guest:
guest_can_join = yield self._can_guest_join(current_state_ids)
if not guest_can_join:
# This should be an auth check, but guests are a local concept,
# so don't really fit into the general auth process.
raise AuthError(403, "Guest access not allowed")
if not is_host_in_room:
inviter = yield self._get_inviter(target.to_string(), room_id)
if inviter and not self.hs.is_mine(inviter):
remote_room_hosts.append(inviter.domain)
MAX_USERID_LENGTH,
),
Codes.INVALID_USERNAME
)
users = yield self.store.get_users_by_id_case_insensitive(user_id)
if users:
if not guest_access_token:
raise SynapseError(
400,
"User ID already taken.",
errcode=Codes.USER_IN_USE,
)
user_data = yield self.auth.get_user_by_access_token(guest_access_token)
if not user_data["is_guest"] or user_data["user"].localpart != localpart:
raise AuthError(
403,
"Cannot register taken user ID without valid guest "
"credentials for that user.",
errcode=Codes.FORBIDDEN,
)