From 37b39268802efbf4ece85e09b9b43aeebd0c7821 Mon Sep 17 00:00:00 2001 From: zlax Date: Tue, 14 Jun 2022 12:33:32 +0300 Subject: [PATCH] merging CMDConnect and CursesConnect into the single Connect function --- setup.py | 2 +- sshch/sshch | 249 ++++++++++++++++++++++++---------------------------- 2 files changed, 116 insertions(+), 135 deletions(-) diff --git a/setup.py b/setup.py index 1683540..cd7df86 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ def main(): long_description='SSH connection and aliases manager with curses and command line interface', long_description_content_type='text/x-rst', license='DWTWL 2.55', - version='1.09.3', + version='1.09.4', py_modules=['sshch'], scripts=['sshch/sshch'], keywords='sshch ssh aliases curses manager', diff --git a/sshch/sshch b/sshch/sshch index b77ea39..19fbb52 100755 --- a/sshch/sshch +++ b/sshch/sshch @@ -22,7 +22,7 @@ from curses import textpad, panel from threading import Thread # https://gitlab.com/zlax/sshch -version = "1.09.3" +version = "1.09.4" # expand groups by default expand_default = True # path to conf dir and file, default: ~/.config/sshch.conf @@ -93,6 +93,41 @@ def ConvertPassword(password): return password_string +def Connect(aliases, command=False, threading=False, screen=False): + if screen: + curses.endwin() + groups = [] + connectaliases = [] + for a in conf.sections(): + if conf.has_option(a, "group"): + groups.append(a) + for alias in aliases: + if alias in groups: + group_aliases = GroupChildAliases(alias) + for ga in group_aliases: + connectaliases.append(ga) + else: + connectaliases.append(alias) + connectaliases = setSeq(connectaliases) + threads = {} + for alias in connectaliases: + if conf.has_section(alias): + print("Connecting to " + alias + "...") + if threading: + threads[alias] = Thread(target=ConnectAlias, args=(alias, + command, True)) + threads[alias].start() + else: + ConnectAlias(alias, command) + if not threading: + print("... " + alias + " session finished.") + else: + print("error: '" + alias + "' alias does not exists") + if screen: + print("Press 'enter' to continue.") + screen.getch() + + def ConnectAlias(alias, command=False, threading=False): exec_string = "" if conf.has_option(alias, "password"): @@ -155,8 +190,8 @@ def CMDGroup(group): def CMDEdit(alias): if conf.has_section(alias): if conf.has_option(alias, "exec_string"): - prompt_edit = ("".join(["Enter connection string for existing alias ", - "(example: ssh user@somehost.com):\n"])) + prompt_edit = ("".join(["Enter connection string for existing ", + "alias (example: ssh user@somehost.com):\n"])) string = "" while string == "": string = input(prompt_edit) @@ -184,7 +219,8 @@ def CMDPassword(alias): print("Can't set password for group.") else: if conf.has_section(alias): - prompt_pass = ("[UNSAFE] Enter password for sshpass (Ctrl+C - cancel, blank - clear password):\n") + prompt_pass = ("[UNSAFE] Enter password for sshpass (Ctrl+C" + + " - cancel, blank - clear password):\n") string = "" string = getpass(prompt_pass) SetPassword(alias, string) @@ -194,7 +230,8 @@ def CMDPassword(alias): def CMDRemove(alias): if conf.has_section(alias): - prompt_remove = ("Type 'yes' if you sure to remove '" + alias + "' alias or group: ") + prompt_remove = ("Type 'yes' if you sure to remove '" + alias + + "' alias or group: ") string = input(prompt_remove) if string == "yes": RemoveAliases([alias]) @@ -204,35 +241,6 @@ def CMDRemove(alias): print("error: '" + alias + "' alias or group does not exists.") -def CMDConnect(aliases, command=False, threading=False): - groups = [] - connectaliases = [] - for a in conf.sections(): - if conf.has_option(a, "group"): - groups.append(a) - for alias in aliases: - if alias in groups: - group_aliases = GroupChildAliases(alias) - for ga in group_aliases: - connectaliases.append(ga) - else: - connectaliases.append(alias) - connectaliases = setSeq(connectaliases) - threads = {} - for alias in connectaliases: - if conf.has_section(alias): - print("Connecting to " + alias + "...") - if threading: - threads[alias] = Thread(target=ConnectAlias, args=(alias, command, True)) - threads[alias].start() - else: - ConnectAlias(alias, command) - if not threading: - print("... " + alias + " session finished.") - else: - print("error: '" + alias + "' alias does not exists") - - def CMDList(option, opt, value, parser): print(' '.join(str(p) for p in conf.sections())) @@ -292,7 +300,8 @@ def GroupChildAliases(group): return setSeq(childaliases) -def GroupTreeRecursion(level, group, treelist, resultalias, resultstring, expandlist, previousgroups): +def GroupTreeRecursion(level, group, treelist, resultalias, resultstring, + expandlist, previousgroups): if group in previousgroups: return resultalias, resultstring previousgroups.append(group) @@ -339,35 +348,6 @@ def CMDFullList(option, opt, value, parser): print (p) -def CursesConnect(screen, aliases, command=False, threading=False): - curses.endwin() - groups = [] - connectaliases = [] - for a in conf.sections(): - if conf.has_option(a, "group"): - groups.append(a) - for alias in aliases: - if alias in groups: - group_aliases = GroupChildAliases(alias) - for ga in group_aliases: - connectaliases.append(ga) - else: - connectaliases.append(alias) - connectaliases = setSeq(connectaliases) - threads = {} - for alias in connectaliases: - print("Connecting to " + alias + "...") - if threading: - threads[alias] = Thread(target=ConnectAlias, args=(alias, command, True)) - threads[alias].start() - else: - ConnectAlias(alias, command) - if not threading: - print("... " + alias + " session finished.") - print("Press 'enter' to continue.") - screen.getch() - - def CursesExit(error=False): curses.endwin() if error: @@ -450,19 +430,20 @@ def CMDOptions(): def format_epilog(self, formatter): return self.epilog + usage = "usage: %prog [options] [aliases]" progname = path.basename(__file__) epilog = ("".join(["Examples:\n ", - progname, " existingalias\n ", - progname, " -a newremoteserver\n ", - progname, " --edit=newremoteserver -p newremoteserver\n ", - progname, ' -c "ls -l" newremoteserver\n ', - progname, " -c reboot existingalias newremoteserver\n", - "Examples of connection string:\n ", - "ssh user@somehost.com\n ", - "ssh gates@8.8.8.8 -p 667\n ", - "ssh root@somehost.com -t tmux a\n", - "Also, you can edit the config file manually: ", conf_file, "\n"])) + progname, " existingalias\n ", + progname, " -a newremoteserver\n ", + progname, " --edit=newremoteserver -p newremoteserver\n ", + progname, ' -c "ls -l" newremoteserver\n ', + progname, " -c reboot existingalias newremoteserver\n", + "Examples of connection string:\n ", + "ssh user@somehost.com\n ", + "ssh gates@8.8.8.8 -p 667\n ", + "ssh root@somehost.com -t tmux a\n", + "Also, you can edit the config file manually: ", conf_file, "\n"])) opts = FormatedParser(usage=usage, version="%prog " + version, epilog=epilog) opts.add_option('-l', '--list', action="callback", @@ -509,32 +490,32 @@ def CMDOptions(): if options.keep: HoldConnection(options.keep) if options.thread: - CMDConnect(alias, options.thread, True) + Connect(alias, options.thread, True) elif alias: - CMDConnect(alias, options.command) + Connect(alias, options.command) # curses template from: https://stackoverflow.com/a/30828805/6224462 def CursesMain(): help_screen = ("".join([" Press:\n", - " 'z'/'x', 'w'/'s' or arrows - navigation\n", - " 'a'/'F2' - add new alias (without spaces)\n", - " 'g'/'F5' - add new group (spaces will be stripped)\n", - " 'e'/'F4' - edit existing alias/group\n", - " 'p'/'F6' - set alias's password for sshpass [UNSAFE]\n", - " 'space'/'insert' - select\n", - " 'r'/'F8' - remove selected alias/aliases\n", - " 'c'/'F3' - command execution for alias (group/aliases - in turn)\n", - " 't'/'F11' - parallel command execution for aliases and groups\n", - " (ssh key authentication or set password required)\n", - " 'k'/'F7' - hold connection with selected alias\n", - " 'enter'/'F9' - connect to selected alias/aliases,\n", - " expand/collapse group\n", - " 'q'/'F10' - quit\n", - " Run program with '--help' option to view command line help.\n", - " Also, you can edit the config file manually:\n", - " ", conf_file])) + " 'z'/'x', 'w'/'s' or arrows - navigation\n", + " 'a'/'F2' - add new alias (without spaces)\n", + " 'g'/'F5' - add new group (spaces will be stripped)\n", + " 'e'/'F4' - edit existing alias/group\n", + " 'p'/'F6' - set alias's password for sshpass [UNSAFE]\n", + " 'space'/'insert' - select\n", + " 'r'/'F8' - remove selected alias/aliases\n", + " 'c'/'F3' - command execution for alias (group/aliases - in turn)\n", + " 't'/'F11' - parallel command execution for aliases and groups\n", + " (ssh key authentication or set password required)\n", + " 'k'/'F7' - hold connection with selected alias\n", + " 'enter'/'F9' - connect to selected alias/aliases,\n", + " expand/collapse group\n", + " 'q'/'F10' - quit\n", + " Run program with '--help' option to view command line help.\n", + " Also, you can edit the config file manually:\n", + " ", conf_file])) if expand_default == True: groups = [] for a in conf.sections(): @@ -597,8 +578,8 @@ def CursesMain(): 'A') or key_pressed == curses.KEY_F2: curses.curs_set(1) new_alias_textpad = CursesTextpad(screen, 1, width - 8, - (height // 2) - 1, 4, "Enter new alias:", "", - normal_text, highlight_text) + (height // 2) - 1, 4, "Enter new alias:", "", + normal_text, highlight_text) add_alias = new_alias_textpad.edit(CursesTextpadConfirm) if not add_alias == "": add_alias = add_alias.split()[0].strip() @@ -608,9 +589,9 @@ def CursesMain(): add_string = "" while add_string == "": string_textpad = CursesTextpad(screen, 3, - width - 8, (height // 2) - 1, 4, - "Enter full execution string:", - "ssh ", normal_text, highlight_text) + width - 8, (height // 2) - 1, 4, + "Enter full execution string:", + "ssh ", normal_text, highlight_text) add_string = string_textpad.edit( CursesTextpadConfirm) SetAliasString(add_alias, @@ -631,8 +612,8 @@ def CursesMain(): 'G') or key_pressed == curses.KEY_F5: curses.curs_set(1) new_group_textpad = CursesTextpad(screen, 1, width - 8, - (height // 2) - 1, 4, "Enter group name (without spaces):", "", - normal_text, highlight_text) + (height // 2) - 1, 4, "Enter group name (without spaces):", "", + normal_text, highlight_text) add_group = new_group_textpad.edit(CursesTextpadConfirm) if not add_group == "": add_group = add_group.split()[0].strip() @@ -642,9 +623,9 @@ def CursesMain(): add_string = "" while add_string.rstrip() == "": string_textpad = CursesTextpad(screen, 3, - width - 8, (height // 2) - 1, 4, - "Enter aliases for new group (example: alias1 alias2):", - "", normal_text, highlight_text) + width - 8, (height // 2) - 1, 4, + "Enter aliases for new group (example: alias1 alias2):", + "", normal_text, highlight_text) add_string = string_textpad.edit( CursesTextpadConfirm) SetGroupString(add_group, @@ -673,11 +654,11 @@ def CursesMain(): if strings[position - 1] in groups: while edit_string.rstrip() == "": string_textpad = CursesTextpad(screen, 3, width - 8, - (height // 2) - 1, 4, "Enter new aliases for existing group:", - (conf.get(strings[position - 1], - "group") if conf.has_option(strings[position - 1], - "group") else ""), - normal_text, highlight_text) + (height // 2) - 1, 4, "Enter new aliases for existing group:", + (conf.get(strings[position - 1], + "group") if conf.has_option(strings[position - 1], + "group") else ""), + normal_text, highlight_text) edit_string = string_textpad.edit(CursesTextpadConfirm) SetGroupString(strings[position - 1], edit_string.replace("\n", "").rstrip()) @@ -690,11 +671,11 @@ def CursesMain(): else: while edit_string.rstrip() == "": string_textpad = CursesTextpad(screen, 3, width - 8, - (height // 2) - 1, 4, "Enter new execution string:", - (conf.get(strings[position - 1].split()[0].strip(), - "exec_string") if conf.has_option(strings[position - 1].split()[0].strip(), - "exec_string") else ""), - normal_text, highlight_text) + (height // 2) - 1, 4, "Enter new execution string:", + (conf.get(strings[position - 1].split()[0].strip(), + "exec_string") if conf.has_option(strings[position - 1].split()[0].strip(), + "exec_string") else ""), + normal_text, highlight_text) edit_string = string_textpad.edit(CursesTextpadConfirm) SetAliasString(strings[position - 1].split()[0].strip(), edit_string.replace("\n", "").rstrip()) @@ -710,9 +691,9 @@ def CursesMain(): if not strings[position - 1].split()[0].strip() in groups: set_password = "" set_password = CursesPanel(screen, 4, width - 6, - (height // 2) - 1, 3, - " Enter user password for sshpass and press 'enter':\n>", - normal_text, highlight_text, "password") + (height // 2) - 1, 3, + " Enter user password for sshpass and press 'enter':\n>", + normal_text, highlight_text, "password") SetPassword(strings[position - 1].split()[0].strip(), set_password) strings = GetTreeList(False, expanded) stringsfull = GetTreeList(True, expanded) @@ -729,11 +710,11 @@ def CursesMain(): str(len(selected)), " selected aliases? (y/N)"])) else: remove_confirm = ("".join(["Are you sure to remove '", - strings[position - 1].split()[0].strip(), "' alias? (y/N)"])) + strings[position - 1].split()[0].strip(), "' alias? (y/N)"])) selected.append(strings[position - 1].split()[0].strip()) remove_result = CursesPanel(screen, 4, width - 6, - (height // 2) - 1, 3, remove_confirm, normal_text, - highlight_text, "remove") + (height // 2) - 1, 3, remove_confirm, normal_text, + highlight_text, "remove") if remove_result == "confirm": RemoveAliases(selected) strings = GetTreeList(False, expanded) @@ -754,14 +735,14 @@ def CursesMain(): selected.append(strings[position - 1].split()[0].strip()) curses.curs_set(1) command_textpad = CursesTextpad(screen, 3, width - 8, - (height // 2) - 1, 4, - "".join([ - "Enter specific command to execute with selected ", - "alias/aliases:"] - ), "", normal_text, highlight_text) + (height // 2) - 1, 4, + "".join([ + "Enter specific command to execute with selected ", + "alias/aliases:"] + ), "", normal_text, highlight_text) command_string = command_textpad.edit(CursesTextpadConfirm) - CursesConnect(screen, selected, - command_string.replace("\n", "").rstrip()) + Connect(selected, command_string.replace("\n", "").rstrip(), + False, screen) curses.curs_set(0) if (key_pressed == ord('t') or key_pressed == ord( 'T') or key_pressed == curses.KEY_F11) and row_num != 0: @@ -773,14 +754,14 @@ def CursesMain(): selected.append(strings[position - 1].split()[0].strip()) curses.curs_set(1) command_textpad = CursesTextpad(screen, 3, width - 8, - (height // 2) - 1, 4, - "".join([ - "Enter specific command to execute with selected ", - "alias/aliases:"] - ), "", normal_text, highlight_text) + (height // 2) - 1, 4, + "".join([ + "Enter specific command to execute with selected ", + "alias/aliases:"] + ), "", normal_text, highlight_text) command_string = command_textpad.edit(CursesTextpadConfirm) - CursesConnect(screen, selected, - command_string.replace("\n", "").rstrip(),True) + Connect(selected, command_string.replace("\n", "").rstrip(), + True, screen) curses.curs_set(0) if (key_pressed == ord('k') or key_pressed == ord('K') or key_pressed == (curses.KEY_F7)) and row_num != 0: @@ -815,7 +796,7 @@ def CursesMain(): selected.append(strings[i - 1]) if not len(selected) > 0: selected.append(strings[position - 1].split()[0].strip()) - CursesConnect(screen, selected) + Connect(selected, False, False, screen) selected_strings = [" " for i in range(0, row_num + 1)] if (key_pressed == 32 or key_pressed == ( curses.KEY_IC)) and row_num != 0: