Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class BasePlugin(object):
async def init_client_data(self, reader, writer, cipher):
pass
async def init_server_data(self, reader, writer, cipher, raddr):
pass
def add_cipher(self, cipher):
pass
@classmethod
def name(cls):
return cls.__name__.replace('_Plugin', '').replace('__', '.').lower()
class Plain_Plugin(BasePlugin):
pass
class Origin_Plugin(BasePlugin):
pass
class Http_Simple_Plugin(BasePlugin):
async def init_client_data(self, reader, writer, cipher):
buf = await reader.read_until(b'\r\n\r\n')
data = buf.split(b' ')[:2]
data = bytes.fromhex(data[1][1:].replace(b'%',b'').decode())
reader._buffer[0:0] = data
writer.write(b'HTTP/1.1 200 OK\r\nConnection: keep-alive\r\nContent-Encoding: gzip\r\nContent-Type: text/html\r\nDate: ' + datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S GMT').encode() + b'\r\nServer: nginx\r\nVary: Accept-Encoding\r\n\r\n')
async def init_server_data(self, reader, writer, cipher, raddr):
writer.write(f'GET / HTTP/1.1\r\nHost: {raddr}\r\nUser-Agent: curl\r\nAccept-Encoding: gzip, deflate\r\nConnection: keep-alive\r\n\r\n'.encode())
await reader.read_until(b'\r\n\r\n')
TIMESTAMP_TOLERANCE = 5 * 60
class Tls1__2_Ticket_Auth_Plugin(BasePlugin):
pass
class Http_Simple_Plugin(BasePlugin):
async def init_client_data(self, reader, writer, cipher):
buf = await reader.read_until(b'\r\n\r\n')
data = buf.split(b' ')[:2]
data = bytes.fromhex(data[1][1:].replace(b'%',b'').decode())
reader._buffer[0:0] = data
writer.write(b'HTTP/1.1 200 OK\r\nConnection: keep-alive\r\nContent-Encoding: gzip\r\nContent-Type: text/html\r\nDate: ' + datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S GMT').encode() + b'\r\nServer: nginx\r\nVary: Accept-Encoding\r\n\r\n')
async def init_server_data(self, reader, writer, cipher, raddr):
writer.write(f'GET / HTTP/1.1\r\nHost: {raddr}\r\nUser-Agent: curl\r\nAccept-Encoding: gzip, deflate\r\nConnection: keep-alive\r\n\r\n'.encode())
await reader.read_until(b'\r\n\r\n')
TIMESTAMP_TOLERANCE = 5 * 60
class Tls1__2_Ticket_Auth_Plugin(BasePlugin):
CACHE = collections.deque(maxlen = 100)
async def init_client_data(self, reader, writer, cipher):
key = cipher.cipher(cipher.key).key
assert await reader.read_n(3) == b'\x16\x03\x01'
header = await reader.read_n(toint(await reader.read_n(2)))
assert header[:2] == b'\x01\x00'
assert header[4:6] == b'\x03\x03'
cacheid = header[6:28]
sessionid = header[39:39+header[38]]
assert cacheid not in self.CACHE
self.CACHE.append(cacheid)
utc_time = int(time.time())
assert hmac.new(key+sessionid, cacheid, hashlib.sha1).digest()[:10] == header[28:38]
assert abs(toint(header[6:10]) - utc_time) < TIMESTAMP_TOLERANCE
addhmac = lambda s: s + hmac.new(key+sessionid, s, hashlib.sha1).digest()[:10]
writer.write(addhmac((b"\x16\x03\x03" + packstr(b"\x02\x00" + packstr(b'\x03\x03' + addhmac(utc_time.to_bytes(4, 'big') + os.urandom(18)) + b'\x20' + sessionid + b'\xc0\x2f\x00\x00\x05\xff\x01\x00\x01\x00')) + (b"\x16\x03\x03" + packstr(b"\x04\x00" + packstr(os.urandom(random.randrange(164)*2+64))) if random.randint(0, 8) < 1 else b'') + b"\x14\x03\x03\x00\x01\x01\x16\x03\x03" + packstr(os.urandom(random.choice((32, 40)))))[:-10]))
pass
async def init_server_data(self, reader, writer, cipher, raddr):
pass
def add_cipher(self, cipher):
pass
@classmethod
def name(cls):
return cls.__name__.replace('_Plugin', '').replace('__', '.').lower()
class Plain_Plugin(BasePlugin):
pass
class Origin_Plugin(BasePlugin):
pass
class Http_Simple_Plugin(BasePlugin):
async def init_client_data(self, reader, writer, cipher):
buf = await reader.read_until(b'\r\n\r\n')
data = buf.split(b' ')[:2]
data = bytes.fromhex(data[1][1:].replace(b'%',b'').decode())
reader._buffer[0:0] = data
writer.write(b'HTTP/1.1 200 OK\r\nConnection: keep-alive\r\nContent-Encoding: gzip\r\nContent-Type: text/html\r\nDate: ' + datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S GMT').encode() + b'\r\nServer: nginx\r\nVary: Accept-Encoding\r\n\r\n')
async def init_server_data(self, reader, writer, cipher, raddr):
writer.write(f'GET / HTTP/1.1\r\nHost: {raddr}\r\nUser-Agent: curl\r\nAccept-Encoding: gzip, deflate\r\nConnection: keep-alive\r\n\r\n'.encode())
await reader.read_until(b'\r\n\r\n')
TIMESTAMP_TOLERANCE = 5 * 60
class Tls1__2_Ticket_Auth_Plugin(BasePlugin):
CACHE = collections.deque(maxlen = 100)
async def init_client_data(self, reader, writer, cipher):
key = cipher.cipher(cipher.key).key
packstr = lambda s, n=2: len(s).to_bytes(n, 'big') + s
toint = lambda s, o='big': int.from_bytes(s, o)
class BasePlugin(object):
async def init_client_data(self, reader, writer, cipher):
pass
async def init_server_data(self, reader, writer, cipher, raddr):
pass
def add_cipher(self, cipher):
pass
@classmethod
def name(cls):
return cls.__name__.replace('_Plugin', '').replace('__', '.').lower()
class Plain_Plugin(BasePlugin):
pass
class Origin_Plugin(BasePlugin):
pass
class Http_Simple_Plugin(BasePlugin):
async def init_client_data(self, reader, writer, cipher):
buf = await reader.read_until(b'\r\n\r\n')
data = buf.split(b' ')[:2]
data = bytes.fromhex(data[1][1:].replace(b'%',b'').decode())
reader._buffer[0:0] = data
writer.write(b'HTTP/1.1 200 OK\r\nConnection: keep-alive\r\nContent-Encoding: gzip\r\nContent-Type: text/html\r\nDate: ' + datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S GMT').encode() + b'\r\nServer: nginx\r\nVary: Accept-Encoding\r\n\r\n')
async def init_server_data(self, reader, writer, cipher, raddr):
writer.write(f'GET / HTTP/1.1\r\nHost: {raddr}\r\nUser-Agent: curl\r\nAccept-Encoding: gzip, deflate\r\nConnection: keep-alive\r\n\r\n'.encode())
await reader.read_until(b'\r\n\r\n')
data = bytes([len(rnd_data)+1]) + rnd_data + s
data = (len(data)+6).to_bytes(2, 'big') + data
crc = (-1 - binascii.crc32(data)) & 0xffffffff
return data + crc.to_bytes(4, 'little')
def encrypt(s):
ret = b''
while len(s) > 8100:
ret += pack(s[:8100])
s = s[8100:]
if s:
ret += pack(s)
return ret
cipher.pdecrypt = decrypt
cipher.pencrypt = encrypt
class Verify_Deflate_Plugin(BasePlugin):
def add_cipher(self, cipher):
self.buf = bytearray()
def decrypt(s):
self.buf.extend(s)
ret = b''
while len(self.buf) >= 2:
l = int.from_bytes(self.buf[:2], 'big')
if len(self.buf) < l:
break
ret += zlib.decompress(b'x\x9c' + self.buf[2:l])
del self.buf[:l]
return ret
def pack(s):
packed = zlib.compress(s)
return len(packed).to_bytes(2, 'big') + packed[2:]
def encrypt(s):
return ret
def pack(s):
return b'\x17\x03\x03' + packstr(s)
def encrypt(s):
ret = b''
while len(s) > 2048:
size = min(random.randrange(4096)+100, len(s))
ret += pack(s[:size])
s = s[size:]
if s:
ret += pack(s)
return ret
cipher.pdecrypt2 = decrypt
cipher.pencrypt2 = encrypt
class Verify_Simple_Plugin(BasePlugin):
def add_cipher(self, cipher):
self.buf = bytearray()
def decrypt(s):
self.buf.extend(s)
ret = b''
while len(self.buf) >= 2:
l = int.from_bytes(self.buf[:2], 'big')
if len(self.buf) < l:
break
data = self.buf[2+self.buf[2]:l-4]
crc = (-1 - binascii.crc32(self.buf[:l-4])) & 0xffffffff
assert int.from_bytes(self.buf[l-4:l], 'little') == crc
ret += data
del self.buf[:l]
return ret
def pack(s):