[spp] [PATCH 3/5] Add management port
ogawa.yasufumi at lab.ntt.co.jp
ogawa.yasufumi at lab.ntt.co.jp
Tue Jul 18 20:57:27 CEST 2017
From: Yasufumi Ogawa <ogawa.yasufumi at lab.ntt.co.jp>
- Management port is a TCP port for send/recv SPP commands from external
functions.
- Replace args parser for getopts to argparse to be more understandable
and easily maintained.
- Add descriptions in exception messages to find which the error is
occured.
Signed-off-by: Yasufumi Ogawa <ogawa.yasufumi at lab.ntt.co.jp>
---
src/spp.py | 211 ++++++++++++++++++++++++++++++++++++++++++++++++-------------
1 file changed, 168 insertions(+), 43 deletions(-)
diff --git a/src/spp.py b/src/spp.py
index c383820..99165ed 100755
--- a/src/spp.py
+++ b/src/spp.py
@@ -2,17 +2,67 @@
"""Soft Patch Panel"""
from __future__ import print_function
-from Queue import Queue
+from Queue import Queue, Empty
from thread import start_new_thread
from threading import Thread
import cmd
-import getopt
+import argparse
import select
import socket
import sys
import re
#import pdb; pdb.set_trace()
+import SocketServer
+import readline
+import threading
+import json
+
+from logging import getLogger,StreamHandler,Formatter,DEBUG
+logger = getLogger(__name__)
+handler = StreamHandler()
+handler.setLevel(DEBUG)
+formatter = Formatter('%(asctime)s - [%(name)s] - [%(levelname)s] - %(message)s')
+handler.setFormatter(formatter)
+logger.setLevel(DEBUG)
+logger.addHandler(handler)
+
+
+CMD_OK = "OK"
+CMD_NG = "NG"
+CMD_NOTREADY = "NOTREADY"
+CMD_ERROR = "ERROR"
+
+RCMD_EXECUTE_QUEUE = Queue()
+RCMD_RESULT_QUEUE = Queue()
+REMOTE_COMMAND = "RCMD"
+
+class CmdRequestHandler(SocketServer.BaseRequestHandler):
+ """Request handler for getting message from remote entities"""
+
+ CMD = None # contains a instance of Shell class
+
+ def handle(self):
+ self.data = self.request.recv(1024).strip()
+ cur_thread = threading.currentThread()
+ print(cur_thread.getName())
+ print(self.client_address[0])
+ print(self.data)
+ if CmdRequestHandler.CMD is not None:
+ RCMD_EXECUTE_QUEUE.put(REMOTE_COMMAND)
+ CmdRequestHandler.CMD.onecmd(self.data)
+ ret = RCMD_RESULT_QUEUE.get()
+ if (ret is not None):
+ logger.debug("ret:%s" % ret)
+ self.request.send(ret)
+ else:
+ logger.debug("ret is none")
+ self.request.send("")
+ else:
+ logger.debug("CMD is None")
+ self.request.send("")
+
+
class GrowingList(list):
"""GrowingList"""
@@ -21,6 +71,7 @@ class GrowingList(list):
self.extend([None]*(index + 1 - len(self)))
list.__setitem__(self, index, value)
+# Maximum num of sock queues for secondaries
MAX_SECONDARY = 16
# init
@@ -55,19 +106,20 @@ def connectionthread(name, client_id, conn, m2s, s2m):
except KeyError:
break
except Exception, excep:
- print (str(excep))
+ print(excep, ",Error while sending msg in connectionthread()!")
break
#Receiving from secondary
try:
data = conn.recv(1024) # 1024 stands for bytes of data to be received
if data:
- s2m.put("recv:" + str(conn.fileno()) + ":" + "{" + data + "}")
+ #s2m.put("recv:" + str(conn.fileno()) + ":" + "{" + data + "}")
+ s2m.put("recv:%s:{%s}" % (str(conn.fileno()), data))
else:
s2m.put("closing:" + str(conn))
break
except Exception, excep:
- print (str(excep))
+ print(excep, ",Error while receiving msg in connectionthread()!")
break
SECONDARY_LIST.remove(client_id)
@@ -146,7 +198,7 @@ def acceptthread(sock, main2sec, sec2main):
sec2main[client_id], ))
SECONDARY_COUNT += 1
except Exception, excep:
- print (str(excep))
+ print(excep, ", Error in acceptthread()!")
sock.close()
def command_primary(command):
@@ -154,24 +206,42 @@ def command_primary(command):
if PRIMARY:
MAIN2PRIMARY.put(command)
- print (PRIMARY2MAIN.get(True))
+ recv = PRIMARY2MAIN.get(True)
+ print (recv)
+ return CMD_OK, recv
else:
- print ("primary not started")
+ recv = "primary not started"
+ print (recv)
+ return CMD_NOTREADY, recv
def command_secondary(sec_id, command):
"""Send command to secondary process with sec_id"""
if sec_id in SECONDARY_LIST:
MAIN2SEC[sec_id].put(command)
- print (SEC2MAIN[sec_id].get(True))
+ recv = SEC2MAIN[sec_id].get(True)
+ print (recv)
+ return CMD_OK, recv
else:
- print ("secondary id %d not exist" % sec_id)
+ message = "secondary id %d not exist" % sec_id
+ print(message)
+ return CMD_NOTREADY, message
+
+def get_status():
+ secondary = []
+ for i in SECONDARY_LIST:
+ secondary.append("%d" % i)
+ stat = {
+ "primary": "%d" % PRIMARY,
+ "secondary": secondary
+ }
+ return stat
def print_status():
"""Display information about connected clients"""
print ("Soft Patch Panel Status :")
- print ("primary: %d" % PRIMARY)
+ print ("primary: %d" % PRIMARY) # "primary: 1" if PRIMA == True
print ("secondary count: %d" % len(SECONDARY_LIST))
for i in SECONDARY_LIST:
print ("Connected secondary id: %d" % i)
@@ -201,7 +271,7 @@ def primarythread(sock, main2primary, primary2main):
except KeyError:
break
except Exception, excep:
- print (str(excep))
+ print(excep, ", Error while sending msg in primarythread()!")
break
#Receiving from primary
@@ -214,7 +284,7 @@ def primarythread(sock, main2primary, primary2main):
conn.close()
break
except Exception, excep:
- print (str(excep))
+ print(excep, ", Error while receiving msg in primarythread()!")
break
print ("primary communication thread end")
@@ -268,6 +338,7 @@ class Shell(cmd.Cmd):
prompt = 'spp > '
recorded_file = None
+ # TODO define pri_commands and sec_commands if there are difference
COMMANDS = ['status', 'add', 'patch', 'ring', 'vhost',
'reset', 'exit', 'forward', 'stop', 'clear']
@@ -295,42 +366,67 @@ class Shell(cmd.Cmd):
]
return completions
+ def response(self, result, message):
+ """Enqueue message from other than CLI"""
+ try:
+ rcmd = RCMD_EXECUTE_QUEUE.get(False)
+ except Empty:
+ return
+
+ if (rcmd == REMOTE_COMMAND):
+ param = result + '\n' + message
+ RCMD_RESULT_QUEUE.put(param)
+ else:
+ logger.debug("unknown remote command = %s" % rcmd)
+
def do_status(self, _):
"""Display Soft Patch Panel Status"""
print_status()
+ stat = get_status()
+ self.response(CMD_OK, json.dumps(stat))
def do_pri(self, command):
"""Send command to primary process"""
if command and command in self.COMMANDS:
- command_primary(command)
+ result, message = command_primary(command)
+ self.response(result, message)
else:
- print ("primary invalid command")
+ message = "primary invalid command"
+ print(message)
+ self.response(CMD_ERROR, ret)
def do_sec(self, arg):
"""Send command to secondary process"""
- # remove unwanted space to avoid invalid command error
+ # remove unwanted spaces to avoid invalid command error
tmparg = re.sub(r'\s+', " ", arg)
tmparg = re.sub(r'\s?;\s?', ";", tmparg)
cmds = tmparg.split(';')
if len(cmds) < 2:
- print ("error")
+ message = "error"
+ print(message)
+ self.response(CMD_ERROR, message)
elif str.isdigit(cmds[0]):
sec_id = int(cmds[0])
if check_sec_cmds(cmds[1]):
- command_secondary(sec_id, cmds[1])
+ result, message = command_secondary(sec_id, cmds[1])
+ self.response(result, message)
else:
- print ("invalid cmd")
+ message = "invalid cmd"
+ print(message)
+ self.response(CMD_ERROR, message)
else:
print (cmds[0])
print ("first %s" % cmds[1])
+ self.response(CMD_ERROR, "invalid format")
def do_record(self, arg):
"""Save future commands to filename: RECORD filename.cmd"""
self.recorded_file = open(arg, 'w')
+ self.response(CMD_OK, "record")
def do_playback(self, arg):
"""Playback commands from a file: PLAYBACK filename.cmd"""
@@ -344,8 +440,11 @@ class Shell(cmd.Cmd):
continue
lines.append(line)
self.cmdqueue.extend(lines)
+ self.response(CMD_OK, "playback")
except IOError:
- print ("Error: File does not exist.")
+ message = "Error: File does not exist."
+ print(message)
+ self.response(CMD_NG, message)
def precmd(self, line):
line = line.lower()
@@ -379,25 +478,34 @@ class Shell(cmd.Cmd):
def main(argv):
"""main"""
- # Defining server address and port
- host = '' #'localhost' or '127.0.0.1' or '' are all same
-
- try:
- opts, _ = getopt.getopt(argv, "p:s:h", ["help", "primary = ", "secondary"])
- except getopt.GetoptError:
- print ('spp.py -p <primary__port_number> -s <secondary_port_number>')
- sys.exit(2)
-
- for opt, arg in opts:
- if opt in ("-h", "--help"):
- print ('spp.py -p <primary__port_number> -s <secondary_port_number>')
- sys.exit()
- elif opt in ("-p", "--primary"):
- primary_port = int(arg)
- print ("primary port : %d" % primary_port)
- elif opt in ("-s", "--secondary"):
- secondary_port = int(arg)
- print ('secondary port : %d' % secondary_port)
+ parser = argparse.ArgumentParser(description="SPP Controller")
+
+ parser.add_argument(
+ "-p", "--pri-port",
+ type=int, default=5555,
+ help="primary port number")
+ parser.add_argument(
+ "-s", "--sec-port",
+ type=int, default=6666,
+ help="secondary port number")
+ parser.add_argument(
+ "-m", "--mng-port",
+ type=int, default=7777,
+ help="management port number")
+ parser.add_argument(
+ "-ip", "--ipaddr",
+ type=str, default='', #'localhost' or '127.0.0.1' or '' are all same
+ help="IP address")
+ args = parser.parse_args()
+
+ host = args.ipaddr
+ primary_port = args.pri_port
+ secondary_port = args.sec_port
+ management_port = args.mng_port
+
+ print("primary port : %d" % primary_port)
+ print('secondary port : %d' % secondary_port)
+ print('management port : %d' % management_port)
#Creating primary socket object
primary_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -428,13 +536,30 @@ def main(argv):
start_new_thread(acceptthread, (secondary_sock, MAIN2SEC, SEC2MAIN))
shell = Shell()
+
+ # Run request handler as a TCP server thread
+ SocketServer.ThreadingTCPServer.allow_reuse_address = True
+ CmdRequestHandler.CMD = shell
+ command_server = SocketServer.ThreadingTCPServer((host, management_port),CmdRequestHandler)
+
+ t = threading.Thread(target=command_server.serve_forever)
+ t.setDaemon(True)
+ t.start()
+
shell.cmdloop()
shell = None
- primary_sock.shutdown(socket.SHUT_RDWR)
- primary_sock.close()
- secondary_sock.shutdown(socket.SHUT_RDWR)
- secondary_sock.close()
+ try:
+ primary_sock.shutdown(socket.SHUT_RDWR)
+ primary_sock.close()
+ except socket.error, excep:
+ print(excep, ", Error while closing primary_sock in main()!")
+
+ try:
+ secondary_sock.shutdown(socket.SHUT_RDWR)
+ secondary_sock.close()
+ except socket.error, excep:
+ print(excep, ", Error while closing primary_sock in main()!")
if __name__ == "__main__":
--
2.13.1
More information about the spp
mailing list