Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def retrieve_and_parse_content(
id: str, guid: str, handle: str, entity_type: str, sender_key_fetcher: Callable[[str], str]=None,
):
"""Retrieve remote content and return an Entity class instance.
This is basically the inverse of receiving an entity. Instead, we fetch it, then call "handle_receive".
:param sender_key_fetcher: Function to use to fetch sender public key. If not given, network will be used
to fetch the profile and the key. Function must take handle as only parameter and return a public key.
:returns: Entity object instance or ``None``
"""
if not validate_handle(handle):
return
_username, domain = handle.split("@")
url = get_fetch_content_endpoint(domain, entity_type.lower(), guid)
document, status_code, error = fetch_document(url)
if status_code == 200:
request = RequestType(body=document)
_sender, _protocol, entities = handle_receive(request, sender_key_fetcher=sender_key_fetcher)
if len(entities) > 1:
logger.warning("retrieve_and_parse_content - more than one entity parsed from remote even though we"
"expected only one! ID %s", guid)
if entities:
return entities[0]
return
elif status_code == 404:
logger.warning("retrieve_and_parse_content - remote content %s not found", guid)
return
if error:
raise error
raise Exception("retrieve_and_parse_content - unknown problem when fetching document: %s, %s, %s" % (
document, status_code, error,
def retrieve_and_parse_diaspora_webfinger(handle):
"""
Retrieve a and parse a remote Diaspora webfinger document.
:arg handle: Remote handle to retrieve
:returns: dict
"""
document = try_retrieve_webfinger_document(handle)
if document:
return parse_diaspora_webfinger(document)
host = handle.split("@")[1]
hostmeta = retrieve_diaspora_host_meta(host)
if not hostmeta:
return None
url = hostmeta.find_link(rels="lrdd").template.replace("{uri}", quote(handle))
document, code, exception = fetch_document(url)
if exception:
return None
return parse_diaspora_webfinger(document)
from federation.hostmeta.fetchers import fetch_nodeinfo_document
return fetch_nodeinfo_document(host)
elif doc.get('version', '').find('misskey') > -1:
# Use Misskey instead, otherwise this is logged as Mastodon
from federation.hostmeta.fetchers import fetch_misskey_document
return fetch_misskey_document(host, mastodon_document=doc)
result = deepcopy(defaults)
result['host'] = host
result['name'] = doc.get('title', host)
result['platform'] = 'mastodon'
result['version'] = doc.get('version', '')
# Awkward parsing of signups from about page
# TODO remove if fixed, issue logged: https://github.com/tootsuite/mastodon/issues/9350
about_doc, _status_code, _error = fetch_document(host=host, path='/about')
if about_doc:
result['open_signups'] = about_doc.find("<div class="closed-registrations-message">") == -1
version = re.sub(r'[^0-9.]', '', doc.get('version', ''))
version = [int(part) for part in version.split('.')]
if version >= [3, 0, 0]:
result['protocols'] = ['activitypub']
elif version >= [1, 6, 0]:
result['protocols'] = ['ostatus', 'activitypub']
else:
result['protocols'] = ['ostatus']
result['relay'] = False
result['activity']['users']['total'] = int_or_none(doc.get('stats', {}).get('user_count'))
result['activity']['local_posts'] = int_or_none(doc.get('stats', {}).get('status_count'))
</div>
for mention in matches:
splits = mention.split(";")
if len(splits) == 1:
self._mentions.add(splits[0].strip(' }'))
elif len(splits) == 2:
self._mentions.add(splits[1].strip(' }'))
class OptionalRawContentMixin(RawContentMixin):
"""A version of the RawContentMixin where `raw_content` is not required."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._required.remove("raw_content")
class EntityTypeMixin(BaseEntity):
"""
Provides a field for entity type.
"""
entity_type = ""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._required += ["entity_type"]
class ProviderDisplayNameMixin(BaseEntity):
"""Provides a field for provider display name."""
provider_display_name = ""
tags = {word.strip("#").lower() for word in self.raw_content.split() if word.startswith("#") and len(word) > 1}
return sorted(tags)
def extract_mentions(self):
matches = re.findall(r'@{([\S ][^{}]+)}', self.raw_content)
if not matches:
return
for mention in matches:
splits = mention.split(";")
if len(splits) == 1:
self._mentions.add(splits[0].strip(' }'))
elif len(splits) == 2:
self._mentions.add(splits[1].strip(' }'))
class OptionalRawContentMixin(RawContentMixin):
"""A version of the RawContentMixin where `raw_content` is not required."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._required.remove("raw_content")
class EntityTypeMixin(BaseEntity):
"""
Provides a field for entity type.
"""
entity_type = ""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._required += ["entity_type"]
def retrieve_and_parse_diaspora_webfinger(handle):
"""
Retrieve a and parse a remote Diaspora webfinger document.
:arg handle: Remote handle to retrieve
:returns: dict
"""
document = try_retrieve_webfinger_document(handle)
if document:
return parse_diaspora_webfinger(document)
host = handle.split("@")[1]
hostmeta = retrieve_diaspora_host_meta(host)
if not hostmeta:
return None
url = hostmeta.find_link(rels="lrdd").template.replace("{uri}", quote(handle))
document, code, exception = fetch_document(url)
if exception:
return None
return parse_diaspora_webfinger(document)
def retrieve_and_parse_content(
id: str, guid: str, handle: str, entity_type: str, sender_key_fetcher: Callable[[str], str]=None,
):
"""Retrieve remote content and return an Entity class instance.
This is basically the inverse of receiving an entity. Instead, we fetch it, then call "handle_receive".
:param sender_key_fetcher: Function to use to fetch sender public key. If not given, network will be used
to fetch the profile and the key. Function must take handle as only parameter and return a public key.
:returns: Entity object instance or ``None``
"""
if not validate_handle(handle):
return
_username, domain = handle.split("@")
url = get_fetch_content_endpoint(domain, entity_type.lower(), guid)
document, status_code, error = fetch_document(url)
if status_code == 200:
request = RequestType(body=document)
_sender, _protocol, entities = handle_receive(request, sender_key_fetcher=sender_key_fetcher)
if len(entities) > 1:
logger.warning("retrieve_and_parse_content - more than one entity parsed from remote even though we"
"expected only one! ID %s", guid)
if entities:
return entities[0]
return
elif status_code == 404:
logger.warning("retrieve_and_parse_content - remote content %s not found", guid)
return
def fetch_nodeinfo2_document(host):
doc, status_code, error = fetch_document(host=host, path='/.well-known/x-nodeinfo2')
if not doc:
return
try:
doc = json.loads(doc)
except json.JSONDecodeError:
return
return parse_nodeinfo2_document(doc, host)
def retrieve_diaspora_hcard(handle):
"""
Retrieve a remote Diaspora hCard document.
:arg handle: Remote handle to retrieve
:return: str (HTML document)
"""
webfinger = retrieve_and_parse_diaspora_webfinger(handle)
document, code, exception = fetch_document(webfinger.get("hcard_url"))
if exception:
return None
return document
def retrieve_diaspora_host_meta(host):
"""
Retrieve a remote Diaspora host-meta document.
:arg host: Host to retrieve from
:returns: ``XRD`` instance
"""
document, code, exception = fetch_document(host=host, path="/.well-known/host-meta")
if exception:
return None
xrd = XRD.parse_xrd(document)
return xrd