Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
self.uri = 'http://localhost:5984'
names = cloudant.Account(self.uri).uuids(4).json()['uuids']
# database names must start with a letter
names = ['a' + name for name in names]
self.db_name = names[0]
self.otherdb_name = names[1]
self.doc_name = names[2]
self.otherdoc_name = names[3]
self.test_doc = {
'herp': 'derp',
'name': 'Luke Skywalker'
}
self.test_otherdoc = {
'derp': 'herp',
'name': 'Larry, the Incorrigible Miscreant'
}
def setUp(self):
super(AsyncTest, self).setUp()
self.account = cloudant.Account(self.uri, async=True)
self.database = self.account.database(self.db_name)
self.document = self.database.document(self.doc_name)
def testCloudant(self):
account = cloudant.Account('garbados')
assert account.uri == "https://garbados.cloudant.com"
def testSync(self):
account = cloudant.Account(async=False)
database = account[self.db_name]
database.put().raise_for_status()
database.delete().raise_for_status()
def setUp(self):
super(AccountTest, self).setUp()
self.account = cloudant.Account(self.uri)
smtp_creds = decoded_config[key][0]['credentials']
smtp_flag = True
if smtp_flag:
smtp_host = str(smtp_creds['smtpHost'])
smtp_port = str(smtp_creds['smtpPort'])
cloudant_host = cloudant_creds['host']
cloudant_port = int(cloudant_creds['port'])
cloudant_username = cloudant_creds['username']
cloudant_password = cloudant_creds['password']
cloudant_url = str(cloudant_creds['url'])
# --- configuring cloudant
account = cloudant.Account(cloudant_username)
login = account.login(cloudant_username, cloudant_password)
assert login.status_code == 200
# create the database
db = account.database('twitter-influence-analyzer')
# put it on the server
response = db.put()
# check it out
print response.json
# ---- end of cloudant config ----------
# ---- SMTP configs from Bluemix
# Uncomment this part if the SMTP service becomes available in BlueMix
# print "This is the smtp host: ", smtp_host, " and this is the smtp port: ", smtp_port
dbPassword = ""
dbName = "iotfzonesample"
port = int(os.getenv('VCAP_APP_PORT', 80))
host = str(os.getenv('VCAP_APP_HOST', "0.0.0.0"))
# =============================================================================
# Choose application theme
# =============================================================================
theme = os.getenv('theme', "bluemix")
print("Using theme '%s'" % theme)
# =============================================================================
# Configure global properties
# =============================================================================
cloudantAccount = cloudant.Account(dbUsername, async=True)
future = cloudantAccount.login(dbUsername, dbPassword)
login = future.result(10)
assert login.status_code == 200
cloudantDb = cloudantAccount.database(dbName)
# Allow up to 10 seconds
response = cloudantDb.get().result(10)
if response.status_code == 200:
print(" * Database '%s' already exists (200)" % (dbName))
elif response.status_code == 404:
print(" * Database '%s' does not exist (404), creating..." % (dbName))
response = cloudantDb.put().result(10)
if response.status_code != 201:
print(" * Error creating database '%s' (%s)" % (dbName, response.status_code))
else:
print(" * Unexpected status code (%s) when checking for existence of database '%s'" % (status, dbName))
storage_object = {'path': '/Users/pascal/sky_collections/'}
cs = CrawlFileService(PROJECT_NAME, storage_object, CrawlFilePluginNews)
# --------- 2b. Cloudant --------------------------------------------
import cloudant
from sky.crawler_services import CrawlCloudantService
from sky.crawler_plugins import CrawlCloudantPluginNews
with open('cloudant.username') as f:
USERNAME = f.read()
with open('cloudant.password') as f:
PASSWORD = f.read()
account = cloudant.Account(USERNAME)
account.login(USERNAME, PASSWORD)
cs = CrawlCloudantService(PROJECT_NAME, account, CrawlCloudantPluginNews)
# --------- 2c. ElasticSearch -------------------------------------
import elasticsearch
from sky.crawler_services import CrawlElasticSearchService
from sky.crawler_plugins import CrawlElasticSearchPluginNews
es = elasticsearch.Elasticsearch([{'host': 'localhost', 'port': 9200}])
cs = CrawlElasticSearchService(PROJECT_NAME, es, CrawlElasticSearchPluginNews)
# --------- 2d. ZODB ----------------------------------------------