290 lines
9.7 KiB
Python
Executable File
290 lines
9.7 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import sleekxmpp
|
|
|
|
opts = {
|
|
"muc": "room@conference.example.com",
|
|
"nick": "botname",
|
|
"jid": "botname@example.com",
|
|
"resource": "resource",
|
|
"password": "password",
|
|
"connect": "xmpp.example.org:5222",
|
|
}
|
|
|
|
|
|
class Hptoad:
|
|
def __init__(self, opts):
|
|
if sys.version_info.major < 3:
|
|
sleekxmpp.util.misc_ops.setdefaultencoding("utf-8")
|
|
|
|
self.client = sleekxmpp.ClientXMPP("%s/%s" % (opts["jid"],
|
|
opts["resource"]),
|
|
opts["password"])
|
|
self.client.register_plugin("xep_0199") # XMPP Ping.
|
|
self.client.register_plugin("xep_0045") # XMPP MUC.
|
|
self.muc_obj = self.client.plugin["xep_0045"]
|
|
|
|
self.jid = opts["jid"]
|
|
self.connect = opts["connect"]
|
|
self.muc = opts["muc"]
|
|
self.pure_bot_nick = opts["nick"]
|
|
self.bot_nick = self.pure_bot_nick
|
|
|
|
def register_handlers(self):
|
|
self.client.add_event_handler("session_start", self.on_session_start)
|
|
self.client.add_event_handler("message", self.on_message,
|
|
threaded=True)
|
|
self.client.add_event_handler("muc::%s::presence" % self.muc,
|
|
self.on_muc_presence)
|
|
|
|
def join_muc(self):
|
|
if self.muc in self.muc_obj.getJoinedRooms():
|
|
self.muc_obj.leaveMUC(self.muc, self.bot_nick,
|
|
msg="Replaced by new connection")
|
|
self.muc_obj.joinMUC(self.muc, self.bot_nick, wait=True)
|
|
|
|
@classmethod
|
|
def log_exception(cls, ex):
|
|
logging.error("%s: %s" % (type(ex).__name__, str(ex)))
|
|
|
|
@classmethod
|
|
def log_message_event(cls, event):
|
|
logging.debug("&{{jabber:client message} %s %s %s %s %s { }}" %
|
|
(event["from"], event["id"], event["to"],
|
|
event["type"], event["body"]))
|
|
|
|
def is_muc_admin(self, muc, nick):
|
|
if nick not in self.muc_obj.rooms[self.muc]:
|
|
return False
|
|
|
|
affiliation = self.muc_obj.getJidProperty(muc, nick, "affiliation")
|
|
return True if affiliation in ("admin", "owner") else False
|
|
|
|
_trim_regexp = re.compile("(`|\\$|\\.\\.)")
|
|
_quote_regexp = re.compile("(\"|')")
|
|
|
|
@classmethod
|
|
def trim(cls, s):
|
|
result = cls._trim_regexp.sub("", s)
|
|
result = cls._quote_regexp.sub("“", result).strip()
|
|
return result
|
|
|
|
# letter(ASCII or cyrillic), number, underscore only.
|
|
_cmd_validator_regexp = re.compile("^!(\\w|\\p{Cyrillic})*$")
|
|
|
|
def prep_extern_cmd(self, body, nick, dir_path, is_admin=False):
|
|
cmd = body.split(" ", 1)
|
|
cmd[0] = cmd[0].strip()
|
|
|
|
is_admin = "true" if is_admin else "false"
|
|
|
|
if not self._cmd_validator_regexp.match(cmd[0]):
|
|
return None, "Bad command \"%s\"" % cmd[0]
|
|
|
|
path = os.path.join(dir_path, self.trim(cmd[0][1:]))
|
|
|
|
if not os.access(path, os.F_OK):
|
|
return None, "\"%s\" does not exist" % path
|
|
if not os.path.isfile(path):
|
|
return None, "\"%s\" is not a file" % path
|
|
if not os.access(path, os.R_OK | os.X_OK):
|
|
return None, "\"%s\" is not readable or executable" % path
|
|
|
|
proc_args = [path, self.trim(nick), is_admin]
|
|
if len(cmd) > 1:
|
|
proc_args.append(self.trim(cmd[1]))
|
|
|
|
return proc_args, None
|
|
|
|
def extern_cmd(self, body, nick, from_id, dir_path, is_admin=False):
|
|
reply = ""
|
|
err = None
|
|
|
|
cmd, prep_err = self.prep_extern_cmd(body, nick, dir_path,
|
|
is_admin=is_admin)
|
|
|
|
if prep_err:
|
|
reply = "%s: WAT" % nick
|
|
err = "Command: %s" % prep_err
|
|
return reply, err
|
|
|
|
try:
|
|
proc = subprocess.Popen(cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
universal_newlines=True)
|
|
cmd_reply, cmd_err = proc.communicate()
|
|
except subprocess.CalledProcessError as e:
|
|
reply = "%s: WAT" % nick
|
|
err = "Execute: %s" % str(e)
|
|
return reply, err
|
|
|
|
if cmd_err and len(cmd_err.strip()) > 0:
|
|
err = "Process: %s" % cmd_err.strip()
|
|
if cmd_reply and len(cmd_reply.strip()) > 0:
|
|
reply = cmd_reply.strip()
|
|
return reply, err
|
|
|
|
def handle_cmd(self, body, nick, from_id, is_admin=False):
|
|
reply = ""
|
|
err = None
|
|
|
|
if body == "!megakick": # Incomplete megakick.
|
|
reply = "%s: WAT" % nick
|
|
|
|
elif body.startswith("!megakick "): # Megakick.
|
|
victim = body.split("!megakick ", 1)[1]
|
|
|
|
is_bot_admin = self.is_muc_admin(self.muc, self.bot_nick)
|
|
is_victim_admin = self.is_muc_admin(self.muc, victim)
|
|
|
|
if is_admin and victim != self.bot_nick:
|
|
if is_bot_admin and not is_victim_admin and \
|
|
victim in self.muc_obj.rooms[self.muc]:
|
|
self.muc_obj.setRole(self.muc, victim, "none")
|
|
else:
|
|
reply = "%s: Can't megakick %s." % (nick, victim)
|
|
else:
|
|
reply = "%s: GTFO" % nick
|
|
|
|
elif body.startswith("!"): # Any external command.
|
|
reply, err = self.extern_cmd(body, nick, from_id, "plugins",
|
|
is_admin=is_admin)
|
|
|
|
return reply, err
|
|
|
|
def handle_self_message(self, body, nick, from_id):
|
|
if body.startswith("!"):
|
|
msg, err = self.handle_cmd(body, nick, from_id, is_admin=True)
|
|
else:
|
|
msg = body.strip()
|
|
|
|
if msg and len(msg) > 0:
|
|
self.client.send_message(mto=self.muc, mbody=msg,
|
|
mtype="groupchat")
|
|
|
|
def handle_muc_message(self, body, nick, from_id):
|
|
is_admin = self.is_muc_admin(self.muc, nick)
|
|
|
|
reply = ""
|
|
err = None
|
|
|
|
# Has to be redone with the current bot nick.
|
|
call_regexp = re.compile("^%s[:,]" % self.bot_nick)
|
|
|
|
if body.startswith("!"): # Any external command.
|
|
reply, err = self.handle_cmd(body, nick, from_id,
|
|
is_admin=is_admin)
|
|
|
|
elif call_regexp.match(body): # Chat.
|
|
cmd_body = call_regexp.sub("!answer", body)
|
|
reply, err = self.extern_cmd(cmd_body, nick, from_id, "chat",
|
|
is_admin=is_admin)
|
|
|
|
if err:
|
|
logging.error(err)
|
|
if is_admin:
|
|
self.client.send_message(mto=from_id, mbody=err, mtype="chat")
|
|
if reply:
|
|
self.client.send_message(mto=self.muc, mbody=reply,
|
|
mtype="groupchat")
|
|
|
|
def on_session_start(self, event):
|
|
self.client.get_roster()
|
|
self.client.send_presence(pstatus="is there some food in this world?",
|
|
ppriority=12)
|
|
self.join_muc()
|
|
|
|
def on_message(self, event):
|
|
try:
|
|
if not event["type"] in ("chat", "normal", "groupchat"):
|
|
return
|
|
|
|
self.log_message_event(event)
|
|
|
|
body = event["body"]
|
|
from_id = event["from"]
|
|
|
|
if event["type"] == "groupchat":
|
|
nick = event["mucnick"]
|
|
self.handle_muc_message(body, nick, from_id)
|
|
|
|
elif event["from"].bare == self.jid:
|
|
# Use resource as a nickname with self messages.
|
|
nick = from_id.resource
|
|
self.handle_self_message(body, nick, from_id)
|
|
except Exception as e:
|
|
self.log_exception(e)
|
|
|
|
def on_muc_presence(self, event):
|
|
try:
|
|
typ = event["muc"]["type"]
|
|
from_id = event["from"]
|
|
nick = event["muc"]["nick"]
|
|
|
|
if not typ:
|
|
typ = event["type"]
|
|
if not nick:
|
|
nick = self.muc_obj.getNick(self.muc, from_id)
|
|
|
|
if typ == "error":
|
|
if event["error"]["code"] == "409":
|
|
self.bot_nick = self.bot_nick + "_"
|
|
self.join_muc()
|
|
|
|
elif typ == "unavailable":
|
|
if nick == self.bot_nick:
|
|
self.bot_nick = self.pure_bot_nick
|
|
time.sleep(0.5)
|
|
self.join_muc()
|
|
except Exception as e:
|
|
self.log_exception(e)
|
|
|
|
def run(self):
|
|
# Reset the nick.
|
|
self.bot_nick = self.pure_bot_nick
|
|
|
|
if self.connect:
|
|
connect = self.connect.split(":", 1)
|
|
if len(connect) != 2 or not connect[1].isdigit():
|
|
logging.critical("Conn: Connection server format is " +
|
|
"invalid, should be example.org:5222")
|
|
sys.exit(1)
|
|
else:
|
|
connect = ()
|
|
|
|
if self.client.connect(connect):
|
|
self.register_handlers()
|
|
self.client.process(block=True)
|
|
else:
|
|
logging.critical("Auth: Could not connect to server, or " +
|
|
"password mismatch!")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
signal.signal(signal.SIGINT, signal.SIG_DFL)
|
|
|
|
logging.basicConfig(level=logging.DEBUG,
|
|
format="%(asctime)s %(message)s",
|
|
datefmt="%Y/%m/%d %H:%M:%S")
|
|
# Silence sleekxmpp debug information.
|
|
logging.getLogger("sleekxmpp").setLevel(logging.CRITICAL)
|
|
|
|
if os.path.isfile(sys.argv[0]) and os.path.dirname(sys.argv[0]):
|
|
os.chdir(os.path.dirname(sys.argv[0]))
|
|
|
|
hptoad = Hptoad(opts)
|
|
while True:
|
|
hptoad.run()
|
|
logging.error("Unknown: WTF am I doing here?")
|
|
time.sleep(0.5)
|