I went down a journey of "How much performance can I squeeze out of a Python web-server?" This lead me to AIOHTTP and uvloop. Still, I could see that AIOHTTP wasn't using my CPU to its full potential. I set out to use multiprocessing with AIOHTTP. I learned that there's a Linux kernel feature that allows multiple processes to share the same TCP port. This lead me to develop the following code (Which works wonderfully):
import asyncio
import os
import socket
import time
from aiohttp import web
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import cpu_count
CPU_COUNT = cpu_count()
print("CPU Count:", CPU_COUNT)
def mk_socket(host="127.0.0.1", port=8000, reuseport=False):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if reuseport:
SO_REUSEPORT = 15
sock.setsockopt(socket.SOL_SOCKET, SO_REUSEPORT, 1)
sock.bind((host, port))
return sock
async def handle(request):
name = request.match_info.get('name', "Anonymous")
pid = os.getpid()
text = "{:.2f}: Hello {}! Process {} is treating you\n".format(
time.time(), name, pid)
#time.sleep(5) # intentionally blocking sleep to simulate CPU load
return web.Response(text=text)
def start_server():
host = "127.0.0.1"
port=8000
reuseport = True
app = web.Application()
sock = mk_socket(host, port, reuseport=reuseport)
app.add_routes([web.get('/', handle),
web.get('/{name}', handle)])
loop = asyncio.get_event_loop()
coro = loop.create_server(
protocol_factory=app.make_handler(),
sock=sock,
)
srv = loop.run_until_complete(coro)
loop.run_forever()
if __name__ == '__main__':
with ProcessPoolExecutor() as executor:
for i in range(0, CPU_COUNT):
executor.submit(start_server)
wrk benchmark of my site before applying this code:
Running 30s test @ http://127.0.0.1:8000/
12 threads and 400 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 54.33ms 6.54ms 273.24ms 89.95%
Req/Sec 608.68 115.97 2.27k 83.63%
218325 requests in 30.10s, 41.23MB read
Non-2xx or 3xx responses: 218325
Requests/sec: 7254.17
Transfer/sec: 1.37MB
wrk benchmark after:
Running 30s test @ http://127.0.0.1:8000/
12 threads and 400 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 15.96ms 7.27ms 97.29ms 84.78%
Req/Sec 2.11k 208.30 4.45k 75.50%
759290 requests in 30.08s, 153.51MB read
Requests/sec: 25242.39
Transfer/sec: 5.10MB
WoW! But there's a problem:
DeprecationWarning: Application.make_handler(...) is deprecated, use AppRunner API instead
protocol_factory=app.make_handler()
So I tried this:
import asyncio
import os
import socket
import time
from aiohttp import web
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import cpu_count
CPU_COUNT = cpu_count()
print("CPU Count:", CPU_COUNT)
def mk_socket(host="127.0.0.1", port=8000, reuseport=False):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if reuseport:
SO_REUSEPORT = 15
sock.setsockopt(socket.SOL_SOCKET, SO_REUSEPORT, 1)
sock.bind((host, port))
return sock
async def handle(request):
name = request.match_info.get('name', "Anonymous")
pid = os.getpid()
text = "{:.2f}: Hello {}! Process {} is treating you\n".format(
time.time(), name, pid)
#time.sleep(5) # intentionally blocking sleep to simulate CPU load
return web.Response(text=text)
async def start_server():
host = "127.0.0.1"
port=8000
reuseport = True
app = web.Application()
sock = mk_socket(host, port, reuseport=reuseport)
app.add_routes([web.get('/', handle),
web.get('/{name}', handle)])
coro = loop.create_server(
protocol_factory=app.make_handler(),
sock=sock,
)
runner = web.AppRunner(app)
await runner.setup()
srv = web.TCPSite(runner, 'localhost', 8000)
await srv.start()
print('Server started at http://127.0.0.1:8000')
return coro, app, runner
async def finalize(srv, app, runner):
sock = srv.sockets[0]
app.loop.remove_reader(sock.fileno())
sock.close()
#await handler.finish_connections(1.0)
await runner.cleanup()
srv.close()
await srv.wait_closed()
await app.finish()
def init():
loop = asyncio.get_event_loop()
srv, app, runner = loop.run_until_complete(init)
try:
loop.run_forever()
except KeyboardInterrupt:
loop.run_until_complete((finalize(srv, app, runner)))
if __name__ == '__main__':
with ProcessPoolExecutor() as executor:
for i in range(0, CPU_COUNT):
executor.submit(init)
which is obviously incomplete becuase coro
isn't being used. I'm not sure where to integrate the socket with AppRunner. Answer should show original example modified to use App Runner.
from AIOHTTP - Application.make_handler(...) is deprecated - Adding Multiprocessing
No comments:
Post a Comment