From ea30e6ff34f6c5356e42c23a3b0c74fbbaa17771 Mon Sep 17 00:00:00 2001 From: Pawky Laguish Date: Tue, 26 Nov 2024 21:16:05 +0000 Subject: wip --- .gitignore | 145 +++++++++++++++++++++++++ LICENSE.md | 35 ++++++ README.txt | 3 + bot.py | 45 ++++++++ commands.py | 323 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ config.py | 81 ++++++++++++++ requirements.txt | 1 + soundcloud.py | 129 ++++++++++++++++++++++ spotify.py | 117 ++++++++++++++++++++ stuff.py | 291 +++++++++++++++++++++++++++++++++++++++++++++++++ util.py | 95 ++++++++++++++++ youtube.py | 174 ++++++++++++++++++++++++++++++ 12 files changed, 1439 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE.md create mode 100644 README.txt create mode 100755 bot.py create mode 100644 commands.py create mode 100644 config.py create mode 100644 requirements.txt create mode 100755 soundcloud.py create mode 100755 spotify.py create mode 100644 stuff.py create mode 100644 util.py create mode 100755 youtube.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f05327d --- /dev/null +++ b/.gitignore @@ -0,0 +1,145 @@ +venv +pass.txt +local_config.py +log.txt + +# ---> Python +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..7b69af9 --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,35 @@ +# Blue Oak Model License + +Version 1.0.0 + +## Purpose + +This license gives everyone as much permission to work with this software as possible, while protecting contributors from liability. + +## Acceptance + +In order to receive this license, you must agree to its rules. The rules of this license are both obligations under that agreement and conditions to your license. You must not do anything with this software that triggers a rule that you cannot or will not follow. + +## Copyright + +Each contributor licenses you to do everything with this software that would otherwise infringe that contributor's copyright in it. + +## Notices + +You must ensure that everyone who gets a copy of any part of this software from you, with or without changes, also gets the text of this license or a link to . + +## Excuse + +If anyone notifies you in writing that you have not complied with [Notices](#notices), you can keep your license by taking all practical steps to comply within 30 days after the notice. If you do not do so, your license ends immediately. + +## Patent + +Each contributor licenses you to do everything with this software that would otherwise infringe any patent claims they can license or become able to license. + +## Reliability + +No contributor can revoke this license. + +## No Liability + +***As far as the law allows, this software comes as is, without any warranty or condition, and no contributor will be liable to anyone for any damages related to this software or this license, under any kind of legal claim.*** diff --git a/README.txt b/README.txt new file mode 100644 index 0000000..ac4778c --- /dev/null +++ b/README.txt @@ -0,0 +1,3 @@ +code is formatted with "black" +#remember to install the libraries, and you probably should use venvs anyway +pip --require-venv install -r requirements.txt diff --git a/bot.py b/bot.py new file mode 100755 index 0000000..83cc0ac --- /dev/null +++ b/bot.py @@ -0,0 +1,45 @@ +#!./venv/bin/python3 +#!/usr/bin/env python3 +import ircstates, socket, ssl + +from config import * +from stuff import stuff +from util import Util +from time import sleep + + +class bot: + server = ircstates.Server(config.server.name) + host = config.server.host + port = config.server.port + + if __name__ == __module__ == "__main__": + + @classmethod + def __init__(self): + if config.server.ssl: + with socket.create_connection((self.host, self.port)) as sock_raw: + ctx = ssl.create_default_context() + with ctx.wrap_socket(sock_raw, server_hostname=self.host) as sock: + try: + util = Util(config, sock) + stuff(self, sock) + except KeyboardInterrupt: + util.quit("^C") + except ircstates.server.ServerDisconnectedException: + sleep(3) + self.__init__() + else: + with socket.create_connection((self.host, self.port)) as sock: + try: + util = Util(config, sock) + stuff(self, sock) + except KeyboardInterrupt: + util.quit("^C") + except ircstates.server.ServerDisconnectedException: + sleep(3) + self.__init__() + + +print("starting bot...") +bot() diff --git a/commands.py b/commands.py new file mode 100644 index 0000000..0740916 --- /dev/null +++ b/commands.py @@ -0,0 +1,323 @@ +from functools import wraps +from irctokens import build + +import importlib +import sys +import random + + +class Command: + def __init__(self, config): + self.config = config + self.commands = [] + + def mesg(self, msg, t=None): + self.util.mesg(msg, t) + + def notice(self, msg): + self.util.notice(msg) + + def action(self, msg): + self.util.action(msg) + + def send(self, msg): + self.util.send(msg) + + def err_perm(self, level="admin"): + self.mesg(f"Error: insufficient privileges, you lack {level} access") + + def adm(func, *args, **kwargs): + """decorator for admin commands""" + global adm_cmds + try: + if func.__name__ not in adm_cmds and func.__name__ != "_": + adm_cmds.append(func.__name__) + except NameError: + adm_cmds = [] + if func.__name__ not in adm_cmds and func.__name__ != "_": + adm_cmds.append(func.__name__) + + @wraps(func) + def _(self, *args, **kwargs): + if func.__name__ == "help": + self.admin_commands = adm_cmds + if not self.admin: + self.err_perm() + else: + return func( + self, + self.prefix, + self.line, + self.pm, + self._line, + self.admin, + self.mesg, + ) + + return _ + + def cmd(func, *args, **kwargs): + """decorator for user commands""" + global cmds + try: + if func.__name__ not in cmds and func.__name__ != "_": + cmds.append(func.__name__) + except NameError: + cmds = [] + if func.__name__ not in cmds and func.__name__ != "_": + cmds.append(func.__name__) + + @wraps(func) + def _(self, *args, **kwargs): + if func.__name__ == "help": + self.commands = cmds + if func.__name__ not in self.config.cmd.disabled: + return func( + self, + self.prefix, + self.line, + self.pm, + self._line, + self.admin, + self.mesg, + ) + + return _ + + def internal(func, *args, **kwargs): + """decorator for commands like ctcp which are for internal use, but use normal args template""" + global cmds + if func.__name__ in cmds: + cmds.remove(func.__name__) + return func + + def preq_cmd(self): # command prequisites / triggers + cmd = self.line + needs_prefix = True + # self.mesg(f"attempting command: {cmd}") + if cmd == "help" or cmd.startswith("help "): + command = "help" + elif cmd.startswith("quit"): + command = "quit" + elif cmd.startswith("echo "): + command = "echo" + elif cmd.startswith("roll ") or cmd == "roll": + command = "dice" + elif cmd.startswith("pick ") or cmd.startswith("choose "): + command = "choose" + elif cmd.startswith("yt ") or self.YouTube.match_urls(self.YouTube, cmd) != []: + command = "yt" + needs_prefix = False + elif cmd.startswith("w ") or cmd.startswith("weather "): + command = "weather" + elif cmd.startswith("me "): + command = "me" + elif cmd == "crapdate" or cmd.startswith("crapdate "): + command = "crapdate" + elif cmd == "dbg" or cmd.startswith("dbg "): + command = "dbg" + elif cmd == "dbg2" or cmd.startswith("dbg2 "): + command = "dbg2" + elif cmd == "version" or cmd == "ver": + command = "version" + elif cmd.startswith("\x01") or self.is_ctcp: + command = "ctcp" + else: + # self.mesg(cmd) + return + if command not in self.config.cmd.disabled: + if needs_prefix == False: + eval(f"self.{command}()") + elif not (self.prefix == None and self.pm == False): + eval(f"self.{command}()") + + # else: + # self.mesg("this ain't a valid commanderoonie, you twat") + + def getversion(self): + with open(self.config.self.gitdir + ".git/logs/HEAD") as f: + for l in f: + pass + return l.split()[1] + + @internal + @cmd + def ctcp(self, prefix, cmd, pm, line, admin, mesg): + """CTCP responses""" + notice = self.notice + ctcp = cmd[1:] + ctcp_upper = ctcp.upper() + if not ctcp.endswith("\x01"): + ctcp = ctcp + "\x01" + if ctcp_upper.startswith("PING"): + ctcp = ( + "\x01PING" + + ("" if 1 == len(ctcp.split(" ")) else " ") + + " ".join(ctcp.split(" ")[1:]) + ) + print(ctcp) + self.notice(ctcp) + if ctcp_upper.startswith("SOURCE"): + self.notice("\x01SOURCE " + self.config.self.source + "\x01") + elif ctcp_upper.startswith("VERSION"): + self.notice(f"\x01VERSION {self.getversion()}\x01") + elif ctcp_upper.startswith("FINGER"): + self.notice( + f"\x01FINGER {self.config.self.nick} version {self.getversion()} ({self.config.self.source})\x01" + ) + elif ctcp_upper.startswith("USERINFO"): + self.notice("\x01USERINFO pawky's crude IRC bot\x01") + elif ctcp_upper.startswith("CLIENTINFO"): + self.notice("\x01CLIENTINFO USERINFO PING SOURCE FINGER VERSION\x01") + + @adm + def quit(self, prefix, cmd, pm, line, admin, mesg): + if admin and (cmd == "q" or cmd == "quit"): + self.util.quit() + elif admin and (cmd.startswith("q ") or cmd.startswith("quit ")): + self.util.quit(cmd.split(" ", 1)[1]) + + @adm + def crapdate(self, prefix, cmd, pm, line, admin, mesg): + """hacky and crappy update command, don't use it, lol""" + args = cmd.split()[1:] + if not args: + args = [""] + popen = __import__("os").popen + # mesg(args) + if args[0] in ["log", "list"]: + if len(args) == 1: + args = args + ["", 3] + elif len(args) < 3: + args = args + ["3"] + for i in ( + popen(f"git log --pretty=oneline --abbrev-commit {args[1]}") + .read() + .split("\n", int(args[2])) + ): + mesg(i) + else: + mesg(popen("git pull").read()) + mesg(popen("git status|tr '\\n' ' '").read()) + + @adm + def dbg(self, prefix, cmd, pm, line, admin, mesg): + """temporary debug command, subject to change A LOT""" + mesg(dir()) + + @cmd + def yt(self, prefix, cmd, pm, line, admin, mesg): + """youtube""" + YouTube = self.YouTube + if cmd.startswith("yt "): + cmd = cmd[3:] + try: + YouTube.premature_optimization = self.config.cmd.yt_premature_opt + except AttributeError: + pass + print(f" YT premature_optimization={YouTube.premature_optimization}") + urls = YouTube.match_urls(YouTube, cmd) + yt_failed = False + for video in urls: + if yt_failed == True: + yt_failed = False + break + try: + a, yt_failed = YouTube.yt(YouTube, video) + except Exception as e: + a = e + yt_failed = True + mesg(a) + + @cmd + def echo(self, prefix, cmd, pm, line, admin, mesg): + """simple echo command | "echo ABC..." """ + mesg("\x7f" + cmd.split(" ", 1)[1]) + + @cmd + def choose(self, prefix, cmd, pm, line, admin, mesg): + """simple random choice command | "choose A B C..." """ + mesg("I choose: " + str(random.choice(cmd.split(" ", 1)[1].split(" ")))) + + @cmd + def dice(self, prefix, cmd, pm, line, admin, mesg): + """simple dice command | "roll [N[d[M]]]" where N is number of dice, and M is number of faces""" + cmd = cmd.split(" ", 1)[1] + amount, faces = 1, 6 + try: + amount = 0 + int(cmd.split("d", 1)[0]) + faces = 0 + int(cmd.split("d", 1)[1]) + except: + pass + if not str(amount).isnumeric() or amount > 100: + amount = 1 + if not str(faces).isnumeric() or faces > 100: + faces = 6 + mesg( + f"rolling {amount}d{faces}: " + + str([random.choice([i for i in range(faces)]) for n in range(amount)]) + ) + + @cmd + def version(self, prefix, cmd, pm, line, admin, mesg): + """version""" + mesg( + f"{self.config.self.nick} version {self.getversion()} ({self.config.self.source})" + ) + + @adm + def dbg2(self, prefix, cmd, pm, line, admin, mesg): + """version""" + with open(self.config.self.gitdir + ".git/logs/HEAD") as f: + for l in f: + pass + mesg("version " + l.split()[1]) + + @cmd + def me(self, prefix, cmd, pm, line, admin, mesg): + """simple /me command""" + self.action(cmd.split(" ", 1)[1]) + + @cmd + def help(self, prefix, cmd, pm, line, admin, mesg): + global adm_cmds + global cmds + disabled_commands = self.config.cmd.disabled + admin_commands, commands = [], [] + for i in ["exec", "eval", "reload"]: + if i not in adm_cmds: + adm_cmds.append(i) + for i in adm_cmds: + if i not in disabled_commands: + admin_commands.append(i) + for i in cmds: + if i not in admin_commands and i not in disabled_commands: + commands.append(i) + prefixes = '"' + '", "'.join(self.config.cmd.prefixes) + '"' + admin_commands = ", ".join(admin_commands) + try: + topic = cmd.split(" ", 1)[1] + except IndexError: + topic = None + try: + self_nick = self.self_nick + except IndexError: + self_nick = None + + abs_topics = {"prefixes": f'available prefixes are {prefixes} or "{self_nick}"'} + if topic == None: + mesg(f"available topics: " + ", ".join(list(abs_topics.keys()))) + mesg(f"available commands: " + ", ".join(commands)) + if admin: + mesg(f"admin commands: {admin_commands}") + else: + try: + mesg(f"{topic}: " + eval(f"self.{topic}.__doc__")) + except (TypeError, AttributeError) as e: + # mesg(str( e.__class__.__name__ )) + if topic in abs_topics: + mesg(f"{topic}: " + abs_topics[topic]) + else: + mesg(f'no help available for "{topic}"...') + except Exception as e: + mesg(str(e.__class__) + " " + str(e)) diff --git a/config.py b/config.py new file mode 100644 index 0000000..27cf5c4 --- /dev/null +++ b/config.py @@ -0,0 +1,81 @@ +import importlib, sys + +# default config file, copy the contents into local_config.py and modify +if __name__ == "local_config": + config = importlib.reload(sys.modules["config"]).config +else: + class config: # dummy, so the local config can simply be a copy of this template + ... + + +class config(config): + class self: + nick = "pawbot" + username = "pawky_bot" + # you should probably indicate yourself to be the owner of the bot, in username, or realname, or both + realname = "pawky's bot" + source = "/home/pawky/git/sotdbot" # so far only used for ctcp response + gitdir = "./" # where is the bot git dir? currently only used for version (git commit hash, assuming latest commit == latest version) + + class server: + name = "town" + host = "localhost" + port = 6667 + ssl = False + nickserv_auth = False + nickserv_mask = ( + "NickServ!NickServ@localhost" # the mask you receive from server + ) + nickserv_squery = False # squery seems to only be a thing on ngircd + nickserv_path = "NickServ@localhost" # the mask you actually send commands to + # get password from secret file + nickserv_pass = open("pass.txt", "r").read().strip() if nickserv_auth else "" + nickserv_recover = "RECOVER" # I recall it being GHOST on some networks? + channel = "#bots" + autojoin = [] + blacklisted_channels = [] + + class admin: + # ircv3 account-tag based admin + accounts = ["pawky", "sotdboat"] + # hostmask-based admin, if at all possible, you should try to use a vhost or reverse dns, or identd, to prevent fakery + hostmasks = ["pawky!pawky@localhost", "*!pawky@localhost"] + + class cmd: + # right now, single-character prefixes only (plus bot's own nick) + prefixes = ["!"] + # disabled commands, won't run via normal means...probably + disabled = [] + # admin-only override, + # useful for testing broken commands which should still be normal-user accessible + # commands which should only ever be used by admins, should be designated as such in code, not through here (e.g. exit) + admin_only = [] + ignored_nicks = [] + # try to read youtube page only up to tag, maybe it's faster? + # premature optimization is the root of all evil + yt_premature_opt = True + + capabilities = [ # what capabilities shall we request? + "message-tags", # needed for account-tag! + "account-tag", # account tag allows us to identify people without needing custom login! + "multi-prefix", # perhaps eventually useful for detecting people's status, such as +v AND +o ? not currently needed + "batch", # we wouldn't want to trigger on historic message playback (only usage of it I've seen) + "away-notify", # no functionality deals with away status yet, but could be interesting + "account-notify", # I don't remember why I try requesting this lol, I think it's to do with account-tag + "chghost", # uh, same, I think? + ] + +# cap-notify draft/account-registration draft/channel-rename draft/persistence draft/read-marker echo-message ergo.chat/nope extended-join extended-monitor invite-notify labeled-response sasl=PLAIN,EXTERNAL server-time setname userhost-in-names znc.in/self-message +# message-tags multi-prefix account-tag batch away-notify account-notify chghost + +# you should remove the following lines if you're editing local_config.py +if __name__ == "config": + try: + config = importlib.reload(sys.modules["local_config"]).config + except ModuleNotFoundError: + print("\x1b[31m!!! you should probably set up local config !!!\x1b[0m") + except KeyError: + try: + from local_config import config + except ModuleNotFoundError: + print("\x1b[31m!!! you should probably set up local config !!!\x1b[0m") diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..8376b04 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +ircstates ~=0.11.9 diff --git a/soundcloud.py b/soundcloud.py new file mode 100755 index 0000000..40c20fd --- /dev/null +++ b/soundcloud.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python3 +from html.parser import HTMLParser +from urllib.request import urlopen + + +class SoundCloud: + video_type = "" + + def mesg(self, msg, t=None): + self.util.mesg(msg, t) + + def match_urls(self, str): + r = [ + i + for i in str.split() + if "https://soundcloud.com" in i + ] + r = list(dict.fromkeys(r)) + n = 0 + for i in r: + if not i.startswith("http"): + r.pop(n) + n += 1 + + return r + + class parseprop(HTMLParser): + def __init__(self): + print("soundcloud parse init") + HTMLParser.__init__(self) + self.itemprops_list = ["name", "duration", "byArtist"] + self.h = {} + self.readartist=False + + def handle_starttag(self, tag, attrs): + print("yo",tag,attrs) + if (tag != "meta" and tag != "link" and tag != "div") or ( + ( + [i for i in attrs if "itemprop" in i] == [] + and ("name", "title") not in attrs + ) + or (tag == "meta" and ("itemprop", "name") in attrs and self.readartist!=True) + ): + print("skip",tag,attrs) + return + # print(self,tag,attrs) + for k, v in attrs: + if k == "itemprop": + if v not in self.itemprops_list: + print("skipprop",tag,attrs) + return + x = [v] + if tag == "link" and v == "name": + x = ["channelName"] + elif k == "content": + #if attrs[0][1] == "interactionCount": + # v = int(v) + x += [v] + elif k == "name" and v == "title": + x = [v] + else: + return + print({x[0]: x[1]}) + self.h.update({x[0]: x[1]}) + # print(x[0],"=",x[1]) + + def fmt_dur(dur): + h, m, s = 0, 0, 0 + dur=dur.removeprefix("PT").removesuffix("S") + h,m = dur.split("H") + m,s = dur.removeprefix(f"{h}H").split("M") + #s = int(m[1][:-1]) + #m = int(m[0]) + h,m,s=int(h),int(m),int(s) + if m >= 60: + h = m // 60 + m = round((m / 60 - h) * 60) + return f"{h}h {m}m {s}s" + elif h == 0 and m == 0 and s == 0: + return "LIVE" + elif m == 0 and s != 0: + return f"{s}s" + elif s == 0: + return f"{m}m" + else: + return f"{m}m {s}s" + + def yt(self, url): + # self.util.mesg("dbg hello") + url = url.rstrip("\x01") + p = self.parseprop() + # use premature optimization? it should be SLIGHTLY faster, but can sometimes fail + data = b"" + if False: #self.premature_optimization: + url_h = urlopen(url) + # appears on approximately line 21 or 22, so we read 24 lines to be safe (23-25 should be license comment) + # I tried to read byte amounts but it's hard to make sure no invalid utf8 bytes happen due to partial reads + for i in range(24): + data += url_h.readline() + url_h.close() + data = data.decode() # bytes to utf-8 + if ( + data.find('meta itemprop="duration"') == -1 + or data.find('meta itemprop="name"') == -1 + ): # acts as both fallback for optimization, and in case optimization's turned off + # just read all of the html + data = urlopen(url).read().decode() + # print(f"\x1b[31m my data is: {data}\x1b[0m") + p.feed(data) + if p.h == {}: + irc_string = "[\x0304SoundCloud\x03] \x0307ERROR:\x0308 got no data from server! \x0315(check your URL for typos!)\x03" + ansi_string = "[\x1b[31mSoundCloud\x1b[0m] \x1b[33;2mERROR:\x1b[33;1m got no data from server! \x1b[37;2m(check your URL for typos!)\x1b[0m" + print(ansi_string) + return irc_string, True + y = p.h + print(y) + y.update(duration=self.fmt_dur(y["duration"])) + #irc_string = f"[\x0303SoundCloud\x03] \x02{y['title']}\x02 ({y['duration']}) uploaded by \x1d{y['channelName']}\x1d on {y['uploadDate']}, {y['interactionCount']:,} views" + #ansi_string = f"[\x1b[32mSoundCloud\x1b[0m] \x1b[1m{y['title']}\x1b[0m ({y['duration']}) uploaded by \x1b[03m{y['channelName']}\x1b[0m on {y['uploadDate']}, {y['interactionCount']:,} views" + irc_string="dummy";ansi_string="dummy" + print(y) + print(ansi_string) + return irc_string, False + + +if __name__ == "__main__": + import sys + + SoundCloud.yt(SoundCloud, sys.argv[1]) diff --git a/spotify.py b/spotify.py new file mode 100755 index 0000000..8d062d7 --- /dev/null +++ b/spotify.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python3 +from html.parser import HTMLParser +from urllib.request import urlopen +import json + +class Spotify: + + def __init__(self): + self.ldjson=None + + def mesg(self, msg, t=None): + self.util.mesg(msg, t) + + def match_urls(self, str): + r = [ + i + for i in str.split() + if "https://open.spotify.com/" in i + ] + r = list(dict.fromkeys(r)) + n = 0 + for i in r: + if not i.startswith("http"): + r.pop(n) + n += 1 + + return r + + class parseprop(HTMLParser): + def __init__(self): + print("spotify parse init") + HTMLParser.__init__(self) + self.ldjson=False + + def handle_starttag(self, tag, attrs): + if (tag == "script" and ('type', 'application/ld+json') in attrs): + self.ldjson=True + else: self.ldjson=False + + def handle_endtag(self,tag): self.ldjson=False + + def handle_data(self,data): + if self.ldjson: + Spotify.ldjson=data + return + + def fmt_dur(dur): + h, m, s = 0, 0, 0 + m = dur[2:].split("M") + s = int(m[1][:-1]) + m = int(m[0]) + if m >= 60: + h = m // 60 + m = round((m / 60 - h) * 60) + return f"{h}h {m}m {s}s" + elif h == 0 and m == 0 and s == 0: + return "LIVE" + elif m == 0 and s != 0: + return f"{s}s" + elif s == 0: + return f"{m}m" + else: + return f"{m}m {s}s" + + def spotify(self, url): + # self.util.mesg("dbg hello") + url = url.rstrip("\x01") + p = self.parseprop() + # use premature optimization? it should be SLIGHTLY faster + if self.premature_optimization: + url_h, data = urlopen(url), b"" + # appears on approximately line 21 or 22, so we read 24 lines to be safe (23-25 should be license comment) + # I tried to read byte amounts but it's hard to make sure no invalid utf8 bytes happen due to partial reads + for i in range(24): + data += url_h.readline() + data = data.decode() # bytes to utf-8 + url_h.close() + else: + # just read all of the html + data = urlopen(url).read().decode() + # print(f"\x1b[31m my data is: {data}\x1b[0m") + p.feed(data) + # irc_string = "[\x0304Youtube\x03] \x0307ERROR:\x0308 got no data from server! \x0315(check your URL for typos!)\x03" + # ansi_string = "[\x1b[31mYoutube\x1b[0m] \x1b[33;2mERROR:\x1b[33;1m got no data from server! \x1b[37;2m(check your URL for typos!)\x1b[0m" + # print(ansi_string) + # return irc_string, True + #irc_string = f"[\x0303Youtube\x03] \x02{y['title']}\x02 ({y['duration']}) uploaded by \x1d{y['channelName']}\x1d on {y['uploadDate']}, {y['interactionCount']:,} views" + #ansi_string = f"[\x1b[32mYoutube\x1b[0m] \x1b[1m{y['title']}\x1b[0m ({y['duration']}) uploaded by \x1b[03m{y['channelName']}\x1b[0m on {y['uploadDate']}, {y['interactionCount']:,} views" + #print(ansi_string) + data=(json.loads(Spotify.ldjson)) + #'@type': 'MusicRecording', '@id': 'https://open.spotify.com/track/7hoyhyjcZvOCC9Tv9JgRhr', 'url': 'https://open.spotify.com/track/7hoyhyjcZvOCC9Tv9JgRhr', 'name': 'YO HO (PEG THE POOPDECK)', 'description': 'Listen to YO HO (PEG THE POOPDECK) on Spotify. Song · TheDooo · 2023', 'datePublished': '2023-08-30' + #Listen to The Evolution of Tears on Spotify · Album · The Gentle Men · 2021 · 10 songs + type=data["@type"] + id=data["@id"] + name=data["name"] + date=data["datePublished"] + artists=data["description"] + artists=artists.removeprefix(f'Listen to {name} on Spotify') + artists=artists.removeprefix('.').strip() + artists=artists.removeprefix('· ') + if artists.startswith("Song · "): artists=artists.removeprefix("Song · ") + elif artists.startswith("Album · "): + artists=artists.removeprefix("Album · ") + #removes the "10 songs" part + artists=artists[::-1].split(" · ",1)[1][::-1] + artists=artists.removesuffix(f" · {date[:4]}") + #print(type,id,name,"|"+artists+"|",date) + print(("Song: " if type=="MusicRecording" else "Album: " if type=="MusicAlbum" else f"Unknown type ({type}): ")+'"'+name+'"'+" by "+'"'+artists+'"'+" released on "+date) + irc_string="dummy" + return irc_string, False + + +if __name__ == "__main__": + import sys + + Spotify.premature_optimization = False + Spotify.spotify(Spotify, sys.argv[1]) diff --git a/stuff.py b/stuff.py new file mode 100644 index 0000000..299555b --- /dev/null +++ b/stuff.py @@ -0,0 +1,291 @@ +import irctokens +from config import config as Config +from util import Util +from commands import Command +from youtube import YouTube +import sys, importlib, time + + +def stuff(bot, sock): + config = Config + util = Util(config, sock) + command = Command(config) + server = bot.server + send = util.send + + def mesg(msg: str, t=None): + util.mesg(msg, t) + + # mesg=util.mesg + server_caps = [] + wanted_caps = config.capabilities + chan = config.server.channel # main channel + autojoin_done = False + is_ctcp = False + is_pm = False + mode = "init" + nick_override = False + + def auth( + nick=config.self.nick, + passwd=config.server.nickserv_pass, + auth="nickserv", + nickserv_auth=config.server.nickserv_auth, + ): + # TODO: handle auth that isn't nickserv + if auth.lower() == "nickserv": # line.source.split("!")[0] + # TODO: on most network can probably do "PRIVMSG NickServ@services. :help" + # TODO: support actually checking the nickserv mask properly + if nickserv_auth: + nick_override = True + if config.server.nickserv_squery: + util.send( + irctokens.build( + "SQUERY", ["NickServ", f"IDENTIFY {nick} {passwd}"] + ).format() + ) + util.send( + irctokens.build( + "SQUERY", + ["NickServ", f"{config.server.nickserv_recover} {nick}"], + ).format() + ) + else: + util.mesg(f"IDENTIFY {nick} {passwd}", config.server.nickserv_path) + util.mesg( + f"{config.server.nickserv_recover} {nick}", + config.server.nickserv_path, + ) + # attempt to re-nick just in case + send(irctokens.build("NICK", [config.self.nick]).format()) + + def configure(): + config = importlib.reload(sys.modules["config"]).config + util = Util(config, sock) + command = importlib.reload(sys.modules["commands"]).Command(config) + command.YouTube = importlib.reload(sys.modules["youtube"]).YouTube + command.util = util + command.YouTube.util = util + prefixes = config.cmd.prefixes + admin_accounts = config.admin.accounts + admin_users = config.admin.hostmasks + admin_only = config.cmd.admin_only + return ( + command, + command.util, + config, + util, + prefixes, + admin_accounts, + admin_users, + admin_only, + ) + + ( + command, + command.util, + config, + util, + prefixes, + admin_accounts, + admin_users, + admin_only, + ) = configure() + send = util.send + + send(irctokens.build("NICK", [config.self.nick]).format()) + send( + irctokens.build( + "USER", [config.self.username, "0", "*", config.self.realname] + ).format() + ) + while True: + self_nick = server.nickname + recv_data = sock.recv(1024) + recv_lines = server.recv(recv_data) + + for line in recv_lines: + try: + server.parse_tokens(line) + except IndexError: + print("\x1b[31;1mNGIRCD SUCKS\x1b[0m ", line) + continue + stri = line.format() + for k, v in util.dict.items(): + stri = stri.replace(k, v) + print(f"{time.strftime('%H:%M:%S')} < {stri}") + del stri + + if line.command == "PING": + send(f"PONG :{line.params[0]}") + + if mode == "init": + if line.command == "NOTICE" and line.source != None: + if util.maskmatch(line.source, config.server.nickserv_mask): + if config.server.nickserv_auth == True: + auth() + if line.command == "433": # 433 is ERR_NICKNAMEINUSE + util.nick(config.self.nick + "_") + if ( + line.command == "376" or line.command == "422" + ): # 376 is RPL_ENDOFMOTD and 422 is ERR_NOMOTD + if config.server.nickserv_auth == True: + auth() + send(irctokens.build("CAP", ["LS", "302"]).format()) + elif line.command == "CAP" and line.params[1] == "LS": + if server_caps == []: + server_caps = line.params[2].split() + caps = [value for value in wanted_caps if value in server_caps] + # single-send is more efficient + # BUT a single invalid cap == no caps enabled at all!!! + send(irctokens.build("CAP", ["REQ", " ".join(caps)]).format()) + # for i in caps: + # send(irctokens.build("CAP", ["REQ",i]).format()) + send(irctokens.build("CAP", ["END"]).format()) + mode = "boot" + elif mode == "boot": + send(irctokens.build("JOIN", [chan]).format()) + mode = "normal" + elif mode == "normal" and autojoin_done == False: + try: + for channel in config.server.autojoin: + send("JOIN " + channel) + time.sleep(0.25) + except Exception: + True + autojoin_done = True + elif mode == "normal": + if line.command == "433" and nick_override != False: + mesg("nick in use!", chan) + util.nick(config.self.nick) + nick_override = False + else: + nick_override = False + if line.command == "INVITE": + ( + send(irctokens.build("JOIN", [line.params[1]]).format()) + if line.params[1] not in config.server.blacklisted_channels + else None + ) + elif line.command == "PRIVMSG": + if line.tags == None or "batch" not in line.tags: + is_pm = False + is_ctcp = False + target = line.params[0] + if target == self_nick: + target = line.source.split("!")[0] + is_pm = True + util.target = target + command.util.target = target + cmd = line.params[1] + + if cmd != "" and ( + is_pm + or cmd.startswith(self_nick) + or cmd[0] in prefixes + or "https://" in cmd + or "http://" in cmd + ): + # TODO: allow ignoring hostmasks + if line.hostmask.nickname in config.cmd.ignored_nicks: + continue + try: + # if message in a channel, remove prefixes + if is_pm: + command.prefix = None + else: + if cmd[0] in prefixes: + cmd = cmd.replace(cmd[0], "", 1) + command.prefix = cmd[0] + elif cmd.startswith(self_nick + ":"): + cmd = cmd.replace(self_nick + ":", "", 1) + command.prefix = self_nick + elif cmd.startswith(self_nick): + cmd = cmd.replace(self_nick, "", 1) + command.prefix = self_nick + else: + command.prefix = None + except IndexError: + continue # skip to next command + cmd = "echo IndexError or something" + try: + prefix = command.prefix or None + except UnboundLocalError or AttributeError: + prefix = None + command.prefix = prefix + cmd = cmd.strip() + try: + is_adm = ( + line.tags["account"] in admin_accounts + or line.source in admin_users + ) + except ( + KeyError, + TypeError, + ): # either no account tag, or no tags at all + is_adm = line.source in admin_users + + # update command module's info dynamically for line info + command.util.target = target + command._line = line + command.is_ctcp = is_ctcp + command.pm = is_pm + command.line = cmd + command.admin = is_adm + command.config = config + command.self_nick = self_nick + command.prefix = prefix + + if is_adm and cmd.startswith("reload"): + ( + command, + command.util, + config, + util, + prefixes, + admin_accounts, + admin_users, + admin_only, + ) = configure() + util.target = target + send = util.send + command._line = line + command.pm = is_pm + command.line = cmd + command.admin = is_adm + command.config = config + command.self_nick = self_nick + command.prefix = prefix + command.util = util + command.util.target = target + mesg("reloaded") + elif ( + cmd.startswith("eval ") + and "eval" not in config.cmd.disabled + ): + if is_adm: + try: + result = eval( + cmd[len("eval ") :].strip() or "None" + ) + except Exception as e: + mesg("Error: " + str(e)) + else: + mesg("Error: you're not authorized to eval") + elif ( + cmd.startswith("exec ") + and "exec" not in config.cmd.disabled + ): + if is_adm: + try: + result = exec( + cmd[len("exec ") :].strip() or "None" + ) + except Exception as e: + mesg("Error: " + str(e)) + else: + mesg("Error: you're not authorized to exec") + + # handle normal commands + else: + command.preq_cmd() diff --git a/util.py b/util.py new file mode 100644 index 0000000..8538ef3 --- /dev/null +++ b/util.py @@ -0,0 +1,95 @@ +import irctokens +import time +from fnmatch import fnmatchcase + + +class Util: + def __init__(self, config, sock): + self.sock = sock + self.config = config + self.target = "" + self.dict = { + "\x00": "\\x00", + "\x01": "\\x01", + "\x02": "\\x02", + "\x03": "\\x03", + "\x04": "\\x04", + "\x05": "\\x05", + "\x06": "\\x06", + "\x07": "\\x07", + "\x08": "\\x08", + "\x09": "\\x09", + "\x0a": "\\x0a", + "\x0b": "\\x0b", + "\x0c": "\\x0c", + "\x0d": "\\x0d", + "\x0e": "\\x0e", + "\x0f": "\\x0f", + "\x10": "\\x10", + "\x11": "\\x11", + "\x12": "\\x12", + "\x13": "\\x13", + "\x14": "\\x14", + "\x15": "\\x15", + "\x16": "\\x16", + "\x17": "\\x17", + "\x18": "\\x18", + "\x19": "\\x19", + "\x1a": "\\x1a", + "\x1b": "\\x1b", + "\x1c": "\\x1c", + "\x1d": "\\x1d", + "\x1e": "\\x1e", + "\x1f": "\\x1f", + "\x7f": "\\x7f", + } + + def send(self, raw: str): + stri = raw + for k, v in self.dict.items(): + stri = stri.replace(k, v) + print(f"{time.strftime('%H:%M:%S')} > {stri}") + self.sock.sendall(f"{raw}\r\n".encode("utf8")) + + def quit(self, msg=None): + if msg != None: + self.send("QUIT :" + msg) + else: + self.send("QUIT") + + def nick(self, nick=None): + if nick == None: + self.send("NICK " + self.config.self.nick) + else: + self.send("NICK " + nick) + + def _m(self, msg: str, t=None): + if t == None: + t = self.target + msg = str(msg).partition("\n")[0] + if len(msg) >= 460: + msg = msg[:460] + self.mesg("message too long!") + return t, msg + + def mesg(self, msg: str, t=None): + t, msg = self._m(msg, t) + self.send(irctokens.build("PRIVMSG", [t, str(msg)]).format()) + + def action(self, msg: str, t=None): + t, msg = self._m(msg, t) + self.send( + irctokens.build("PRIVMSG", [t, "\x01ACTION " + str(msg) + "\x01"]).format() + ) + + def notice(self, msg: str, t=None): + t, msg = self._m(msg, t) + self.send(irctokens.build("NOTICE", [t, str(msg)]).format()) + + def maskmatch(self, string: str, hostmask: str): + """DOES NOT HANDLE CASEMAPPING (yet), just dumb case-sensitive match, only ? and * are special""" + print("string is", string, "and hostmask is", hostmask) + pat = "[[]".join( + [x.replace("]", "[]]") for x in hostmask.split("[")] + ) # escape all [ and ] into [[] and []] + return fnmatchcase(string, pat) diff --git a/youtube.py b/youtube.py new file mode 100755 index 0000000..475a36d --- /dev/null +++ b/youtube.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +from html.parser import HTMLParser +from urllib.request import urlopen + + +class YouTube: + video_type = "" + + def mesg(self, msg, t=None): + self.util.mesg(msg, t) + + def match_urls(self, str): + r = [ + i + for i in str.split() + if "https://youtu.be/" in i + or "https://www.youtube.com/watch?v=" in i + or "https://m.youtube.com/watch?v=" in i + or "https://youtube.com/watch?v=" in i + or "https://www.youtube.com/embed/" in i + or "https://www.youtube-nocookie.com/embed/" in i + or "https://music.youtube.com/watch?v=" in i + or "https://youtube.com/shorts/" in i + or "https://www.youtube.com/shorts/" in i + or "https://www.youtube.com/clip/" in i + or "https://youtube.com/clip/" in i + ] + r = list(dict.fromkeys(r)) + n = 0 + for i in r: + if not i.startswith("http"): + r.pop(n) + n += 1 + + return r + + def is_embed(str): + return str.startswith("https://www.youtube.com/embed/") or str.startswith( + "https://www.youtube-nocookie.com/embed/" + ) + + def is_ytmusic(str): + return str.startswith("https://music.youtube.com/watch?v=") + + def is_ytshorts(str): + return str.startswith("https://youtube.com/shorts/") or str.startswith( + "https://www.youtube.com/shorts/" + ) + + def is_clip(str): + return str.startswith("https://youtube.com/clip/") or str.startswith( + "https://www.youtube.com/clip/" + ) + + class parseprop(HTMLParser): + def __init__(self): + print("yt parse init") + HTMLParser.__init__(self) + self.itemprops_list = ["name", "duration", "uploadDate", "interactionCount"] + self.h = {} + if YouTube.video_type == "clip": + self.itemprops_list += ["description"] + print("it is a clip!") + + def handle_starttag(self, tag, attrs): + if (tag != "meta" and tag != "link") or ( + ( + [i for i in attrs if "itemprop" in i] == [] + and ("name", "title") not in attrs + ) + or (tag == "meta" and ("itemprop", "name") in attrs) + ): + return + # print(self,tag,attrs) + for k, v in attrs: + if k == "itemprop": + if v not in self.itemprops_list: + return + x = [v] + if tag == "link" and v == "name": + x = ["channelName"] + elif k == "content": + if attrs[0][1] == "interactionCount": + v = int(v) + x += [v] + elif k == "name" and v == "title": + x = [v] + else: + return + self.h.update({x[0]: x[1]}) + # print(x[0],"=",x[1]) + + def fmt_dur(dur): + h, m, s = 0, 0, 0 + m = dur[2:].split("M") + s = int(m[1][:-1]) + m = int(m[0]) + if m >= 60: + h = m // 60 + m = round((m / 60 - h) * 60) + return f"{h}h {m}m {s}s" + elif h == 0 and m == 0 and s == 0: + return "LIVE" + elif m == 0 and s != 0: + return f"{s}s" + elif s == 0: + return f"{m}m" + else: + return f"{m}m {s}s" + + def yt(self, url): + # self.util.mesg("dbg hello") + url = url.rstrip("\x01") + self.video_type = ( + "clip" + if self.is_clip(url) + else "shorts" + if self.is_ytshorts(url) + else "music" + if self.is_ytmusic(url) + else "embed" + if self.is_embed(url) + else "video" + ) + video_type = self.video_type + if video_type == "embed": + videoId = url.split("/")[4] + url = f"https://www.youtube.com/watch?v={videoId}" + elif video_type == "music": + for i in url.split("?")[1].split("&"): + if i[0:2] == "v=": + videoId = i[2:] + url = f"https://www.youtube.com/watch?v={videoId}" + elif video_type == "shorts": + videoId = url.split("?")[0].split("/")[-1] + url = f"https://www.youtube.com/watch?v={videoId}" + p = self.parseprop() + # use premature optimization? it should be SLIGHTLY faster, but can sometimes fail + data = b"" + if self.premature_optimization: + url_h = urlopen(url) + # appears on approximately line 21 or 22, so we read 24 lines to be safe (23-25 should be license comment) + # I tried to read byte amounts but it's hard to make sure no invalid utf8 bytes happen due to partial reads + for i in range(24): + data += url_h.readline() + url_h.close() + data = data.decode() # bytes to utf-8 + if ( + data.find('meta itemprop="duration"') == -1 + or data.find('meta itemprop="name"') == -1 + ): # acts as both fallback for optimization, and in case optimization's turned off + # just read all of the html + data = urlopen(url).read().decode() + # print(f"\x1b[31m my data is: {data}\x1b[0m") + p.feed(data) + if p.h == {}: + irc_string = "[\x0304Youtube\x03] \x0307ERROR:\x0308 got no data from server! \x0315(check your URL for typos!)\x03" + ansi_string = "[\x1b[31mYoutube\x1b[0m] \x1b[33;2mERROR:\x1b[33;1m got no data from server! \x1b[37;2m(check your URL for typos!)\x1b[0m" + print(ansi_string) + return irc_string, True + y = p.h + print(y) + y.update(duration=self.fmt_dur(y["duration"])) + irc_string = f"[\x0303Youtube\x03] \x02{y['title']}\x02 ({y['duration']}) uploaded by \x1d{y['channelName']}\x1d on {y['uploadDate']}, {y['interactionCount']:,} views" + ansi_string = f"[\x1b[32mYoutube\x1b[0m] \x1b[1m{y['title']}\x1b[0m ({y['duration']}) uploaded by \x1b[03m{y['channelName']}\x1b[0m on {y['uploadDate']}, {y['interactionCount']:,} views" + print(ansi_string) + return irc_string, False + + +if __name__ == "__main__": + import sys + + YouTube.premature_optimization = False + YouTube.yt(YouTube, sys.argv[1]) -- cgit 1.4.1-2-gfad0