Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if not config.get('release_bench'):
print('bench not configured to release')
sys.exit(1)
global github_username, github_password
github_username = config.get('github_username')
github_password = config.get('github_password')
if not github_username:
github_username = click.prompt('Username', type=str)
if not github_password:
github_password = getpass.getpass()
r = requests.get('https://api.github.com/user', auth=HTTPBasicAuth(github_username, github_password))
r.raise_for_status()
fullpath = blobdir + path.replace('/',os.sep)
# now PUT blob
if args.verbose:
sprint("Uploading blob " + f)
response = doHttpPostPut(url,fullpath, True,headers )
if response and response.status_code >= 200 and response.status_code <= 250:
if args.verbose:
sprint("Uploaded " + path + " payload successfully")
# makeBaseUri(True) is used for Fusion 4.0.1 compatibility but this requires us to make the link
lurl = makeBaseUri(True) + "/links"
# {"subject":"blob:lucid.googledrive-4.0.1.zip","object":"app:EnterpriseSearch","linkType":"inContextOf"}
payload = {"subject":"","object":"","linkType":"inContextOf"}
payload['subject'] = 'blob:' + blobId
payload['object'] = 'app:' + appName
lresponse = requests.put(lurl, auth=requests.auth.HTTPBasicAuth(args.user, args.password),headers={"Content-Type": "application/json"}, data=json.dumps(payload))
if lresponse and lresponse.status_code < 200 or lresponse.status_code > 250:
eprint("Non OK response: " + str(lresponse.status_code) + " when linking Blob " + blobId + " to App " + appName)
elif response and response.status_code:
eprint("Non OK response: " + str(response.status_code) + " when processing " + f)
def add_function(file_name):
files = {'up_file': open('functions/' + file_name,'r')}
r=requests.post(url + '/files/helper-files/' + file_name, auth=HTTPBasicAuth(authuser, authpwd), headers={ 'Accept' : 'application/json' }, verify=False, files=files)
print 'loaded healthbot function ' + file_name
# return r.status_code
def __authenticate(self, method, url, kwargs):
kwargs["auth"] = HTTPBasicAuth(self.env.config.get("foreman.user"), self.env.config.get("foreman.password"))
response = method(url, **kwargs)
self.__cookies = response.cookies
return response
def __call__(self, request):
url = self.creds.get('opsmgr').get('url') + '/uaa/oauth/token'
username = self.creds.get('opsmgr').get('username')
password = self.creds.get('opsmgr').get('password')
headers = { 'Accept': 'application/json' }
data = {
'grant_type': 'password',
'client_id': 'opsman',
'client_secret': '',
'username': username,
'password': password,
'response_type': 'token',
}
response = requests.post(url, data=data, verify=False, headers=headers)
if response.status_code != requests.codes.ok:
return requests.auth.HTTPBasicAuth(username, password)(request)
response = response.json()
access_token = response.get('access_token')
token_type = response.get('token_type')
request.headers['Authorization'] = token_type + ' ' + access_token
return request
def ips_rpc_fire(method, config, *parameters):
payload = {
"method": method,
"params": parameters,
"jsonrpc": config['jsonrpc'],
"id": 0,
}
return requests.post(
config['webservice_url'],
headers = config['headers'],
auth = HTTPBasicAuth(config['username'], config['password']),
data = json.dumps(payload)
)
# Run with Python 3
import requests
# 1. Get your keys at https://stepik.org/oauth2/applications/ (client type = confidential,
# authorization grant type = client credentials)
client_id = "..."
client_secret = "..."
# 2. Get a token
auth = requests.auth.HTTPBasicAuth(client_id, client_secret)
resp = requests.post('https://stepik.org/oauth2/token/',
data={'grant_type': 'client_credentials'},
auth=auth
)
token = resp.json()['access_token']
# 3. Call API (https://stepik.org/api/docs/) using this token.
# Example:
def get_user_id():
api_url = 'https://stepik.org/api/stepics/1' # should be stepic with "c"!
user = requests.get(api_url, headers={'Authorization': 'Bearer ' + token}).json()
return user['users'][0]['id']
_config.UPLOADED_PACKAGES_DEST = os.path.join(_config.PROJECT_PATH, 'static/upload/packages/')
_config.EXPORT_EXCEL_PATH = os.path.join(_config.PROJECT_PATH, 'static/download/export_excels')
_config.DOWNLOAD_TEMPLATE_EXCEL_DEST = os.path.join(
_config.PROJECT_PATH, 'static/download/templates/'
)
# sqlalchemy config
_config.SQLALCHEMY_DATABASE_URI = f"postgresql" \
f"://{_config.POSTGRES_USER}" \
f":{_config.POSTGRES_PASSWORD}" \
f"@{_config.POSTGRES_HOST}" \
f":{_config.POSTGRES_PORT}" \
f"/{_config.POSTGRES_DATABASE}" # postgresql://username:password@server/db
_config.SQLALCHEMY_POOL_SIZE = 32
_config.SQLALCHEMY_TRACK_MODIFICATIONS = True
# emqx config
_config.EMQX_AUTH = HTTPBasicAuth(
username=_config.EMQX_APP_ID,
password=_config.EMQX_APP_SECRET
)
# task schedule config
_config.PUBLISH_TASK_URL = f"http://{_config.TASK_SCHEDULER_NODE}/api/v1/publish_tasks"
_config.IMPORT_EXCEL_TASK_URL = f"http://{_config.TASK_SCHEDULER_NODE}/api/v1/import_excels"
_config.EXPORT_EXCEL_TASK_URL = f"http://{_config.TASK_SCHEDULER_NODE}/api/v1/export_excels"
def _create_project(self, project):
collection_name = self._get_collection_name(project)
self.create_database(collection_name)
# create index
payload = {
'index': {
'fields': ['taskid']
},
'name': collection_name
}
res = requests.post(self.base_url + collection_name + "/_index",
data=json.dumps(payload),
headers={"Content-Type": "application/json"},
auth=HTTPBasicAuth(self.username, self.password)).json()
self.index = res['id']
self._list_project()
def _compose_args(self):
args = {}
if not self.token or self.token == 'None':
args['auth'] = HTTPBasicAuth(self.user, self.password)
return args