Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def base_object(self, obj):
"""Extends the default base_object() to avoid using shortcodes as object ids.
"""
base_obj = super(Instagram, self).base_object(obj)
base_id = base_obj.get('id')
if base_id and not base_id.replace('_', '').isdigit():
# this isn't id. it's probably a shortcode.
del base_obj['id']
id = obj.get('id')
if id:
parsed = util.parse_tag_uri(id)
if parsed and '_' in parsed[1]:
base_obj['id'] = parsed[1].split('_')[0]
return base_obj
def finish(self, auth_entity, state=None):
if auth_entity:
user_json = json_loads(auth_entity.user_json)
# find instagram profile URL
urls = user_json.get('rel-me', [])
logging.info('rel-mes: %s', urls)
for url in util.trim_nulls(urls):
if util.domain_from_link(url) == gr_instagram.Instagram.DOMAIN:
username = urllib.parse.urlparse(url).path.strip('/')
break
else:
self.messages.add(
'No Instagram profile found. Please <a href="https://indieauth.com/setup">add an Instagram rel-me link</a>, then try again.')
return self.redirect('/')
# check that instagram profile links to web site
try:
actor = gr_instagram.Instagram(scrape=True).get_actor(
username, ignore_rate_limit=True)
except Exception as e:
code, _ = util.interpret_http_exception(e)
if code in Instagram.RATE_LIMIT_HTTP_CODES:
self.messages.add(
'<a href="https://github.com/snarfed/bridgy/issues/665#issuecomment-524977427">Apologies, Instagram is temporarily blocking us.</a> Please try again later!')
SILOS = (
'flickr',
'github',
'instagram',
'mastodon',
'twitter',
)
OAUTHS = { # maps oauth-dropins module name to module
name: importlib.import_module('oauth_dropins.%s' % name)
for name in SILOS
}
SILO_DOMAINS = {cls.DOMAIN for cls in (
Facebook,
Flickr,
GitHub,
Instagram,
Twitter,
)}
SCOPE_OVERRIDES = {
# https://developers.facebook.com/docs/reference/login/
'facebook': 'user_status,user_posts,user_photos,user_events',
# https://developer.github.com/apps/building-oauth-apps/scopes-for-oauth-apps/
'github': 'notifications,public_repo',
# https://docs.joinmastodon.org/api/permissions/
'mastodon': 'read',
}
class FrontPageHandler(handlers.TemplateHandler):
"""Renders and serves the front page."""
handle_exception = handlers.handle_exception
from oauth_dropins.webutil.handlers import TemplateHandler
from oauth_dropins.webutil.util import json_dumps, json_loads
import webapp2
from models import Source
import util
class Instagram(Source):
"""An Instagram account.
The key name is the username. Instagram usernames may have ASCII letters (case
insensitive), numbers, periods, and underscores:
https://stackoverflow.com/questions/15470180
"""
GR_CLASS = gr_instagram.Instagram
OAUTH_START_HANDLER = oauth_instagram.StartHandler
SHORT_NAME = 'instagram'
FAST_POLL = datetime.timedelta(minutes=120)
RATE_LIMITED_POLL = Source.SLOW_POLL
RATE_LIMIT_HTTP_CODES = ('401', '429', '503')
DISABLE_HTTP_CODES = ()
CAN_PUBLISH = False
URL_CANONICALIZER = util.UrlCanonicalizer(
domain=GR_CLASS.DOMAIN,
subdomain='www',
approve=r'https://www.instagram.com/p/[^/?]+/$',
trailing_slash=True,
headers=util.REQUEST_HEADERS)
# no reject regexp; non-private Instagram post URLs just 404
@staticmethod
# find instagram profile URL
urls = user_json.get('rel-me', [])
logging.info('rel-mes: %s', urls)
for url in util.trim_nulls(urls):
if util.domain_from_link(url) == gr_instagram.Instagram.DOMAIN:
username = urllib.parse.urlparse(url).path.strip('/')
break
else:
self.messages.add(
'No Instagram profile found. Please <a href="https://indieauth.com/setup">add an Instagram rel-me link</a>, then try again.')
return self.redirect('/')
# check that instagram profile links to web site
try:
actor = gr_instagram.Instagram(scrape=True).get_actor(
username, ignore_rate_limit=True)
except Exception as e:
code, _ = util.interpret_http_exception(e)
if code in Instagram.RATE_LIMIT_HTTP_CODES:
self.messages.add(
'<a href="https://github.com/snarfed/bridgy/issues/665#issuecomment-524977427">Apologies, Instagram is temporarily blocking us.</a> Please try again later!')
return self.redirect('/')
else:
raise
if not actor:
self.messages.add("Couldn't find Instagram user '%s'. Please check your site's rel-me link and your Instagram account." % username)
return self.redirect('/')
canonicalize = util.UrlCanonicalizer(redirects=False)
website = canonicalize(auth_entity.key.id())
obj = activity['object']
obj['ig_like_count'] = media['likes'].get('count', 0)
# multi-photo
children = media.get('edge_sidecar_to_children', {}).get('edges', [])
if children:
obj['attachments'] = []
for child in children:
child_activity = self._json_media_node_to_activity(child.get('node'))
for att in child_activity['object']['attachments']:
if not att.get('url'):
att['url'] = link
obj['attachments'].append(att)
self.postprocess_object(obj)
return super(Instagram, self).postprocess_activity(activity)