--- /dev/null
+config.ini
--- /dev/null
+Freedom License v1 (2023年05月11日)
+
+全く無限的自由です。
+It's infinite freedom.
--- /dev/null
+# $TheSupernovaDuo$
+all:
+ @echo "Commands available"
+ @echo "=================="
+ @echo "deps -- fetch and install dependencies"
+ @echo "format -- format code using python-black"
+deps:
+ pip install --user -r requirements.txt
+format:
+ black main.py
--- /dev/null
+chen
+====
+
+XMPP bot to preview links and file contents. Shikigami of the Shikigami of the Gap Youkai
+Based on Angel[1], without the sed(1) and YT redirect features.
+
+Requirements
+------------
+
+* Python >= 3.7
+
+Run
+---
+
+* pip3 install --user -r requirements.txt
+* $EDITOR config.ini.default (save as config.ini)
+* python3 main.py
+
+
+[1]: https://wiki.kalli.st/Angel
--- /dev/null
+[chen]
+jid = chen@example.com
+password = b0TPA55W0rD
+nick = Chen
+autojoin = room1@muc.example.com room2@muc.example.com room3@muc.example.com
--- /dev/null
+import requests
+import bs4
+import random
+import configparser
+import re
+import io
+import os
+import mimetypes
+import asyncio
+from collections import defaultdict
+from slixmpp import ClientXMPP
+from urllib.parse import urlparse, parse_qs, urlunparse
+from pantomime import normalize_mimetype
+import ecgi
+
+parser = "html.parser"
+user_agent = "Mozilla/5.0 (X11; Linux x86_64; rv:10.0) Gecko/20100101 Firefox/10.0"
+accept_lang = "en-US"
+data_limit = 786400000
+
+headers = {
+ "user-agent": user_agent,
+ "Accept-Language": accept_lang,
+ "Cache-Control": "no-cache",
+}
+block_list = (
+ "localhost",
+ "127.0.0.1",
+ "0.0.0.0",
+ "youtu.be",
+ "www.youtube.com",
+ "youtube.com",
+ "m.youtube.com",
+ "music.youtube.com",
+)
+req_list = (
+ "http://",
+ "https://",
+)
+html_files = (
+ "text/html",
+ "application/xhtml+xml",
+)
+
+
+class Lifo(list):
+ """
+ Limited size LIFO array to store messages and urls
+ """
+
+ def __init__(self, size):
+ super().__init__()
+ self.size = size
+
+ def add(self, item):
+ self.insert(0, item)
+ if len(self) > self.size:
+ self.pop()
+
+
+# Cheeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeen
+class ChenBot(ClientXMPP):
+ commands = {}
+ muc_commands = {}
+
+ messages = defaultdict(
+ lambda: {
+ "messages": Lifo(100),
+ "links": Lifo(10),
+ "previews": Lifo(10),
+ }
+ )
+
+ def get_urls(self, msg):
+ str_list = msg["body"].strip().split()
+ urls = [u for u in str_list if any(r in u for r in req_list)]
+ return urls
+
+ async def parse_uri(self, uri, sender, mtype):
+ """Parse a URI and send the result to the sender."""
+ netloc = uri.netloc
+ if netloc.split(":")[0] in block_list:
+ return
+ else:
+ await self.process_link(uri, sender, mtype)
+
+ async def process_link(self, uri, sender, mtype):
+ """Process a link and send the result to the sender."""
+ url = urlunparse(uri)
+ r = requests.get(url, stream=True, headers=headers, timeout=5)
+ if not r.ok:
+ return
+ ftype = normalize_mimetype(r.headers.get("content-type"))
+ if ftype in html_files:
+ data = ""
+ for i in r.iter_content(chunk_size=1024, decode_unicode=False):
+ data += i.decode("utf-8", errors="ignore")
+ if len(data) > data_limit or "</head>" in data.lower():
+ break
+ soup = bs4.BeautifulSoup(data, parser)
+ if title := soup.find("title"):
+ output = title.text.strip()
+ if output:
+ output = f"*{output}*" if ("\n" not in output) else output
+ if output in self.messages[sender]["previews"]:
+ return
+
+ self.messages[sender]["previews"].add(output)
+ if r.history:
+ self.send_message(mto=sender, mbody=r.url, mtype=mtype)
+ self.send_message(mto=sender, mbody=output, mtype=mtype)
+
+ else:
+ try:
+ length = 0
+ outfile = io.BytesIO()
+ for chunk in r.iter_content(
+ chunk_size=512,
+ decode_unicode=False,
+ ):
+ length += 512
+ if length >= data_limit:
+ return
+ outfile.write(chunk)
+
+ content_disposition = r.headers.get("content-disposition")
+ filename = None
+ if content_disposition:
+ _, params = ecgi.parse_header(content_disposition)
+ filename = params.get("filename")
+ else:
+ filename = os.path.basename(uri.path)
+
+ ext = os.path.splitext(filename)[1] if filename else ".txt"
+ fname = filename if filename else f"file{ext}"
+ await self.embed_file(url, sender, mtype, ftype, fname, outfile)
+ except Exception as e:
+ print(e)
+
+ async def embed_file(self, url, sender, mtype, ftype, fname, outfile):
+ """Embed a file and send the result to the sender."""
+ furl = await self.plugin["xep_0363"].upload_file(
+ fname, content_type=ftype, input_file=outfile
+ )
+ message = self.make_message(sender)
+ message["body"] = furl
+ message["type"] = mtype
+ message["oob"]["url"] = furl
+ message.send()
+
+ async def parse_urls(self, msg, urls, sender, mtype):
+ body = msg["body"].lower()
+ if "nsfl" in body:
+ return
+ if "nsfw" in body:
+ return
+ if "#nospoil" in body:
+ return
+ for u in urls:
+ if u in self.messages[sender]["links"]:
+ continue
+ else:
+ self.messages[sender]["links"].add(u)
+
+ uri = urlparse(u)
+ await self.parse_uri(uri, sender, mtype)
+
+ def __init__(self, jid, password, nick, autojoin=None):
+ ClientXMPP.__init__(self, jid, password)
+ self.jid = jid
+ self.nick = nick or []
+ self.autojoin = autojoin or []
+ self.register_plugin("xep_0030")
+ self.register_plugin("xep_0060")
+ self.register_plugin("xep_0054")
+ self.register_plugin("xep_0045")
+ self.register_plugin("xep_0066")
+ self.register_plugin("xep_0084")
+ self.register_plugin("xep_0153")
+ self.register_plugin("xep_0363")
+
+ self.add_event_handler("session_start", self.session_start)
+ self.add_event_handler("message", self.message)
+ self.add_event_handler("groupchat_message", self.muc_message)
+ self.add_event_handler("disconnected", lambda _: self.connect())
+
+ async def session_start(self, event):
+ """Start the bot."""
+ self.send_presence()
+ await self.get_roster()
+ await self.update_info()
+ for channel in self.autojoin:
+ try:
+ self.plugin["xep_0045"].join_muc(channel, self.nick)
+ except Exception as e:
+ print(e)
+
+ async def update_info(self):
+ """Update the bot info."""
+ with open("avatar.png", "rb") as avatar_file:
+ avatar = avatar_file.read()
+
+ avatar_type = "image/png"
+ avatar_id = self.plugin["xep_0084"].generate_id(avatar)
+ avatar_bytes = len(avatar)
+
+ asyncio.gather(self.plugin["xep_0084"].publish_avatar(avatar))
+ asyncio.gather(
+ self.plugin["xep_0153"].set_avatar(
+ avatar=avatar,
+ mtype=avatar_type,
+ )
+ )
+
+ info = {
+ "id": avatar_id,
+ "type": avatar_type,
+ "bytes": avatar_bytes,
+ }
+
+ asyncio.gather(self.plugin["xep_0084"].publish_avatar_metadata([info]))
+
+ vcard = self.plugin["xep_0054"].make_vcard()
+ vcard["URL"] = "https://git.chaotic.ninja/yakumo.izuru/chen"
+ vcard["DESC"] = "Shikigami of the Shikigami of the Gap Youkai"
+ vcard["NICKNAME"] = "Chen"
+ vcard["FN"] = "Chen"
+ asyncio.gather(self.plugin["xep_0054"].publish_vcard(vcard))
+
+ async def message(self, msg):
+ """Process a message."""
+ if msg["type"] in ("chat", "normal"):
+ mtype = "chat"
+ sender = msg["from"].bare
+ message = msg["body"]
+
+ ctx = message.strip().split()
+
+ try:
+ if not msg["oob"]["url"]:
+ if urls := self.get_urls(msg):
+ await self.parse_urls(msg, urls, sender, mtype)
+ except Exception as e:
+ print(e)
+
+ async def muc_message(self, msg):
+ """Process a groupchat message."""
+ if msg["type"] in ("groupchat", "normal"):
+ mtype = "groupchat"
+ sender = msg["from"].bare
+ if msg["mucnick"] == self.nick:
+ return
+
+ ctx = msg["body"].strip().split()
+
+ try:
+ if not msg["oob"]["url"]:
+ if urls := self.get_urls(msg):
+ await self.parse_urls(msg, urls, sender, mtype)
+ except Exception:
+ pass
+
+
+if __name__ == "__main__":
+ config = configparser.ConfigParser()
+ config.read("config.ini")
+ jid = config["chen"]["jid"]
+ password = config["chen"]["password"]
+ nick = config["chen"]["nick"]
+ autojoin = config["chen"]["autojoin"].split()
+ bot = ChenBot(jid, password, nick, autojoin=autojoin)
+
+ bot.connect()
+ bot.process(forever=True)
--- /dev/null
+requests
+slixmpp
+beautifulsoup4
+pantomime
+aiohttp
+git+https://git.chaotic.ninja/yakumo.izuru/ecgi#egg=ecgi
--- /dev/null
+#!/bin/sh
+# $TheSupernovaDuo$
+echo "Starting bot..."
+python-3.9 main.py 2>chen.log
--- /dev/null
+# $YakumoLabs$
+# vim: ft=systemd
+[Unit]
+Description=XMPP bot to preview links and file contents
+After=network.target
+After=prosody.service
+After=ejabberd.service
+
+[Service]
+Type=simple
+Restart=always
+RestartSec=10
+StartLimitBurst=5
+StartLimitInterval=100
+WorkingDirectory=%h/chen/
+ExecStart=/usr/bin/python main.py
+
+[Install]
+WantedBy=multi-user.target
--- /dev/null
+# $YakumoLabs$
+# vim: ft=yaml
+#
+# Note: %h must be replaced by the user's home directory path
+cmd: /usr/bin/env python3 main.py
+cwd: %h/chen
--- /dev/null
+config.ini
--- /dev/null
+Freedom License v1 (2023年05月11日)
+
+全く無限的自由です。
+It's infinite freedom.
--- /dev/null
+# $TheSupernovaDuo$
+all:
+ @echo "Commands available"
+ @echo "=================="
+ @echo "deps -- fetch and install dependencies"
+ @echo "format -- format code using python-black"
+deps:
+ pip install --user -r requirements.txt
+format:
+ black main.py
--- /dev/null
+chen
+====
+
+XMPP bot to preview links and file contents. Shikigami of the Shikigami of the Gap Youkai
+Based on Angel[1], without the sed(1) and YT redirect features.
+
+Requirements
+------------
+
+* Python >= 3.7
+
+Run
+---
+
+* pip3 install --user -r requirements.txt
+* $EDITOR config.ini.default (save as config.ini)
+* python3 main.py
+
+
+[1]: https://wiki.kalli.st/Angel
--- /dev/null
+[chen]
+jid = chen@example.com
+password = b0TPA55W0rD
+nick = Chen
+autojoin = room1@muc.example.com room2@muc.example.com room3@muc.example.com
--- /dev/null
+import requests
+import bs4
+import random
+import configparser
+import re
+import io
+import os
+import mimetypes
+import asyncio
+from collections import defaultdict
+from slixmpp import ClientXMPP
+from urllib.parse import urlparse, parse_qs, urlunparse
+from pantomime import normalize_mimetype
+import ecgi
+
+parser = "html.parser"
+user_agent = "Mozilla/5.0 (X11; Linux x86_64; rv:10.0) Gecko/20100101 Firefox/10.0"
+accept_lang = "en-US"
+data_limit = 786400000
+
+headers = {
+ "user-agent": user_agent,
+ "Accept-Language": accept_lang,
+ "Cache-Control": "no-cache",
+}
+block_list = (
+ "localhost",
+ "127.0.0.1",
+ "0.0.0.0",
+ "youtu.be",
+ "www.youtube.com",
+ "youtube.com",
+ "m.youtube.com",
+ "music.youtube.com",
+)
+req_list = (
+ "http://",
+ "https://",
+)
+html_files = (
+ "text/html",
+ "application/xhtml+xml",
+)
+
+
+class Lifo(list):
+ """
+ Limited size LIFO array to store messages and urls
+ """
+
+ def __init__(self, size):
+ super().__init__()
+ self.size = size
+
+ def add(self, item):
+ self.insert(0, item)
+ if len(self) > self.size:
+ self.pop()
+
+
+# Cheeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeen
+class ChenBot(ClientXMPP):
+ commands = {}
+ muc_commands = {}
+
+ messages = defaultdict(
+ lambda: {
+ "messages": Lifo(100),
+ "links": Lifo(10),
+ "previews": Lifo(10),
+ }
+ )
+
+ def get_urls(self, msg):
+ str_list = msg["body"].strip().split()
+ urls = [u for u in str_list if any(r in u for r in req_list)]
+ return urls
+
+ async def parse_uri(self, uri, sender, mtype):
+ """Parse a URI and send the result to the sender."""
+ netloc = uri.netloc
+ if netloc.split(":")[0] in block_list:
+ return
+ else:
+ await self.process_link(uri, sender, mtype)
+
+ async def process_link(self, uri, sender, mtype):
+ """Process a link and send the result to the sender."""
+ url = urlunparse(uri)
+ r = requests.get(url, stream=True, headers=headers, timeout=5)
+ if not r.ok:
+ return
+ ftype = normalize_mimetype(r.headers.get("content-type"))
+ if ftype in html_files:
+ data = ""
+ for i in r.iter_content(chunk_size=1024, decode_unicode=False):
+ data += i.decode("utf-8", errors="ignore")
+ if len(data) > data_limit or "</head>" in data.lower():
+ break
+ soup = bs4.BeautifulSoup(data, parser)
+ if title := soup.find("title"):
+ output = title.text.strip()
+ if output:
+ output = f"*{output}*" if ("\n" not in output) else output
+ if output in self.messages[sender]["previews"]:
+ return
+
+ self.messages[sender]["previews"].add(output)
+ if r.history:
+ self.send_message(mto=sender, mbody=r.url, mtype=mtype)
+ self.send_message(mto=sender, mbody=output, mtype=mtype)
+
+ else:
+ try:
+ length = 0
+ outfile = io.BytesIO()
+ for chunk in r.iter_content(
+ chunk_size=512,
+ decode_unicode=False,
+ ):
+ length += 512
+ if length >= data_limit:
+ return
+ outfile.write(chunk)
+
+ content_disposition = r.headers.get("content-disposition")
+ filename = None
+ if content_disposition:
+ _, params = ecgi.parse_header(content_disposition)
+ filename = params.get("filename")
+ else:
+ filename = os.path.basename(uri.path)
+
+ ext = os.path.splitext(filename)[1] if filename else ".txt"
+ fname = filename if filename else f"file{ext}"
+ await self.embed_file(url, sender, mtype, ftype, fname, outfile)
+ except Exception as e:
+ print(e)
+
+ async def embed_file(self, url, sender, mtype, ftype, fname, outfile):
+ """Embed a file and send the result to the sender."""
+ furl = await self.plugin["xep_0363"].upload_file(
+ fname, content_type=ftype, input_file=outfile
+ )
+ message = self.make_message(sender)
+ message["body"] = furl
+ message["type"] = mtype
+ message["oob"]["url"] = furl
+ message.send()
+
+ async def parse_urls(self, msg, urls, sender, mtype):
+ body = msg["body"].lower()
+ if "nsfl" in body:
+ return
+ if "nsfw" in body:
+ return
+ if "#nospoil" in body:
+ return
+ for u in urls:
+ if u in self.messages[sender]["links"]:
+ continue
+ else:
+ self.messages[sender]["links"].add(u)
+
+ uri = urlparse(u)
+ await self.parse_uri(uri, sender, mtype)
+
+ def __init__(self, jid, password, nick, autojoin=None):
+ ClientXMPP.__init__(self, jid, password)
+ self.jid = jid
+ self.nick = nick or []
+ self.autojoin = autojoin or []
+ self.register_plugin("xep_0030")
+ self.register_plugin("xep_0060")
+ self.register_plugin("xep_0054")
+ self.register_plugin("xep_0045")
+ self.register_plugin("xep_0066")
+ self.register_plugin("xep_0084")
+ self.register_plugin("xep_0153")
+ self.register_plugin("xep_0363")
+
+ self.add_event_handler("session_start", self.session_start)
+ self.add_event_handler("message", self.message)
+ self.add_event_handler("groupchat_message", self.muc_message)
+ self.add_event_handler("disconnected", lambda _: self.connect())
+
+ async def session_start(self, event):
+ """Start the bot."""
+ self.send_presence()
+ await self.get_roster()
+ await self.update_info()
+ for channel in self.autojoin:
+ try:
+ self.plugin["xep_0045"].join_muc(channel, self.nick)
+ except Exception as e:
+ print(e)
+
+ async def update_info(self):
+ """Update the bot info."""
+ with open("avatar.png", "rb") as avatar_file:
+ avatar = avatar_file.read()
+
+ avatar_type = "image/png"
+ avatar_id = self.plugin["xep_0084"].generate_id(avatar)
+ avatar_bytes = len(avatar)
+
+ asyncio.gather(self.plugin["xep_0084"].publish_avatar(avatar))
+ asyncio.gather(
+ self.plugin["xep_0153"].set_avatar(
+ avatar=avatar,
+ mtype=avatar_type,
+ )
+ )
+
+ info = {
+ "id": avatar_id,
+ "type": avatar_type,
+ "bytes": avatar_bytes,
+ }
+
+ asyncio.gather(self.plugin["xep_0084"].publish_avatar_metadata([info]))
+
+ vcard = self.plugin["xep_0054"].make_vcard()
+ vcard["URL"] = "https://git.chaotic.ninja/yakumo.izuru/chen"
+ vcard["DESC"] = "Shikigami of the Shikigami of the Gap Youkai"
+ vcard["NICKNAME"] = "Chen"
+ vcard["FN"] = "Chen"
+ asyncio.gather(self.plugin["xep_0054"].publish_vcard(vcard))
+
+ async def message(self, msg):
+ """Process a message."""
+ if msg["type"] in ("chat", "normal"):
+ mtype = "chat"
+ sender = msg["from"].bare
+ message = msg["body"]
+
+ ctx = message.strip().split()
+
+ try:
+ if not msg["oob"]["url"]:
+ if urls := self.get_urls(msg):
+ await self.parse_urls(msg, urls, sender, mtype)
+ except Exception as e:
+ print(e)
+
+ async def muc_message(self, msg):
+ """Process a groupchat message."""
+ if msg["type"] in ("groupchat", "normal"):
+ mtype = "groupchat"
+ sender = msg["from"].bare
+ if msg["mucnick"] == self.nick:
+ return
+
+ ctx = msg["body"].strip().split()
+
+ try:
+ if not msg["oob"]["url"]:
+ if urls := self.get_urls(msg):
+ await self.parse_urls(msg, urls, sender, mtype)
+ except Exception:
+ pass
+
+
+if __name__ == "__main__":
+ config = configparser.ConfigParser()
+ config.read("config.ini")
+ jid = config["chen"]["jid"]
+ password = config["chen"]["password"]
+ nick = config["chen"]["nick"]
+ autojoin = config["chen"]["autojoin"].split()
+ bot = ChenBot(jid, password, nick, autojoin=autojoin)
+
+ bot.connect()
+ bot.process(forever=True)
--- /dev/null
+requests
+slixmpp
+beautifulsoup4
+pantomime
+aiohttp
+git+https://git.chaotic.ninja/yakumo.izuru/ecgi#egg=ecgi
--- /dev/null
+#!/bin/sh
+# $TheSupernovaDuo$
+echo "Starting bot..."
+python-3.9 main.py 2>chen.log
--- /dev/null
+# $YakumoLabs$
+# vim: ft=systemd
+[Unit]
+Description=XMPP bot to preview links and file contents
+After=network.target
+After=prosody.service
+After=ejabberd.service
+
+[Service]
+Type=simple
+Restart=always
+RestartSec=10
+StartLimitBurst=5
+StartLimitInterval=100
+WorkingDirectory=%h/chen/
+ExecStart=/usr/bin/python main.py
+
+[Install]
+WantedBy=multi-user.target
--- /dev/null
+# $YakumoLabs$
+# vim: ft=yaml
+#
+# Note: %h must be replaced by the user's home directory path
+cmd: /usr/bin/env python3 main.py
+cwd: %h/chen
--- /dev/null
+config.ini
--- /dev/null
+Freedom License v1 (2023年05月11日)
+
+全く無限的自由です。
+It's infinite freedom.
--- /dev/null
+# $TheSupernovaDuo$
+all:
+ @echo "Commands available"
+ @echo "=================="
+ @echo "deps -- fetch and install dependencies"
+ @echo "format -- format code using python-black"
+deps:
+ pip install --user -r requirements.txt
+format:
+ black main.py
--- /dev/null
+chen
+====
+
+XMPP bot to preview links and file contents. Shikigami of the Shikigami of the Gap Youkai
+Based on Angel[1], without the sed(1) and YT redirect features.
+
+Requirements
+------------
+
+* Python >= 3.7
+
+Run
+---
+
+* pip3 install --user -r requirements.txt
+* $EDITOR config.ini.default (save as config.ini)
+* python3 main.py
+
+
+[1]: https://wiki.kalli.st/Angel
--- /dev/null
+[chen]
+jid = chen@example.com
+password = b0TPA55W0rD
+nick = Chen
+autojoin = room1@muc.example.com room2@muc.example.com room3@muc.example.com
--- /dev/null
+import requests
+import bs4
+import random
+import configparser
+import re
+import io
+import os
+import mimetypes
+import asyncio
+from collections import defaultdict
+from slixmpp import ClientXMPP
+from urllib.parse import urlparse, parse_qs, urlunparse
+from pantomime import normalize_mimetype
+import ecgi
+
+parser = "html.parser"
+user_agent = "Mozilla/5.0 (X11; Linux x86_64; rv:10.0) Gecko/20100101 Firefox/10.0"
+accept_lang = "en-US"
+data_limit = 786400000
+
+headers = {
+ "user-agent": user_agent,
+ "Accept-Language": accept_lang,
+ "Cache-Control": "no-cache",
+}
+block_list = (
+ "localhost",
+ "127.0.0.1",
+ "0.0.0.0",
+ "youtu.be",
+ "www.youtube.com",
+ "youtube.com",
+ "m.youtube.com",
+ "music.youtube.com",
+)
+req_list = (
+ "http://",
+ "https://",
+)
+html_files = (
+ "text/html",
+ "application/xhtml+xml",
+)
+
+
+class Lifo(list):
+ """
+ Limited size LIFO array to store messages and urls
+ """
+
+ def __init__(self, size):
+ super().__init__()
+ self.size = size
+
+ def add(self, item):
+ self.insert(0, item)
+ if len(self) > self.size:
+ self.pop()
+
+
+# Cheeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeen
+class ChenBot(ClientXMPP):
+ commands = {}
+ muc_commands = {}
+
+ messages = defaultdict(
+ lambda: {
+ "messages": Lifo(100),
+ "links": Lifo(10),
+ "previews": Lifo(10),
+ }
+ )
+
+ def get_urls(self, msg):
+ str_list = msg["body"].strip().split()
+ urls = [u for u in str_list if any(r in u for r in req_list)]
+ return urls
+
+ async def parse_uri(self, uri, sender, mtype):
+ """Parse a URI and send the result to the sender."""
+ netloc = uri.netloc
+ if netloc.split(":")[0] in block_list:
+ return
+ else:
+ await self.process_link(uri, sender, mtype)
+
+ async def process_link(self, uri, sender, mtype):
+ """Process a link and send the result to the sender."""
+ url = urlunparse(uri)
+ r = requests.get(url, stream=True, headers=headers, timeout=5)
+ if not r.ok:
+ return
+ ftype = normalize_mimetype(r.headers.get("content-type"))
+ if ftype in html_files:
+ data = ""
+ for i in r.iter_content(chunk_size=1024, decode_unicode=False):
+ data += i.decode("utf-8", errors="ignore")
+ if len(data) > data_limit or "</head>" in data.lower():
+ break
+ soup = bs4.BeautifulSoup(data, parser)
+ if title := soup.find("title"):
+ output = title.text.strip()
+ if output:
+ output = f"*{output}*" if ("\n" not in output) else output
+ if output in self.messages[sender]["previews"]:
+ return
+
+ self.messages[sender]["previews"].add(output)
+ if r.history:
+ self.send_message(mto=sender, mbody=r.url, mtype=mtype)
+ self.send_message(mto=sender, mbody=output, mtype=mtype)
+
+ else:
+ try:
+ length = 0
+ outfile = io.BytesIO()
+ for chunk in r.iter_content(
+ chunk_size=512,
+ decode_unicode=False,
+ ):
+ length += 512
+ if length >= data_limit:
+ return
+ outfile.write(chunk)
+
+ content_disposition = r.headers.get("content-disposition")
+ filename = None
+ if content_disposition:
+ _, params = ecgi.parse_header(content_disposition)
+ filename = params.get("filename")
+ else:
+ filename = os.path.basename(uri.path)
+
+ ext = os.path.splitext(filename)[1] if filename else ".txt"
+ fname = filename if filename else f"file{ext}"
+ await self.embed_file(url, sender, mtype, ftype, fname, outfile)
+ except Exception as e:
+ print(e)
+
+ async def embed_file(self, url, sender, mtype, ftype, fname, outfile):
+ """Embed a file and send the result to the sender."""
+ furl = await self.plugin["xep_0363"].upload_file(
+ fname, content_type=ftype, input_file=outfile
+ )
+ message = self.make_message(sender)
+ message["body"] = furl
+ message["type"] = mtype
+ message["oob"]["url"] = furl
+ message.send()
+
+ async def parse_urls(self, msg, urls, sender, mtype):
+ body = msg["body"].lower()
+ if "nsfl" in body:
+ return
+ if "nsfw" in body:
+ return
+ if "#nospoil" in body:
+ return
+ for u in urls:
+ if u in self.messages[sender]["links"]:
+ continue
+ else:
+ self.messages[sender]["links"].add(u)
+
+ uri = urlparse(u)
+ await self.parse_uri(uri, sender, mtype)
+
+ def __init__(self, jid, password, nick, autojoin=None):
+ ClientXMPP.__init__(self, jid, password)
+ self.jid = jid
+ self.nick = nick or []
+ self.autojoin = autojoin or []
+ self.register_plugin("xep_0030")
+ self.register_plugin("xep_0060")
+ self.register_plugin("xep_0054")
+ self.register_plugin("xep_0045")
+ self.register_plugin("xep_0066")
+ self.register_plugin("xep_0084")
+ self.register_plugin("xep_0153")
+ self.register_plugin("xep_0363")
+
+ self.add_event_handler("session_start", self.session_start)
+ self.add_event_handler("message", self.message)
+ self.add_event_handler("groupchat_message", self.muc_message)
+ self.add_event_handler("disconnected", lambda _: self.connect())
+
+ async def session_start(self, event):
+ """Start the bot."""
+ self.send_presence()
+ await self.get_roster()
+ await self.update_info()
+ for channel in self.autojoin:
+ try:
+ self.plugin["xep_0045"].join_muc(channel, self.nick)
+ except Exception as e:
+ print(e)
+
+ async def update_info(self):
+ """Update the bot info."""
+ with open("avatar.png", "rb") as avatar_file:
+ avatar = avatar_file.read()
+
+ avatar_type = "image/png"
+ avatar_id = self.plugin["xep_0084"].generate_id(avatar)
+ avatar_bytes = len(avatar)
+
+ asyncio.gather(self.plugin["xep_0084"].publish_avatar(avatar))
+ asyncio.gather(
+ self.plugin["xep_0153"].set_avatar(
+ avatar=avatar,
+ mtype=avatar_type,
+ )
+ )
+
+ info = {
+ "id": avatar_id,
+ "type": avatar_type,
+ "bytes": avatar_bytes,
+ }
+
+ asyncio.gather(self.plugin["xep_0084"].publish_avatar_metadata([info]))
+
+ vcard = self.plugin["xep_0054"].make_vcard()
+ vcard["URL"] = "https://git.chaotic.ninja/yakumo.izuru/chen"
+ vcard["DESC"] = "Shikigami of the Shikigami of the Gap Youkai"
+ vcard["NICKNAME"] = "Chen"
+ vcard["FN"] = "Chen"
+ asyncio.gather(self.plugin["xep_0054"].publish_vcard(vcard))
+
+ async def message(self, msg):
+ """Process a message."""
+ if msg["type"] in ("chat", "normal"):
+ mtype = "chat"
+ sender = msg["from"].bare
+ message = msg["body"]
+
+ ctx = message.strip().split()
+
+ try:
+ if not msg["oob"]["url"]:
+ if urls := self.get_urls(msg):
+ await self.parse_urls(msg, urls, sender, mtype)
+ except Exception as e:
+ print(e)
+
+ async def muc_message(self, msg):
+ """Process a groupchat message."""
+ if msg["type"] in ("groupchat", "normal"):
+ mtype = "groupchat"
+ sender = msg["from"].bare
+ if msg["mucnick"] == self.nick:
+ return
+
+ ctx = msg["body"].strip().split()
+
+ try:
+ if not msg["oob"]["url"]:
+ if urls := self.get_urls(msg):
+ await self.parse_urls(msg, urls, sender, mtype)
+ except Exception:
+ pass
+
+
+if __name__ == "__main__":
+ config = configparser.ConfigParser()
+ config.read("config.ini")
+ jid = config["chen"]["jid"]
+ password = config["chen"]["password"]
+ nick = config["chen"]["nick"]
+ autojoin = config["chen"]["autojoin"].split()
+ bot = ChenBot(jid, password, nick, autojoin=autojoin)
+
+ bot.connect()
+ bot.process(forever=True)
--- /dev/null
+requests
+slixmpp
+beautifulsoup4
+pantomime
+aiohttp
+git+https://git.chaotic.ninja/yakumo.izuru/ecgi#egg=ecgi
--- /dev/null
+#!/bin/sh
+# $TheSupernovaDuo$
+echo "Starting bot..."
+python-3.9 main.py 2>chen.log
--- /dev/null
+# $YakumoLabs$
+# vim: ft=systemd
+[Unit]
+Description=XMPP bot to preview links and file contents
+After=network.target
+After=prosody.service
+After=ejabberd.service
+
+[Service]
+Type=simple
+Restart=always
+RestartSec=10
+StartLimitBurst=5
+StartLimitInterval=100
+WorkingDirectory=%h/chen/
+ExecStart=/usr/bin/python main.py
+
+[Install]
+WantedBy=multi-user.target
--- /dev/null
+# $YakumoLabs$
+# vim: ft=yaml
+#
+# Note: %h must be replaced by the user's home directory path
+cmd: /usr/bin/env python3 main.py
+cwd: %h/chen
--- /dev/null
+config.ini
--- /dev/null
+Freedom License v1 (2023年05月11日)
+
+全く無限的自由です。
+It's infinite freedom.
--- /dev/null
+# $TheSupernovaDuo$
+all:
+ @echo "Commands available"
+ @echo "=================="
+ @echo "deps -- fetch and install dependencies"
+ @echo "format -- format code using python-black"
+deps:
+ pip install --user -r requirements.txt
+format:
+ black main.py
--- /dev/null
+chen
+====
+
+XMPP bot to preview links and file contents. Shikigami of the Shikigami of the Gap Youkai
+Based on Angel[1], without the sed(1) and YT redirect features.
+
+Requirements
+------------
+
+* Python >= 3.7
+
+Run
+---
+
+* pip3 install --user -r requirements.txt
+* $EDITOR config.ini.default (save as config.ini)
+* python3 main.py
+
+
+[1]: https://wiki.kalli.st/Angel
--- /dev/null
+[chen]
+jid = chen@example.com
+password = b0TPA55W0rD
+nick = Chen
+autojoin = room1@muc.example.com room2@muc.example.com room3@muc.example.com
--- /dev/null
+import requests
+import bs4
+import random
+import configparser
+import re
+import io
+import os
+import mimetypes
+import asyncio
+from collections import defaultdict
+from slixmpp import ClientXMPP
+from urllib.parse import urlparse, parse_qs, urlunparse
+from pantomime import normalize_mimetype
+import ecgi
+
+parser = "html.parser"
+user_agent = "Mozilla/5.0 (X11; Linux x86_64; rv:10.0) Gecko/20100101 Firefox/10.0"
+accept_lang = "en-US"
+data_limit = 786400000
+
+headers = {
+ "user-agent": user_agent,
+ "Accept-Language": accept_lang,
+ "Cache-Control": "no-cache",
+}
+block_list = (
+ "localhost",
+ "127.0.0.1",
+ "0.0.0.0",
+ "youtu.be",
+ "www.youtube.com",
+ "youtube.com",
+ "m.youtube.com",
+ "music.youtube.com",
+)
+req_list = (
+ "http://",
+ "https://",
+)
+html_files = (
+ "text/html",
+ "application/xhtml+xml",
+)
+
+
+class Lifo(list):
+ """
+ Limited size LIFO array to store messages and urls
+ """
+
+ def __init__(self, size):
+ super().__init__()
+ self.size = size
+
+ def add(self, item):
+ self.insert(0, item)
+ if len(self) > self.size:
+ self.pop()
+
+
+# Cheeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeen
+class ChenBot(ClientXMPP):
+ commands = {}
+ muc_commands = {}
+
+ messages = defaultdict(
+ lambda: {
+ "messages": Lifo(100),
+ "links": Lifo(10),
+ "previews": Lifo(10),
+ }
+ )
+
+ def get_urls(self, msg):
+ str_list = msg["body"].strip().split()
+ urls = [u for u in str_list if any(r in u for r in req_list)]
+ return urls
+
+ async def parse_uri(self, uri, sender, mtype):
+ """Parse a URI and send the result to the sender."""
+ netloc = uri.netloc
+ if netloc.split(":")[0] in block_list:
+ return
+ else:
+ await self.process_link(uri, sender, mtype)
+
+ async def process_link(self, uri, sender, mtype):
+ """Process a link and send the result to the sender."""
+ url = urlunparse(uri)
+ r = requests.get(url, stream=True, headers=headers, timeout=5)
+ if not r.ok:
+ return
+ ftype = normalize_mimetype(r.headers.get("content-type"))
+ if ftype in html_files:
+ data = ""
+ for i in r.iter_content(chunk_size=1024, decode_unicode=False):
+ data += i.decode("utf-8", errors="ignore")
+ if len(data) > data_limit or "</head>" in data.lower():
+ break
+ soup = bs4.BeautifulSoup(data, parser)
+ if title := soup.find("title"):
+ output = title.text.strip()
+ if output:
+ output = f"*{output}*" if ("\n" not in output) else output
+ if output in self.messages[sender]["previews"]:
+ return
+
+ self.messages[sender]["previews"].add(output)
+ if r.history:
+ self.send_message(mto=sender, mbody=r.url, mtype=mtype)
+ self.send_message(mto=sender, mbody=output, mtype=mtype)
+
+ else:
+ try:
+ length = 0
+ outfile = io.BytesIO()
+ for chunk in r.iter_content(
+ chunk_size=512,
+ decode_unicode=False,
+ ):
+ length += 512
+ if length >= data_limit:
+ return
+ outfile.write(chunk)
+
+ content_disposition = r.headers.get("content-disposition")
+ filename = None
+ if content_disposition:
+ _, params = ecgi.parse_header(content_disposition)
+ filename = params.get("filename")
+ else:
+ filename = os.path.basename(uri.path)
+
+ ext = os.path.splitext(filename)[1] if filename else ".txt"
+ fname = filename if filename else f"file{ext}"
+ await self.embed_file(url, sender, mtype, ftype, fname, outfile)
+ except Exception as e:
+ print(e)
+
+ async def embed_file(self, url, sender, mtype, ftype, fname, outfile):
+ """Embed a file and send the result to the sender."""
+ furl = await self.plugin["xep_0363"].upload_file(
+ fname, content_type=ftype, input_file=outfile
+ )
+ message = self.make_message(sender)
+ message["body"] = furl
+ message["type"] = mtype
+ message["oob"]["url"] = furl
+ message.send()
+
+ async def parse_urls(self, msg, urls, sender, mtype):
+ body = msg["body"].lower()
+ if "nsfl" in body:
+ return
+ if "nsfw" in body:
+ return
+ if "#nospoil" in body:
+ return
+ for u in urls:
+ if u in self.messages[sender]["links"]:
+ continue
+ else:
+ self.messages[sender]["links"].add(u)
+
+ uri = urlparse(u)
+ await self.parse_uri(uri, sender, mtype)
+
+ def __init__(self, jid, password, nick, autojoin=None):
+ ClientXMPP.__init__(self, jid, password)
+ self.jid = jid
+ self.nick = nick or []
+ self.autojoin = autojoin or []
+ self.register_plugin("xep_0030")
+ self.register_plugin("xep_0060")
+ self.register_plugin("xep_0054")
+ self.register_plugin("xep_0045")
+ self.register_plugin("xep_0066")
+ self.register_plugin("xep_0084")
+ self.register_plugin("xep_0153")
+ self.register_plugin("xep_0363")
+
+ self.add_event_handler("session_start", self.session_start)
+ self.add_event_handler("message", self.message)
+ self.add_event_handler("groupchat_message", self.muc_message)
+ self.add_event_handler("disconnected", lambda _: self.connect())
+
+ async def session_start(self, event):
+ """Start the bot."""
+ self.send_presence()
+ await self.get_roster()
+ await self.update_info()
+ for channel in self.autojoin:
+ try:
+ self.plugin["xep_0045"].join_muc(channel, self.nick)
+ except Exception as e:
+ print(e)
+
+ async def update_info(self):
+ """Update the bot info."""
+ with open("avatar.png", "rb") as avatar_file:
+ avatar = avatar_file.read()
+
+ avatar_type = "image/png"
+ avatar_id = self.plugin["xep_0084"].generate_id(avatar)
+ avatar_bytes = len(avatar)
+
+ asyncio.gather(self.plugin["xep_0084"].publish_avatar(avatar))
+ asyncio.gather(
+ self.plugin["xep_0153"].set_avatar(
+ avatar=avatar,
+ mtype=avatar_type,
+ )
+ )
+
+ info = {
+ "id": avatar_id,
+ "type": avatar_type,
+ "bytes": avatar_bytes,
+ }
+
+ asyncio.gather(self.plugin["xep_0084"].publish_avatar_metadata([info]))
+
+ vcard = self.plugin["xep_0054"].make_vcard()
+ vcard["URL"] = "https://git.chaotic.ninja/yakumo.izuru/chen"
+ vcard["DESC"] = "Shikigami of the Shikigami of the Gap Youkai"
+ vcard["NICKNAME"] = "Chen"
+ vcard["FN"] = "Chen"
+ asyncio.gather(self.plugin["xep_0054"].publish_vcard(vcard))
+
+ async def message(self, msg):
+ """Process a message."""
+ if msg["type"] in ("chat", "normal"):
+ mtype = "chat"
+ sender = msg["from"].bare
+ message = msg["body"]
+
+ ctx = message.strip().split()
+
+ try:
+ if not msg["oob"]["url"]:
+ if urls := self.get_urls(msg):
+ await self.parse_urls(msg, urls, sender, mtype)
+ except Exception as e:
+ print(e)
+
+ async def muc_message(self, msg):
+ """Process a groupchat message."""
+ if msg["type"] in ("groupchat", "normal"):
+ mtype = "groupchat"
+ sender = msg["from"].bare
+ if msg["mucnick"] == self.nick:
+ return
+
+ ctx = msg["body"].strip().split()
+
+ try:
+ if not msg["oob"]["url"]:
+ if urls := self.get_urls(msg):
+ await self.parse_urls(msg, urls, sender, mtype)
+ except Exception:
+ pass
+
+
+if __name__ == "__main__":
+ config = configparser.ConfigParser()
+ config.read("config.ini")
+ jid = config["chen"]["jid"]
+ password = config["chen"]["password"]
+ nick = config["chen"]["nick"]
+ autojoin = config["chen"]["autojoin"].split()
+ bot = ChenBot(jid, password, nick, autojoin=autojoin)
+
+ bot.connect()
+ bot.process(forever=True)
--- /dev/null
+requests
+slixmpp
+beautifulsoup4
+pantomime
+aiohttp
+git+https://git.chaotic.ninja/yakumo.izuru/ecgi#egg=ecgi
--- /dev/null
+#!/bin/sh
+# $TheSupernovaDuo$
+echo "Starting bot..."
+python-3.9 main.py 2>chen.log
--- /dev/null
+# $YakumoLabs$
+# vim: ft=systemd
+[Unit]
+Description=XMPP bot to preview links and file contents
+After=network.target
+After=prosody.service
+After=ejabberd.service
+
+[Service]
+Type=simple
+Restart=always
+RestartSec=10
+StartLimitBurst=5
+StartLimitInterval=100
+WorkingDirectory=%h/chen/
+ExecStart=/usr/bin/python main.py
+
+[Install]
+WantedBy=multi-user.target
--- /dev/null
+# $YakumoLabs$
+# vim: ft=yaml
+#
+# Note: %h must be replaced by the user's home directory path
+cmd: /usr/bin/env python3 main.py
+cwd: %h/chen