from sys import stdin, stdout, stderr import datetime import hashlib import socket import ssl import urllib.parse import uuid import traceback # based on: # https://tildegit.org/solderpunk/gemini-demo-1/src/branch/master/gemini-demo.py # https://tildegit.org/solderpunk/AV-98/src/branch/master/src/av98/client.py # TODO ciphers etc SIZE_LIMIT = 4 * 1024 * 1024 # 4MB seems reasonable outf = stdout.buffer # directly stolen from gemini-demo def absolutise_url(base, relative): # Absolutise relative links if "://" not in relative: # Python's URL tools somehow only work with known schemes? base = base.replace("gemini://","http://") relative = urllib.parse.urljoin(base, relative) relative = relative.replace("http://", "gemini://") return relative def header(k, v): v = str(v) assert '\n' not in v outf.write((k + ': ' + str(v) + '\r\n').encode('utf-8')) def warcinfo(): payload = "software: garcon (a very early version thereof)\r\n" payload += f"hostname: {socket.gethostname()}\r\n" payload = payload.encode('utf-8') outf.write(b'WARC/1.0\r\n') header("WARC-Type", "warcinfo") header("WARC-Date", datetime.datetime.now(tz=datetime.timezone.utc).isoformat()) header("WARC-Record-ID", f"") header("Content-Length", len(payload)) header("Content-Type", "application/warc-fields") outf.write(b'\r\n') outf.write(payload) outf.write(b'\r\n\r\n') def request_raw(host, port, url): assert '\n' not in url s = socket.create_connection((host, port)) context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.check_hostname = False context.verify_mode = ssl.CERT_NONE s = context.wrap_socket(s, server_hostname=host) s.sendall((url + '\r\n').encode("UTF-8")) peername = s.getpeername() cert = s.getpeercert(True) fp = s.makefile("rb") payload = fp.read(SIZE_LIMIT) truncated = fp.read() != b'' print(truncated) fp.close() s.close() # warctools doesn't like WARC/1.1 outf.write(b'WARC/1.0\r\n') # mandatory header("WARC-Type", "response") header("WARC-Date", datetime.datetime.now(tz=datetime.timezone.utc).isoformat()) header("WARC-Record-ID", f"") header("Content-Length", len(payload)) # optional header("WARC-Payload-Digest", 'sha256:' + hashlib.sha256(payload).hexdigest()) header("WARC-IP-Address", peername[0]) header("WARC-Target-URI", url) header("Content-Type", "application/gemini; msgtype=response") # as in mozz-archiver if trunacted: header("WARC-Truncated", "length") # my extensions header("X-Server-Fingerprint", 'sha256:' + hashlib.sha256(cert).hexdigest()) outf.write(b'\r\n') #outf.write(payload) outf.write(b'\r\n\r\n') # TODO check for close_notify return payload def request_url(url): p = urllib.parse.urlparse(url) assert p.scheme == 'gemini' return request_raw(p.hostname, p.port or 1965, url) def request_url_loop(url): # i only allow 3 redirects, so detecting loops isn't really necessary for _ in range(3): res = request_url(url) header = res.split(b'\r\n')[0] if 2 + 1 + 1024 < len(header): break if len(header) > 0 and header[0] == ord('3'): newurl = header.split(b' ', 2)[1].decode('utf-8') url = absolutise_url(url, newurl) else: break if __name__ == '__main__': warcinfo() outf.flush() for line in stdin: try: request_url_loop(line.rstrip('\r\n').rstrip('\n')) outf.flush() except: print(traceback.format_exc(), file=stderr)