Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
connector._resolve_host = make_mocked_coro(
[{'hostname': 'hostname', 'host': '127.0.0.1', 'port': 80,
'family': socket.AF_INET, 'proto': 0, 'flags': 0}])
tr, proto = mock.Mock(), mock.Mock()
tr.get_extra_info.return_value = None
self.loop.create_connection = make_mocked_coro((tr, proto))
req = ClientRequest(
'GET', URL('https://www.python.org'),
proxy=URL('http://proxy.example.com'),
loop=self.loop,
)
with self.assertRaisesRegex(OSError, "error message"):
self.loop.run_until_complete(connector._create_connection(
req, None, aiohttp.ClientTimeout()))
def session(self):
# create authentication parameters
if self.basic_auth:
auth = aiohttp.BasicAuth(self.basic_auth["username"],
self.basic_auth["password"])
else:
auth = None
return aiohttp.ClientSession(
headers=self.headers,
auth=auth,
timeout=aiohttp.ClientTimeout(total=DEFAULT_REQUEST_TIMEOUT),
)
def session(self) -> aiohttp.ClientSession:
# create authentication parameters
if self.basic_auth:
auth = aiohttp.BasicAuth(
self.basic_auth["username"], self.basic_auth["password"]
)
else:
auth = None
return aiohttp.ClientSession(
headers=self.headers,
auth=auth,
timeout=aiohttp.ClientTimeout(total=DEFAULT_REQUEST_TIMEOUT),
)
def _prepare_timeout(
value: typing.Optional[typing.Union[base.Integer, base.Float, aiohttp.ClientTimeout]]
) -> typing.Optional[aiohttp.ClientTimeout]:
if value is None or isinstance(value, aiohttp.ClientTimeout):
return value
return aiohttp.ClientTimeout(total=value)
async def asyncRequest(url):
timeout = aiohttp.ClientTimeout(total=60*3)
ua = {'user-agent': randomUserAgent(), 'Cookie': ''}
async with aiohttp.ClientSession(headers=ua) as session:
try:
async with await session.get(url, timeout=timeout) as response:
return await response.read()
except aiohttp.client_exceptions.ClientConnectorError:
print(Fore.RED + "\n[x] Error while fetching data from Amazon!")
from socialscan import __version__
class QueryError(Exception):
pass
class PlatformChecker:
DEFAULT_HEADERS = {"User-agent": f"socialscan {__version__}", "Accept-Language": "en-GB,en-US;q=0.9,en;q=0.8"}
UNEXPECTED_CONTENT_TYPE_ERROR_MESSAGE = "Unexpected content type {}. You might be sending too many requests. Use a proxy or wait before trying again."
TOKEN_ERROR_MESSAGE = "Could not retrieve token. You might be sending too many requests. Use a proxy or wait before trying again."
TOO_MANY_REQUEST_ERROR_MESSAGE = "Requests denied by platform due to excessive requests. Use a proxy or wait before trying again."
TIMEOUT_DURATION = 15
client_timeout = aiohttp.ClientTimeout(connect=TIMEOUT_DURATION)
# Subclasses can implement 3 methods depending on requirements: prerequest(), check_username() and check_email()
# 1: Be as explicit as possible in handling all cases
# 2: Do not include any queries that will lead to side-effects on users (e.g. submitting sign up forms)
async def get_token(self):
"""
Retrieve and return platform token using the `prerequest` method specified in the class
Normal calls will not be able to take advantage of this as all tokens are retrieved concurrently
This only applies to when tokens are retrieved before main queries with -c
Adds 1-2s to overall running time but halves HTTP requests sent for bulk queries
"""
if self.prerequest_sent:
if self.token is None:
raise QueryError(self.TOKEN_ERROR_MESSAGE)
import aiohttp,asyncio
#各种超时设置:
#initial_timeout=40
#如果花掉initial_timeout的时间依然没有进世萌的门,则放弃这次投票
#global_timeout=210
#进入世萌成功后,如果超过global_timeout依然没有完成整个流程,
#并且全局尝试次数global_retry用完,则放弃这次投票
#以上所有超时设置似乎都没啥意义
request_timeout=30 #单次http请求的默认超时。
#你可以随时暂时覆盖这一设置
captcha_timeout=60 #单次取验证码的默认超时
timeoutConfig=aiohttp.ClientTimeout(total=request_timeout)
captchaTimeoutConfig=aiohttp.ClientTimeout(total=captcha_timeout)
from charaSelector import selector
def captcha_server_generator():#每调用一次next(csGen),产生一个captchaServers中的url
length=len(captchaServers)
i=0
while 1:
yield captchaServers[i]
i+=1
if(i>=length):
i=0
csGen=captcha_server_generator()
from base64 import b64encode
from io import BytesIO
async def async_main():
deploy_config = get_deploy_config()
auth_ns = deploy_config.service_ns('auth')
tokens = get_tokens()
if auth_ns not in tokens:
print('Not logged in.')
return
headers = service_auth_headers(deploy_config, 'auth')
async with aiohttp.ClientSession(
raise_for_status=True, timeout=aiohttp.ClientTimeout(total=60), headers=headers) as session:
async with session.post(deploy_config.url('auth', '/api/v1alpha/logout')):
pass
auth_ns = deploy_config.service_ns('auth')
del tokens[auth_ns]
tokens.write()
print('Logged out.')
async def get_response(self, url: str) -> dict:
"""Get responses from twitch after checking rate limits"""
await self.oauth_check()
header = await self.get_header()
await self.wait_for_rate_limit_reset()
async with aiohttp.ClientSession() as session:
async with session.get(
url, headers=header, timeout=aiohttp.ClientTimeout(total=None)
) as resp:
remaining = resp.headers.get("Ratelimit-Remaining")
if remaining:
self.rate_limit_remaining = int(remaining)
reset = resp.headers.get("Ratelimit-Reset")
if reset:
self.rate_limit_resets.add(int(reset))
if resp.status == 429:
log.info("Trying again")
return await self.get_response(url)
return await resp.json()