Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def init_pycurl(debug=False):
"""
Provides an instances of pycurl with basic configuration
:return: pycurl instance
"""
_curl = pycurl.Curl()
_curl.setopt(pycurl.SSL_OPTIONS, pycurl.SSLVERSION_TLSv1_2)
_curl.setopt(pycurl.SSL_VERIFYPEER, False)
_curl.setopt(pycurl.SSL_VERIFYHOST, False)
_curl.setopt(pycurl.VERBOSE, debug)
_curl.setopt(pycurl.TIMEOUT, 10)
_curl.setopt(pycurl.COOKIEFILE, "")
_curl.setopt(pycurl.USERAGENT, 'APIFuzzer')
return _curl
postdata = 'csrfmiddlewaretoken='+csrfmiddlewaretoken[0]+'&username='+username+'&password='+password
buf = cStringIO.StringIO()
c = pycurl.Curl()
c.setopt(pycurl.URL, "https://instagram.com"+postaction[0])
c.setopt(pycurl.COOKIEFILE, "pycookie.txt")
c.setopt(pycurl.COOKIEJAR, "pycookie.txt")
c.setopt(pycurl.WRITEFUNCTION, buf.write)
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.setopt(pycurl.ENCODING, "")
c.setopt(pycurl.SSL_VERIFYPEER, 0)
c.setopt(pycurl.SSL_VERIFYHOST, 0)
c.setopt(pycurl.REFERER, "https://instagram.com/accounts/login/?next=/oauth/authorize/%3Fclient_id%3D"+clientid[0]+"%26redirect_uri%3Dhttp%3A//web.stagram.com/%26response_type%3Dcode%26scope%3Dlikes%2Bcomments%2Brelationships")
useragent = random.choice(browsers) + str(random.randrange(1,9)) + "." + str(random.randrange(0,50)) + " (" + random.choice(operatingsystems) + "; " + random.choice(operatingsystems) + "; rv:" + str(random.randrange(1,9)) + "." + str(random.randrange(1,9)) + "." + str(random.randrange(1,9)) + "." + str(random.randrange(1,9)) + ")"
c.setopt(pycurl.USERAGENT, useragent)
c.setopt(pycurl.POST, 1)
c.setopt(pycurl.POSTFIELDS, postdata)
c.setopt(pycurl.POSTFIELDSIZE, len(postdata))
#c.setopt(pycurl.VERBOSE, True)
c.perform()
curlData = buf.getvalue()
buf.close()
# cert_file = os.path.expanduser(acertregexp.group(2))
# self.set_option(pycurl.SSL_VERIFYHOST, 0)
# self.set_option(pycurl.SSL_VERIFYPEER, 1)
# self.set_option(pycurl.SSH_PUBLIC_KEYFILE, key_file)
# self.set_option(pycurl.CAINFO, cert_file)
# self.set_option(pycurl.SSLCERT, cert_file)
# self.set_option(pycurl.SSLCERTTYPE, 'p12')
# self.set_option(pycurl.SSLCERTPASSWD, '1234')
# self.set_option(pycurl.SSLKEY, key_file)
# self.set_option(pycurl.SSLKEYPASSWD, '1234')
# for file in (key_file, cert_file):
# if not os.path.exists(file):
# print "\n[E] File '%s' doesn't exist\n" % file
# return
self.set_option(pycurl.SSL_VERIFYHOST, 0)
self.set_option(pycurl.SSL_VERIFYPEER, 0)
self.header.seek(0,0)
self.payload = ""
for count in range(0, self.retries):
time.sleep(self.delay)
if self.dropcookie:
self.set_option(pycurl.COOKIELIST, 'ALL')
nocookie = ['Set-Cookie: ', '']
self.set_option(pycurl.HTTPHEADER, self.fakeheaders + nocookie)
try:
self.handle.perform()
except:
return
return self.payload
'X-IG-Capabilities: 3QI=',
'Content-type: application/x-www-form-urlencoded; charset=UTF-8',
'Cookie2: $Version=1',
'Accept-Language: en-US'
]
ch = pycurl.Curl()
ch.setopt(pycurl.URL, Constants.API_URL + endpoint)
ch.setopt(pycurl.USERAGENT, self.userAgent)
ch.setopt(pycurl.WRITEFUNCTION, buffer.write)
ch.setopt(pycurl.FOLLOWLOCATION, True)
ch.setopt(pycurl.HEADER, True)
ch.setopt(pycurl.HTTPHEADER, headers)
ch.setopt(pycurl.VERBOSE, False)
ch.setopt(pycurl.SSL_VERIFYPEER, self.verifyPeer)
ch.setopt(pycurl.SSL_VERIFYHOST, self.verifyHost)
ch.setopt(pycurl.COOKIEFILE, self.parent.IGDataPath + self.parent.username + '-cookies.dat')
ch.setopt(pycurl.COOKIEJAR, self.parent.IGDataPath + self.parent.username + '-cookies.dat')
if post:
ch.setopt(pycurl.POST, True)
ch.setopt(pycurl.POSTFIELDS, post)
if self.parent.proxy:
ch.setopt(pycurl.PROXY, self.parent.proxyHost)
if self.parent.proxyAuth:
ch.setopt(pycurl.PROXYUSERPWD, self.parent.proxyAuth)
ch.perform()
resp = buffer.getvalue().decode("utf-8")
header_len = ch.getinfo(pycurl.HEADER_SIZE)
def _init_curl(self, verb, url, headers,
resp_body_file=None):
self.c = pycurl.Curl()
self.c.resp_header_buf = None
self.c.resp_body_buf = None
self.c.resp_body_file = None
self.c.setopt(pycurl.DEBUGFUNCTION, self._curl_log)
self.c.setopt(pycurl.VERBOSE, 1)
self.c.setopt(pycurl.FOLLOWLOCATION, 1)
self.c.setopt(pycurl.MAXREDIRS, 10)
self.c.setopt(pycurl.SSL_VERIFYHOST, 0)
self.c.setopt(pycurl.SSL_VERIFYPEER, 0)
#self.c.setopt(pycurl.CONNECTTIMEOUT, 100)
#self.c.setopt(pycurl.TIMEOUT, 60*60*3)
self.c.unsetopt(pycurl.CUSTOMREQUEST)
if verb == 'GET' : self.c.setopt(pycurl.HTTPGET, True)
elif verb == 'PUT' : self.c.setopt(pycurl.UPLOAD , True)
elif verb == 'POST' : self.c.setopt(pycurl.POST , True)
elif verb == 'HEAD' : self.c.setopt(pycurl.NOBODY, True)
elif verb == 'DELETE' : self.c.setopt(pycurl.CUSTOMREQUEST, 'DELETE')
else: raise KeyError('unknown verb ' + verb)
self.c.setopt(pycurl.URL, url)
if headers:
headers = ['%s: %s'%(k, v) for (k,v) in headers.items()]
curl.setopt(pycurl.NOSIGNAL, not use_signal)
# Whether to verify remote peer's CN
if verify_hostname:
# curl_easy_setopt(3): "When CURLOPT_SSL_VERIFYHOST is 2, that
# certificate must indicate that the server is the server to which you
# meant to connect, or the connection fails. [...] When the value is 1,
# the certificate must contain a Common Name field, but it doesn't matter
# what name it says. [...]"
curl.setopt(pycurl.SSL_VERIFYHOST, 2)
else:
curl.setopt(pycurl.SSL_VERIFYHOST, 0)
if cafile or capath or use_curl_cabundle:
# Require certificates to be checked
curl.setopt(pycurl.SSL_VERIFYPEER, True)
if cafile:
curl.setopt(pycurl.CAINFO, str(cafile))
if capath:
curl.setopt(pycurl.CAPATH, str(capath))
# Not changing anything for using default CA bundle
else:
# Disable SSL certificate verification
curl.setopt(pycurl.SSL_VERIFYPEER, False)
if proxy is not None:
curl.setopt(pycurl.PROXY, str(proxy))
# Timeouts
if connect_timeout is not None:
curl.setopt(pycurl.CONNECTTIMEOUT, connect_timeout)
if timeout is not None:
def _curl(url,ciphers,poc):
try:
import pycurl, tempfile
# out_temp = tempfile.TemporaryFile(mode='w+')
# fileno = out_temp.fileno()
c = pycurl.Curl()
c.setopt(c.URL, url + poc)
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.setopt(pycurl.SSL_CIPHER_LIST,ciphers)
c.setopt(pycurl.SSL_VERIFYPEER, 0)
c.setopt(pycurl.CONNECTTIMEOUT, 5)
c.setopt(pycurl.TIMEOUT, 5)
c.setopt(pycurl.SSL_VERIFYHOST, 0)
c.setopt(pycurl.PROXY, "127.0.0.1")
c.setopt(pycurl.PROXYPORT, 7999)
c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5)
with tempfile.NamedTemporaryFile() as fp:
c.setopt(pycurl.WRITEHEADER, fp)
c.setopt(pycurl.WRITEDATA, fp)
c.perform()
# out_temp.seek(0)
# rt = out_temp.read()
return c.getinfo(pycurl.HTTP_CODE)
except Exception as e:
pass
Download a file with curl streaming it directly to disk
"""
ckey, cert = self.getKeyCert()
capath = self.getCAPath()
import pycurl
hbuf = BytesIO()
with open(fileName, "wb") as fp:
curl = pycurl.Curl()
curl.setopt(pycurl.URL, url)
curl.setopt(pycurl.WRITEDATA, fp)
curl.setopt(pycurl.HEADERFUNCTION, hbuf.write)
if capath:
curl.setopt(pycurl.CAPATH, capath)
curl.setopt(pycurl.SSL_VERIFYPEER, True)
else:
curl.setopt(pycurl.SSL_VERIFYPEER, False)
if ckey:
curl.setopt(pycurl.SSLKEY, ckey)
if cert:
curl.setopt(pycurl.SSLCERT, cert)
curl.setopt(pycurl.FOLLOWLOCATION, 1)
curl.perform()
curl.close()
header = ResponseHeader(hbuf.getvalue())
if header.status < 200 or header.status >= 300:
raise RuntimeError('Reading %s failed with code %s' % (url, header.status))
return fileName, header
if request.decompress_response:
curl.setopt(pycurl.ENCODING, "gzip,deflate")
else:
curl.setopt(pycurl.ENCODING, "none")
if request.proxy_host and request.proxy_port:
curl.setopt(pycurl.PROXY, request.proxy_host)
curl.setopt(pycurl.PROXYPORT, request.proxy_port)
if request.proxy_username:
credentials = '%s:%s' % (request.proxy_username,
request.proxy_password)
curl.setopt(pycurl.PROXYUSERPWD, credentials)
else:
curl.setopt(pycurl.PROXY, '')
curl.unsetopt(pycurl.PROXYUSERPWD)
if request.validate_cert:
curl.setopt(pycurl.SSL_VERIFYPEER, 1)
curl.setopt(pycurl.SSL_VERIFYHOST, 2)
else:
curl.setopt(pycurl.SSL_VERIFYPEER, 0)
curl.setopt(pycurl.SSL_VERIFYHOST, 0)
if request.ca_certs is not None:
curl.setopt(pycurl.CAINFO, request.ca_certs)
else:
# There is no way to restore pycurl.CAINFO to its default value
# (Using unsetopt makes it reject all certificates).
# I don't see any way to read the default value from python so it
# can be restored later. We'll have to just leave CAINFO untouched
# if no ca_certs file was specified, and require that if any
# request uses a custom ca_certs file, they all must.
pass
if request.allow_ipv6 is False:
def _set_common(self, c):
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.setopt(pycurl.AUTOREFERER, 1)
c.setopt(pycurl.ENCODING, self._accept_encoding)
c.setopt(pycurl.MAXREDIRS, 255)
c.setopt(pycurl.CONNECTTIMEOUT, 30)
c.setopt(pycurl.TIMEOUT, 300)
c.setopt(pycurl.NOSIGNAL, 1)
if self._proxy:
c.setopt(pycurl.PROXY, self._proxy)
if self._url.scheme == 'https':
c.setopt(pycurl.SSLVERSION, 3)
c.setopt(pycurl.SSL_VERIFYPEER, 0)