hptoad/plugins/shell_cmds.py

63 rader
2.2 KiB
Python

# -*- python -*-
import asyncio
import os
import re
class Plugin:
_trim_regexp = re.compile("(`|\\$|\\.\\.)")
_quote_regexp = re.compile("(\"|')")
@classmethod
def _trim(cls, s):
result = cls._trim_regexp.sub("", s)
result = cls._quote_regexp.sub("", result).strip()
return result
# letter(ASCII or cyrillic), number, underscore only.
_cmd_validator_regexp = re.compile("^(\\w|\\p{Cyrillic})*$")
@asyncio.coroutine
def _exec_cmd(self, cmd, body, nick, dir_path, is_admin):
is_admin = "true" if is_admin else "false"
path = os.path.join(dir_path, self._trim(cmd))
if not self._cmd_validator_regexp.match(cmd) or \
not os.access(path, os.F_OK | os.X_OK) or not os.path.isfile(path):
return {"handled": False}
if not os.access(path, os.R_OK):
return {"handled": True,
"error": "\"%s\" is not readable" % path}
cmd = [path, self._trim(nick), is_admin, self._trim(body)]
try:
pipe = asyncio.subprocess.PIPE
proc = yield from asyncio.create_subprocess_exec(*cmd,
stdout=pipe,
stderr=pipe)
cmd_reply, cmd_error = yield from proc.communicate()
except OSError as e:
return {"handled": True,
"error": "Execute: %s" % str(e)}
result = {}
if cmd_error and len(cmd_error.strip()) > 0:
result["error"] = "Process: %s" % cmd_error.strip()
if cmd_reply and len(cmd_reply.strip()) > 0:
result["reply"] = cmd_reply.decode().strip()
if result:
result["handled"] = True
return result
@asyncio.coroutine
def command(self, command, body, nick, from_id, is_admin):
result = yield from self._exec_cmd(command, body, nick,
"plugins", is_admin)
return result
@asyncio.coroutine
def question(self, body, nick, from_id, is_admin):
result = yield from self._exec_cmd("answer", body, nick,
"chat", is_admin)
return result