Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
policy_text = ''
policy_text += '{'
policy_text += '"Version":"1",'
policy_text += '"Statement":[{'
policy_text += '"Action":["oss:*"],'
policy_text += '"Effect":"Allow",'
policy_text += '"Principal":["{0}"],'.format(OSS_PAYER_UID)
policy_text += '"Resource": ["acs:oss:*:*:{0}","acs:oss:*:*:{0}/*"]'.format(OSS_BUCKET)
policy_text += '}]}'
self.bucket.put_bucket_policy(policy_text)
# Enable bucket request payment
result = self.bucket.put_bucket_request_payment(PAYER_REQUESTER)
self.assertEqual(result.status, 200)
self.payer_bucket = oss2.Bucket(oss2.Auth(OSS_PAYER_ID, OSS_PAYER_SECRET), OSS_ENDPOINT, OSS_BUCKET)
def test_upload_sequenial(self):
endpoint = "http://oss-cn-shanghai.aliyuncs.com"
auth = oss2.Auth(OSS_ID, OSS_SECRET)
bucket_name = OSS_BUCKET + "-test-upload-sequential"
bucket = oss2.Bucket(auth, endpoint, bucket_name)
bucket.create_bucket()
key = "test-upload_sequential"
content = random_bytes(10 * 100 * 1024)
pathname = self._prepare_temp_file(content)
oss2.resumable_upload(bucket, key, pathname, multipart_threshold=200 * 1024, part_size=100 * 1024, num_threads=6)
result = bucket.get_object(key)
self.assertIsNone(result.resp.headers.get('Content-MD5'))
# single thread in sequential mode.
params = {'sequential': ''}
oss2.resumable_upload(bucket, key, pathname, multipart_threshold=200 * 1024, part_size=100 * 1024, num_threads=1,
params=params)
object = object.split('.')
dm_name = object[0]
package_name = object[0]
if dm_name in Files:
project_file = Files[dm_name]
package_name = project_file.split('.')[0]
dm_type = project_file.split('.')[-1]
if len(project_file.split('.')) >2:
dm_type = '.'.join(project_file.split('.')[1:])
package = '%s-%s.%s'% (package_name, version, dm_type)
project_path = '%s/%s/%s' % (dockerfile_path, dm_name, project_file)
if not os.path.exists(project_path):
try:
Redis.lpush(redis_key, '%s package download from oss ......' %package)
_flow_log('%s package download from oss ......' %package)
auth = oss2.Auth(oss_id, oss_key)
bucket = oss2.Bucket(auth, oss_url, 'ops')
oss_project_path = None
try:
if not os.path.exists('%s/%s' %(dockerfile_path,dm_name)):
os.mkdir('%s/%s' %(dockerfile_path,dm_name))
for obj in oss2.ObjectIterator(bucket):
if obj.key.endswith('.war') or obj.key.endswith('.tar.gz') or obj.key.endswith('.jar'):
obj_name = obj.key.split('/')[-1].replace('_','-')
if obj_name.startswith(package_name) and version in obj_name:
oss_project_path = obj.key
break
except Exception as e:
logging.error(e)
if oss_project_path:
#尝试3次下载
for i in range(3):
def connect_OSS(access_key_id,access_key_secret,bucket_name,endpoint):
# verify parameters
for param in (access_key_id, access_key_secret, bucket_name, endpoint):
assert '<' not in param, '请设置参数:' + param
# create Bucket object
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name)
print("connect succeed: " + bucket_name)
return bucket
def auth(self):
auth = oss2.Auth(self.cloud['AccessKeyId'], self.cloud['AccessKeySecret'])
endpoint = self.cloud['Region'] + '.aliyuncs.com'
self.client = oss2.Bucket(auth, endpoint, self.cloud['Bucket'])
def get_bucket(self, path):
path = self.normalize_separate_char(path)
bucket_name = self.get_oss_bucket_name(path)
bucket = oss2.Bucket(oss2.Auth(self.access_id, self.access_key), self.endpoint, bucket_name, app_name=defaults.app_name)
return bucket
def from_crawler(cls, crawler):
s = cls()
import oss2
aid = 'kkkkkkkkkkkkkkkkkkkkkkkk'
ack = 'vvvvvvvvvvvvvvvvvvvvvvvvvvvvvv'
enp = 'http://oss-cn-hangzhou.aliyuncs.com'
_bucket = ''
VOssPipeline.BUCKET_STORE = oss2.Bucket(oss2.Auth(aid,ack), enp, _bucket)
return s
def process_item(self, item, spider):
def conn(self):
auth = oss2.Auth(self.key,self.secret)
self.bucket = oss2.Bucket(auth,self.region,self.bucket_name)
def ali_upload_file(self, source_file, filename):
domain_prefix = self.qiniu_domain
# endpoint = 'http://oss-cn-shenzhen.aliyuncs.com' # 你的Bucket处于的区域
auth = oss2.Auth(self.qiniu_access_key, self.qiniu_secret_key)
bucket = oss2.Bucket(auth, self.endpoint, self.qiniu_bucket_name)
# result = bucket.put_object(filename, source_file) # 上传
# if result.status == 200:
# return domain_prefix + filename
new_filename = '%s/' % self.subusr_id + filename
result = bucket.put_object(new_filename, source_file) # 上传
if result.status == 200:
return domain_prefix + new_filename
return None