Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.key = key
self.ota = ota
self.iv = None
def setup_iv(self, iv=None):
self.iv = os.urandom(self.IV_LENGTH) if iv is None else iv
self.setup()
return self
def decrypt(self, s):
return self.cipher.decrypt(s)
def encrypt(self, s):
return self.cipher.encrypt(s)
@classmethod
def name(cls):
return cls.__name__.replace('_Cipher', '').replace('_', '-').lower()
class AEADCipher(BaseCipher):
PACKET_LIMIT = 16*1024-1
def setup_iv(self, iv=None):
self.iv = os.urandom(self.IV_LENGTH) if iv is None else iv
randkey = hmac.new(self.iv, self.key, hashlib.sha1).digest()
blocks_needed = (self.KEY_LENGTH + len(randkey) - 1) // len(randkey)
okm = bytearray()
output_block = b''
for counter in range(blocks_needed):
output_block = hmac.new(randkey, output_block + b'ss-subkey' + bytes([counter+1]), hashlib.sha1).digest()
okm.extend(output_block)
self.key = bytes(okm[:self.KEY_LENGTH])
self._nonce = 0
self._buffer = bytearray()
self._declen = None
self.setup()
@property
await reader_remote.read_until(b'\x00\x05\x00\x00')
header = (await reader_remote.read_n(1))[0]
await reader_remote.read_n(6 if header == 1 else (18 if header == 4 else (await reader_remote.read_n(1))[0]+2))
def udp_parse(self, data, **kw):
reader = io.BytesIO(data)
if reader.read(3) != b'\x00\x00\x00':
return
n = reader.read(1)[0]
if n not in (1, 3, 4):
return
host_name, port = socks_address(reader, n)
return host_name, port, reader.read()
def udp_connect(self, rauth, host_name, port, data, **kw):
return b'\x00\x00\x00\x03' + packstr(host_name.encode()) + port.to_bytes(2, 'big') + data
class HTTP(BaseProtocol):
def correct_header(self, header, **kw):
return header and header.isalpha()
async def parse(self, header, reader, writer, auth, authtable, httpget=None, **kw):
lines = header + await reader.read_until(b'\r\n\r\n')
headers = lines[:-4].decode().split('\r\n')
method, path, ver = HTTP_LINE.match(headers.pop(0)).groups()
lines = '\r\n'.join(i for i in headers if not i.startswith('Proxy-'))
headers = dict(i.split(': ', 1) for i in headers if ': ' in i)
url = urllib.parse.urlparse(path)
if method == 'GET' and not url.hostname and httpget:
for path, text in httpget.items():
if url.path == path:
authtable.set_authed()
if type(text) is str:
text = (text % dict(host=headers["Host"])).encode()
writer.write(f'{ver} 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nCache-Control: max-age=900\r\nContent-Length: {len(text)}\r\n\r\n'.encode() + text)
parser.add_argument('-v', dest='v', action='count', help='print verbose output')
parser.add_argument('--ssl', dest='sslfile', help='certfile[,keyfile] if server listen in ssl mode')
parser.add_argument('--pac', help='http PAC path')
parser.add_argument('--get', dest='gets', default=[], action='append', help='http custom {path,file}')
parser.add_argument('--auth', dest='authtime', type=int, default=86400*30, help='re-auth time interval for same ip (default: 86400*30)')
parser.add_argument('--sys', action='store_true', help='change system proxy setting (mac, windows)')
parser.add_argument('--reuse', dest='ruport', action='store_true', help='set SO_REUSEPORT (Linux only)')
parser.add_argument('--daemon', dest='daemon', action='store_true', help='run as a daemon (Linux only)')
parser.add_argument('--test', help='test this url for all remote proxies and exit')
parser.add_argument('--version', action='version', version=f'%(prog)s {__version__}')
args = parser.parse_args()
if args.test:
asyncio.get_event_loop().run_until_complete(test_url(args.test, args.rserver))
return
if not args.listen and not args.ulisten:
args.listen.append(ProxyURI.compile_relay('http+socks4+socks5://:8080/'))
args.httpget = {}
if args.pac:
pactext = 'function FindProxyForURL(u,h){' + (f'var b=/^(:?{args.block.__self__.pattern})$/i;if(b.test(h))return "";' if args.block else '')
for i, option in enumerate(args.rserver):
pactext += (f'var m{i}=/^(:?{option.match.__self__.pattern})$/i;if(m{i}.test(h))' if option.match else '') + 'return "PROXY %(host)s";'
args.httpget[args.pac] = pactext+'return "DIRECT";}'
args.httpget[args.pac+'/all'] = 'function FindProxyForURL(u,h){return "PROXY %(host)s";}'
args.httpget[args.pac+'/none'] = 'function FindProxyForURL(u,h){return "DIRECT";}'
for gets in args.gets:
path, filename = gets.split(',', 1)
with open(filename, 'rb') as f:
args.httpget[path] = f.read()
if args.sslfile:
sslfile = args.sslfile.split(',')
for option in args.listen:
if option.sslclient:
next_iv = bytearray(self.iv)
while 1:
data = self.cipher.encrypt(next_iv)
del next_iv[:segment_byte]
for i in range(segment_byte):
next_iv.append((yield data[i]))
def core_bit(self, segment_bit):
next_iv = int.from_bytes(self.iv, 'big')
mask = (1 << self.IV_LENGTH*8) - 1
while 1:
data = self.cipher.encrypt(next_iv)
next_iv = next_iv<>(7-i%8)&1)<<(segment_bit-1-i)
class CFB8Cipher(CFBCipher):
SEGMENT_SIZE = 8
class CFB1Cipher(CFBCipher):
SEGMENT_SIZE = 1
class CTRCipher(StreamCipher):
def setup(self):
self.stream = self.core()
self.cipher = self.CIPHER.new(self.key)
def core(self):
next_iv = int.from_bytes(self.iv, 'big')
while 1:
yield from self.cipher.encrypt(next_iv)
next_iv = 0 if next_iv >= (1<<(self.IV_LENGTH*8))-1 else next_iv+1
class OFBCipher(CTRCipher):
del next_iv[:segment_byte]
for i in range(segment_byte):
next_iv.append((yield data[i]))
def core_bit(self, segment_bit):
next_iv = int.from_bytes(self.iv, 'big')
mask = (1 << self.IV_LENGTH*8) - 1
while 1:
data = self.cipher.encrypt(next_iv)
next_iv = next_iv<>(7-i%8)&1)<<(segment_bit-1-i)
class CFB8Cipher(CFBCipher):
SEGMENT_SIZE = 8
class CFB1Cipher(CFBCipher):
SEGMENT_SIZE = 1
class CTRCipher(StreamCipher):
def setup(self):
self.stream = self.core()
self.cipher = self.CIPHER.new(self.key)
def core(self):
next_iv = int.from_bytes(self.iv, 'big')
while 1:
yield from self.cipher.encrypt(next_iv)
next_iv = 0 if next_iv >= (1<<(self.IV_LENGTH*8))-1 else next_iv+1
class OFBCipher(CTRCipher):
def core(self):
data = self.iv
while 1:
class BasePlugin(object):
async def init_client_data(self, reader, writer, cipher):
pass
async def init_server_data(self, reader, writer, cipher, raddr):
pass
def add_cipher(self, cipher):
pass
@classmethod
def name(cls):
return cls.__name__.replace('_Plugin', '').replace('__', '.').lower()
class Plain_Plugin(BasePlugin):
pass
class Origin_Plugin(BasePlugin):
pass
class Http_Simple_Plugin(BasePlugin):
async def init_client_data(self, reader, writer, cipher):
buf = await reader.read_until(b'\r\n\r\n')
data = buf.split(b' ')[:2]
data = bytes.fromhex(data[1][1:].replace(b'%',b'').decode())
reader._buffer[0:0] = data
writer.write(b'HTTP/1.1 200 OK\r\nConnection: keep-alive\r\nContent-Encoding: gzip\r\nContent-Type: text/html\r\nDate: ' + datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S GMT').encode() + b'\r\nServer: nginx\r\nVary: Accept-Encoding\r\n\r\n')
async def init_server_data(self, reader, writer, cipher, raddr):
writer.write(f'GET / HTTP/1.1\r\nHost: {raddr}\r\nUser-Agent: curl\r\nAccept-Encoding: gzip, deflate\r\nConnection: keep-alive\r\n\r\n'.encode())
await reader.read_until(b'\r\n\r\n')
TIMESTAMP_TOLERANCE = 5 * 60
class Tls1__2_Ticket_Auth_Plugin(BasePlugin):
pass
class Http_Simple_Plugin(BasePlugin):
async def init_client_data(self, reader, writer, cipher):
buf = await reader.read_until(b'\r\n\r\n')
data = buf.split(b' ')[:2]
data = bytes.fromhex(data[1][1:].replace(b'%',b'').decode())
reader._buffer[0:0] = data
writer.write(b'HTTP/1.1 200 OK\r\nConnection: keep-alive\r\nContent-Encoding: gzip\r\nContent-Type: text/html\r\nDate: ' + datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S GMT').encode() + b'\r\nServer: nginx\r\nVary: Accept-Encoding\r\n\r\n')
async def init_server_data(self, reader, writer, cipher, raddr):
writer.write(f'GET / HTTP/1.1\r\nHost: {raddr}\r\nUser-Agent: curl\r\nAccept-Encoding: gzip, deflate\r\nConnection: keep-alive\r\n\r\n'.encode())
await reader.read_until(b'\r\n\r\n')
TIMESTAMP_TOLERANCE = 5 * 60
class Tls1__2_Ticket_Auth_Plugin(BasePlugin):
CACHE = collections.deque(maxlen = 100)
async def init_client_data(self, reader, writer, cipher):
key = cipher.cipher(cipher.key).key
assert await reader.read_n(3) == b'\x16\x03\x01'
header = await reader.read_n(toint(await reader.read_n(2)))
assert header[:2] == b'\x01\x00'
assert header[4:6] == b'\x03\x03'
cacheid = header[6:28]
sessionid = header[39:39+header[38]]
assert cacheid not in self.CACHE
self.CACHE.append(cacheid)
utc_time = int(time.time())
assert hmac.new(key+sessionid, cacheid, hashlib.sha1).digest()[:10] == header[28:38]
assert abs(toint(header[6:10]) - utc_time) < TIMESTAMP_TOLERANCE
addhmac = lambda s: s + hmac.new(key+sessionid, s, hashlib.sha1).digest()[:10]
writer.write(addhmac((b"\x16\x03\x03" + packstr(b"\x02\x00" + packstr(b'\x03\x03' + addhmac(utc_time.to_bytes(4, 'big') + os.urandom(18)) + b'\x20' + sessionid + b'\xc0\x2f\x00\x00\x05\xff\x01\x00\x01\x00')) + (b"\x16\x03\x03" + packstr(b"\x04\x00" + packstr(os.urandom(random.randrange(164)*2+64))) if random.randint(0, 8) < 1 else b'') + b"\x14\x03\x03\x00\x01\x01\x16\x03\x03" + packstr(os.urandom(random.choice((32, 40)))))[:-10]))
pass
async def init_server_data(self, reader, writer, cipher, raddr):
pass
def add_cipher(self, cipher):
pass
@classmethod
def name(cls):
return cls.__name__.replace('_Plugin', '').replace('__', '.').lower()
class Plain_Plugin(BasePlugin):
pass
class Origin_Plugin(BasePlugin):
pass
class Http_Simple_Plugin(BasePlugin):
async def init_client_data(self, reader, writer, cipher):
buf = await reader.read_until(b'\r\n\r\n')
data = buf.split(b' ')[:2]
data = bytes.fromhex(data[1][1:].replace(b'%',b'').decode())
reader._buffer[0:0] = data
writer.write(b'HTTP/1.1 200 OK\r\nConnection: keep-alive\r\nContent-Encoding: gzip\r\nContent-Type: text/html\r\nDate: ' + datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S GMT').encode() + b'\r\nServer: nginx\r\nVary: Accept-Encoding\r\n\r\n')
async def init_server_data(self, reader, writer, cipher, raddr):
writer.write(f'GET / HTTP/1.1\r\nHost: {raddr}\r\nUser-Agent: curl\r\nAccept-Encoding: gzip, deflate\r\nConnection: keep-alive\r\n\r\n'.encode())
await reader.read_until(b'\r\n\r\n')
TIMESTAMP_TOLERANCE = 5 * 60
class Tls1__2_Ticket_Auth_Plugin(BasePlugin):
CACHE = collections.deque(maxlen = 100)
async def init_client_data(self, reader, writer, cipher):
key = cipher.cipher(cipher.key).key
packstr = lambda s, n=2: len(s).to_bytes(n, 'big') + s
toint = lambda s, o='big': int.from_bytes(s, o)
class BasePlugin(object):
async def init_client_data(self, reader, writer, cipher):
pass
async def init_server_data(self, reader, writer, cipher, raddr):
pass
def add_cipher(self, cipher):
pass
@classmethod
def name(cls):
return cls.__name__.replace('_Plugin', '').replace('__', '.').lower()
class Plain_Plugin(BasePlugin):
pass
class Origin_Plugin(BasePlugin):
pass
class Http_Simple_Plugin(BasePlugin):
async def init_client_data(self, reader, writer, cipher):
buf = await reader.read_until(b'\r\n\r\n')
data = buf.split(b' ')[:2]
data = bytes.fromhex(data[1][1:].replace(b'%',b'').decode())
reader._buffer[0:0] = data
writer.write(b'HTTP/1.1 200 OK\r\nConnection: keep-alive\r\nContent-Encoding: gzip\r\nContent-Type: text/html\r\nDate: ' + datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S GMT').encode() + b'\r\nServer: nginx\r\nVary: Accept-Encoding\r\n\r\n')
async def init_server_data(self, reader, writer, cipher, raddr):
writer.write(f'GET / HTTP/1.1\r\nHost: {raddr}\r\nUser-Agent: curl\r\nAccept-Encoding: gzip, deflate\r\nConnection: keep-alive\r\n\r\n'.encode())
await reader.read_until(b'\r\n\r\n')
class Salsa20_Cipher(StreamCipher):
KEY_LENGTH = 32
IV_LENGTH = 8
def core(self):
data = list(struct.unpack('<16I', b'expa' + self.key[:16] + b'nd 3' + self.iv.ljust(16, b'\x00') + b'2-by' + self.key[16:] + b'te k'))
while 1:
H = data[:]
for a, b, c, d in ORDERS_SALSA20:
H[a] ^= ROL(H[b]+H[c], 7)
H[d] ^= ROL(H[a]+H[b], 9)
H[c] ^= ROL(H[d]+H[a], 13)
H[b] ^= ROL(H[c]+H[d], 18)
yield from struct.pack('<16I', *(a+b&0xffffffff for a, b in zip(H, data)))
data[8:10] = (0, data[9]+1) if data[8]==0xffffffff else (data[8]+1, data[9])
class CFBCipher(StreamCipher):
def setup(self):
segment_bit = getattr(self, 'SEGMENT_SIZE', self.IV_LENGTH*8)
self.bit_mode = segment_bit % 8 != 0
self.stream = self.core_bit(segment_bit) if self.bit_mode else self.core(segment_bit//8)
self.last = None
self.cipher = self.CIPHER.new(self.key)
def process(self, s, inv=False):
r = bytearray()
for i in s:
if self.bit_mode:
j = 0
for k in range(7,-1,-1):
ibit = i>>k & 1
jbit = ibit^self.stream.send(self.last)
j |= jbit<