#!/usr/bin/env python3 import asyncio import logging import os import re import signal import subprocess import sys import time import slixmpp opts = { "muc": "room@conference.example.com", "nick": "botname", "jid": "botname@example.com", "resource": "resource", "password": "password", "connect": "xmpp.example.org:5222", } class Hptoad: def __init__(self, opts): self.client = slixmpp.ClientXMPP("%s/%s" % (opts["jid"], opts["resource"]), opts["password"]) self.client.register_plugin("xep_0199") # XMPP Ping. self.client.register_plugin("xep_0045") # XMPP MUC. self.muc_obj = self.client.plugin["xep_0045"] self.logger = logging.getLogger(self.__class__.__name__) self.logger.addHandler(logging.NullHandler()) self.logger.setLevel(logging.DEBUG) self.jid = opts["jid"] self.connect_host = opts["connect"] self.muc = opts["muc"] self.pure_bot_nick = opts["nick"] self.bot_nick = self.pure_bot_nick def register_handlers(self): self.client.add_event_handler("failed_all_auth", self.on_failed_all_auth) self.client.add_event_handler("session_start", self.on_session_start) self.client.add_event_handler("session_end", self.on_session_end) self.client.add_event_handler("message", self.on_message) self.client.add_event_handler("muc::%s::presence" % self.muc, self.on_muc_presence) def connect(self): # Reset the nick. self.bot_nick = self.pure_bot_nick if self.connect_host: connect = self.connect_host.split(":", 1) if len(connect) != 2 or not connect[1].isdigit(): self.logger.critical("Conn: Connection server format is " + "invalid, should be example.org:5222") sys.exit(1) else: connect = () self.client.connect(connect) def join_muc(self): self.muc_obj.join_muc(self.muc, self.bot_nick) def log_exception(self, ex): self.logger.error("%s: %s" % (type(ex).__name__, str(ex))) def log_message_event(self, event): self.logger.debug("&{{jabber:client message} %s %s %s %s %s { }}" % (event["from"], event["id"], event["to"], event["type"], event["body"])) def is_muc_admin(self, muc, nick): if nick not in self.muc_obj.rooms[self.muc]: return False affiliation = self.muc_obj.get_jid_property(muc, nick, "affiliation") return True if affiliation in ("admin", "owner") else False _trim_regexp = re.compile("(`|\\$|\\.\\.)") _quote_regexp = re.compile("(\"|')") @classmethod def trim(cls, s): result = cls._trim_regexp.sub("", s) result = cls._quote_regexp.sub("“", result).strip() return result # letter(ASCII or cyrillic), number, underscore only. _cmd_validator_regexp = re.compile("^!(\\w|\\p{Cyrillic})*$") @asyncio.coroutine def prep_extern_cmd(self, body, nick, dir_path, is_admin=False): cmd = body.split(" ", 1) cmd[0] = cmd[0].strip() is_admin = "true" if is_admin else "false" if not self._cmd_validator_regexp.match(cmd[0]): return None, "Bad command \"%s\"" % cmd[0] path = os.path.join(dir_path, self.trim(cmd[0][1:])) if not os.access(path, os.F_OK): return None, "\"%s\" does not exist" % path if not os.path.isfile(path): return None, "\"%s\" is not a file" % path if not os.access(path, os.R_OK | os.X_OK): return None, "\"%s\" is not readable or executable" % path proc_args = [path, self.trim(nick), is_admin] if len(cmd) > 1: proc_args.append(self.trim(cmd[1])) return proc_args, None @asyncio.coroutine def extern_cmd(self, body, nick, from_id, dir_path, is_admin=False): reply = "" err = None cmd, prep_err = yield from self.prep_extern_cmd(body, nick, dir_path, is_admin=is_admin) if prep_err: reply = "%s: WAT" % nick err = "Command: %s" % prep_err return reply, err try: proc = yield from \ asyncio.create_subprocess_exec(*cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) cmd_reply, cmd_err = yield from proc.communicate() except subprocess.CalledProcessError as e: reply = "%s: WAT" % nick err = "Execute: %s" % str(e) return reply, err if cmd_err and len(cmd_err.strip()) > 0: err = "Process: %s" % cmd_err.strip() if cmd_reply and len(cmd_reply.strip()) > 0: reply = cmd_reply.decode().strip() return reply, err @asyncio.coroutine def handle_cmd(self, body, nick, from_id, is_admin=False): reply = "" err = None if body == "!megakick": # Incomplete megakick. reply = "%s: WAT" % nick elif body.startswith("!megakick "): # Megakick. victim = body.split("!megakick ", 1)[1] is_bot_admin = self.is_muc_admin(self.muc, self.bot_nick) is_victim_admin = self.is_muc_admin(self.muc, victim) if is_admin and victim != self.bot_nick: if is_bot_admin and not is_victim_admin and \ victim in self.muc_obj.rooms[self.muc]: self.muc_obj.set_role(self.muc, victim, "none") else: reply = "%s: Can't megakick %s." % (nick, victim) else: reply = "%s: GTFO" % nick elif body.startswith("!"): # Any external command. reply, err = yield from self.extern_cmd(body, nick, from_id, "plugins", is_admin=is_admin) return reply, err @asyncio.coroutine def handle_self_message(self, body, nick, from_id): if body.startswith("!"): msg, err = yield from self.handle_cmd(body, nick, from_id, is_admin=True) else: msg = body.strip() if msg and len(msg) > 0: self.client.send_message(mto=self.muc, mbody=msg, mtype="groupchat") @asyncio.coroutine def handle_muc_message(self, body, nick, from_id): is_admin = self.is_muc_admin(self.muc, nick) reply = "" err = None # Has to be redone with the current bot nick. call_regexp = re.compile("^%s[:,]" % self.bot_nick) if body.startswith("!"): # Any external command. reply, err = yield from self.handle_cmd(body, nick, from_id, is_admin=is_admin) elif call_regexp.match(body): # Chat. cmd_body = call_regexp.sub("!answer", body) reply, err = yield from self.extern_cmd(cmd_body, nick, from_id, "chat", is_admin=is_admin) if err: self.logger.error(err) if is_admin: self.client.send_message(mto=from_id, mbody=err, mtype="chat") if reply: self.client.send_message(mto=self.muc, mbody=reply, mtype="groupchat") def on_failed_all_auth(self, event): self.logger.critical("Auth: Could not connect to the server, or " + "password mismatch!") sys.exit(1) def on_session_start(self, event): self.client.send_presence(pstatus="is there some food in this world?", ppriority=12) self.client.get_roster() self.join_muc() def on_session_end(self, event): time.sleep(2.0) self.connect() @asyncio.coroutine def on_message(self, event): try: if not event["type"] in ("chat", "normal", "groupchat"): return self.log_message_event(event) body = event["body"] from_id = event["from"] if event["type"] == "groupchat": nick = event["mucnick"] if nick != self.bot_nick: yield from self.handle_muc_message(body, nick, from_id) elif event["from"].bare == self.jid: # Use resource as a nickname with self messages. nick = from_id.resource yield from self.handle_self_message(body, nick, from_id) except Exception as e: self.log_exception(e) def on_muc_presence(self, event): try: typ = event["muc"]["type"] from_id = event["from"] nick = event["muc"]["nick"] if not typ: typ = event["type"] if not nick: nick = self.muc_obj.get_nick(self.muc, from_id) if typ == "error": if event["error"]["code"] == "409": self.bot_nick = self.bot_nick + "_" self.join_muc() elif typ == "unavailable": if nick == self.bot_nick: self.bot_nick = self.pure_bot_nick time.sleep(0.5) self.join_muc() except Exception as e: self.log_exception(e) def run(self): self.register_handlers() self.connect() self.client.process(forever=True) if __name__ == "__main__": signal.signal(signal.SIGINT, signal.SIG_DFL) logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y/%m/%d %H:%M:%S") if os.path.isfile(sys.argv[0]) and os.path.dirname(sys.argv[0]): os.chdir(os.path.dirname(sys.argv[0])) hptoad = Hptoad(opts) while True: hptoad.run() logging.error("Unknown: WTF am I doing here?") time.sleep(0.5)