Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def signed_cookie_to_id(self, sesh_cookie):
serial = URLSafeTimedSerializer(self.secret_key)
try:
return serial.loads(sesh_cookie)
except BadSignature as b:
return None
def check_token(token):
ser = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
try:
email = ser.loads(token, max_age=current_app.config['TOKEN_MAX_AGE_SECONDS'],
salt=current_app.config.get('DANGEROUS_SALT'))
return email
except SignatureExpired as e:
current_app.logger.info('token expired %s' % e)
def get_signing_serializer(self, app):
if not app.secret_key:
return None
signer_kwargs = dict(
key_derivation=self.key_derivation, digest_method=self.digest_method
)
return URLSafeTimedSerializer(
app.secret_key,
salt=self.salt,
serializer=self.serializer,
signer_kwargs=signer_kwargs,
)
def get_signing_serializer(self, app):
if not app.secret_key:
return None
signer_kwargs = dict(
key_derivation=self.key_derivation,
digest_method=self.digest_method
)
return URLSafeTimedSerializer(app.secret_key, salt=self.salt,
serializer=self.serializer,
signer_kwargs=signer_kwargs)
rv = {}
for e, name in zip(authors.author_emails, authors.author_names):
rv[e] = _USER_FB_ITSD.dumps([name, id])
return rv
def check_author_key(key):
try:
return _USER_FB_ITSD.loads(key)
except Exception as e:
return None, None
"""
Emails
"""
_ADMIN_EMAILS = set(json.loads(os.environ['ADMIN_EMAILS']))
_LOGIN_EMAIL_ITSD = itsdangerous.URLSafeTimedSerializer(os.environ['ITSD_KEY'],
salt='loginemail')
def send_login_email(email):
q = 'SELECT id, email FROM users WHERE lower(email) = lower(%s)'
user = fetchone(q, email)
if not user:
l('failed_pw_reset_request', email=email)
return False
body = _JINJA.get_template('email/login_email.txt')
key = _LOGIN_EMAIL_ITSD.dumps(user.id)
url = 'http://{}/user/login/{}/'.format(_WEB_HOST, key)
body = body.render(url=url)
msg = {
"personalizations": [
def generate_token(email):
ser = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
return ser.dumps(email, current_app.config.get('DANGEROUS_SALT'))
)
field_name = _get_config(
token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
message='A field name is required to use CSRF.'
)
time_limit = _get_config(
time_limit, 'WTF_CSRF_TIME_LIMIT', 3600, required=False
)
if not data:
raise ValidationError('The CSRF token is missing.')
if field_name not in session:
raise ValidationError('The CSRF session token is missing.')
s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')
try:
token = s.loads(data, max_age=time_limit)
except SignatureExpired:
raise ValidationError('The CSRF token has expired.')
except BadData:
raise ValidationError('The CSRF token is invalid.')
if not safe_str_cmp(session[field_name], token):
raise ValidationError('The CSRF tokens do not match.')
def login(self, **kwargs):
if kwargs.get('email', '') != USER_MAIL:
return None
# Limit password length to 1024:
password = kwargs.get('password', '')[0:1024].encode()
password_hash = scrypt(password, salt=USER_SALT, n=2, r=8, p=1)
if password_hash != USER_HASH:
return None
auth = URLSafeTimedSerializer(SECRET_KEY).dumps({'email': USER_MAIL})
return self.create_cookie(auth, SESSION_MAX_AGE)
def get_signing_serializer(self, app):
if not app.secret_key:
return None
signer_kwargs = dict(
key_derivation=self.key_derivation,
digest_method=self.digest_method
)
return URLSafeTimedSerializer(app.secret_key, salt=self.salt,
serializer=self.serializer,
signer_kwargs=signer_kwargs)
def _gen_reset_otk(self, user):
# generate the reset key and sign it
reset_signer = itsdangerous.URLSafeTimedSerializer(
self.config.reset_secret, 'password-recovery')
# we include a snip of the current password hash so that the OTK can't
# be used again once the password is changed. And hash it to be extra
# obscure
return reset_signer.dumps((user['name'], user['password'][-4:]))