Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
os.chdir(FILE_DIR)
import config # noqa, needs to be loaded from local path
GIT_PATH = 'git'
PATCH_PATH = 'patch'
RUN_ENV = {'PATH': FILE_DIR + "/bin", 'LANG': 'en_US.UTF-8', 'LD_LIBRARY_PATH': FILE_DIR + "/lib"}
app = Flask(__name__)
app.secret_key = config.app_secret_key
mwoauth = MWOAuth(consumer_key=config.oauth_key, consumer_secret=config.oauth_secret)
mwoauth.handshaker.user_agent = 'Gerrit-Patch-Uploader by valhallasw using MWOAuth - http://tools.wmflabs.org/gerrit-patch-uploader'
app.register_blueprint(mwoauth.bp)
cache = FileSystemCache('cache')
def get_projects():
projects = cache.get('projects')
if projects is None:
p = subprocess.Popen(['ssh', 'gerrit', 'gerrit ls-projects'], stdout=subprocess.PIPE, env=RUN_ENV)
stdout, stderr = p.communicate()
projects = stdout.decode("utf-8", "replace").strip().split("\n")
cache.set('projects', projects)
return projects
@app.route("/")
def index():
author = session.get('author', '')
return render_template('index.html', projects=get_projects(), username=mwoauth.get_current_user(),
from app.dao import services_dao, templates_dao
from app.dao.service_sms_sender_dao import dao_get_service_sms_senders_by_id
from app.models import (
INTERNATIONAL_SMS_TYPE, SMS_TYPE, EMAIL_TYPE, LETTER_TYPE,
KEY_TYPE_TEST, KEY_TYPE_TEAM, SCHEDULE_NOTIFICATIONS
)
from app.service.utils import service_allowed_to_send_to
from app.v2.errors import TooManyRequestsError, BadRequestError, RateLimitError
from app import redis_store
from app.notifications.process_notifications import create_content_for_notification
from app.utils import get_public_notify_type_text
from app.dao.service_email_reply_to_dao import dao_get_reply_to_by_id
from app.dao.service_letter_contact_dao import dao_get_letter_contact_by_id
daily_message_limit_cache = SimpleCache(default_timeout=15)
def check_service_over_api_rate_limit(service, api_key):
if current_app.config['API_RATE_LIMIT_ENABLED'] and current_app.config['REDIS_ENABLED']:
cache_key = rate_limit_cache_key(service.id, api_key.key_type)
rate_limit = service.rate_limit
interval = 60
if redis_store.exceeded_rate_limit(cache_key, rate_limit, interval):
current_app.logger.error("service {} has been rate limited for throughput".format(service.id))
raise RateLimitError(rate_limit, interval, api_key.key_type)
def check_service_over_daily_message_limit(key_type, service):
if key_type == KEY_TYPE_TEST:
return
def _clear_cache():
global cache
if not cache:
cache = FileSystemCache(CACHE_DIR, CACHE_ENTRY_MAX, 0)
return cache.clear()
from datetime import datetime
from db import DB
from flask import Flask
from flask import Response
from flask import abort
from flask import redirect
from flask import request
from json import JSONEncoder
from hashlib import sha256
from os import environ
from urllib.parse import urlparse, quote
from werkzeug.routing import BaseConverter
app = Flask(__name__)
ttl = float(environ.get('TTL', 0)) #zero means infinity to the FileSystemCache
cache = FileSystemCache('/tmp/clinvar-miner', threshold=1000000) if ttl >= 0 else NullCache()
cache.clear() #delete the cache when the webserver is restarted
app.jinja_env.trim_blocks = True
app.jinja_env.lstrip_blocks = True
clinvar_versions = DB().dates()
#it's necessary to double-escape slashes because WSGI decodes them before passing the URL to Flask
class SuperEscapedConverter(BaseConverter):
@staticmethod
def to_python(value):
return value.replace('%2F', '/')
@staticmethod
def to_url(value):
return quote(value).replace('/', '%252F')
BLOCKED_QUESTION_FRAGMENTS = (
'webcache.googleusercontent.com',
)
STAR_HEADER = u('\u2605')
ANSWER_HEADER = u('{2} Answer from {0} {2}\n{1}')
NO_ANSWER_MSG = '< no answer given >'
CACHE_EMPTY_VAL = "NULL"
CACHE_DIR = appdirs.user_cache_dir('howdoi')
CACHE_ENTRY_MAX = 128
if os.getenv('HOWDOI_DISABLE_CACHE'):
cache = NullCache() # works like an always empty cache
else:
cache = FileSystemCache(CACHE_DIR, CACHE_ENTRY_MAX, default_timeout=0)
howdoi_session = requests.session()
class BlockError(RuntimeError):
pass
def _random_int(width):
bres = os.urandom(width)
if sys.version < '3':
ires = int(bres.encode('hex'), 16)
else:
ires = int.from_bytes(bres, 'little')
return ires
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from json import dumps
from deprecated.installer.libs import ProvisionDocker
from deprecated.installer.libs import str2bool
from flask import Flask, render_template, request, Response
from cachelib import SimpleCache
app = Flask(__name__)
cache = SimpleCache()
@app.route("/", methods=["GET"])
def hello():
return render_template("index.html")
@app.route("/install", methods=["POST"])
def install():
data = dict(dns=request.form['dns'],
workers=int(request.form['workers']),
perfmeter=str2bool(request.form['perfmeter']),
perfgun=str2bool(request.form['perfgun']),
sast=str2bool(request.form['sast']),
dast=str2bool(request.form['dast']),
grafana_dashboards=not str2bool(request.form['grafana_dashboards']),
import importlib.util # Testing installed packages
import io #Camera preview
import logging
import os
import psutil
import re #RegEx. Used in Copy Files
from smbus2 import SMBus # For I2C
import struct
import subprocess
import sys
import time
import gphoto2 as gp
from cachelib import SimpleCache
cache = SimpleCache()
from werkzeug.security import check_password_hash
from flask import Flask, flash, render_template, request, redirect, url_for, make_response, abort
from flask_login import LoginManager, current_user, login_user, logout_user, login_required, UserMixin, login_url
app = Flask(__name__)
app.secret_key = b'### Paste the secret key here. See the Setup docs ###' #Cookie for session messages
app.jinja_env.lstrip_blocks = True
app.jinja_env.trim_blocks = True
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = ''
# ////////////////////////////////
import datetime
from cachelib import SimpleCache
from app.clients.sms import PollableSMSClient
from app.clients.sms.utils import timed
from saplivelink365 import Configuration, ApiClient, AuthorizationApi, SMSV20Api
# See https://livelink.sapmobileservices.com/documentation/guides/sms-channel/delivery_statuses/#body-description
sap_response_map = {
'SENT': 'sending',
'DELIVERED': 'delivered',
'RECEIVED': 'delivered',
'ERROR': 'failed',
}
auth_cache = SimpleCache()
def get_sap_responses(status):
return sap_response_map[status]
class SAPSMSClient(PollableSMSClient):
def __init__(self, client_id=None, client_secret=None, *args, **kwargs):
super(SAPSMSClient, self).__init__(*args, **kwargs)
self._client_id = client_id
self._client_secret = client_secret
def init_app(self, logger, notify_host, *args, **kwargs):
self.logger = logger
self.notify_host = notify_host
Notification = 'Notification'
UnsubscribeConfirmation = 'UnsubscribeConfirmation'
class InvalidMessageTypeException(Exception):
pass
def verify_message_type(message_type: str):
try:
SNSMessageType(message_type)
except ValueError:
raise InvalidMessageTypeException(f'{message_type} is not a valid message type.')
certificate_cache = SimpleCache()
def get_certificate(url):
res = certificate_cache.get(url)
if res is not None:
return res
res = requests.get(url).content
certificate_cache.set(url, res, timeout=60 * 60) # 60 minutes
return res
# 400 counts as a permanent failure so SNS will not retry.
# 500 counts as a failed delivery attempt so SNS will retry.
# See https://docs.aws.amazon.com/sns/latest/dg/DeliveryPolicies.html#DeliveryPolicies
@ses_callback_blueprint.route('/notifications/email/ses', methods=['POST'])
def sns_callback_handler():
)
BLOCKED_QUESTION_FRAGMENTS = (
'webcache.googleusercontent.com',
)
STAR_HEADER = u('\u2605')
ANSWER_HEADER = u('{2} Answer from {0} {2}\n{1}')
NO_ANSWER_MSG = '< no answer given >'
CACHE_EMPTY_VAL = "NULL"
CACHE_DIR = appdirs.user_cache_dir('howdoi')
CACHE_ENTRY_MAX = 128
if os.getenv('HOWDOI_DISABLE_CACHE'):
cache = NullCache() # works like an always empty cache
else:
cache = FileSystemCache(CACHE_DIR, CACHE_ENTRY_MAX, default_timeout=0)
howdoi_session = requests.session()
class BlockError(RuntimeError):
pass
def _random_int(width):
bres = os.urandom(width)
if sys.version < '3':
ires = int(bres.encode('hex'), 16)
else:
ires = int.from_bytes(bres, 'little')