[dpdk-dev,v5] vhost: allow for many vhost user ports

Message ID 1481729401-27546-1-git-send-email-jan.wickbom@ericsson.com (mailing list archive)
State Superseded, archived
Delegated to: Yuanhan Liu
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel compilation success Compilation OK

Commit Message

Jan Wickbom Dec. 14, 2016, 3:30 p.m. UTC
  Currently select() is used to monitor file descriptors for vhostuser
ports. This limits the number of ports possible to create since the
fd number is used as index in the fd_set and we have seen fds > 1023.
This patch changes select() to poll(). This way we can keep an
packed (pollfd) array for the fds, e.g. as many fds as the size of
the array.

Also see:
http://dpdk.org/ml/archives/dev/2016-April/037024.html

Signed-off-by: Jan Wickbom <jan.wickbom@ericsson.com>
Reported-by: Patrik Andersson <patrik.r.andersson@ericsson.com>
---

v5:
* pack of arrays moved after poll loop

v4:
* fdset_del can not shrink the array. Took care of that in sveral places
* Moved rwfds[] into struct fdset

v3:
* removed unnecessary include
* removed fdset_fill, made it functionally part of poll loop

v2:
* removed unnecessary casts
* static array replacing allocated memory

 lib/librte_vhost/fd_man.c | 234 ++++++++++++++++++++++++++--------------------
 lib/librte_vhost/fd_man.h |   2 +
 2 files changed, 135 insertions(+), 101 deletions(-)
  

Patch

diff --git a/lib/librte_vhost/fd_man.c b/lib/librte_vhost/fd_man.c
index 2d3eeb7..c7c2b35 100644
--- a/lib/librte_vhost/fd_man.c
+++ b/lib/librte_vhost/fd_man.c
@@ -35,93 +35,111 @@ 
 #include <stdio.h>
 #include <stdlib.h>
 #include <sys/socket.h>
-#include <sys/select.h>
 #include <sys/time.h>
 #include <sys/types.h>
 #include <unistd.h>
+#include <string.h>
 
 #include <rte_common.h>
 #include <rte_log.h>
 
 #include "fd_man.h"
 
+#define FDPOLLERR (POLLERR | POLLHUP | POLLNVAL)
+
 /**
- * Returns the index in the fdset for a given fd.
- * If fd is -1, it means to search for a free entry.
+ * Adjusts the highest index populated in the array of fds
  * @return
- *   index for the fd, or -1 if fd isn't in the fdset.
+ *   The new size of fdset.
  */
 static int
-fdset_find_fd(struct fdset *pfdset, int fd)
+fdset_shrink(struct fdset *pfdset)
 {
-	int i;
+	int idx;
 
-	if (pfdset == NULL)
-		return -1;
+	/* Remove all empty spots in the end */
 
-	for (i = 0; i < MAX_FDS && pfdset->fd[i].fd != fd; i++)
+	for (idx = pfdset->num - 1;
+	     idx >= 0 && pfdset->fd[idx].fd == -1;
+	     idx--)
 		;
 
-	return i ==  MAX_FDS ? -1 : i;
+	pfdset->num = idx + 1;
+
+	return pfdset->num;
 }
 
-static int
-fdset_find_free_slot(struct fdset *pfdset)
+/**
+ *  Moves the fd from last slot to specified slot, including
+ *  corresponding pollfd
+ */
+static void
+fdset_move_last(struct fdset *pfdset, int idx)
 {
-	return fdset_find_fd(pfdset, -1);
+	int last_idx = pfdset->num - 1;
+
+	if (idx < last_idx) {
+		pfdset->fd[idx] = pfdset->fd[last_idx];
+		pfdset->fd[last_idx].fd = -1;
+
+		pfdset->rwfds[idx] = pfdset->rwfds[last_idx];
+		pfdset->rwfds[last_idx].revents = 0;
+	}
 }
 
-static int
-fdset_add_fd(struct fdset  *pfdset, int idx, int fd,
-	fd_cb rcb, fd_cb wcb, void *dat)
-{
-	struct fdentry *pfdentry;
 
-	if (pfdset == NULL || idx >= MAX_FDS || fd >= FD_SETSIZE)
-		return -1;
+/**
+ *  Packs the two arrays in the fdset structure
+ */
+static void
+fdset_pack(struct fdset *pfdset)
+{
+	int idx;
+	int num = pfdset->num;
 
-	pfdentry = &pfdset->fd[idx];
-	pfdentry->fd = fd;
-	pfdentry->rcb = rcb;
-	pfdentry->wcb = wcb;
-	pfdentry->dat = dat;
+	for (idx = 0; idx < num; idx++) {
+		if (pfdset->fd[idx].fd == -1) {
+			num = fdset_shrink(pfdset);
+			fdset_move_last(pfdset, idx);
+		}
+	}
 
-	return 0;
+	fdset_shrink(pfdset);
 }
 
 /**
- * Fill the read/write fd_set with the fds in the fdset.
+ * Returns the index in the fdset for a given fd.
+ * If fd is -1, it means to search for a free entry.
  * @return
- *  the maximum fds filled in the read/write fd_set.
+ *   index for the fd, or -1 if fd isn't in the fdset.
  */
 static int
-fdset_fill(fd_set *rfset, fd_set *wfset, struct fdset *pfdset)
+fdset_find_fd(struct fdset *pfdset, int fd)
 {
-	struct fdentry *pfdentry;
-	int i, maxfds = -1;
-	int num = MAX_FDS;
+	int i;
 
-	if (pfdset == NULL)
-		return -1;
+	for (i = 0; i < pfdset->num && pfdset->fd[i].fd != fd; i++)
+		;
 
-	for (i = 0; i < num; i++) {
-		pfdentry = &pfdset->fd[i];
-		if (pfdentry->fd != -1) {
-			int added = 0;
-			if (pfdentry->rcb && rfset) {
-				FD_SET(pfdentry->fd, rfset);
-				added = 1;
-			}
-			if (pfdentry->wcb && wfset) {
-				FD_SET(pfdentry->fd, wfset);
-				added = 1;
-			}
-			if (added)
-				maxfds = pfdentry->fd < maxfds ?
-					maxfds : pfdentry->fd;
-		}
-	}
-	return maxfds;
+	return i == pfdset->num ? -1 : i;
+}
+
+static void
+fdset_add_fd(struct fdset *pfdset, int idx, int fd,
+	fd_cb rcb, fd_cb wcb, void *dat)
+{
+	struct fdentry *pfdentry = &pfdset->fd[idx];
+	struct pollfd *pfd = &pfdset->rwfds[idx];
+
+	pfdentry->fd = fd;
+	pfdentry->rcb = rcb;
+	pfdentry->wcb = wcb;
+	pfdentry->dat = dat;
+
+	pfd->fd = fd;
+	pfd->events = rcb ? POLLIN : 0;
+	pfd->events |= wcb ? POLLOUT : 0;
+	pfd->revents = 0;
 }
 
 void
@@ -132,11 +150,15 @@ 
 	if (pfdset == NULL)
 		return;
 
+	pthread_mutex_init(&pfdset->fd_mutex, NULL);
+
 	for (i = 0; i < MAX_FDS; i++) {
 		pfdset->fd[i].fd = -1;
 		pfdset->fd[i].dat = NULL;
 	}
 	pfdset->num = 0;
+
+	memset(pfdset->rwfds, 0, sizeof(pfdset->rwfds));
 }
 
 /**
@@ -152,14 +174,14 @@ 
 
 	pthread_mutex_lock(&pfdset->fd_mutex);
 
-	/* Find a free slot in the list. */
-	i = fdset_find_free_slot(pfdset);
-	if (i == -1 || fdset_add_fd(pfdset, i, fd, rcb, wcb, dat) < 0) {
+	i = pfdset->num < MAX_FDS ? pfdset->num++ : -1;
+
+	if (i == -1) {
 		pthread_mutex_unlock(&pfdset->fd_mutex);
 		return -2;
 	}
 
-	pfdset->num++;
+	fdset_add_fd(pfdset, i, fd, rcb, wcb, dat);
 
 	pthread_mutex_unlock(&pfdset->fd_mutex);
 
@@ -189,7 +211,6 @@ 
 			pfdset->fd[i].fd = -1;
 			pfdset->fd[i].rcb = pfdset->fd[i].wcb = NULL;
 			pfdset->fd[i].dat = NULL;
-			pfdset->num--;
 			i = -1;
 		}
 		pthread_mutex_unlock(&pfdset->fd_mutex);
@@ -198,24 +219,6 @@ 
 	return dat;
 }
 
-/**
- *  Unregister the fd at the specified slot from the fdset.
- */
-static void
-fdset_del_slot(struct fdset *pfdset, int index)
-{
-	if (pfdset == NULL || index < 0 || index >= MAX_FDS)
-		return;
-
-	pthread_mutex_lock(&pfdset->fd_mutex);
-
-	pfdset->fd[index].fd = -1;
-	pfdset->fd[index].rcb = pfdset->fd[index].wcb = NULL;
-	pfdset->fd[index].dat = NULL;
-	pfdset->num--;
-
-	pthread_mutex_unlock(&pfdset->fd_mutex);
-}
 
 /**
  * This functions runs in infinite blocking loop until there is no fd in
@@ -229,55 +232,74 @@ 
 void
 fdset_event_dispatch(struct fdset *pfdset)
 {
-	fd_set rfds, wfds;
-	int i, maxfds;
+	int i;
+	struct pollfd *pfd;
 	struct fdentry *pfdentry;
-	int num = MAX_FDS;
 	fd_cb rcb, wcb;
 	void *dat;
-	int fd;
+	int fd, numfds;
 	int remove1, remove2;
-	int ret;
 
 	if (pfdset == NULL)
 		return;
 
 	while (1) {
-		struct timeval tv;
-		tv.tv_sec = 1;
-		tv.tv_usec = 0;
-		FD_ZERO(&rfds);
-		FD_ZERO(&wfds);
-		pthread_mutex_lock(&pfdset->fd_mutex);
-
-		maxfds = fdset_fill(&rfds, &wfds, pfdset);
-
-		pthread_mutex_unlock(&pfdset->fd_mutex);
+		int has_holes;
 
 		/*
-		 * When select is blocked, other threads might unregister
+		 * When poll is blocked, other threads might unregister
 		 * listenfds from and register new listenfds into fdset.
-		 * When select returns, the entries for listenfds in the fdset
+		 * When poll returns, the entries for listenfds in the fdset
 		 * might have been updated. It is ok if there is unwanted call
 		 * for new listenfds.
 		 */
-		ret = select(maxfds + 1, &rfds, &wfds, NULL, &tv);
-		if (ret <= 0)
-			continue;
+		pthread_mutex_lock(&pfdset->fd_mutex);
 
-		for (i = 0; i < num; i++) {
-			remove1 = remove2 = 0;
+		numfds = pfdset->num;
+
+		pthread_mutex_unlock(&pfdset->fd_mutex);
+
+		poll(pfdset->rwfds, numfds, 1000 /* millisecs */);
+
+		has_holes = 0;
+		for (i = 0; i < numfds; i++) {
 			pthread_mutex_lock(&pfdset->fd_mutex);
+
 			pfdentry = &pfdset->fd[i];
 			fd = pfdentry->fd;
+			pfd = &pfdset->rwfds[i];
+
+			if (fd < 0) {
+				/* Removed during poll, care later */
+
+				has_holes = 1;
+
+				pthread_mutex_unlock(&pfdset->fd_mutex);
+
+				continue;
+			}
+
+			if (!pfd->revents) {
+
+				pthread_mutex_unlock(&pfdset->fd_mutex);
+
+				continue;
+			}
+
+			/* Valid fd, and at least one revent ... */
+
+			remove1 = remove2 = 0;
+
 			rcb = pfdentry->rcb;
 			wcb = pfdentry->wcb;
 			dat = pfdentry->dat;
 			pfdentry->busy = 1;
+
 			pthread_mutex_unlock(&pfdset->fd_mutex);
-			if (fd >= 0 && FD_ISSET(fd, &rfds) && rcb)
+
+			if (rcb && pfd->revents & (POLLIN | FDPOLLERR))
 				rcb(fd, dat, &remove1);
-			if (fd >= 0 && FD_ISSET(fd, &wfds) && wcb)
+			if (wcb && pfd->revents & (POLLOUT | FDPOLLERR))
 				wcb(fd, dat, &remove2);
 			pfdentry->busy = 0;
 			/*
@@ -292,8 +314,18 @@ 
 			 * listen fd in another thread, we couldn't call
 			 * fd_set_del.
 			 */
-			if (remove1 || remove2)
-				fdset_del_slot(pfdset, i);
+			if (remove1 || remove2) {
+				pfdentry->fd = -1;
+				has_holes = 1;
+			}
+		}
+
+		if (has_holes) {
+			pthread_mutex_lock(&pfdset->fd_mutex);
+
+			fdset_pack(pfdset);
+
+			pthread_mutex_unlock(&pfdset->fd_mutex);
 		}
 	}
 }
diff --git a/lib/librte_vhost/fd_man.h b/lib/librte_vhost/fd_man.h
index bd66ed1..d319cac 100644
--- a/lib/librte_vhost/fd_man.h
+++ b/lib/librte_vhost/fd_man.h
@@ -35,6 +35,7 @@ 
 #define _FD_MAN_H_
 #include <stdint.h>
 #include <pthread.h>
+#include <poll.h>
 
 #define MAX_FDS 1024
 
@@ -49,6 +50,7 @@  struct fdentry {
 };
 
 struct fdset {
+	struct pollfd rwfds[MAX_FDS];
 	struct fdentry fd[MAX_FDS];
 	pthread_mutex_t fd_mutex;
 	int num;	/* current fd number of this fdset */