Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if IS_PYMONGO_3:
test_conn = connect(
'mongoenginetest', alias='test1',
host='mongodb://username2:password@localhost/mongoenginetest'
)
self.assertRaises(OperationFailure, test_conn.server_info)
else:
self.assertRaises(
MongoEngineConnectionError,
connect, 'mongoenginetest', alias='test1',
host='mongodb://username2:password@localhost/mongoenginetest'
)
self.assertRaises(MongoEngineConnectionError, get_db, 'test1')
# Authentication succeeds with "authSource"
authd_conn = connect(
'mongoenginetest', alias='test2',
host=('mongodb://username2:password@localhost/'
'mongoenginetest?authSource=admin')
)
db = get_db('test2')
self.assertTrue(isinstance(db, pymongo.database.Database))
self.assertEqual(db.name, 'mongoenginetest')
# Clear all users
authd_conn.admin.system.users.remove({})
def _connect(conn_settings):
"""Given a dict of connection settings, create a connection to
MongoDB by calling mongoengine.connect and return its result.
"""
db_name = conn_settings.pop('name')
return mongoengine.connect(db_name, **conn_settings)
from bson import ObjectId
from mongoengine import connect
from pymongo import MongoClient, DESCENDING
from config import MONGO_HOST, MONGO_PORT, MONGO_DB, MONGO_USERNAME, MONGO_PASSWORD
from utils import is_object_id
connect(db=MONGO_DB, host=MONGO_HOST, port=MONGO_PORT)
class DbManager(object):
__doc__ = """
Database Manager class for handling database CRUD actions.
"""
def __init__(self):
self.mongo = MongoClient(host=MONGO_HOST,
port=MONGO_PORT,
username=MONGO_USERNAME,
password=MONGO_PASSWORD,
connect=False)
self.db = self.mongo[MONGO_DB]
def save(self, col_name: str, item: dict, **kwargs) -> None:
import cgi
import datetime
from flask import Flask, render_template, url_for
from models import Feed
import mongoengine
import re
import sys
app = Flask(__name__)
app.config.from_object('config')
app.add_url_rule('/favicon.ico', redirect_to=url_for('static', filename='favicon.ico'))
mongoengine.connect(app.config['MONGODB_DB'], port=app.config['MONGODB_PORT'])
@app.template_filter()
def twitterize(s):
"""
Jinja filter replaces twitter usernames and hashtags with anchor tags
"""
s = re.sub(r'(\s+|\A)@([a-zA-Z0-9\-_]*)\b',r'\1<a href="http://twitter.com/\2">@\2</a>', s)
s = re.sub(r'(\s+|\A)#([a-zA-Z0-9\-_]*)\b',r'\1<a href="http://search.twitter.com/search?q=%23\2">#\2</a>', s)
return s
@app.template_filter()
def time_since(dt, default='just now'):
"""
Jinja filter replaces a date-time with an age string ('3 hours ago', '2 days ago', etc.)
import sys
import traceback
import boto
import mongoengine as me
import rmc.models as m
import rmc.shared.constants as c
import rmc.shared.secrets as secrets
EMAIL_SENDER = 'UW Flow '
me.connect(c.MONGO_DB_RMC)
conn = boto.connect_ses(
aws_access_key_id=secrets.AWS_KEY_ID,
aws_secret_access_key=secrets.AWS_SECRET_KEY)
def title_renderer():
return 'We\'re Excited to Announce Email Signup!'
def html_body_renderer():
email_body = \
"""<p>Hey there,</p>
<p>A while back, you expressed interest in using email to sign up for <a href="https://uwflow.com">Flow</a>. We've received a lot of interest from users like you, and we've finally added email sign-up!</p>
def connect():
global serializer, fernet
crypto_info = settings.load()['database']['crypto']
mongo_host = settings.load()['database']['mongo'].get('host', '127.0.0.1')
mongo_port = settings.load()['database']['mongo'].get('port', '27017')
serializer = URLSafeSerializer(crypto_info['key'])
fernet = Fernet(crypto_info['fernet'])
mongoengine.connect(name, host=mongo_host, port=mongo_port, tz_aware=True)
def run(self):
logger.info("Starting SampleGenerator...")
connect(f3c_global_config.db_name, host=f3c_global_config.db_host)
while 1:
jobs = self._get_active_jobs()
for job in jobs:
if job.mutation_engine != 'external':
samples_queue = "%s-%s" % (job.name, "samples")
maximum = job.maximum_samples
if not self.wq.queue_exists(samples_queue):
self.wq.create_queue(samples_queue)
if not self.wq.queue_is_full(samples_queue, maximum):
for i in range(self.wq.get_pending_elements(samples_queue, maximum)):
try:
self.create_sample(job)
except:
logger.error("Error creating sample: %s" % str(sys.exc_info()[1]))
raise
import mongoengine
from discgraph.models import *
mongodb_client = mongoengine.connect('discgraph')
def main():
"""
The main entry point of the program.
"""
print("Connecting to database {}:{}".format(DB_HOST, DB_PORT))
if DB_USER and DB_PASS:
connect(DB_NAME, host=DB_HOST, port=DB_PORT, username=DB_USER, password=DB_PASS)
else:
connect(DB_NAME, host=DB_HOST, port=DB_PORT)
print('Generating authentication schema...')
for username, password in CONFIG['users'].items():
user = create_user(username, password, username=='admin')
print('[+][Created User] {}:{}'.format(user.username, password))
print('')
for rolename, roleconfig in CONFIG['roles'].items():
role = create_role(rolename, roleconfig['allowed_api_calls'], roleconfig['users'])
print('[+][Created Role] {}:{}:{}'.format(
rolename,
','.join(role.users),
','.join(role.allowed_api_calls)))
print('')
for owner, allowed_api_calls in CONFIG['api_keys'].items():
api_key = create_api_key(owner, allowed_api_calls)
print('[+][Created API Key] {}:{}:{}'.format(
import mongoengine
mongoengine.connect('mumblr-example')
import os
PROJECT_PATH = os.path.abspath(os.path.dirname(__file__))
#MEDIA_ROOT = os.path.join(PROJECT_PATH, '..', 'mumblr', 'static')
SECRET_KEY = '$geoon8_ymg-k)!9wl3wloq4&30w$rhc1*zv%h6m_&nza(4)nk'
RECAPTCHA_PUBLIC_KEY = "6LfFgQoAAAAAABQTj4YjuPbccgKtZStoiWtr7E5k"
RECAPTCHA_PRIVATE_KEY = "6LfFgQoAAAAAAM-0SAUTe7WxZ-thnWFfSpoc7sfJ"