From aa3c2e49cc2d11fa491b2204fb2128e403cb1428 Mon Sep 17 00:00:00 2001 From: zlaxy Date: Sun, 18 Jun 2017 17:31:35 +0300 Subject: [PATCH] html parser from XRevan86 --- README.md | 2 ++ plugins/shell_cmds.py | 62 ++++++++++++++++++++++++++++++++++++++++++ plugins/title_fetch.py | 60 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 124 insertions(+) create mode 100644 plugins/shell_cmds.py create mode 100644 plugins/title_fetch.py diff --git a/README.md b/README.md index eb6cca6..53c2f85 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ # hptoad An MIT licensed XMPP bot written using Python 3 and slixmpp. + +Original project: https://gitlab.com/XRevan86/hptoad diff --git a/plugins/shell_cmds.py b/plugins/shell_cmds.py new file mode 100644 index 0000000..81f17c5 --- /dev/null +++ b/plugins/shell_cmds.py @@ -0,0 +1,62 @@ +# -*- python -*- +import asyncio +import os +import re + +class Plugin: + _trim_regexp = re.compile("(`|\\$|\\.\\.)") + _quote_regexp = re.compile("(\"|')") + + @classmethod + def _trim(cls, s): + result = cls._trim_regexp.sub("", s) + result = cls._quote_regexp.sub("“", result).strip() + return result + + # letter(ASCII or cyrillic), number, underscore only. + _cmd_validator_regexp = re.compile("^(\\w|\\p{Cyrillic})*$") + + @asyncio.coroutine + def _exec_cmd(self, cmd, body, nick, dir_path, is_admin): + is_admin = "true" if is_admin else "false" + path = os.path.join(dir_path, self._trim(cmd)) + + if not self._cmd_validator_regexp.match(cmd) or \ + not os.access(path, os.F_OK | os.X_OK) or not os.path.isfile(path): + return {"handled": False} + + if not os.access(path, os.R_OK): + return {"handled": True, + "error": "\"%s\" is not readable" % path} + + cmd = [path, self._trim(nick), is_admin, self._trim(body)] + try: + pipe = asyncio.subprocess.PIPE + proc = yield from asyncio.create_subprocess_exec(*cmd, + stdout=pipe, + stderr=pipe) + cmd_reply, cmd_error = yield from proc.communicate() + except OSError as e: + return {"handled": True, + "error": "Execute: %s" % str(e)} + + result = {} + if cmd_error and len(cmd_error.strip()) > 0: + result["error"] = "Process: %s" % cmd_error.strip() + if cmd_reply and len(cmd_reply.strip()) > 0: + result["reply"] = cmd_reply.decode().strip() + if result: + result["handled"] = True + return result + + @asyncio.coroutine + def command(self, command, body, nick, from_id, is_admin): + result = yield from self._exec_cmd(command, body, nick, + "plugins", is_admin) + return result + + @asyncio.coroutine + def question(self, body, nick, from_id, is_admin): + result = yield from self._exec_cmd("answer", body, nick, + "chat", is_admin) + return result diff --git a/plugins/title_fetch.py b/plugins/title_fetch.py new file mode 100644 index 0000000..f4107af --- /dev/null +++ b/plugins/title_fetch.py @@ -0,0 +1,60 @@ +# -*- python -*- +import asyncio +import bs4 +import functools +import lxml +import re +import requests + +class Plugin: + _html_regexp = re.compile(r"(https?://[^\"\s>]+)") + + @asyncio.coroutine + def chat_message(self, body, nick, from_id, is_admin): + loop = asyncio.get_event_loop() + result = {} + + urls = self._html_regexp.findall(body) + if urls: + result["handled"] = True + mime_types = ("application/xhtml+xml", "application/xml", + "text/html", "text/xml") + reply = "" + for url in urls: + try: + req = yield from \ + loop.run_in_executor(None, + functools.partial(requests.get, + url, + stream=True)) + if req.headers["content-type"].startswith(mime_types): + # Handle a case when no charset is defined for text/html. + if req.headers["content-type"].startswith("text/") and \ + not "charset=" in req.headers["content-type"]: + req.encoding = None + + if not req.encoding: + req.encoding = req.apparent_encoding + + contents = title = "" + for i in req.iter_content(chunk_size=128, + decode_unicode=True): + contents += i + soup = bs4.BeautifulSoup(contents, "lxml") + if soup and soup.title: + if soup.title.string == title: + req.close() + break + title = soup.title.string + + if title: + if reply: + reply += "\n" + reply += "Link: %s" % title + except Exception as e: + result["error"] = "Title fetch: %s" % str(e) + result["reply"] = reply + else: + result["handled"] = False + + return result \ No newline at end of file