summary refs log tree commit diff
path: root/IRC.py
blob: 6df303ec9f225cc61c3e836b154a494d6d6acee1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# Part of rabbitears See LICENSE for permissions
# Copyright (C) 2022 Matt Arnold

import irctokens
import base64


def create_sasl_plain_auth(username, password):
    # per KICL this is the valid way to do it
    auth_string = f"{username}\x00{username}\x00{password}"
    auth_bytes = auth_string.encode("utf-8")
    base64_auth = base64.b64encode(auth_bytes).decode("utf-8")

    return base64_auth


def encode_action_message(m):
    return f"\x01ACTION {m}\x01"


class IRCBadMessage(Exception):
    pass


class IRCError(Exception):
    pass


def printred(s):
    t = f"\033[1;31m {s} \033[0m\n"
    print(t)


def parsemsg(s):
    """Breaks a message from an IRC server into its prefix, command, and arguments."""
    prefix = ""
    trailing = []
    if not s:
        raise IRCBadMessage("Empty line.")
    if s[0] == ":":
        prefix, s = s[1:].split(" ", 1)
    if s.find(" :") != -1:
        s, trailing = s.split(" :", 1)
        args = s.split()
        args.append(trailing)
    else:
        args = s.split()
    command = args.pop(0)
    return prefix, command, args


LINEEND = "\r\n"


class IRCBot:
    irc = None

    def __init__(self, sock, config=None):
        self.irc = sock
        self.connected = False
        self.config = config

    def send_cmd(self, line):
        """Send an IRC Command, takes an IRC command string without the CRLF
        Returns encoded msg on success raises IRCError on failure"""
        if not self.connected:
            raise IRCError("Not Connected")
        encmsg = bytes(line.format() + LINEEND, "UTF-8")
        printred(encmsg)
        expected = len(encmsg)
        if self.irc.send(encmsg) == expected:
            return str(encmsg)
        else:
            raise IRCError("Unexpected Send Length")

    def on_welcome(self, *args, **kwargs):
        authmsg = irctokens.build("NICKSERV", ["SET", "autoreplay-lines", "0"])
        self.send_cmd(authmsg)
        joinmsg = irctokens.build("JOIN", [self.config["channel"]])
        self.send_cmd(joinmsg)
        self.send_action("Hops in", self.config["channel"])

    def send_privmsg(self, dst, msg):
        msg = irctokens.build("PRIVMSG", [dst, msg])
        self.send_cmd(msg)

    def send_quit(self, quitmsg):
        msg = irctokens.build("QUIT", [quitmsg])
        print(msg)
        self.send_cmd(msg)

    def send_action(self, action_msg, dst):
        em = encode_action_message(action_msg)
        ctcpmsg = irctokens.build("PRIVMSG", [dst, em])
        self.send_cmd(ctcpmsg)

    def connect(self, server, port, channel, botnick, botnickpass):
        if self.config is None:
            self.config = {}
            self.config["hostname"] = server
            self.config["port"] = port
            self.config["nick"] = botnick
            self.config["channel"] = channel
            self.config["nickpass"] = botnickpass
        print("Connecting to: " + server)
        self.irc.connect((self.config["hostname"], self.config["port"]))
        self.connected = True
        # perform SASL
        caps_msg = irctokens.build("CAP", ["REQ", "sasl"])
        printred(caps_msg)
        self.send_cmd(caps_msg)

        # Perform user registration
        cap = irctokens.build("CAP", ["LS", "302"])
        self.send_cmd(cap)
        password_msg = irctokens.build("PASS", [self.config["nickpass"]])
        printred(password_msg)
        token = create_sasl_plain_auth(botnick, botnickpass)
        usermsg = irctokens.build("USER", [botnick, "0", "*", "frog"])
        auth = irctokens.build("AUTHENTICATE", ["PLAIN"])
        self.send_cmd(auth)
        # auth_st2 = irctokens.build("AUTHENTICATE", ["+"])
        # self.send_cmd(auth_st2)
        auth_st3 = irctokens.build("AUTHENTICATE", [token])
        self.send_cmd(auth_st3)
        cmd = irctokens.build("CAP", ["END"])
        self.send_cmd(cmd)
        print(usermsg)
        # s#elf.send_cmd(password_msg)
        nickmsg = irctokens.build("NICK", [botnick]).format()
        print(nickmsg)
        self.send_cmd(nickmsg)
        self.send_cmd(usermsg)

    def get_response(self):
        # Get the response
        resp = self.irc.recv(4096).decode("UTF-8")
        msg = parsemsg(resp)
        nwmsg = irctokens.tokenise(resp)
        printred(nwmsg.command)
        if nwmsg.command == "001":
            self.on_welcome(nwmsg.params)
        if nwmsg.command == "ERROR":
            raise IRCError(str(nwmsg.params[0]))
        if nwmsg.command == "PING":
            print("Sending pong")
            self.irc.send(bytes("PONG " + nwmsg.params[0] + LINEEND, "UTF-8"))

        return msg