1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
from sys import stdin, stdout, stderr
import datetime
import hashlib
import socket
import ssl
import urllib.parse
import uuid
import traceback
# based on:
# https://tildegit.org/solderpunk/gemini-demo-1/src/branch/master/gemini-demo.py
# https://tildegit.org/solderpunk/AV-98/src/branch/master/src/av98/client.py
# TODO ciphers etc
SIZE_LIMIT = 4 * 1024 * 1024 # 4MB seems reasonable
outf = stdout.buffer
# directly stolen from gemini-demo
def absolutise_url(base, relative):
# Absolutise relative links
if "://" not in relative:
# Python's URL tools somehow only work with known schemes?
base = base.replace("gemini://","http://")
relative = urllib.parse.urljoin(base, relative)
relative = relative.replace("http://", "gemini://")
return relative
def header(k, v):
v = str(v)
assert '\n' not in v
outf.write((k + ': ' + str(v) + '\r\n').encode('utf-8'))
def warcinfo():
payload = "software: garcon (a very early version thereof)\r\n"
payload += f"hostname: {socket.gethostname()}\r\n"
payload = payload.encode('utf-8')
outf.write(b'WARC/1.0\r\n')
header("WARC-Type", "warcinfo")
header("WARC-Date", datetime.datetime.now(tz=datetime.timezone.utc).isoformat())
header("WARC-Record-ID", f"<urn:uuid:{uuid.uuid4()}>")
header("Content-Length", len(payload))
header("Content-Type", "application/warc-fields")
outf.write(b'\r\n')
outf.write(payload)
outf.write(b'\r\n\r\n')
def request_raw(host, port, url):
assert '\n' not in url
s = socket.create_connection((host, port))
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
s = context.wrap_socket(s, server_hostname=host)
s.sendall((url + '\r\n').encode("UTF-8"))
peername = s.getpeername()
cert = s.getpeercert(True)
fp = s.makefile("rb")
payload = fp.read(SIZE_LIMIT)
truncated = fp.read() != b''
print(truncated)
fp.close()
s.close()
# warctools doesn't like WARC/1.1
outf.write(b'WARC/1.0\r\n')
# mandatory
header("WARC-Type", "response")
header("WARC-Date", datetime.datetime.now(tz=datetime.timezone.utc).isoformat())
header("WARC-Record-ID", f"<urn:uuid:{uuid.uuid4()}>")
header("Content-Length", len(payload))
# optional
header("WARC-Payload-Digest", 'sha256:' + hashlib.sha256(payload).hexdigest())
header("WARC-IP-Address", peername[0])
header("WARC-Target-URI", url)
header("Content-Type", "application/gemini; msgtype=response") # as in mozz-archiver
if trunacted:
header("WARC-Truncated", "length")
# my extensions
header("X-Server-Fingerprint", 'sha256:' + hashlib.sha256(cert).hexdigest())
outf.write(b'\r\n')
#outf.write(payload)
outf.write(b'\r\n\r\n')
# TODO check for close_notify
return payload
def request_url(url):
p = urllib.parse.urlparse(url)
assert p.scheme == 'gemini'
return request_raw(p.hostname, p.port or 1965, url)
def request_url_loop(url):
# i only allow 3 redirects, so detecting loops isn't really necessary
for _ in range(3):
res = request_url(url)
header = res.split(b'\r\n')[0]
if 2 + 1 + 1024 < len(header): break
if len(header) > 0 and header[0] == ord('3'):
newurl = header.split(b' ', 2)[1].decode('utf-8')
url = absolutise_url(url, newurl)
else:
break
if __name__ == '__main__':
warcinfo()
outf.flush()
for line in stdin:
try:
request_url_loop(line.rstrip('\r\n').rstrip('\n'))
outf.flush()
except:
print(traceback.format_exc(), file=stderr)
|