[spp] Proposal for adding port for external management tools

Yasufumi Ogawa ogawa.yasufumi at lab.ntt.co.jp
Thu Jun 29 11:19:37 CEST 2017


Signed-off-by: Yasufumi Ogawa <ogawa.yasufumi at lab.ntt.co.jp>

diff --git a/src/spp.py b/src/spp.py
index b937b5a..54d7c67 100755
--- a/src/spp.py
+++ b/src/spp.py
@@ -2,14 +2,72 @@
  """Soft Patch Panel"""

  from __future__ import print_function
-from Queue import Queue
+from Queue import Queue, Empty
  from thread import start_new_thread
  from threading import Thread
  import cmd
-import getopt
+import argparse
  import select
  import socket
  import sys
+import re
+#import pdb; pdb.set_trace()
+
+import SocketServer
+import readline
+import threading
+import json
+
+logger = None
+
+# Uncomment if you use logger
+#from logging import getLogger,StreamHandler,Formatter,DEBUG
+#logger = getLogger(__name__)
+#handler = StreamHandler()
+#handler.setLevel(DEBUG)
+#formatter = Formatter('%(asctime)s - [%(name)s] - [%(levelname)s] - 
%(message)s')
+#handler.setFormatter(formatter)
+#logger.setLevel(DEBUG)
+#logger.addHandler(handler)
+
+
+CMD_OK = "OK"
+CMD_NG = "NG"
+CMD_NOTREADY = "NOTREADY"
+CMD_ERROR = "ERROR"
+
+RCMD_EXECUTE_QUEUE = Queue()
+RCMD_RESULT_QUEUE = Queue()
+REMOTE_COMMAND = "RCMD"
+
+class CmdRequestHandler(SocketServer.BaseRequestHandler):
+    """Request handler for getting message from remote entities"""
+
+    CMD = None  # contains a instance of Shell class
+
+    def handle(self):
+        self.data = self.request.recv(1024).strip()
+        cur_thread = threading.currentThread()
+        print(cur_thread.getName())
+        print(self.client_address[0])
+        print(self.data)
+        if CmdRequestHandler.CMD is not None:
+            RCMD_EXECUTE_QUEUE.put(REMOTE_COMMAND)
+            CmdRequestHandler.CMD.onecmd(self.data)
+            ret = RCMD_RESULT_QUEUE.get()
+            if (ret is not None):
+                if logger != None:
+                    logger.debug("ret:%s" % ret)
+                self.request.send(ret)
+            else:
+                if logger != None:
+                    logger.debug("ret is none")
+                self.request.send("")
+        else:
+            if logger != None:
+                logger.debug("CMD is None")
+            self.request.send("")
+

  class GrowingList(list):
      """GrowingList"""
@@ -19,6 +77,7 @@ class GrowingList(list):
              self.extend([None]*(index + 1 - len(self)))
          list.__setitem__(self, index, value)

+# Maximum num of sock queues for secondaries
  MAX_SECONDARY = 16

  # init
@@ -53,19 +112,20 @@ def connectionthread(name, client_id, conn, m2s, s2m):
          except KeyError:
              break
          except Exception, excep:
-            print (str(excep))
+            print(excep, ",Error while sending msg in connectionthread()!")
              break

          #Receiving from secondary
          try:
              data = conn.recv(1024) # 1024 stands for bytes of data to 
be received
              if data:
-                s2m.put("recv:" + str(conn.fileno()) + ":" + "{" + data 
+ "}")
+                #s2m.put("recv:" + str(conn.fileno()) + ":" + "{" + 
data + "}")
+                s2m.put("recv:%s:{%s}" % (str(conn.fileno()), data))
              else:
                  s2m.put("closing:" + str(conn))
                  break
          except Exception, excep:
-            print (str(excep))
+            print(excep, ",Error while receiving msg in 
connectionthread()!")
              break

      SECONDARY_LIST.remove(client_id)
@@ -144,7 +204,7 @@ def acceptthread(sock, main2sec, sec2main):
                                sec2main[client_id], ))
              SECONDARY_COUNT += 1
      except Exception, excep:
-        print (str(excep))
+        print(excep, ", Error in acceptthread()!")
          sock.close()

  def command_primary(command):
@@ -152,24 +212,42 @@ def command_primary(command):

      if PRIMARY:
          MAIN2PRIMARY.put(command)
-        print (PRIMARY2MAIN.get(True))
+        recv = PRIMARY2MAIN.get(True)
+        print (recv)
+        return CMD_OK, recv
      else:
-        print ("primary not started")
+        recv = "primary not started"
+        print (recv)
+        return CMD_NOTREADY, recv

  def command_secondary(sec_id, command):
      """Send command to secondary process with sec_id"""

      if sec_id in SECONDARY_LIST:
          MAIN2SEC[sec_id].put(command)
-        print (SEC2MAIN[sec_id].get(True))
+        recv = SEC2MAIN[sec_id].get(True)
+        print (recv)
+        return CMD_OK, recv
      else:
-        print ("secondary id %d not exist" % sec_id)
+        message = "secondary id %d not exist" % sec_id
+        print(message)
+        return CMD_NOTREADY, message
+
+def get_status():
+    secondary = []
+    for i in SECONDARY_LIST:
+        secondary.append("%d" % i)
+    stat = {
+        "primary": "%d" % PRIMARY,
+        "secondary": secondary
+        }
+    return stat

  def print_status():
      """Display information about connected clients"""

      print ("Soft Patch Panel Status :")
-    print ("primary: %d" % PRIMARY)
+    print ("primary: %d" % PRIMARY)  # "primary: 1" if PRIMA == True
      print ("secondary count: %d" % len(SECONDARY_LIST))
      for i in SECONDARY_LIST:
          print ("Connected secondary id: %d" % i)
@@ -199,20 +277,21 @@ def primarythread(sock, main2primary, primary2main):
              except KeyError:
                  break
              except Exception, excep:
-                print (str(excep))
+                print(excep, ", Error while sending msg in 
primarythread()!")
                  break

              #Receiving from primary
              try:
                  data = conn.recv(1024) # 1024 stands for bytes of data 
to be received
                  if data:
-                    primary2main.put("recv:" + str(addr) + ":" + "{" + 
data + "}")
+                    #primary2main.put("recv:" + str(addr) + ":" + "{" + 
data + "}")
+                    primary2main.put("recv:%s:{%s}" % (str(addr), data))
                  else:
                      primary2main.put("closing:" + str(addr))
                      conn.close()
                      break
              except Exception, excep:
-                print (str(excep))
+                print(excep, ", Error while receiving msg in 
primarythread()!")
                  break

      print ("primary communication thread end")
@@ -259,6 +338,14 @@ def check_sec_cmds(cmds):

      return valid

+def clean_sec_cmd(cmdstr):
+    """remove unwanted spaces to avoid invalid command error"""
+
+    tmparg = re.sub(r'\s+', " ", cmdstr)
+    res = re.sub(r'\s?;\s?', ";", tmparg)
+    return res
+
+
  class Shell(cmd.Cmd):
      """SPP command prompt"""

@@ -266,17 +353,19 @@ class Shell(cmd.Cmd):
      prompt = 'spp > '
      recorded_file = None

-    COMMANDS = ['status', 'add', 'patch', 'ring', 'vhost',
-                'reset', 'exit', 'forward', 'stop', 'clear']
+    PRI_CMDS = ['status', 'exit', 'clear']
+    SEC_CMDS = ['status', 'exit', 'forward', 'stop', 'add', 'patch', 'del']
+    SEC_SUBCMDS = ['vhost', 'ring']
+    BYE_CMDS = ['sec', 'all']

      def complete_pri(self, text, line, begidx, endidx):
          """Completion for primary process commands"""

          if not text:
-            completions = self.COMMANDS[:]
+            completions = self.PRI_CMDS[:]
          else:
              completions = [p
-                           for p in self.COMMANDS
+                           for p in self.PRI_CMDS
                             if p.startswith(text)
                            ]
          return completions
@@ -284,68 +373,152 @@ class Shell(cmd.Cmd):
      def complete_sec(self, text, line, begidx, endidx):
          """Completion for secondary process commands"""

+        try:
+            cleaned_line = clean_sec_cmd(line)
+            if len(cleaned_line.split()) == 1:
+                completions = [str(i)+";" for i in SECONDARY_LIST]
+            elif len(cleaned_line.split()) == 2:
+                if not (";" in cleaned_line):
+                    tmplist = [str(i) for i in SECONDARY_LIST]
+                    completions = [p+";"
+                            for p in tmplist
+                            if p.startswith(text)
+                            ]
+                elif cleaned_line[-1] == ";":
+                    completions = self.SEC_CMDS[:]
+                else:
+                    seccmd = cleaned_line.split(";")[1]
+                    if cleaned_line[-1] != " ":
+                        completions = [p
+                                for p in self.SEC_CMDS
+                                if p.startswith(seccmd)
+                                ]
+                    elif ("add" in seccmd) or ("del" in seccmd):
+                        completions = self.SEC_SUBCMDS[:]
+                    else:
+                        completions = []
+            elif len(cleaned_line.split()) == 3:
+                subcmd = cleaned_line.split()[-1]
+                if ("add" == subcmd) or ("del" == subcmd):
+                    completions = self.SEC_SUBCMDS[:]
+                else:
+                    if cleaned_line[-1] == " ":
+                        completions = []
+                    else:
+                        completions = [p
+                                for p in self.SEC_SUBCMDS
+                                if p.startswith(subcmd)
+                                ]
+            else:
+                completions = []
+            return completions
+        except Exception, e:
+            print(len(cleaned_line.split()))
+            print(e)
+
+    def complete_bye(self, text, line, begidx, endidx):
+        """Completion for bye commands"""
+
          if not text:
-            completions = self.COMMANDS[:]
+            completions = self.BYE_CMDS[:]
          else:
              completions = [p
-                           for p in self.COMMANDS
+                           for p in self.BYE_CMDS
                             if p.startswith(text)
                            ]
          return completions

+    def response(self, result, message):
+        """Enqueue message from other than CLI"""
+        try:
+            rcmd = RCMD_EXECUTE_QUEUE.get(False)
+        except Empty:
+            return
+
+        if (rcmd == REMOTE_COMMAND):
+            param = result + '\n' + message
+            RCMD_RESULT_QUEUE.put(param)
+        else:
+            if logger != None:
+                logger.debug("unknown remote command = %s" % rcmd)
+
      def do_status(self, _):
          """Display Soft Patch Panel Status"""

          print_status()
+        stat = get_status()
+        self.response(CMD_OK, json.dumps(stat))

      def do_pri(self, command):
          """Send command to primary process"""

-        if command and command in self.COMMANDS:
-            command_primary(command)
+        if command and command in self.PRI_CMDS:
+            result, message = command_primary(command)
+            self.response(result, message)
          else:
-            print ("primary invalid command")
+            message = "primary invalid command"
+            print(message)
+            self.response(CMD_ERROR, message)

      def do_sec(self, arg):
          """Send command to secondary process"""

-        cmds = arg.split(';')
+        # remove unwanted spaces to avoid invalid command error
+        tmparg = clean_sec_cmd(arg)
+        cmds = tmparg.split(';')
          if len(cmds) < 2:
-            print ("error")
+            message = "error"
+            print(message)
+            self.response(CMD_ERROR, message)
          elif str.isdigit(cmds[0]):
              sec_id = int(cmds[0])
              if check_sec_cmds(cmds[1]):
-                command_secondary(sec_id, cmds[1])
+                result, message = command_secondary(sec_id, cmds[1])
+                self.response(result, message)
              else:
-                print ("invalid cmd")
+                message = "invalid cmd"
+                print(message)
+                self.response(CMD_ERROR, message)
          else:
              print (cmds[0])
              print ("first %s" % cmds[1])
+            self.response(CMD_ERROR, "invalid format")

-    def do_record(self, arg):
+    def do_record(self, fname):
          """Save future commands to filename:  RECORD filename.cmd"""

-        self.recorded_file = open(arg, 'w')
+        if fname == '':
+            print("Record file is required!")
+        else:
+            self.recorded_file = open(fname, 'w')
+            self.response(CMD_OK, "record")

-    def do_playback(self, arg):
+    def do_playback(self, fname):
          """Playback commands from a file:  PLAYBACK filename.cmd"""
-
-        self.close()
-        try:
-            with open(arg) as recorded_file:
-                lines = []
-                for line in recorded_file:
-                    if line.strip().startswith("#"):
-                        continue
-                    lines.append(line)
-                self.cmdqueue.extend(lines)
-        except IOError:
-            print ("Error: File does not exist.")
+
+        if fname == '':
+            print("Record file is required!")
+        else:
+            self.close()
+            try:
+                with open(fname) as recorded_file:
+                    lines = []
+                    for line in recorded_file:
+                        if line.strip().startswith("#"):
+                            continue
+                        lines.append(line)
+                    self.cmdqueue.extend(lines)
+                    self.response(CMD_OK, "playback")
+            except IOError:
+                message = "Error: File does not exist."
+                print(message)
+                self.response(CMD_NG, message)

      def precmd(self, line):
          line = line.lower()
-        if self.recorded_file and 'playback' not in line:
-            print(line, file=self.recorded_file)
+        if self.recorded_file:
+            if not (('playback' in line) or ('bye' in line)):
+                print(line, file=self.recorded_file)
          return line

      def close(self):
@@ -370,30 +543,42 @@ class Shell(cmd.Cmd):
              self.close()
              return True

+
  def main(argv):
      """main"""

-    # Defining server address and port
-    host = ''  #'localhost' or '127.0.0.1' or '' are all same
-
-    try:
-        opts, _ = getopt.getopt(argv, "p:s:h", ["help", "primary = ", 
"secondary"])
-    except getopt.GetoptError:
-        print ('spp.py -p <primary__port_number> -s 
<secondary_port_number>')
-        sys.exit(2)
-    for opt, arg in opts:
-        if opt in ("-h", "--help"):
-            print ('spp.py -p <primary__port_number> -s 
<secondary_port_number>')
-            sys.exit()
-        elif opt in ("-p", "--primary"):
-            primary_port = int(arg)
-            print ("primary port : %d" % primary_port)
-        elif opt in ("-s", "--secondary"):
-            secondary_port = int(arg)
-            print ('secondary port : %d' % secondary_port)
+    parser = argparse.ArgumentParser(description="SPP Controller")
+
+    parser.add_argument(
+            "-p", "--pri-port",
+            type=int, default=5555,
+            help="primary port number")
+    parser.add_argument(
+            "-s", "--sec-port",
+            type=int, default=6666,
+            help="secondary port number")
+    parser.add_argument(
+            "-m", "--mng-port",
+            type=int, default=7777,
+            help="management port number")
+    parser.add_argument(
+            "-ip", "--ipaddr",
+            type=str, default='', #'localhost' or '127.0.0.1' or '' are 
all same
+            help="IP address")
+    args = parser.parse_args()
+
+    host = args.ipaddr
+    primary_port = args.pri_port
+    secondary_port = args.sec_port
+    management_port = args.mng_port
+
+    print("primary port : %d" % primary_port)
+    print('secondary port : %d' % secondary_port)
+    print('management port : %d' % management_port)

      #Creating primary socket object
      primary_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    primary_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

      #Binding primary socket to a address. bind() takes tuple of host 
and port.
      primary_sock.bind((host, primary_port))
@@ -408,6 +593,7 @@ def main(argv):

      #Creating secondary socket object
      secondary_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    secondary_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

      #Binding secondary socket to a address. bind() takes tuple of host 
and port.
      secondary_sock.bind((host, secondary_port))
@@ -419,13 +605,31 @@ def main(argv):
      start_new_thread(acceptthread, (secondary_sock, MAIN2SEC, SEC2MAIN))

      shell = Shell()
+
+    # Run request handler as a TCP server thread
+    SocketServer.ThreadingTCPServer.allow_reuse_address = True
+    CmdRequestHandler.CMD = shell
+    command_server = SocketServer.ThreadingTCPServer((host, 
management_port),CmdRequestHandler)
+
+    t = threading.Thread(target=command_server.serve_forever)
+    t.setDaemon(True)
+    t.start()
+
      shell.cmdloop()
      shell = None

-    primary_sock.shutdown(socket.SHUT_RDWR)
-    primary_sock.close()
-    secondary_sock.shutdown(socket.SHUT_RDWR)
-    secondary_sock.close()
+    try:
+        primary_sock.shutdown(socket.SHUT_RDWR)
+        primary_sock.close()
+    except socket.error, excep:
+        print(excep, ", Error while closing primary_sock in main()!")
+
+    try:
+        secondary_sock.shutdown(socket.SHUT_RDWR)
+        secondary_sock.close()
+    except socket.error, excep:
+        print(excep, ", Error while closing secondary_sock in main()!")
+

  if __name__ == "__main__":
      main(sys.argv[1:])



More information about the spp mailing list