[spp] Proposal for adding port for external management tools
Yasufumi Ogawa
ogawa.yasufumi at lab.ntt.co.jp
Thu Jun 29 11:19:37 CEST 2017
Signed-off-by: Yasufumi Ogawa <ogawa.yasufumi at lab.ntt.co.jp>
diff --git a/src/spp.py b/src/spp.py
index b937b5a..54d7c67 100755
--- a/src/spp.py
+++ b/src/spp.py
@@ -2,14 +2,72 @@
"""Soft Patch Panel"""
from __future__ import print_function
-from Queue import Queue
+from Queue import Queue, Empty
from thread import start_new_thread
from threading import Thread
import cmd
-import getopt
+import argparse
import select
import socket
import sys
+import re
+#import pdb; pdb.set_trace()
+
+import SocketServer
+import readline
+import threading
+import json
+
+logger = None
+
+# Uncomment if you use logger
+#from logging import getLogger,StreamHandler,Formatter,DEBUG
+#logger = getLogger(__name__)
+#handler = StreamHandler()
+#handler.setLevel(DEBUG)
+#formatter = Formatter('%(asctime)s - [%(name)s] - [%(levelname)s] -
%(message)s')
+#handler.setFormatter(formatter)
+#logger.setLevel(DEBUG)
+#logger.addHandler(handler)
+
+
+CMD_OK = "OK"
+CMD_NG = "NG"
+CMD_NOTREADY = "NOTREADY"
+CMD_ERROR = "ERROR"
+
+RCMD_EXECUTE_QUEUE = Queue()
+RCMD_RESULT_QUEUE = Queue()
+REMOTE_COMMAND = "RCMD"
+
+class CmdRequestHandler(SocketServer.BaseRequestHandler):
+ """Request handler for getting message from remote entities"""
+
+ CMD = None # contains a instance of Shell class
+
+ def handle(self):
+ self.data = self.request.recv(1024).strip()
+ cur_thread = threading.currentThread()
+ print(cur_thread.getName())
+ print(self.client_address[0])
+ print(self.data)
+ if CmdRequestHandler.CMD is not None:
+ RCMD_EXECUTE_QUEUE.put(REMOTE_COMMAND)
+ CmdRequestHandler.CMD.onecmd(self.data)
+ ret = RCMD_RESULT_QUEUE.get()
+ if (ret is not None):
+ if logger != None:
+ logger.debug("ret:%s" % ret)
+ self.request.send(ret)
+ else:
+ if logger != None:
+ logger.debug("ret is none")
+ self.request.send("")
+ else:
+ if logger != None:
+ logger.debug("CMD is None")
+ self.request.send("")
+
class GrowingList(list):
"""GrowingList"""
@@ -19,6 +77,7 @@ class GrowingList(list):
self.extend([None]*(index + 1 - len(self)))
list.__setitem__(self, index, value)
+# Maximum num of sock queues for secondaries
MAX_SECONDARY = 16
# init
@@ -53,19 +112,20 @@ def connectionthread(name, client_id, conn, m2s, s2m):
except KeyError:
break
except Exception, excep:
- print (str(excep))
+ print(excep, ",Error while sending msg in connectionthread()!")
break
#Receiving from secondary
try:
data = conn.recv(1024) # 1024 stands for bytes of data to
be received
if data:
- s2m.put("recv:" + str(conn.fileno()) + ":" + "{" + data
+ "}")
+ #s2m.put("recv:" + str(conn.fileno()) + ":" + "{" +
data + "}")
+ s2m.put("recv:%s:{%s}" % (str(conn.fileno()), data))
else:
s2m.put("closing:" + str(conn))
break
except Exception, excep:
- print (str(excep))
+ print(excep, ",Error while receiving msg in
connectionthread()!")
break
SECONDARY_LIST.remove(client_id)
@@ -144,7 +204,7 @@ def acceptthread(sock, main2sec, sec2main):
sec2main[client_id], ))
SECONDARY_COUNT += 1
except Exception, excep:
- print (str(excep))
+ print(excep, ", Error in acceptthread()!")
sock.close()
def command_primary(command):
@@ -152,24 +212,42 @@ def command_primary(command):
if PRIMARY:
MAIN2PRIMARY.put(command)
- print (PRIMARY2MAIN.get(True))
+ recv = PRIMARY2MAIN.get(True)
+ print (recv)
+ return CMD_OK, recv
else:
- print ("primary not started")
+ recv = "primary not started"
+ print (recv)
+ return CMD_NOTREADY, recv
def command_secondary(sec_id, command):
"""Send command to secondary process with sec_id"""
if sec_id in SECONDARY_LIST:
MAIN2SEC[sec_id].put(command)
- print (SEC2MAIN[sec_id].get(True))
+ recv = SEC2MAIN[sec_id].get(True)
+ print (recv)
+ return CMD_OK, recv
else:
- print ("secondary id %d not exist" % sec_id)
+ message = "secondary id %d not exist" % sec_id
+ print(message)
+ return CMD_NOTREADY, message
+
+def get_status():
+ secondary = []
+ for i in SECONDARY_LIST:
+ secondary.append("%d" % i)
+ stat = {
+ "primary": "%d" % PRIMARY,
+ "secondary": secondary
+ }
+ return stat
def print_status():
"""Display information about connected clients"""
print ("Soft Patch Panel Status :")
- print ("primary: %d" % PRIMARY)
+ print ("primary: %d" % PRIMARY) # "primary: 1" if PRIMA == True
print ("secondary count: %d" % len(SECONDARY_LIST))
for i in SECONDARY_LIST:
print ("Connected secondary id: %d" % i)
@@ -199,20 +277,21 @@ def primarythread(sock, main2primary, primary2main):
except KeyError:
break
except Exception, excep:
- print (str(excep))
+ print(excep, ", Error while sending msg in
primarythread()!")
break
#Receiving from primary
try:
data = conn.recv(1024) # 1024 stands for bytes of data
to be received
if data:
- primary2main.put("recv:" + str(addr) + ":" + "{" +
data + "}")
+ #primary2main.put("recv:" + str(addr) + ":" + "{" +
data + "}")
+ primary2main.put("recv:%s:{%s}" % (str(addr), data))
else:
primary2main.put("closing:" + str(addr))
conn.close()
break
except Exception, excep:
- print (str(excep))
+ print(excep, ", Error while receiving msg in
primarythread()!")
break
print ("primary communication thread end")
@@ -259,6 +338,14 @@ def check_sec_cmds(cmds):
return valid
+def clean_sec_cmd(cmdstr):
+ """remove unwanted spaces to avoid invalid command error"""
+
+ tmparg = re.sub(r'\s+', " ", cmdstr)
+ res = re.sub(r'\s?;\s?', ";", tmparg)
+ return res
+
+
class Shell(cmd.Cmd):
"""SPP command prompt"""
@@ -266,17 +353,19 @@ class Shell(cmd.Cmd):
prompt = 'spp > '
recorded_file = None
- COMMANDS = ['status', 'add', 'patch', 'ring', 'vhost',
- 'reset', 'exit', 'forward', 'stop', 'clear']
+ PRI_CMDS = ['status', 'exit', 'clear']
+ SEC_CMDS = ['status', 'exit', 'forward', 'stop', 'add', 'patch', 'del']
+ SEC_SUBCMDS = ['vhost', 'ring']
+ BYE_CMDS = ['sec', 'all']
def complete_pri(self, text, line, begidx, endidx):
"""Completion for primary process commands"""
if not text:
- completions = self.COMMANDS[:]
+ completions = self.PRI_CMDS[:]
else:
completions = [p
- for p in self.COMMANDS
+ for p in self.PRI_CMDS
if p.startswith(text)
]
return completions
@@ -284,68 +373,152 @@ class Shell(cmd.Cmd):
def complete_sec(self, text, line, begidx, endidx):
"""Completion for secondary process commands"""
+ try:
+ cleaned_line = clean_sec_cmd(line)
+ if len(cleaned_line.split()) == 1:
+ completions = [str(i)+";" for i in SECONDARY_LIST]
+ elif len(cleaned_line.split()) == 2:
+ if not (";" in cleaned_line):
+ tmplist = [str(i) for i in SECONDARY_LIST]
+ completions = [p+";"
+ for p in tmplist
+ if p.startswith(text)
+ ]
+ elif cleaned_line[-1] == ";":
+ completions = self.SEC_CMDS[:]
+ else:
+ seccmd = cleaned_line.split(";")[1]
+ if cleaned_line[-1] != " ":
+ completions = [p
+ for p in self.SEC_CMDS
+ if p.startswith(seccmd)
+ ]
+ elif ("add" in seccmd) or ("del" in seccmd):
+ completions = self.SEC_SUBCMDS[:]
+ else:
+ completions = []
+ elif len(cleaned_line.split()) == 3:
+ subcmd = cleaned_line.split()[-1]
+ if ("add" == subcmd) or ("del" == subcmd):
+ completions = self.SEC_SUBCMDS[:]
+ else:
+ if cleaned_line[-1] == " ":
+ completions = []
+ else:
+ completions = [p
+ for p in self.SEC_SUBCMDS
+ if p.startswith(subcmd)
+ ]
+ else:
+ completions = []
+ return completions
+ except Exception, e:
+ print(len(cleaned_line.split()))
+ print(e)
+
+ def complete_bye(self, text, line, begidx, endidx):
+ """Completion for bye commands"""
+
if not text:
- completions = self.COMMANDS[:]
+ completions = self.BYE_CMDS[:]
else:
completions = [p
- for p in self.COMMANDS
+ for p in self.BYE_CMDS
if p.startswith(text)
]
return completions
+ def response(self, result, message):
+ """Enqueue message from other than CLI"""
+ try:
+ rcmd = RCMD_EXECUTE_QUEUE.get(False)
+ except Empty:
+ return
+
+ if (rcmd == REMOTE_COMMAND):
+ param = result + '\n' + message
+ RCMD_RESULT_QUEUE.put(param)
+ else:
+ if logger != None:
+ logger.debug("unknown remote command = %s" % rcmd)
+
def do_status(self, _):
"""Display Soft Patch Panel Status"""
print_status()
+ stat = get_status()
+ self.response(CMD_OK, json.dumps(stat))
def do_pri(self, command):
"""Send command to primary process"""
- if command and command in self.COMMANDS:
- command_primary(command)
+ if command and command in self.PRI_CMDS:
+ result, message = command_primary(command)
+ self.response(result, message)
else:
- print ("primary invalid command")
+ message = "primary invalid command"
+ print(message)
+ self.response(CMD_ERROR, message)
def do_sec(self, arg):
"""Send command to secondary process"""
- cmds = arg.split(';')
+ # remove unwanted spaces to avoid invalid command error
+ tmparg = clean_sec_cmd(arg)
+ cmds = tmparg.split(';')
if len(cmds) < 2:
- print ("error")
+ message = "error"
+ print(message)
+ self.response(CMD_ERROR, message)
elif str.isdigit(cmds[0]):
sec_id = int(cmds[0])
if check_sec_cmds(cmds[1]):
- command_secondary(sec_id, cmds[1])
+ result, message = command_secondary(sec_id, cmds[1])
+ self.response(result, message)
else:
- print ("invalid cmd")
+ message = "invalid cmd"
+ print(message)
+ self.response(CMD_ERROR, message)
else:
print (cmds[0])
print ("first %s" % cmds[1])
+ self.response(CMD_ERROR, "invalid format")
- def do_record(self, arg):
+ def do_record(self, fname):
"""Save future commands to filename: RECORD filename.cmd"""
- self.recorded_file = open(arg, 'w')
+ if fname == '':
+ print("Record file is required!")
+ else:
+ self.recorded_file = open(fname, 'w')
+ self.response(CMD_OK, "record")
- def do_playback(self, arg):
+ def do_playback(self, fname):
"""Playback commands from a file: PLAYBACK filename.cmd"""
-
- self.close()
- try:
- with open(arg) as recorded_file:
- lines = []
- for line in recorded_file:
- if line.strip().startswith("#"):
- continue
- lines.append(line)
- self.cmdqueue.extend(lines)
- except IOError:
- print ("Error: File does not exist.")
+
+ if fname == '':
+ print("Record file is required!")
+ else:
+ self.close()
+ try:
+ with open(fname) as recorded_file:
+ lines = []
+ for line in recorded_file:
+ if line.strip().startswith("#"):
+ continue
+ lines.append(line)
+ self.cmdqueue.extend(lines)
+ self.response(CMD_OK, "playback")
+ except IOError:
+ message = "Error: File does not exist."
+ print(message)
+ self.response(CMD_NG, message)
def precmd(self, line):
line = line.lower()
- if self.recorded_file and 'playback' not in line:
- print(line, file=self.recorded_file)
+ if self.recorded_file:
+ if not (('playback' in line) or ('bye' in line)):
+ print(line, file=self.recorded_file)
return line
def close(self):
@@ -370,30 +543,42 @@ class Shell(cmd.Cmd):
self.close()
return True
+
def main(argv):
"""main"""
- # Defining server address and port
- host = '' #'localhost' or '127.0.0.1' or '' are all same
-
- try:
- opts, _ = getopt.getopt(argv, "p:s:h", ["help", "primary = ",
"secondary"])
- except getopt.GetoptError:
- print ('spp.py -p <primary__port_number> -s
<secondary_port_number>')
- sys.exit(2)
- for opt, arg in opts:
- if opt in ("-h", "--help"):
- print ('spp.py -p <primary__port_number> -s
<secondary_port_number>')
- sys.exit()
- elif opt in ("-p", "--primary"):
- primary_port = int(arg)
- print ("primary port : %d" % primary_port)
- elif opt in ("-s", "--secondary"):
- secondary_port = int(arg)
- print ('secondary port : %d' % secondary_port)
+ parser = argparse.ArgumentParser(description="SPP Controller")
+
+ parser.add_argument(
+ "-p", "--pri-port",
+ type=int, default=5555,
+ help="primary port number")
+ parser.add_argument(
+ "-s", "--sec-port",
+ type=int, default=6666,
+ help="secondary port number")
+ parser.add_argument(
+ "-m", "--mng-port",
+ type=int, default=7777,
+ help="management port number")
+ parser.add_argument(
+ "-ip", "--ipaddr",
+ type=str, default='', #'localhost' or '127.0.0.1' or '' are
all same
+ help="IP address")
+ args = parser.parse_args()
+
+ host = args.ipaddr
+ primary_port = args.pri_port
+ secondary_port = args.sec_port
+ management_port = args.mng_port
+
+ print("primary port : %d" % primary_port)
+ print('secondary port : %d' % secondary_port)
+ print('management port : %d' % management_port)
#Creating primary socket object
primary_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ primary_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
#Binding primary socket to a address. bind() takes tuple of host
and port.
primary_sock.bind((host, primary_port))
@@ -408,6 +593,7 @@ def main(argv):
#Creating secondary socket object
secondary_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ secondary_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
#Binding secondary socket to a address. bind() takes tuple of host
and port.
secondary_sock.bind((host, secondary_port))
@@ -419,13 +605,31 @@ def main(argv):
start_new_thread(acceptthread, (secondary_sock, MAIN2SEC, SEC2MAIN))
shell = Shell()
+
+ # Run request handler as a TCP server thread
+ SocketServer.ThreadingTCPServer.allow_reuse_address = True
+ CmdRequestHandler.CMD = shell
+ command_server = SocketServer.ThreadingTCPServer((host,
management_port),CmdRequestHandler)
+
+ t = threading.Thread(target=command_server.serve_forever)
+ t.setDaemon(True)
+ t.start()
+
shell.cmdloop()
shell = None
- primary_sock.shutdown(socket.SHUT_RDWR)
- primary_sock.close()
- secondary_sock.shutdown(socket.SHUT_RDWR)
- secondary_sock.close()
+ try:
+ primary_sock.shutdown(socket.SHUT_RDWR)
+ primary_sock.close()
+ except socket.error, excep:
+ print(excep, ", Error while closing primary_sock in main()!")
+
+ try:
+ secondary_sock.shutdown(socket.SHUT_RDWR)
+ secondary_sock.close()
+ except socket.error, excep:
+ print(excep, ", Error while closing secondary_sock in main()!")
+
if __name__ == "__main__":
main(sys.argv[1:])
More information about the spp
mailing list