[spp] [PATCH 02/13] controller: move connection threads
ogawa.yasufumi at lab.ntt.co.jp
ogawa.yasufumi at lab.ntt.co.jp
Tue Mar 6 11:50:44 CET 2018
From: Yasufumi Ogawa <ogawa.yasufumi at lab.ntt.co.jp>
There are three classes in 'spp.py' for managing connection as threads.
This update is for separating them from main file to improve
maintainability.
Signed-off-by: Yasufumi Ogawa <ogawa.yasufumi at lab.ntt.co.jp>
---
src/controller/conn_thread.py | 259 ++++++++++++++++++++++++++++++++++++++++++
src/controller/spp.py | 243 +--------------------------------------
2 files changed, 261 insertions(+), 241 deletions(-)
create mode 100644 src/controller/conn_thread.py
diff --git a/src/controller/conn_thread.py b/src/controller/conn_thread.py
new file mode 100644
index 0000000..7ba3b00
--- /dev/null
+++ b/src/controller/conn_thread.py
@@ -0,0 +1,259 @@
+from Queue import Queue
+import select
+import socket
+import spp_common
+import threading
+import traceback
+
+# Turn true if activate logger to debug remote command.
+logger = None
+
+if logger is True:
+ import logging
+ logger = logging.getLogger(__name__)
+ handler = logging.StreamHandler()
+ handler.setLevel(logging.DEBUG)
+ formatter = logging.Formatter(
+ '%(asctime)s,[%(filename)s][%(name)s][%(levelname)s]%(message)s')
+ handler.setFormatter(formatter)
+ logger.setLevel(logging.DEBUG)
+ logger.addHandler(handler)
+
+
+class ConnectionThread(threading.Thread):
+ """Manage connection between controller and secondary"""
+
+ def __init__(self, client_id, conn):
+ super(ConnectionThread, self).__init__()
+ self.daemon = True
+
+ self.client_id = client_id
+ self.conn = conn
+ self.stop_event = threading.Event()
+ self.conn_opened = False
+
+ def stop(self):
+ self.stop_event.set()
+
+ def run(self):
+ cmd_str = 'hello'
+
+ # infinite loop so that function do not terminate and thread do not
+ # end.
+ while True:
+ try:
+ _, _, _ = select.select(
+ [self.conn, ], [self.conn, ], [], 5)
+ except select.error:
+ break
+
+ # Sending message to connected secondary
+ try:
+ cmd_str = spp_common.MAIN2SEC[self.client_id].get(True)
+ self.conn.send(cmd_str) # send only takes string
+ except KeyError:
+ break
+ except Exception as excep:
+ print(excep, ",Error while sending msg in connectionthread()!")
+ break
+
+ # Receiving from secondary
+ try:
+ # 1024 stands for bytes of data to be received
+ data = self.conn.recv(1024)
+ if data:
+ spp_common.SEC2MAIN[self.client_id].put(
+ "recv:%s:{%s}" % (str(self.conn.fileno()), data))
+ else:
+ spp_common.SEC2MAIN[self.client_id].put(
+ "closing:" + str(self.conn))
+ break
+ except Exception as excep:
+ print(
+ excep, ",Error while receiving msg in connectionthread()!")
+ break
+
+ spp_common.SECONDARY_LIST.remove(self.client_id)
+ self.conn.close()
+
+
+class AcceptThread(threading.Thread):
+ """Manage connection"""
+
+ def __init__(self, host, port):
+ super(AcceptThread, self).__init__()
+ self.daemon = True
+
+ # Creating secondary socket object
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+ # Binding secondary socket to a address. bind() takes tuple of host
+ # and port.
+ self.sock.bind((host, port))
+
+ # Listening secondary at the address
+ self.sock.listen(spp_common.MAX_SECONDARY)
+
+ self.stop_event = threading.Event()
+ self.sock_opened = False
+
+ def getclientid(self, conn):
+ """Get client_id from client"""
+
+ try:
+ conn.send("_get_client_id")
+ except KeyError:
+ return -1
+
+ data = conn.recv(1024)
+ if data is None:
+ return -1
+
+ if logger is not None:
+ logger.debug("data: %s" % data)
+ client_id = int(data.strip('\0'))
+
+ if client_id < 0 or client_id > spp_common.MAX_SECONDARY:
+ logger.debug("Failed to get client_id: %d" % client_id)
+ return -1
+
+ found = 0
+ for i in spp_common.SECONDARY_LIST:
+ if client_id == i:
+ found = 1
+ break
+
+ if found == 0:
+ return client_id
+
+ # client_id in use, find a free one
+ free_client_id = -1
+ for i in range(spp_common.MAX_SECONDARY):
+ found = -1
+ for j in spp_common.SECONDARY_LIST:
+ if i == j:
+ found = i
+ break
+ if found == -1:
+ free_client_id = i
+ break
+
+ if logger is not None:
+ logger.debug("Found free_client_id: %d" % free_client_id)
+
+ if free_client_id < 0:
+ return -1
+
+ conn.send("_set_client_id %u" % free_client_id)
+ data = conn.recv(1024)
+
+ return free_client_id
+
+ def stop(self):
+ if self.sock_opened is True:
+ try:
+ self.sock.shutdown(socket.SHUT_RDWR)
+ except socket.error as excep:
+ print(excep, ", Error while closing sock in AcceptThread!")
+ traceback.print_exc()
+ self.sock.close()
+ self.stop_event.set()
+
+ def run(self):
+ try:
+ while True:
+ # Accepting incoming connections
+ conn, _ = self.sock.accept()
+
+ client_id = self.getclientid(conn)
+ if client_id < 0:
+ break
+
+ # Creating new thread.
+ # Calling secondarythread function for this function and
+ # passing conn as argument.
+ spp_common.SECONDARY_LIST.append(client_id)
+ spp_common.MAIN2SEC[client_id] = Queue()
+ spp_common.SEC2MAIN[client_id] = Queue()
+ connection_thread = ConnectionThread(client_id, conn)
+ connection_thread.daemon = True
+ connection_thread.start()
+
+ spp_common.SECONDARY_COUNT += 1
+ except Exception as excep:
+ print(excep, ", Error in AcceptThread!")
+ traceback.print_exc()
+ self.sock_opened = False
+ self.sock.close()
+
+
+class PrimaryThread(threading.Thread):
+
+ def __init__(self, host, port):
+ super(PrimaryThread, self).__init__()
+ self.daemon = True
+
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ # Binding primary socket to a address. bind() takes tuple of host
+ # and port.
+ self.sock.bind((host, port))
+
+ # Listening primary at the address
+ self.sock.listen(1) # 5 denotes the number of clients can queue
+
+ self.stop_event = threading.Event()
+ self.sock_opened = False
+
+ def stop(self):
+ if self.sock_opened is True:
+ self.sock.shutdown(socket.SHUT_RDWR)
+ self.sock.close()
+ self.stop_event.set()
+
+ def run(self):
+ cmd_str = ''
+
+ while True:
+ # waiting for connection
+ spp_common.PRIMARY = False
+ conn, addr = self.sock.accept()
+ spp_common.PRIMARY = True
+
+ while conn:
+ try:
+ _, _, _ = select.select([conn, ], [conn, ], [], 5)
+ except select.error:
+ break
+
+ self.sock_opened = True
+ # Sending message to connected primary
+ try:
+ cmd_str = spp_common.MAIN2PRIMARY.get(True)
+ conn.send(cmd_str) # send only takes string
+ except KeyError:
+ break
+ except Exception as excep:
+ print(
+ excep,
+ ", Error while sending msg in primarythread()!")
+ break
+
+ # Receiving from primary
+ try:
+ # 1024 stands for bytes of data to be received
+ data = conn.recv(1024)
+ if data:
+ spp_common.PRIMARY2MAIN.put(
+ "recv:%s:{%s}" % (str(addr), data))
+ else:
+ spp_common.PRIMARY2MAIN.put("closing:" + str(addr))
+ conn.close()
+ self.sock_opened = False
+ break
+ except Exception as excep:
+ print(
+ excep,
+ ", Error while receiving msg in primarythread()!")
+ break
diff --git a/src/controller/spp.py b/src/controller/spp.py
index 0515193..d0d7bc9 100644
--- a/src/controller/spp.py
+++ b/src/controller/spp.py
@@ -4,8 +4,8 @@
from __future__ import print_function
import argparse
-from Queue import Queue
-import select
+from conn_thread import AcceptThread
+from conn_thread import PrimaryThread
from shell import Shell
import socket
import SocketServer
@@ -58,245 +58,6 @@ class CmdRequestHandler(SocketServer.BaseRequestHandler):
self.request.send("")
-class ConnectionThread(threading.Thread):
- """Manage connection between controller and secondary"""
-
- def __init__(self, client_id, conn):
- super(ConnectionThread, self).__init__()
- self.daemon = True
-
- self.client_id = client_id
- self.conn = conn
- self.stop_event = threading.Event()
- self.conn_opened = False
-
- def stop(self):
- self.stop_event.set()
-
- def run(self):
- cmd_str = 'hello'
-
- # infinite loop so that function do not terminate and thread do not
- # end.
- while True:
- try:
- _, _, _ = select.select(
- [self.conn, ], [self.conn, ], [], 5)
- except select.error:
- break
-
- # Sending message to connected secondary
- try:
- cmd_str = spp_common.MAIN2SEC[self.client_id].get(True)
- self.conn.send(cmd_str) # send only takes string
- except KeyError:
- break
- except Exception as excep:
- print(excep, ",Error while sending msg in connectionthread()!")
- break
-
- # Receiving from secondary
- try:
- # 1024 stands for bytes of data to be received
- data = self.conn.recv(1024)
- if data:
- spp_common.SEC2MAIN[self.client_id].put(
- "recv:%s:{%s}" % (str(self.conn.fileno()), data))
- else:
- spp_common.SEC2MAIN[self.client_id].put(
- "closing:" + str(self.conn))
- break
- except Exception as excep:
- print(
- excep, ",Error while receiving msg in connectionthread()!")
- break
-
- spp_common.SECONDARY_LIST.remove(self.client_id)
- self.conn.close()
-
-
-class AcceptThread(threading.Thread):
- """Manage connection"""
-
- def __init__(self, host, port):
- super(AcceptThread, self).__init__()
- self.daemon = True
-
- # Creating secondary socket object
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
- # Binding secondary socket to a address. bind() takes tuple of host
- # and port.
- self.sock.bind((host, port))
-
- # Listening secondary at the address
- self.sock.listen(spp_common.MAX_SECONDARY)
-
- self.stop_event = threading.Event()
- self.sock_opened = False
-
- def getclientid(self, conn):
- """Get client_id from client"""
-
- try:
- conn.send("_get_client_id")
- except KeyError:
- return -1
-
- data = conn.recv(1024)
- if data is None:
- return -1
-
- if logger is not None:
- logger.debug("data: %s" % data)
- client_id = int(data.strip('\0'))
-
- if client_id < 0 or client_id > spp_common.MAX_SECONDARY:
- logger.debug("Failed to get client_id: %d" % client_id)
- return -1
-
- found = 0
- for i in spp_common.SECONDARY_LIST:
- if client_id == i:
- found = 1
- break
-
- if found == 0:
- return client_id
-
- # client_id in use, find a free one
- free_client_id = -1
- for i in range(spp_common.MAX_SECONDARY):
- found = -1
- for j in spp_common.SECONDARY_LIST:
- if i == j:
- found = i
- break
- if found == -1:
- free_client_id = i
- break
-
- if logger is not None:
- logger.debug("Found free_client_id: %d" % free_client_id)
-
- if free_client_id < 0:
- return -1
-
- conn.send("_set_client_id %u" % free_client_id)
- data = conn.recv(1024)
-
- return free_client_id
-
- def stop(self):
- if self.sock_opened is True:
- try:
- self.sock.shutdown(socket.SHUT_RDWR)
- except socket.error as excep:
- print(excep, ", Error while closing sock in AcceptThread!")
- traceback.print_exc()
- self.sock.close()
- self.stop_event.set()
-
- def run(self):
- try:
- while True:
- # Accepting incoming connections
- conn, _ = self.sock.accept()
-
- client_id = self.getclientid(conn)
- if client_id < 0:
- break
-
- # Creating new thread.
- # Calling secondarythread function for this function and
- # passing conn as argument.
- spp_common.SECONDARY_LIST.append(client_id)
- spp_common.MAIN2SEC[client_id] = Queue()
- spp_common.SEC2MAIN[client_id] = Queue()
- connection_thread = ConnectionThread(client_id, conn)
- connection_thread.daemon = True
- connection_thread.start()
-
- spp_common.SECONDARY_COUNT += 1
- except Exception as excep:
- print(excep, ", Error in AcceptThread!")
- traceback.print_exc()
- self.sock_opened = False
- self.sock.close()
-
-
-class PrimaryThread(threading.Thread):
-
- def __init__(self, host, port):
- super(PrimaryThread, self).__init__()
- self.daemon = True
-
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- # Binding primary socket to a address. bind() takes tuple of host
- # and port.
- self.sock.bind((host, port))
-
- # Listening primary at the address
- self.sock.listen(1) # 5 denotes the number of clients can queue
-
- self.stop_event = threading.Event()
- self.sock_opened = False
-
- def stop(self):
- if self.sock_opened is True:
- self.sock.shutdown(socket.SHUT_RDWR)
- self.sock.close()
- self.stop_event.set()
-
- def run(self):
- cmd_str = ''
-
- while True:
- # waiting for connection
- spp_common.PRIMARY = False
- conn, addr = self.sock.accept()
- spp_common.PRIMARY = True
-
- while conn:
- try:
- _, _, _ = select.select([conn, ], [conn, ], [], 5)
- except select.error:
- break
-
- self.sock_opened = True
- # Sending message to connected primary
- try:
- cmd_str = spp_common.MAIN2PRIMARY.get(True)
- conn.send(cmd_str) # send only takes string
- except KeyError:
- break
- except Exception as excep:
- print(
- excep,
- ", Error while sending msg in primarythread()!")
- break
-
- # Receiving from primary
- try:
- # 1024 stands for bytes of data to be received
- data = conn.recv(1024)
- if data:
- spp_common.PRIMARY2MAIN.put(
- "recv:%s:{%s}" % (str(addr), data))
- else:
- spp_common.PRIMARY2MAIN.put("closing:" + str(addr))
- conn.close()
- self.sock_opened = False
- break
- except Exception as excep:
- print(
- excep,
- ", Error while receiving msg in primarythread()!")
- break
-
-
def main(argv):
"""main"""
--
2.13.1
More information about the spp
mailing list