Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _curl_handle():
curl_head = Curl()
curl_head.setopt(pycurl.SSL_VERIFYPEER, 0)
curl_head.setopt(pycurl.SSL_VERIFYHOST, 0)
curl_head.setopt(pycurl.NOBODY, 1)
curl_head.setopt(pycurl.TIMEOUT, HttpDirectory.TIMEOUT)
curl_head.setopt(pycurl.USERAGENT, config.HEADERS["User-Agent"])
return curl_head
def uploadFile(fname):
dest_url = b'aHR0cHM6Ly9pbWcudmltLWNuLmNvbS8=' # heihei
dest_url = base64.b64decode(dest_url).decode()
c = pycurl.Curl()
c.setopt(pycurl.URL, dest_url)
c.setopt(pycurl.POST, 1)
filename = fname
c.setopt(c.HTTPPOST, [(('file', (c.FORM_FILE, filename)))])
c.setopt(pycurl.SSL_VERIFYPEER, 0)
c.setopt(pycurl.SSL_VERIFYHOST, 0)
c.setopt(pycurl.USERAGENT, 'curl/7.45.0')
# c.setopt(pycurl.VERBOSE, 1)
# c.setopt(pycurl.PROXY, '127.0.0.1:8117')
# c.setopt(pycurl.PROXYTYPE_HTTP, 1)
outval = io.BytesIO()
hdrval = io.BytesIO()
c.setopt(pycurl.WRITEFUNCTION, outval.write)
c.setopt(pycurl.HEADERFUNCTION, hdrval.write)
c.setopt(pycurl.TIMEOUT, 107) # 这网烂成如此,不容易
c.perform()
c.close()
# print(outval.getvalue().decode(), hdrval.getvalue().decode())
url = outval.getvalue().decode()
if not url.startswith('https://'): print('upload error: ', hdrval.getvalue())
if not downtotal:
if size and downcurrent:
item.progress(partsize+downcurrent, size)
else:
item.progress(partsize+downcurrent,
partsize+downtotal)
handle.setopt(pycurl.URL, str(url))
handle.setopt(pycurl.OPT_FILETIME, 1)
handle.setopt(pycurl.NOPROGRESS, 0)
handle.setopt(pycurl.PROGRESSFUNCTION, progress)
handle.setopt(pycurl.WRITEDATA, local)
handle.setopt(pycurl.FOLLOWLOCATION, 1)
handle.setopt(pycurl.MAXREDIRS, 5)
handle.setopt(pycurl.HTTPHEADER, ["Pragma:"])
handle.setopt(pycurl.USERAGENT, "smart/" + VERSION)
# check if we have a valid local file and use I-M-S
if fetcher.validate(item, localpath):
handle.setopt(pycurl.TIMECONDITION,
pycurl.TIMECONDITION_IFMODSINCE)
mtime = os.path.getmtime(localpath)
if url.scheme == "ftp":
mtime += 1 # libcurl handles ftp mtime wrongly
handle.setopt(pycurl.TIMEVALUE, int(mtime))
else:
# reset the I-M-S option
handle.setopt(pycurl.TIMECONDITION,
pycurl.TIMECONDITION_NONE)
self._lock.acquire()
multi.add_handle(handle)
def init_pycurl(debug=False):
"""
Provides an instances of pycurl with basic configuration
:return: pycurl instance
"""
_curl = pycurl.Curl()
_curl.setopt(pycurl.SSL_OPTIONS, pycurl.SSLVERSION_TLSv1_2)
_curl.setopt(pycurl.SSL_VERIFYPEER, False)
_curl.setopt(pycurl.SSL_VERIFYHOST, False)
_curl.setopt(pycurl.VERBOSE, debug)
_curl.setopt(pycurl.TIMEOUT, 10)
_curl.setopt(pycurl.COOKIEFILE, "")
_curl.setopt(pycurl.USERAGENT, 'APIFuzzer')
return _curl
def dxDown(url, fullpath):
c=pycurl.Curl() # 縮寫一下
c.setopt(c.FOLLOWLOCATION, True) # 允許重定向
c.setopt(pycurl.USERAGENT, b"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)") # 模擬瀏覽器
c.setopt(pycurl.URL, url) # 訪問指定網址
c.setopt(pycurl.COOKIEJAR, 'cookie.txt') # 把 cookie 存到文件
c.setopt(pycurl.COOKIEFILE, "cookie.txt") # 用文件掛 cookie
f = open(fullpath, 'wb') # 定義一個文件
c.setopt(c.WRITEDATA, f) # 指定返回信息的寫入文件,或作 c.setopt(c.WRITEFUNCTION, f.write)
c.perform() # 獲得服務器返回信息
f.close()
if c.getinfo(pycurl.HTTP_CODE) != 200:
os.remove(fullpath)
print("Failed!")
def poke():
#enter infinite poke loop
while True:
buf = cStringIO.StringIO()
c = pycurl.Curl()
c.setopt(pycurl.URL, "https://www.facebook.com/pokes?notif_t=poke")
c.setopt(pycurl.COOKIEFILE, "pycookie.txt")
c.setopt(pycurl.COOKIEJAR, "pycookie.txt")
c.setopt(pycurl.WRITEFUNCTION, buf.write)
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.setopt(pycurl.ENCODING, "")
c.setopt(pycurl.USERAGENT, "Mozilla/5.0 (Windows; U; Windows NT 6.0; en-US; rv:1.9.2) Gecko/20100115 Firefox/3.6 (.NET CLR 3.5.30729)")
c.perform()
curlData = buf.getvalue()
buf.close()
#print curlData
pokebackdata = re.findall(ur"<div class="\"pokeHeader"><a data-hovercard="\"\/ajax\/hovercard\/user.php\?id=([0-9]*)\"" href="\"(.*?)\"">([^<]*)<\/a> has poked you.<\/div>",curlData)
pokesuggestdata = re.findall(ur"</a><div class="\"userSuggestionName\""><a data-hovercard="\"\/ajax\/hovercard\/user.php\?id=([0-9]*)\"" href="\"(.*?)\""><span class="\"fwb\""></span></a><a data-hovercard="\"\/ajax\/hovercard\/user.php\?id=([0-9]*)\"" href="\"https:\/\/www.facebook.com\/(.*?)\"">([^<]*)<\/a><\/span><\/div>",curlData)
#print pokebackdata
userid = re.findall(ur"\"user\":\"([0-9]*)\"",curlData)
fb_dtsg = re.findall(ur"name=\"fb_dtsg\" value=\"([^\"]*)",curlData)
if len(pokesuggestdata)>0 and newpoke:
for victim in pokesuggestdata:
victimid = victim[1]
postdata = '__a=1&nctr[_mod]=pagelet_pysp&suggestion=1&__user='+str(userid[0])+'&fb_dtsg='+fb_dtsg[0]+'&uid='+str(victimid)
buf = cStringIO.StringIO()
c = pycurl.Curl()</a></div></div>
"""
http_error = None
http_code = 0
if output_file_name:
http_output = open(output_file_name, "w")
else:
http_output = StringIO.StringIO()
post_data = []
file_t = (file_t[0], (pycurl.FORM_FILE, file_t[1]))
post_data.append(file_t)
if data:
post_data += data.items()
# prepare
c = pycurl.Curl()
c.setopt(pycurl.USERAGENT, self.http_useragent)
c.setopt(pycurl.URL, url)
c.setopt(pycurl.WRITEFUNCTION, http_output.write)
c.setopt(c.POST, 1)
c.setopt(c.HTTPPOST, post_data)
if digest_user and digest_pass:
c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_DIGEST)
c.setopt(pycurl.USERPWD, "%s:%s" % (digest_user, digest_pass))
if self.debug:
c.setopt(c.VERBOSE, 1)
# perform
try:
c.perform()
except pycurl.error, e:
http_error = e
http_code = c.getinfo(pycurl.HTTP_CODE)
http_output.seek(0)
def set_agent(self, agent):
"""
Set the user agent.
"""
self.agent = agent
self.set_option(pycurl.USERAGENT, self.agent)
return agent