1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
# Hellish Gemini daemon, in both behaviour and code quality.
#
# openssl req -newkey rsa:4096 -nodes -keyout key.pem -nodes -x509 -out cert.pem
# python3 helld.py cert.pem key.pem
import socket
import threading
import socketserver
import ssl
import sys
import time
import random
from urllib.parse import urlparse
handlers = []
def register(addr):
def dec(fn):
handlers.append((addr, fn))
return dec
@register('/normal')
def normal(s, _):
s.sendall(b'20 text/gemini\r\n# hello\nnothing strange going on here\n')
@register('/noclose')
def noclose(s, rh):
s.sendall(b'20 text/gemini\r\nquick before the connection is killed i need to tell you tha')
close(rh.request)
@register('/sleep1')
def sleep1(s, rh):
s.sendall(b'20 text/gemini\r\nabout to take a nap')
time.sleep(120)
@register('/sleep2')
def sleep(s, rh):
time.sleep(120)
s.sendall(b'20 text/gemini\r\ntook a nap')
@register('/128mb')
def lotsofdata(s, rh):
s.sendall(b'20 text/gemini\r\n')
for n in range(128):
s.sendall(b'lol ' * (1024 * 1024 // 4))
@register('/4mb')
def sensibleamountofdata(s, rh):
buffer = b'20 text/gemini\r\n' + b'lol ' * (1024 * 1024)
buffer = buffer[:4*1024*1024]
s.sendall(buffer)
@register('/over4mb')
def nonsensibleamountofdata(s, rh):
buffer = b'20 text/gemini\r\n' + b'lol ' * (1024 * 1024)
buffer = buffer[:4*1024*1024+1]
s.sendall(buffer)
@register('/loop')
def loop(s, rh):
s.sendall(f'30 /loop/{random.randint(0, 2**32)}\r\n'.encode('utf-8'))
@register('/index')
def index(s, rh):
s.sendall(b'20 text/gemini\r\n')
for addr, _ in handlers:
s.sendall(f'=> {addr}\n'.encode('utf-8'))
@register('')
def toindex(s, rh):
s.sendall(b'30 /index\r\n')
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
s = context.wrap_socket(self.request, server_side=True)
req = s.read().decode('utf-8').rstrip('\r\n')
print(req)
req = urlparse(req)
for addr, fn in handlers:
if req.path.startswith(addr):
fn(s, self)
break
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
if __name__ == "__main__":
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(sys.argv[1], sys.argv[2])
ThreadedTCPServer.allow_reuse_address = True
server = ThreadedTCPServer(('0.0.0.0', 1965), ThreadedTCPRequestHandler)
server.serve_forever()
|