merging CMDConnect and CursesConnect into the single Connect function

This commit is contained in:
ivan 2022-06-14 12:33:32 +03:00
parent abb2fcfa4c
commit 37b3926880
2 changed files with 116 additions and 135 deletions

View File

@ -13,7 +13,7 @@ def main():
long_description='SSH connection and aliases manager with curses and command line interface', long_description='SSH connection and aliases manager with curses and command line interface',
long_description_content_type='text/x-rst', long_description_content_type='text/x-rst',
license='DWTWL 2.55', license='DWTWL 2.55',
version='1.09.3', version='1.09.4',
py_modules=['sshch'], py_modules=['sshch'],
scripts=['sshch/sshch'], scripts=['sshch/sshch'],
keywords='sshch ssh aliases curses manager', keywords='sshch ssh aliases curses manager',

View File

@ -22,7 +22,7 @@ from curses import textpad, panel
from threading import Thread from threading import Thread
# https://gitlab.com/zlax/sshch # https://gitlab.com/zlax/sshch
version = "1.09.3" version = "1.09.4"
# expand groups by default # expand groups by default
expand_default = True expand_default = True
# path to conf dir and file, default: ~/.config/sshch.conf # path to conf dir and file, default: ~/.config/sshch.conf
@ -93,6 +93,41 @@ def ConvertPassword(password):
return password_string return password_string
def Connect(aliases, command=False, threading=False, screen=False):
if screen:
curses.endwin()
groups = []
connectaliases = []
for a in conf.sections():
if conf.has_option(a, "group"):
groups.append(a)
for alias in aliases:
if alias in groups:
group_aliases = GroupChildAliases(alias)
for ga in group_aliases:
connectaliases.append(ga)
else:
connectaliases.append(alias)
connectaliases = setSeq(connectaliases)
threads = {}
for alias in connectaliases:
if conf.has_section(alias):
print("Connecting to " + alias + "...")
if threading:
threads[alias] = Thread(target=ConnectAlias, args=(alias,
command, True))
threads[alias].start()
else:
ConnectAlias(alias, command)
if not threading:
print("... " + alias + " session finished.")
else:
print("error: '" + alias + "' alias does not exists")
if screen:
print("Press 'enter' to continue.")
screen.getch()
def ConnectAlias(alias, command=False, threading=False): def ConnectAlias(alias, command=False, threading=False):
exec_string = "" exec_string = ""
if conf.has_option(alias, "password"): if conf.has_option(alias, "password"):
@ -155,8 +190,8 @@ def CMDGroup(group):
def CMDEdit(alias): def CMDEdit(alias):
if conf.has_section(alias): if conf.has_section(alias):
if conf.has_option(alias, "exec_string"): if conf.has_option(alias, "exec_string"):
prompt_edit = ("".join(["Enter connection string for existing alias ", prompt_edit = ("".join(["Enter connection string for existing ",
"(example: ssh user@somehost.com):\n"])) "alias (example: ssh user@somehost.com):\n"]))
string = "" string = ""
while string == "": while string == "":
string = input(prompt_edit) string = input(prompt_edit)
@ -184,7 +219,8 @@ def CMDPassword(alias):
print("Can't set password for group.") print("Can't set password for group.")
else: else:
if conf.has_section(alias): if conf.has_section(alias):
prompt_pass = ("[UNSAFE] Enter password for sshpass (Ctrl+C - cancel, blank - clear password):\n") prompt_pass = ("[UNSAFE] Enter password for sshpass (Ctrl+C" +
" - cancel, blank - clear password):\n")
string = "" string = ""
string = getpass(prompt_pass) string = getpass(prompt_pass)
SetPassword(alias, string) SetPassword(alias, string)
@ -194,7 +230,8 @@ def CMDPassword(alias):
def CMDRemove(alias): def CMDRemove(alias):
if conf.has_section(alias): if conf.has_section(alias):
prompt_remove = ("Type 'yes' if you sure to remove '" + alias + "' alias or group: ") prompt_remove = ("Type 'yes' if you sure to remove '" + alias +
"' alias or group: ")
string = input(prompt_remove) string = input(prompt_remove)
if string == "yes": if string == "yes":
RemoveAliases([alias]) RemoveAliases([alias])
@ -204,35 +241,6 @@ def CMDRemove(alias):
print("error: '" + alias + "' alias or group does not exists.") print("error: '" + alias + "' alias or group does not exists.")
def CMDConnect(aliases, command=False, threading=False):
groups = []
connectaliases = []
for a in conf.sections():
if conf.has_option(a, "group"):
groups.append(a)
for alias in aliases:
if alias in groups:
group_aliases = GroupChildAliases(alias)
for ga in group_aliases:
connectaliases.append(ga)
else:
connectaliases.append(alias)
connectaliases = setSeq(connectaliases)
threads = {}
for alias in connectaliases:
if conf.has_section(alias):
print("Connecting to " + alias + "...")
if threading:
threads[alias] = Thread(target=ConnectAlias, args=(alias, command, True))
threads[alias].start()
else:
ConnectAlias(alias, command)
if not threading:
print("... " + alias + " session finished.")
else:
print("error: '" + alias + "' alias does not exists")
def CMDList(option, opt, value, parser): def CMDList(option, opt, value, parser):
print(' '.join(str(p) for p in conf.sections())) print(' '.join(str(p) for p in conf.sections()))
@ -292,7 +300,8 @@ def GroupChildAliases(group):
return setSeq(childaliases) return setSeq(childaliases)
def GroupTreeRecursion(level, group, treelist, resultalias, resultstring, expandlist, previousgroups): def GroupTreeRecursion(level, group, treelist, resultalias, resultstring,
expandlist, previousgroups):
if group in previousgroups: if group in previousgroups:
return resultalias, resultstring return resultalias, resultstring
previousgroups.append(group) previousgroups.append(group)
@ -339,35 +348,6 @@ def CMDFullList(option, opt, value, parser):
print (p) print (p)
def CursesConnect(screen, aliases, command=False, threading=False):
curses.endwin()
groups = []
connectaliases = []
for a in conf.sections():
if conf.has_option(a, "group"):
groups.append(a)
for alias in aliases:
if alias in groups:
group_aliases = GroupChildAliases(alias)
for ga in group_aliases:
connectaliases.append(ga)
else:
connectaliases.append(alias)
connectaliases = setSeq(connectaliases)
threads = {}
for alias in connectaliases:
print("Connecting to " + alias + "...")
if threading:
threads[alias] = Thread(target=ConnectAlias, args=(alias, command, True))
threads[alias].start()
else:
ConnectAlias(alias, command)
if not threading:
print("... " + alias + " session finished.")
print("Press 'enter' to continue.")
screen.getch()
def CursesExit(error=False): def CursesExit(error=False):
curses.endwin() curses.endwin()
if error: if error:
@ -450,19 +430,20 @@ def CMDOptions():
def format_epilog(self, formatter): def format_epilog(self, formatter):
return self.epilog return self.epilog
usage = "usage: %prog [options] [aliases]" usage = "usage: %prog [options] [aliases]"
progname = path.basename(__file__) progname = path.basename(__file__)
epilog = ("".join(["Examples:\n ", epilog = ("".join(["Examples:\n ",
progname, " existingalias\n ", progname, " existingalias\n ",
progname, " -a newremoteserver\n ", progname, " -a newremoteserver\n ",
progname, " --edit=newremoteserver -p newremoteserver\n ", progname, " --edit=newremoteserver -p newremoteserver\n ",
progname, ' -c "ls -l" newremoteserver\n ', progname, ' -c "ls -l" newremoteserver\n ',
progname, " -c reboot existingalias newremoteserver\n", progname, " -c reboot existingalias newremoteserver\n",
"Examples of connection string:\n ", "Examples of connection string:\n ",
"ssh user@somehost.com\n ", "ssh user@somehost.com\n ",
"ssh gates@8.8.8.8 -p 667\n ", "ssh gates@8.8.8.8 -p 667\n ",
"ssh root@somehost.com -t tmux a\n", "ssh root@somehost.com -t tmux a\n",
"Also, you can edit the config file manually: ", conf_file, "\n"])) "Also, you can edit the config file manually: ", conf_file, "\n"]))
opts = FormatedParser(usage=usage, version="%prog " + version, opts = FormatedParser(usage=usage, version="%prog " + version,
epilog=epilog) epilog=epilog)
opts.add_option('-l', '--list', action="callback", opts.add_option('-l', '--list', action="callback",
@ -509,32 +490,32 @@ def CMDOptions():
if options.keep: if options.keep:
HoldConnection(options.keep) HoldConnection(options.keep)
if options.thread: if options.thread:
CMDConnect(alias, options.thread, True) Connect(alias, options.thread, True)
elif alias: elif alias:
CMDConnect(alias, options.command) Connect(alias, options.command)
# curses template from: https://stackoverflow.com/a/30828805/6224462 # curses template from: https://stackoverflow.com/a/30828805/6224462
def CursesMain(): def CursesMain():
help_screen = ("".join([" Press:\n", help_screen = ("".join([" Press:\n",
" 'z'/'x', 'w'/'s' or arrows - navigation\n", " 'z'/'x', 'w'/'s' or arrows - navigation\n",
" 'a'/'F2' - add new alias (without spaces)\n", " 'a'/'F2' - add new alias (without spaces)\n",
" 'g'/'F5' - add new group (spaces will be stripped)\n", " 'g'/'F5' - add new group (spaces will be stripped)\n",
" 'e'/'F4' - edit existing alias/group\n", " 'e'/'F4' - edit existing alias/group\n",
" 'p'/'F6' - set alias's password for sshpass [UNSAFE]\n", " 'p'/'F6' - set alias's password for sshpass [UNSAFE]\n",
" 'space'/'insert' - select\n", " 'space'/'insert' - select\n",
" 'r'/'F8' - remove selected alias/aliases\n", " 'r'/'F8' - remove selected alias/aliases\n",
" 'c'/'F3' - command execution for alias (group/aliases - in turn)\n", " 'c'/'F3' - command execution for alias (group/aliases - in turn)\n",
" 't'/'F11' - parallel command execution for aliases and groups\n", " 't'/'F11' - parallel command execution for aliases and groups\n",
" (ssh key authentication or set password required)\n", " (ssh key authentication or set password required)\n",
" 'k'/'F7' - hold connection with selected alias\n", " 'k'/'F7' - hold connection with selected alias\n",
" 'enter'/'F9' - connect to selected alias/aliases,\n", " 'enter'/'F9' - connect to selected alias/aliases,\n",
" expand/collapse group\n", " expand/collapse group\n",
" 'q'/'F10' - quit\n", " 'q'/'F10' - quit\n",
" Run program with '--help' option to view command line help.\n", " Run program with '--help' option to view command line help.\n",
" Also, you can edit the config file manually:\n", " Also, you can edit the config file manually:\n",
" ", conf_file])) " ", conf_file]))
if expand_default == True: if expand_default == True:
groups = [] groups = []
for a in conf.sections(): for a in conf.sections():
@ -597,8 +578,8 @@ def CursesMain():
'A') or key_pressed == curses.KEY_F2: 'A') or key_pressed == curses.KEY_F2:
curses.curs_set(1) curses.curs_set(1)
new_alias_textpad = CursesTextpad(screen, 1, width - 8, new_alias_textpad = CursesTextpad(screen, 1, width - 8,
(height // 2) - 1, 4, "Enter new alias:", "", (height // 2) - 1, 4, "Enter new alias:", "",
normal_text, highlight_text) normal_text, highlight_text)
add_alias = new_alias_textpad.edit(CursesTextpadConfirm) add_alias = new_alias_textpad.edit(CursesTextpadConfirm)
if not add_alias == "": if not add_alias == "":
add_alias = add_alias.split()[0].strip() add_alias = add_alias.split()[0].strip()
@ -608,9 +589,9 @@ def CursesMain():
add_string = "" add_string = ""
while add_string == "": while add_string == "":
string_textpad = CursesTextpad(screen, 3, string_textpad = CursesTextpad(screen, 3,
width - 8, (height // 2) - 1, 4, width - 8, (height // 2) - 1, 4,
"Enter full execution string:", "Enter full execution string:",
"ssh ", normal_text, highlight_text) "ssh ", normal_text, highlight_text)
add_string = string_textpad.edit( add_string = string_textpad.edit(
CursesTextpadConfirm) CursesTextpadConfirm)
SetAliasString(add_alias, SetAliasString(add_alias,
@ -631,8 +612,8 @@ def CursesMain():
'G') or key_pressed == curses.KEY_F5: 'G') or key_pressed == curses.KEY_F5:
curses.curs_set(1) curses.curs_set(1)
new_group_textpad = CursesTextpad(screen, 1, width - 8, new_group_textpad = CursesTextpad(screen, 1, width - 8,
(height // 2) - 1, 4, "Enter group name (without spaces):", "", (height // 2) - 1, 4, "Enter group name (without spaces):", "",
normal_text, highlight_text) normal_text, highlight_text)
add_group = new_group_textpad.edit(CursesTextpadConfirm) add_group = new_group_textpad.edit(CursesTextpadConfirm)
if not add_group == "": if not add_group == "":
add_group = add_group.split()[0].strip() add_group = add_group.split()[0].strip()
@ -642,9 +623,9 @@ def CursesMain():
add_string = "" add_string = ""
while add_string.rstrip() == "": while add_string.rstrip() == "":
string_textpad = CursesTextpad(screen, 3, string_textpad = CursesTextpad(screen, 3,
width - 8, (height // 2) - 1, 4, width - 8, (height // 2) - 1, 4,
"Enter aliases for new group (example: alias1 alias2):", "Enter aliases for new group (example: alias1 alias2):",
"", normal_text, highlight_text) "", normal_text, highlight_text)
add_string = string_textpad.edit( add_string = string_textpad.edit(
CursesTextpadConfirm) CursesTextpadConfirm)
SetGroupString(add_group, SetGroupString(add_group,
@ -673,11 +654,11 @@ def CursesMain():
if strings[position - 1] in groups: if strings[position - 1] in groups:
while edit_string.rstrip() == "": while edit_string.rstrip() == "":
string_textpad = CursesTextpad(screen, 3, width - 8, string_textpad = CursesTextpad(screen, 3, width - 8,
(height // 2) - 1, 4, "Enter new aliases for existing group:", (height // 2) - 1, 4, "Enter new aliases for existing group:",
(conf.get(strings[position - 1], (conf.get(strings[position - 1],
"group") if conf.has_option(strings[position - 1], "group") if conf.has_option(strings[position - 1],
"group") else ""), "group") else ""),
normal_text, highlight_text) normal_text, highlight_text)
edit_string = string_textpad.edit(CursesTextpadConfirm) edit_string = string_textpad.edit(CursesTextpadConfirm)
SetGroupString(strings[position - 1], SetGroupString(strings[position - 1],
edit_string.replace("\n", "").rstrip()) edit_string.replace("\n", "").rstrip())
@ -690,11 +671,11 @@ def CursesMain():
else: else:
while edit_string.rstrip() == "": while edit_string.rstrip() == "":
string_textpad = CursesTextpad(screen, 3, width - 8, string_textpad = CursesTextpad(screen, 3, width - 8,
(height // 2) - 1, 4, "Enter new execution string:", (height // 2) - 1, 4, "Enter new execution string:",
(conf.get(strings[position - 1].split()[0].strip(), (conf.get(strings[position - 1].split()[0].strip(),
"exec_string") if conf.has_option(strings[position - 1].split()[0].strip(), "exec_string") if conf.has_option(strings[position - 1].split()[0].strip(),
"exec_string") else ""), "exec_string") else ""),
normal_text, highlight_text) normal_text, highlight_text)
edit_string = string_textpad.edit(CursesTextpadConfirm) edit_string = string_textpad.edit(CursesTextpadConfirm)
SetAliasString(strings[position - 1].split()[0].strip(), SetAliasString(strings[position - 1].split()[0].strip(),
edit_string.replace("\n", "").rstrip()) edit_string.replace("\n", "").rstrip())
@ -710,9 +691,9 @@ def CursesMain():
if not strings[position - 1].split()[0].strip() in groups: if not strings[position - 1].split()[0].strip() in groups:
set_password = "" set_password = ""
set_password = CursesPanel(screen, 4, width - 6, set_password = CursesPanel(screen, 4, width - 6,
(height // 2) - 1, 3, (height // 2) - 1, 3,
" Enter user password for sshpass and press 'enter':\n>", " Enter user password for sshpass and press 'enter':\n>",
normal_text, highlight_text, "password") normal_text, highlight_text, "password")
SetPassword(strings[position - 1].split()[0].strip(), set_password) SetPassword(strings[position - 1].split()[0].strip(), set_password)
strings = GetTreeList(False, expanded) strings = GetTreeList(False, expanded)
stringsfull = GetTreeList(True, expanded) stringsfull = GetTreeList(True, expanded)
@ -729,11 +710,11 @@ def CursesMain():
str(len(selected)), " selected aliases? (y/N)"])) str(len(selected)), " selected aliases? (y/N)"]))
else: else:
remove_confirm = ("".join(["Are you sure to remove '", remove_confirm = ("".join(["Are you sure to remove '",
strings[position - 1].split()[0].strip(), "' alias? (y/N)"])) strings[position - 1].split()[0].strip(), "' alias? (y/N)"]))
selected.append(strings[position - 1].split()[0].strip()) selected.append(strings[position - 1].split()[0].strip())
remove_result = CursesPanel(screen, 4, width - 6, remove_result = CursesPanel(screen, 4, width - 6,
(height // 2) - 1, 3, remove_confirm, normal_text, (height // 2) - 1, 3, remove_confirm, normal_text,
highlight_text, "remove") highlight_text, "remove")
if remove_result == "confirm": if remove_result == "confirm":
RemoveAliases(selected) RemoveAliases(selected)
strings = GetTreeList(False, expanded) strings = GetTreeList(False, expanded)
@ -754,14 +735,14 @@ def CursesMain():
selected.append(strings[position - 1].split()[0].strip()) selected.append(strings[position - 1].split()[0].strip())
curses.curs_set(1) curses.curs_set(1)
command_textpad = CursesTextpad(screen, 3, width - 8, command_textpad = CursesTextpad(screen, 3, width - 8,
(height // 2) - 1, 4, (height // 2) - 1, 4,
"".join([ "".join([
"Enter specific command to execute with selected ", "Enter specific command to execute with selected ",
"alias/aliases:"] "alias/aliases:"]
), "", normal_text, highlight_text) ), "", normal_text, highlight_text)
command_string = command_textpad.edit(CursesTextpadConfirm) command_string = command_textpad.edit(CursesTextpadConfirm)
CursesConnect(screen, selected, Connect(selected, command_string.replace("\n", "").rstrip(),
command_string.replace("\n", "").rstrip()) False, screen)
curses.curs_set(0) curses.curs_set(0)
if (key_pressed == ord('t') or key_pressed == ord( if (key_pressed == ord('t') or key_pressed == ord(
'T') or key_pressed == curses.KEY_F11) and row_num != 0: 'T') or key_pressed == curses.KEY_F11) and row_num != 0:
@ -773,14 +754,14 @@ def CursesMain():
selected.append(strings[position - 1].split()[0].strip()) selected.append(strings[position - 1].split()[0].strip())
curses.curs_set(1) curses.curs_set(1)
command_textpad = CursesTextpad(screen, 3, width - 8, command_textpad = CursesTextpad(screen, 3, width - 8,
(height // 2) - 1, 4, (height // 2) - 1, 4,
"".join([ "".join([
"Enter specific command to execute with selected ", "Enter specific command to execute with selected ",
"alias/aliases:"] "alias/aliases:"]
), "", normal_text, highlight_text) ), "", normal_text, highlight_text)
command_string = command_textpad.edit(CursesTextpadConfirm) command_string = command_textpad.edit(CursesTextpadConfirm)
CursesConnect(screen, selected, Connect(selected, command_string.replace("\n", "").rstrip(),
command_string.replace("\n", "").rstrip(),True) True, screen)
curses.curs_set(0) curses.curs_set(0)
if (key_pressed == ord('k') or key_pressed == ord('K') or if (key_pressed == ord('k') or key_pressed == ord('K') or
key_pressed == (curses.KEY_F7)) and row_num != 0: key_pressed == (curses.KEY_F7)) and row_num != 0:
@ -815,7 +796,7 @@ def CursesMain():
selected.append(strings[i - 1]) selected.append(strings[i - 1])
if not len(selected) > 0: if not len(selected) > 0:
selected.append(strings[position - 1].split()[0].strip()) selected.append(strings[position - 1].split()[0].strip())
CursesConnect(screen, selected) Connect(selected, False, False, screen)
selected_strings = [" " for i in range(0, row_num + 1)] selected_strings = [" " for i in range(0, row_num + 1)]
if (key_pressed == 32 or key_pressed == ( if (key_pressed == 32 or key_pressed == (
curses.KEY_IC)) and row_num != 0: curses.KEY_IC)) and row_num != 0: