Monday, 29 July 2019

Designing a reverse SOCKS relay in Python

I have the following situation:

WWW <- ServerBehindNAT(C) -> (S)MyApp(S) <- (C)BrowserUser

C == Client connection.

S == Server connection.

ServerBehindNAT represents a node that connects via SOCKS to MyApp on my server.

BrowserUser represents a client browser with SOCKS proxy set as IP of MyApp.

MyApp represents this script which accepts incoming connection from BrowserUser, accepts incoming connection from ServerBehindNAT, and sends requests from BrowserUser through to ServerBehindNAT.

WWW represents the external Internet/www.

I have created a multi-threaded SOCKS server which listens on 2 ports. What I'm having trouble designing is the part where it forwards requests from BrowserUser through to ServerBehindNAT:

(S)<--(S)

The first code I had:

import logging
import select
import socket
import struct
from socketserver import ThreadingMixIn, TCPServer, StreamRequestHandler, BaseRequestHandler
import threading
import mysql.connector

logging.basicConfig(level=logging.DEBUG)
SOCKS_VERSION = 5


def sql_connect():
    """ database connection"""
    try:
        sql = mysql.connector.connect(host='localhost',
                                      database='prox_main232',
                                      user='prox_main551',
                                      password='H20gN!vsi#rJ')
        if sql.is_connected():
            print('connected to sql')
            return sql
    except Exception as e:
        print(e)
        return False


def sql_auth(username, password):
        sql  = sql_connect()
        cursor = sql.cursor()
        query = "select `id` from partners where username = %s and password = %s"
        params = (username, password)
        try:
            cursor.execute(query, params)
        except Exception as e:
            print("Error: ", e)


class ThreadingTCPServer(ThreadingMixIn, TCPServer):
    pass


class SocksProxy(StreamRequestHandler):
    username = 'username'
    password = 'password'

    def handle(self):
        logging.info('Accepting connection from %s:%s' % self.client_address)

        # greeting header
        # read and unpack 2 bytes from a client
        header = self.connection.recv(2)
        version, nmethods = struct.unpack("!BB", header)

        # socks 5
        assert version == SOCKS_VERSION
        assert nmethods > 0

        # get available methods
        methods = self.get_available_methods(nmethods)

        # accept only USERNAME/PASSWORD auth
        if 2 not in set(methods):
            # close connection
            self.server.close_request(self.request)
            return

        # send welcome message
        self.connection.sendall(struct.pack("!BB", SOCKS_VERSION, 2))

        if not self.verify_credentials():
            return

        # request
        version, cmd, _, address_type = struct.unpack("!BBBB", self.connection.recv(4))
        assert version == SOCKS_VERSION

        if address_type == 1:  # IPv4
            address = socket.inet_ntoa(self.connection.recv(4))
        elif address_type == 3:  # Domain name
            domain_length = ord(self.connection.recv(1)[0])
            address = self.connection.recv(domain_length)

        port = struct.unpack('!H', self.connection.recv(2))[0]

        # reply
        try:
            if cmd == 1:  # CONNECT
                remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                remote.connect((address, port))
                bind_address = remote.getsockname()
                logging.info('Connected to %s %s' % (address, port))
            else:
                self.server.close_request(self.request)

            addr = struct.unpack("!I", socket.inet_aton(bind_address[0]))[0]
            port = bind_address[1]
            reply = struct.pack("!BBBBIH", SOCKS_VERSION, 0, 0, address_type,
                                addr, port)

        except Exception as err:
            logging.error(err)
            # return connection refused error
            reply = self.generate_failed_reply(address_type, 5)

        self.connection.sendall(reply)

        # establish data exchange
        if reply[1] == 0 and cmd == 1:
            self.exchange_loop(self.connection, remote)

        self.server.close_request(self.request)

    def get_available_methods(self, n):
        methods = []
        for i in range(n):
            methods.append(ord(self.connection.recv(1)))
        return methods

    def verify_credentials(self):
        version = ord(self.connection.recv(1))
        assert version == 1

        username_len = ord(self.connection.recv(1))
        username = self.connection.recv(username_len).decode('utf-8')

        password_len = ord(self.connection.recv(1))
        password = self.connection.recv(password_len).decode('utf-8')

        if username == self.username and password == self.password:
            # success, status = 0
            response = struct.pack("!BB", version, 0)
            self.connection.sendall(response)
            return True

        # failure, status != 0
        response = struct.pack("!BB", version, 0xFF)
        self.connection.sendall(response)
        self.server.close_request(self.request)
        return False

    def generate_failed_reply(self, address_type, error_number):
        return struct.pack("!BBBBIH", SOCKS_VERSION, error_number, 0, address_type, 0, 0)

    def exchange_loop(self, client, remote):
        while True:
            # wait until client or remote is available for read
            r, w, e = select.select([client, remote], [], [])

            if client in r:
                data = client.recv(4096)
                if remote.send(data) <= 0:
                    break

            if remote in r:
                data = remote.recv(4096)
                if client.send(data) <= 0:
                    break


if __name__ == '__main__':
    ####sql_connect() # connect to database
    print("Accept partner connections...")
    #with ThreadingTCPServer(('127.0.0.1', 9013), SocksProxy) as server:
    #    server.serve_forever()
    partner_server = ThreadingTCPServer(('127.0.0.1', 9013), SocksProxy)
    partner_thread = threading.Thread(target=partner_server.serve_forever)
    print("Accept app connections...")
    app_server = ThreadingTCPServer(('127.0.0.1', 9014), SocksProxy)
    app_thread = threading.Thread(target=app_server.serve_forever)

    for t in partner_thread, app_thread:
        t.start()
    for t in partner_thread, app_thread:
        t.join()

To conceptualize accepting inbound connection on both ports and sending request of one through the other (Making it a type of reverse proxy) is difficult to know the next step. I got the idea to communicate between two SOCKS server classes. I attempted that here:

import logging
import select
import socket
import struct
from socketserver import ThreadingMixIn, TCPServer, StreamRequestHandler, BaseRequestHandler
import threading

logging.basicConfig(level=logging.DEBUG)
SOCKS_VERSION = 5



class ThreadingTCPServer(ThreadingMixIn, TCPServer):
    pass


class NatServer(StreamRequestHandler):
    username = 'username'
    password = 'password'

    def __init__(self):
        NatServer.client = None
        NatServer.remote = None

    def handle(self):
        logging.info('Accepting app connection from %s:%s' % self.client_address)

        # greeting header
        # read and unpack 2 bytes from a client
        header = self.connection.recv(2)
        version, nmethods = struct.unpack("!BB", header)

        # socks 5
        assert version == SOCKS_VERSION
        assert nmethods > 0

        # get available methods
        methods = self.get_available_methods(nmethods)

        # accept only USERNAME/PASSWORD auth
        if 2 not in set(methods):
            # close connection
            self.server.close_request(self.request)
            return

        # send welcome message
        self.connection.sendall(struct.pack("!BB", SOCKS_VERSION, 2))

        if not self.verify_credentials():
            return

        # request
        version, cmd, _, address_type = struct.unpack("!BBBB", self.connection.recv(4))
        assert version == SOCKS_VERSION

        if address_type == 1:  # IPv4
            address = socket.inet_ntoa(self.connection.recv(4))
        elif address_type == 3:  # Domain name
            domain_length = ord(self.connection.recv(1)[0])
            address = self.connection.recv(domain_length)

        port = struct.unpack('!H', self.connection.recv(2))[0]

        # reply
        try:
            if cmd == 1:  # CONNECT
                remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                remote.connect((address, port))
                bind_address = remote.getsockname()
                logging.info('Connected to %s %s' % (address, port))
            else:
                self.server.close_request(self.request)

            addr = struct.unpack("!I", socket.inet_aton(bind_address[0]))[0]
            port = bind_address[1]
            reply = struct.pack("!BBBBIH", SOCKS_VERSION, 0, 0, address_type,
                                addr, port)

        except Exception as err:
            logging.error(err)
            # return connection refused error
            reply = self.generate_failed_reply(address_type, 5)

        self.connection.sendall(reply)

        # establish data exchange
        if reply[1] == 0 and cmd == 1:
            self.exchange_loop(self.connection, remote)

        self.server.close_request(self.request)

    def get_available_methods(self, n):
        methods = []
        for i in range(n):
            methods.append(ord(self.connection.recv(1)))
        return methods

    def verify_credentials(self):
        version = ord(self.connection.recv(1))
        assert version == 1

        username_len = ord(self.connection.recv(1))
        username = self.connection.recv(username_len).decode('utf-8')

        password_len = ord(self.connection.recv(1))
        password = self.connection.recv(password_len).decode('utf-8')

        if username == self.username and password == self.password:
            # success, status = 0
            response = struct.pack("!BB", version, 0)
            self.connection.sendall(response)
            return True

        # failure, status != 0
        response = struct.pack("!BB", version, 0xFF)
        self.connection.sendall(response)
        self.server.close_request(self.request)
        return False

    def generate_failed_reply(self, address_type, error_number):
        return struct.pack("!BBBBIH", SOCKS_VERSION, error_number, 0, address_type, 0, 0)

    def exchange_loop(self, client, remote):
        self.bridge_sockets(client, remote)
        while True:
            # wait until client or remote is available for read
            r, w, e = select.select([client, remote], [], [])
            if client in r:
                data = client.recv(4096)
                if remote.send(data) <= 0:
                    break
            if remote in r:
                data = remote.recv(4096)
                if client.send(data) <= 0:
                    break

    def bridge_sockets(self, client, remote):
        NatServer.client = client
        NatServer.remote = remote
        browser_server = ThreadingTCPServer(('127.0.0.1', 9013), BrowserServer)
        browser_thread = threading.Thread(target=partner_server.serve_forever)
        ''', kwargs={'app_client': client,
                     'app_remote': remote}'''
        browser_thread.start()
        #partner_thread.join()


class BrowserServer(StreamRequestHandler):
    username: "username"
    password: "password"

    def __init__(self):  # , app_client, app_remote
        self.app_client = NatServer.client
        self.app_remote = NatServer.remote

    def handle(self):
        logging.info('Accepting partner connection from %s:%s' % self.client_address)

        # greeting header
        # read and unpack 2 bytes from a client
        header = self.connection.recv(2)
        version, nmethods = struct.unpack("!BB", header)

        # socks 5
        assert version == SOCKS_VERSION
        assert nmethods > 0

        # get available methods
        methods = self.get_available_methods(nmethods)

        # accept only USERNAME/PASSWORD auth
        if 2 not in set(methods):
            # close connection
            self.server.close_request(self.request)
            return

        # send welcome message
        self.connection.sendall(struct.pack("!BB", SOCKS_VERSION, 2))

        if not self.verify_credentials():
            return

        # client request
        version, cmd, _, address_type = struct.unpack("!BBBB", self.connection.recv(4))
        assert version == SOCKS_VERSION

        if address_type == 1:  # IPv4
            address = socket.inet_ntoa(self.connection.recv(4))
        elif address_type == 3:  # Domain name
            domain_length = ord(self.connection.recv(1)[0])
            address = self.connection.recv(domain_length)

        port = struct.unpack('!H', self.connection.recv(2))[0]

        # reply
        try:
            if cmd == 1:  # CONNECT
                remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                remote.connect((address, port))
                bind_address = remote.getsockname()
                logging.info('Connected to %s %s' % (address, port))
            else:
                self.server.close_request(self.request)

            addr = struct.unpack("!I", socket.inet_aton(bind_address[0]))[0]
            port = bind_address[1]
            reply = struct.pack("!BBBBIH", SOCKS_VERSION, 0, 0, address_type,
                                addr, port)

        except Exception as err:
            logging.error(err)
            # return connection refused error
            reply = self.generate_failed_reply(address_type, 5)

        self.connection.sendall(reply)

        # establish data exchange
        if reply[1] == 0 and cmd == 1:
            self.exchange_loop(self.connection, remote)

        self.server.close_request(self.request)

        def get_available_methods(self, n):
            methods = []
        for i in range(n):
            methods.append(ord(self.connection.recv(1)))
        return methods

    def verify_credentials(self):
        version = ord(self.connection.recv(1))
        assert version == 1

        username_len = ord(self.connection.recv(1))
        username = self.connection.recv(username_len).decode('utf-8')

        password_len = ord(self.connection.recv(1))
        password = self.connection.recv(password_len).decode('utf-8')

        if username == self.username and password == self.password:
            # success, status = 0
            response = struct.pack("!BB", version, 0)
            self.connection.sendall(response)
            return True

        # failure, status != 0
        response = struct.pack("!BB", version, 0xFF)
        self.connection.sendall(response)
        self.server.close_request(self.request)
        return False

    def generate_failed_reply(self, address_type, error_number):
        return struct.pack("!BBBBIH", SOCKS_VERSION, error_number, 0, address_type, 0, 0)

    def exchange_loop(self, client, remote):
        # app_<-partner_client
        app_client = self.app_client
        app_remote = self.app_remote
        while True:
            # wait until client or remote is available for read
            #r, w, e = select.select([client, remote], [], [])
            r, w, e = select.select([client, app_client], [], [])
            if app_client in r:
                data = app_client.recv(4096)
                if client.send(data) <= 0:
                    break

            if client in r:
                data = client.recv(4096)
                if app_client.send(data) <= 0:
                    break


if __name__ == '__main__':
    nat_server = ThreadingTCPServer(('127.0.0.1', 9014), NatServer)
    nat_thread = threading.Thread(target=app_server.serve_forever)

    nat_thread.start()
    nat_thread.join()

I'm not sure if that's the right idea but it gave me the error when I connected to BrowserServer port via a web browser:

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 36632)
Traceback (most recent call last):
  File "/home/user/.pyenv/versions/3.7.3/lib/python3.7/socketserver.py", line 650, in process_request_thread
    self.finish_request(request, client_address)
  File "/home/user/.pyenv/versions/3.7.3/lib/python3.7/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
TypeError: __init__() takes 1 positional argument but 4 were given

Just so that everyone is clear on what I'm trying to accomplish: MyApp will run on a server and open 2 ports. ServerBehindNAT will be running client code which will maintain a connection to MyApp. BrowserUser will use MyApp server IP as a SOCKS server in his browser. When BrowserUser visits a site, the request will go through MyApp to ServerBehindNAT, who will process the request and send responses back through MyApp.



from Designing a reverse SOCKS relay in Python

No comments:

Post a Comment