#!/usr/bin/env python3 from sys import stdin, stdout, stderr import asyncio import datetime import hashlib import socket import ssl import traceback import urllib.parse import uuid # based on: # https://tildegit.org/solderpunk/gemini-demo-1/src/branch/master/gemini-demo.py # https://tildegit.org/solderpunk/AV-98/src/branch/master/src/av98/client.py # TODO ciphers etc SIZE_LIMIT = 4 * 1024 * 1024 # 4MB seems reasonable TIME_LIMIT = 45 # seconds for each request outf = stdout.buffer # directly stolen from gemini-demo def absolutise_url(base, relative): # Absolutise relative links if "://" not in relative: # Python's URL tools somehow only work with known schemes? base = base.replace("gemini://","http://") relative = urllib.parse.urljoin(base, relative) relative = relative.replace("http://", "gemini://") return relative def header(k, v): v = str(v) assert '\n' not in v outf.write((k + ': ' + str(v) + '\r\n').encode('utf-8')) def warcinfo(): payload = "software: garcon (a very early version thereof)\r\n" payload += f"hostname: {socket.gethostname()}\r\n" payload = payload.encode('utf-8') outf.write(b'WARC/1.0\r\n') header("WARC-Type", "warcinfo") header("WARC-Date", datetime.datetime.now(tz=datetime.timezone.utc).isoformat()) header("WARC-Record-ID", f"") header("Content-Length", len(payload)) header("Content-Type", "application/warc-fields") outf.write(b'\r\n') outf.write(payload) outf.write(b'\r\n\r\n') async def request_raw(host, port, url): assert '\n' not in url context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.check_hostname = False context.verify_mode = ssl.CERT_NONE reader, writer = await asyncio.open_connection(host, port, ssl=context) writer.write((url + '\r\n').encode("UTF-8")) peername = writer.transport.get_extra_info('peername') cert = writer.transport.get_extra_info('ssl_object').getpeercert(True) truncated = None payload = bytearray() try: async with asyncio.timeout(TIME_LIMIT): while len(payload) < SIZE_LIMIT: res = await reader.read(SIZE_LIMIT - len(payload)) if res == b'': break payload += res if (await reader.read(1)) != b'': truncated = 'length' except TimeoutError: truncated = 'time' writer.close() # warctools doesn't like WARC/1.1 outf.write(b'WARC/1.0\r\n') # mandatory header("WARC-Type", "response") header("WARC-Date", datetime.datetime.now(tz=datetime.timezone.utc).isoformat()) header("WARC-Record-ID", f"") header("Content-Length", len(payload)) # optional header("WARC-Payload-Digest", 'sha256:' + hashlib.sha256(payload).hexdigest()) header("WARC-IP-Address", peername[0]) header("WARC-Target-URI", url) header("Content-Type", "application/gemini; msgtype=response") # as in mozz-archiver if truncated: header("WARC-Truncated", truncated) # my extensions header("X-Server-Fingerprint", 'sha256:' + hashlib.sha256(cert).hexdigest()) outf.write(b'\r\n') outf.write(payload) outf.write(b'\r\n\r\n') # TODO check for close_notify return payload async def request_url(url): p = urllib.parse.urlparse(url) assert p.scheme == 'gemini' return await request_raw(p.hostname, p.port or 1965, url) async def request_url_loop(url): # i only allow 3 redirects, so detecting loops isn't really necessary for _ in range(3): res = await request_url(url) header = res.split(b'\r\n')[0] if 2 + 1 + 1024 < len(header): break if len(header) > 0 and header[0] == ord('3'): newurl = header.split(b' ', 2)[1].decode('utf-8') url = absolutise_url(url, newurl) else: break if __name__ == '__main__': warcinfo() outf.flush() for line in stdin: try: asyncio.run(request_url_loop(line.rstrip('\r\n').rstrip('\n'))) outf.flush() except: print(traceback.format_exc(), file=stderr)