[dpdk-dev,v4] vhost: allow for many vhost user ports

Message ID 1481635187-12624-1-git-send-email-jan.wickbom@ericsson.com (mailing list archive)
State Superseded, archived
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel compilation success Compilation OK

Commit Message

Jan Wickbom Dec. 13, 2016, 1:19 p.m. UTC
  Currently select() is used to monitor file descriptors for vhostuser
ports. This limits the number of ports possible to create since the
fd number is used as index in the fd_set and we have seen fds > 1023.
This patch changes select() to poll(). This way we can keep an
packed (pollfd) array for the fds, e.g. as many fds as the size of
the array.

Also see:
http://dpdk.org/ml/archives/dev/2016-April/037024.html

Signed-off-by: Jan Wickbom <jan.wickbom@ericsson.com>
Reported-by: Patrik Andersson <patrik.r.andersson@ericsson.com>
---

v4:
* fdset_del can not shrink the array. Took care of that in sveral places
* Moved rwfds[] into struct fdset

v3:
* removed unnecessary include
* removed fdset_fill, made it functionally part of poll loop

v2:
* removed unnecessary casts
* static array replacing allocated memory

 lib/librte_vhost/fd_man.c | 226 ++++++++++++++++++++++++++--------------------
 lib/librte_vhost/fd_man.h |   4 +-
 2 files changed, 131 insertions(+), 99 deletions(-)
  

Comments

Yuanhan Liu Dec. 14, 2016, 3:25 a.m. UTC | #1
On Tue, Dec 13, 2016 at 02:19:47PM +0100, Jan Wickbom wrote:
> +
> +		poll(pfdset->rwfds, numfds, 1000 /* millisecs */);
> +
> +		for (i = 0; i < numfds; ) {
>  			pthread_mutex_lock(&pfdset->fd_mutex);
> +
>  			pfdentry = &pfdset->fd[i];
>  			fd = pfdentry->fd;
> +			pfd = &pfdset->rwfds[i];
> +
> +			if (fd < 0) {
> +				/* Removed during poll */
> +				/* shrink first, last migth be deleted*/
> +
> +				fdset_shrink(pfdset);
> +				fdset_move_last(pfdset, i);
> +				fdset_shrink(pfdset);
> +
> +				pthread_mutex_unlock(&pfdset->fd_mutex);

This patch looks better, but as said, I will not do shrink in the
event processing loop: I would simply set a flag, something like
"need_shrink" here, and do the shrink outside this for loop.


> +			if (remove1 || remove2) {
> +				pthread_mutex_lock(&pfdset->fd_mutex);
> +
> +				/* shrink first, last migth be deleted*/
> +				fdset_shrink(pfdset);
> +				fdset_move_last(pfdset, i);
> +				fdset_shrink(pfdset);
> +
> +				pthread_mutex_unlock(&pfdset->fd_mutex);
> +
> +				continue;

Same here: just sets a flag.

> +			}
> +
> +			i++;
>  		}
> +
> +		/* fdset_del do not shrink, pack eventual remainings of array */
> +		pthread_mutex_lock(&pfdset->fd_mutex);
> +
> +		fdset_shrink(pfdset);
> +
> +		for ( ; i < pfdset->num; i++) {
> +			pfdentry = &pfdset->fd[i];
> +
> +			if (pfdentry->fd < 0) {
> +				fdset_move_last(pfdset, i);
> +				fdset_shrink(pfdset);
> +			}
> +		}

And yes, do the shrink here (when the shrink flag is set). But I would
simply call fdset_shrink() here (without the for loop), and let the
fdset_shrink() to handle the details: it could either be a swap with
last __valid__ entry, or simply a memmov.

That said, if you prefer to choose fdset_move_last(), fine, invoke it
inside fdset_shrink() then. Let fdset_shrink be able to remove an fd
in the middle.

> +
> +		pthread_mutex_unlock(&pfdset->fd_mutex);
>  	}
>  }
> diff --git a/lib/librte_vhost/fd_man.h b/lib/librte_vhost/fd_man.h
> index bd66ed1..03e7881 100644
> --- a/lib/librte_vhost/fd_man.h
> +++ b/lib/librte_vhost/fd_man.h
> @@ -35,6 +35,7 @@
>  #define _FD_MAN_H_
>  #include <stdint.h>
>  #include <pthread.h>
> +#include <poll.h>
>  
>  #define MAX_FDS 1024
>  
> @@ -49,9 +50,10 @@ struct fdentry {
>  };
>  
>  struct fdset {
> +	struct pollfd rwfds[MAX_FDS];
>  	struct fdentry fd[MAX_FDS];
>  	pthread_mutex_t fd_mutex;
> -	int num;	/* current fd number of this fdset */
> +	int num;	/* highest index occupied in fd array + 1 */

I don't see the comment change makes it more readable.

	--yliu
  

Patch

diff --git a/lib/librte_vhost/fd_man.c b/lib/librte_vhost/fd_man.c
index 2d3eeb7..d7c775d 100644
--- a/lib/librte_vhost/fd_man.c
+++ b/lib/librte_vhost/fd_man.c
@@ -35,16 +35,38 @@ 
 #include <stdio.h>
 #include <stdlib.h>
 #include <sys/socket.h>
-#include <sys/select.h>
 #include <sys/time.h>
 #include <sys/types.h>
 #include <unistd.h>
+#include <string.h>
 
 #include <rte_common.h>
 #include <rte_log.h>
 
 #include "fd_man.h"
 
+#define FDPOLLERR (POLLERR | POLLHUP | POLLNVAL)
+
+/**
+ * Adjusts the highest index populated in the array of fds
+ * @return
+ *   The new size of fdset.
+ */
+static int
+fdset_shrink(struct fdset *pfdset)
+{
+	int idx;
+
+	for (idx = pfdset->num - 1;
+	     idx >= 0 && pfdset->fd[idx].fd == -1;
+	     idx--)
+		;
+
+	pfdset->num = idx + 1;
+
+	return pfdset->num;
+}
+
 /**
  * Returns the index in the fdset for a given fd.
  * If fd is -1, it means to search for a free entry.
@@ -56,72 +78,28 @@ 
 {
 	int i;
 
-	if (pfdset == NULL)
-		return -1;
-
-	for (i = 0; i < MAX_FDS && pfdset->fd[i].fd != fd; i++)
+	for (i = 0; i < pfdset->num && pfdset->fd[i].fd != fd; i++)
 		;
 
-	return i ==  MAX_FDS ? -1 : i;
-}
-
-static int
-fdset_find_free_slot(struct fdset *pfdset)
-{
-	return fdset_find_fd(pfdset, -1);
+	return i == pfdset->num ? -1 : i;
 }
 
-static int
-fdset_add_fd(struct fdset  *pfdset, int idx, int fd,
+static void
+fdset_add_fd(struct fdset *pfdset, int idx, int fd,
 	fd_cb rcb, fd_cb wcb, void *dat)
 {
-	struct fdentry *pfdentry;
-
-	if (pfdset == NULL || idx >= MAX_FDS || fd >= FD_SETSIZE)
-		return -1;
+	struct fdentry *pfdentry = &pfdset->fd[idx];
+	struct pollfd *pfd = &pfdset->rwfds[idx];
 
-	pfdentry = &pfdset->fd[idx];
 	pfdentry->fd = fd;
 	pfdentry->rcb = rcb;
 	pfdentry->wcb = wcb;
 	pfdentry->dat = dat;
 
-	return 0;
-}
-
-/**
- * Fill the read/write fd_set with the fds in the fdset.
- * @return
- *  the maximum fds filled in the read/write fd_set.
- */
-static int
-fdset_fill(fd_set *rfset, fd_set *wfset, struct fdset *pfdset)
-{
-	struct fdentry *pfdentry;
-	int i, maxfds = -1;
-	int num = MAX_FDS;
-
-	if (pfdset == NULL)
-		return -1;
-
-	for (i = 0; i < num; i++) {
-		pfdentry = &pfdset->fd[i];
-		if (pfdentry->fd != -1) {
-			int added = 0;
-			if (pfdentry->rcb && rfset) {
-				FD_SET(pfdentry->fd, rfset);
-				added = 1;
-			}
-			if (pfdentry->wcb && wfset) {
-				FD_SET(pfdentry->fd, wfset);
-				added = 1;
-			}
-			if (added)
-				maxfds = pfdentry->fd < maxfds ?
-					maxfds : pfdentry->fd;
-		}
-	}
-	return maxfds;
+	pfd->fd = fd;
+	pfd->events = rcb ? POLLIN : 0;
+	pfd->events |= wcb ? POLLOUT : 0;
+	pfd->revents = 0;
 }
 
 void
@@ -132,11 +110,15 @@ 
 	if (pfdset == NULL)
 		return;
 
+	pthread_mutex_init(&pfdset->fd_mutex, NULL);
+
 	for (i = 0; i < MAX_FDS; i++) {
 		pfdset->fd[i].fd = -1;
 		pfdset->fd[i].dat = NULL;
 	}
 	pfdset->num = 0;
+
+	memset(pfdset->rwfds, 0, sizeof(pfdset->rwfds));
 }
 
 /**
@@ -152,14 +134,14 @@ 
 
 	pthread_mutex_lock(&pfdset->fd_mutex);
 
-	/* Find a free slot in the list. */
-	i = fdset_find_free_slot(pfdset);
-	if (i == -1 || fdset_add_fd(pfdset, i, fd, rcb, wcb, dat) < 0) {
+	i = pfdset->num < MAX_FDS ? pfdset->num++ : -1;
+
+	if (i == -1) {
 		pthread_mutex_unlock(&pfdset->fd_mutex);
 		return -2;
 	}
 
-	pfdset->num++;
+	fdset_add_fd(pfdset, i, fd, rcb, wcb, dat);
 
 	pthread_mutex_unlock(&pfdset->fd_mutex);
 
@@ -189,7 +171,6 @@ 
 			pfdset->fd[i].fd = -1;
 			pfdset->fd[i].rcb = pfdset->fd[i].wcb = NULL;
 			pfdset->fd[i].dat = NULL;
-			pfdset->num--;
 			i = -1;
 		}
 		pthread_mutex_unlock(&pfdset->fd_mutex);
@@ -198,25 +179,26 @@ 
 	return dat;
 }
 
+
 /**
- *  Unregister the fd at the specified slot from the fdset.
+ *  Moves the fd from last slot to specified slot, including
+ *  corresponding pollfd
  */
 static void
-fdset_del_slot(struct fdset *pfdset, int index)
+fdset_move_last(struct fdset *pfdset, int idx)
 {
-	if (pfdset == NULL || index < 0 || index >= MAX_FDS)
-		return;
-
-	pthread_mutex_lock(&pfdset->fd_mutex);
+	int last_idx = pfdset->num - 1;
 
-	pfdset->fd[index].fd = -1;
-	pfdset->fd[index].rcb = pfdset->fd[index].wcb = NULL;
-	pfdset->fd[index].dat = NULL;
-	pfdset->num--;
+	if (idx < last_idx) {
+		pfdset->fd[idx] = pfdset->fd[last_idx];
+		pfdset->fd[last_idx].fd = -1;
 
-	pthread_mutex_unlock(&pfdset->fd_mutex);
+		pfdset->rwfds[idx] = pfdset->rwfds[last_idx];
+		pfdset->rwfds[last_idx].revents = 0;
+	}
 }
 
+
 /**
  * This functions runs in infinite blocking loop until there is no fd in
  * pfdset. It calls corresponding r/w handler if there is event on the fd.
@@ -229,55 +211,75 @@ 
 void
 fdset_event_dispatch(struct fdset *pfdset)
 {
-	fd_set rfds, wfds;
-	int i, maxfds;
+	int i;
+	struct pollfd *pfd;
 	struct fdentry *pfdentry;
-	int num = MAX_FDS;
 	fd_cb rcb, wcb;
 	void *dat;
-	int fd;
+	int fd, numfds;
 	int remove1, remove2;
-	int ret;
 
 	if (pfdset == NULL)
 		return;
 
 	while (1) {
-		struct timeval tv;
-		tv.tv_sec = 1;
-		tv.tv_usec = 0;
-		FD_ZERO(&rfds);
-		FD_ZERO(&wfds);
-		pthread_mutex_lock(&pfdset->fd_mutex);
-
-		maxfds = fdset_fill(&rfds, &wfds, pfdset);
-
-		pthread_mutex_unlock(&pfdset->fd_mutex);
-
 		/*
-		 * When select is blocked, other threads might unregister
+		 * When poll is blocked, other threads might unregister
 		 * listenfds from and register new listenfds into fdset.
-		 * When select returns, the entries for listenfds in the fdset
+		 * When poll returns, the entries for listenfds in the fdset
 		 * might have been updated. It is ok if there is unwanted call
 		 * for new listenfds.
 		 */
-		ret = select(maxfds + 1, &rfds, &wfds, NULL, &tv);
-		if (ret <= 0)
-			continue;
+		pthread_mutex_lock(&pfdset->fd_mutex);
 
-		for (i = 0; i < num; i++) {
-			remove1 = remove2 = 0;
+		numfds = pfdset->num;
+
+		pthread_mutex_unlock(&pfdset->fd_mutex);
+
+		poll(pfdset->rwfds, numfds, 1000 /* millisecs */);
+
+		for (i = 0; i < numfds; ) {
 			pthread_mutex_lock(&pfdset->fd_mutex);
+
 			pfdentry = &pfdset->fd[i];
 			fd = pfdentry->fd;
+			pfd = &pfdset->rwfds[i];
+
+			if (fd < 0) {
+				/* Removed during poll */
+				/* shrink first, last migth be deleted*/
+
+				fdset_shrink(pfdset);
+				fdset_move_last(pfdset, i);
+				fdset_shrink(pfdset);
+
+				pthread_mutex_unlock(&pfdset->fd_mutex);
+
+				continue;
+			}
+
+			if (!pfd->revents) {
+
+				pthread_mutex_unlock(&pfdset->fd_mutex);
+
+				i++;
+				continue;
+			}
+
+			/* Valid fd, and at least one revent ... */
+
+			remove1 = remove2 = 0;
+
 			rcb = pfdentry->rcb;
 			wcb = pfdentry->wcb;
 			dat = pfdentry->dat;
 			pfdentry->busy = 1;
+
 			pthread_mutex_unlock(&pfdset->fd_mutex);
-			if (fd >= 0 && FD_ISSET(fd, &rfds) && rcb)
+
+			if (rcb && pfd->revents & (POLLIN | FDPOLLERR))
 				rcb(fd, dat, &remove1);
-			if (fd >= 0 && FD_ISSET(fd, &wfds) && wcb)
+			if (wcb && pfd->revents & (POLLOUT | FDPOLLERR))
 				wcb(fd, dat, &remove2);
 			pfdentry->busy = 0;
 			/*
@@ -292,8 +294,36 @@ 
 			 * listen fd in another thread, we couldn't call
 			 * fd_set_del.
 			 */
-			if (remove1 || remove2)
-				fdset_del_slot(pfdset, i);
+			if (remove1 || remove2) {
+				pthread_mutex_lock(&pfdset->fd_mutex);
+
+				/* shrink first, last migth be deleted*/
+				fdset_shrink(pfdset);
+				fdset_move_last(pfdset, i);
+				fdset_shrink(pfdset);
+
+				pthread_mutex_unlock(&pfdset->fd_mutex);
+
+				continue;
+			}
+
+			i++;
 		}
+
+		/* fdset_del do not shrink, pack eventual remainings of array */
+		pthread_mutex_lock(&pfdset->fd_mutex);
+
+		fdset_shrink(pfdset);
+
+		for ( ; i < pfdset->num; i++) {
+			pfdentry = &pfdset->fd[i];
+
+			if (pfdentry->fd < 0) {
+				fdset_move_last(pfdset, i);
+				fdset_shrink(pfdset);
+			}
+		}
+
+		pthread_mutex_unlock(&pfdset->fd_mutex);
 	}
 }
diff --git a/lib/librte_vhost/fd_man.h b/lib/librte_vhost/fd_man.h
index bd66ed1..03e7881 100644
--- a/lib/librte_vhost/fd_man.h
+++ b/lib/librte_vhost/fd_man.h
@@ -35,6 +35,7 @@ 
 #define _FD_MAN_H_
 #include <stdint.h>
 #include <pthread.h>
+#include <poll.h>
 
 #define MAX_FDS 1024
 
@@ -49,9 +50,10 @@  struct fdentry {
 };
 
 struct fdset {
+	struct pollfd rwfds[MAX_FDS];
 	struct fdentry fd[MAX_FDS];
 	pthread_mutex_t fd_mutex;
-	int num;	/* current fd number of this fdset */
+	int num;	/* highest index occupied in fd array + 1 */
 };