# Part of rabbitears See LICENSE for permissions # Copyright (C) 2022 Matt Arnold import socket import sys import irctokens import time import base64 def create_sasl_plain_auth(username, password): # per KICL this is the valid way to do it auth_string = f"{username}\x00{username}\x00{password}" auth_bytes = auth_string.encode("utf-8") base64_auth = base64.b64encode(auth_bytes).decode("utf-8") return base64_auth def encode_action_message(m): return f"\x01ACTION {m}\x01" class IRCBadMessage(Exception): pass class IRCError(Exception): pass def printred(s): t = f"\033[1;31m {s} \033[0m\n" print(t) def parsemsg(s): """Breaks a message from an IRC server into its prefix, command, and arguments.""" prefix = "" trailing = [] if not s: raise IRCBadMessage("Empty line.") if s[0] == ":": prefix, s = s[1:].split(" ", 1) if s.find(" :") != -1: s, trailing = s.split(" :", 1) args = s.split() args.append(trailing) else: args = s.split() command = args.pop(0) return prefix, command, args LINEEND = "\r\n" class IRCBot: irc = None def __init__(self, sock, config=None): self.irc = sock self.connected = False self.config = config def send_cmd(self, line): """Send an IRC Command, takes an IRC command string without the CRLF Returns encoded msg on success raises IRCError on failure""" if not self.connected: raise IRCError("Not Connected") encmsg = bytes(line.format() + LINEEND, "UTF-8") printred(encmsg) expected = len(encmsg) if self.irc.send(encmsg) == expected: return str(encmsg) else: raise IRCError("Unexpected Send Length") def on_welcome(self, *args, **kwargs): authmsg = irctokens.build("NICKSERV", ["SET", "autoreplay-lines", "0"]) self.send_cmd(authmsg) joinmsg = irctokens.build("JOIN", [self.config["channel"]]) self.send_cmd(joinmsg) self.send_action("Hops in", self.config["channel"]) def send_privmsg(self, dst, msg): msg = irctokens.build("PRIVMSG", [dst, msg]) self.send_cmd(msg) def send_quit(self, quitmsg): msg = irctokens.build("QUIT", [quitmsg]) print(msg) self.send_cmd(msg) def send_action(self, action_msg, dst): em = encode_action_message(action_msg) ctcpmsg = irctokens.build("PRIVMSG", [dst, em]) self.send_cmd(ctcpmsg) def connect(self, server, port, channel, botnick, botnickpass): if self.config is None: self.config = {} self.config["hostname"] = server self.config["port"] = port self.config["nick"] = botnick self.config["channel"] = channel self.config["nickpass"] = botnickpass print("Connecting to: " + server) self.irc.connect((self.config["hostname"], self.config["port"])) self.connected = True # perform SASL caps_msg = irctokens.build("CAP", ["REQ", "sasl"]) printred(caps_msg) self.send_cmd(caps_msg) # Perform user registration cap = irctokens.build("CAP", ["LS", "302"]) self.send_cmd(cap) password_msg = irctokens.build("PASS", [self.config["nickpass"]]) printred(password_msg) token = create_sasl_plain_auth(botnick, botnickpass) usermsg = irctokens.build("USER", [botnick, "0", "*", "frog"]) auth = irctokens.build("AUTHENTICATE", ["PLAIN"]) self.send_cmd(auth) # auth_st2 = irctokens.build("AUTHENTICATE", ["+"]) # self.send_cmd(auth_st2) auth_st3 = irctokens.build("AUTHENTICATE", [token]) self.send_cmd(auth_st3) cmd = irctokens.build("CAP", ["END"]) self.send_cmd(cmd) print(usermsg) # s#elf.send_cmd(password_msg) nickmsg = irctokens.build("NICK", [botnick]).format() print(nickmsg) self.send_cmd(nickmsg) self.send_cmd(usermsg) def get_response(self): # Get the response resp = self.irc.recv(4096).decode("UTF-8") msg = parsemsg(resp) nwmsg = irctokens.tokenise(resp) printred(nwmsg.command) if nwmsg.command == "001": self.on_welcome(nwmsg.params) if nwmsg.command == "ERROR": raise IRCError(str(nwmsg.params[0])) if nwmsg.command == "PING": print("Sending pong") self.irc.send(bytes("PONG " + nwmsg.params[0] + LINEEND, "UTF-8")) return msg