Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
#!/usr/bin/python
# -*- coding: utf-8 -*-
import os
from . import Opml, Feed
from . import HackerNews, Reddit
from tornado import gen, queues
from tornado.ioloop import IOLoop
from concurrent.futures import ThreadPoolExecutor
from motor.motor_tornado import MotorClient
thread_pool = ThreadPoolExecutor(2)
feeds = queues.Queue()
entries = queues.Queue()
client = MotorClient(os.environ['MONGO_ENTRIES'])
db = client['entries-by-votes']
ENTRIES_AGE = float(os.environ.get('ENTRIES_AGE', 14))
@gen.coroutine
def do_insert_entry(entry):
yield db.entries.update_one(
{'link': entry['link']}, {'$set': entry}, upsert=True
)
@gen.coroutine
def get_feeds():
return (yield thread_pool.submit(Opml().get_feeds))
def __init__(self):
handlers = urlpatterns
conn = MotorClient(Settings.MONGO_HOST,Settings.MONGO_PORT)
self.db = conn[Settings.DB_NAME]
settings = dict(
template_path = Settings.TEMPLATE_PATH,
static_path = Settings.STATIC_PATH,
debug = Settings.DEBUG,
)
tornado.web.Application.__init__(self,handlers=handlers,**settings)
def __init__(self, name: str, connection_uri: str, logSizeBytes: int=100*1024*1024) -> None:
self.name = name
self.conn = motor.motor_tornado.MotorClient(connection_uri)
self.db = self.conn['hopps_{}'.format(name)]
self.commit_collection = None
self.security_collection = self.db['security']
self.__ready = self.db.create_collection('committed',
capped=True,
size=logSizeBytes)
""" 初始化mongodb连接
"""
if username and password:
username = quote_plus(username)
password = quote_plus(password)
host = quote_plus(host)
uri = 'mongodb://{username}:{password}@{host}/{dbname}'.format(
username=username,
password=password,
host=host,
dbname=dbname,
)
else:
uri = "mongodb://{host}/{dbname}".format(host=host, dbname=dbname)
mongo_client = motor.motor_tornado.MotorClient(uri)
global MONGO_CONN
MONGO_CONN = mongo_client
logger.info('create mongodb connection pool.')
try:
f = open("./local/settings.json")
settings = json.load(f)
except Exception:
# В случае проблем с чтением настроек из файла, оставим пустые настройки.
settings = {}
# Базовые настройки запуска системы
define("port", default=settings["system_port"], help="run on the given port", type=int)
# Настройки подключения к базе данных MongoDB
client_motor = db_motor = None
settings_db = settings.get("db", {})
for connection_name, setting_db in settings_db.items():
client_motor = MotorClient(setting_db["host"], setting_db["port"])
# todo Разрулить с доступами к базам
db_motor = client_motor[setting_db["name"]]
# Настройки подключения к Redis
client_redis = tredis.RedisClient(host=settings["redis"]["host"], port=settings["redis"]["port"])
# Настройки SSL
try:
ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_ctx.load_cert_chain(settings["ssl"]["certificates"][0], settings["ssl"]["certificates"][1])
except KeyError:
ssl_ctx = None
# Настройки приложения - доступны внутри хендлеров через self.settings
system_settings = {
"client_motor": db_motor,