[dpdk-dev,v3] vhost: allow for many vhost user ports

Message ID 1481561434-28675-1-git-send-email-jan.wickbom@ericsson.com (mailing list archive)
State Superseded, archived
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel Per-patch compilation check success Compilation OK

Commit Message

Jan Wickbom Dec. 12, 2016, 4:50 p.m. UTC
  Currently select() is used to monitor file descriptors for vhostuser
ports. This limits the number of ports possible to create since the
fd number is used as index in the fd_set and we have seen fds > 1023.
This patch changes select() to poll(). This way we can keep an
packed (pollfd) array for the fds, e.g. as many fds as the size of
the array.

Also see:
http://dpdk.org/ml/archives/dev/2016-April/037024.html

Signed-off-by: Jan Wickbom <jan.wickbom@ericsson.com>
Reported-by: Patrik Andersson <patrik.r.andersson@ericsson.com>
---

v3:
* removed unnecessary include
* removed fdset_fill, made it functionally part of poll loop

v2:
* removed unnecessary casts
* static array replacing allocated memory

 lib/librte_vhost/fd_man.c | 194 +++++++++++++++++++++++++---------------------
 lib/librte_vhost/fd_man.h |   2 +-
 2 files changed, 105 insertions(+), 91 deletions(-)
  

Comments

Yuanhan Liu Dec. 13, 2016, 9:14 a.m. UTC | #1
On Mon, Dec 12, 2016 at 05:50:34PM +0100, Jan Wickbom wrote:
> Currently select() is used to monitor file descriptors for vhostuser
> ports. This limits the number of ports possible to create since the
> fd number is used as index in the fd_set and we have seen fds > 1023.
> This patch changes select() to poll(). This way we can keep an
> packed (pollfd) array for the fds, e.g. as many fds as the size of
> the array.
> 
> Also see:
> http://dpdk.org/ml/archives/dev/2016-April/037024.html
> 
> Signed-off-by: Jan Wickbom <jan.wickbom@ericsson.com>
> Reported-by: Patrik Andersson <patrik.r.andersson@ericsson.com>
...
> +static struct pollfd rwfds[MAX_FDS];

Though it's unlikely, but just assume we have multiple instance of
fdset_event_dispatch(pfdset), a global rwfds will not work.

Thought twice, and it's better to put it into the fdset struct:

    struct fdset {
            struct fdentry fd[MAX_FDS];
            struct pollfd rwfds[MAX_FDS];
            ...

>  /**
>   * This functions runs in infinite blocking loop until there is no fd in
>   * pfdset. It calls corresponding r/w handler if there is event on the fd.
> @@ -229,55 +217,71 @@
>  void
>  fdset_event_dispatch(struct fdset *pfdset)
>  {
> -	fd_set rfds, wfds;
> -	int i, maxfds;
> +	int i;
>  	struct fdentry *pfdentry;
> -	int num = MAX_FDS;
>  	fd_cb rcb, wcb;
>  	void *dat;
>  	int fd;
>  	int remove1, remove2;
> -	int ret;
>  
>  	if (pfdset == NULL)
>  		return;
>  
> -	while (1) {
> -		struct timeval tv;
> -		tv.tv_sec = 1;
> -		tv.tv_usec = 0;
> -		FD_ZERO(&rfds);
> -		FD_ZERO(&wfds);
> -		pthread_mutex_lock(&pfdset->fd_mutex);
> -
> -		maxfds = fdset_fill(&rfds, &wfds, pfdset);
> -
> -		pthread_mutex_unlock(&pfdset->fd_mutex);
> +	memset(rwfds, 0, sizeof(rwfds));
>  
> +	while (1) {
>  		/*
> -		 * When select is blocked, other threads might unregister
> +		 * When poll is blocked, other threads might unregister
>  		 * listenfds from and register new listenfds into fdset.
> -		 * When select returns, the entries for listenfds in the fdset
> +		 * When poll returns, the entries for listenfds in the fdset
>  		 * might have been updated. It is ok if there is unwanted call
>  		 * for new listenfds.
>  		 */
> -		ret = select(maxfds + 1, &rfds, &wfds, NULL, &tv);
> -		if (ret <= 0)
> -			continue;
> +		poll(rwfds, pfdset->num, 1000 /* millisecs */);
>  
> -		for (i = 0; i < num; i++) {
> -			remove1 = remove2 = 0;
> +		for (i = 0; i < pfdset->num; ) {
>  			pthread_mutex_lock(&pfdset->fd_mutex);
> +
>  			pfdentry = &pfdset->fd[i];
>  			fd = pfdentry->fd;
> +
> +			if (fd < 0) {
> +				/* Removed during poll */
> +
> +				fdset_move_last(pfdset, i);
> +				fdset_shrink(pfdset);
> +
> +				pthread_mutex_unlock(&pfdset->fd_mutex);
> +
> +				continue;
> +			}
> +
> +			if (!rwfds[i].revents) {
> +				/* No revents, maybe added during poll */
> +
> +				rwfds[i].fd = fd;
> +				rwfds[i].events = pfdentry->rcb ? POLLIN : 0;
> +				rwfds[i].events |= pfdentry->wcb ? POLLOUT : 0;
> +				pthread_mutex_unlock(&pfdset->fd_mutex);
> +
> +				i++;
> +				continue;

I think it's error-prone to manipulate the rwfds here. Besides, it
registers an fd repeatedly.

The way I think of is:

- set rwfds[i] at fdset_add().

  This also simply makes sure that pfdset->rwfds[i] and pfdset->fd[i] is
  correctly bond.

	fdset_add(fdset, fd, ...) {
		lock();

		i = fdset_find_free_slot(..);

		pfdset->fd[i]->fd  = fd;
		pfdset->fd[i]->rcb = rcb;
		pfdset->fd[i]->...;

		pfdset->rwfds[i]->fd = fd;
		pfdset->rwfds[i]->events  = ...;
		pfdset->rwfds[i]->revents = 0;
	}


- set pfdset->fd[i]->fd = -1 on fdset_del. Note we should not decrease
  'num' here, as we may be at poll.

	fdset_del(fdset, fd) {
		lock();

		i = fdset_find_fd(pfdset, fd);
		pfdset->fd[i]->fd = -1;

		...
	}
		
		

- log down pfdset->num before poll, because 'num' may increase during poll.

  I think it's optional, since even 'num' is increased during poll, it just
  leads to few more rwfds entries will be accessed. But it's not tracked by
  kernel, and revents is initiated with 0, that there is no issue.


- shrink the fdset and rwfds (together) for those removed entries, __outside__
  the for loop after poll.

Works to you?

	--yliu
  
Jan Wickbom Dec. 13, 2016, 1:15 p.m. UTC | #2
> -----Original Message-----
> From: Yuanhan Liu [mailto:yuanhan.liu@linux.intel.com]
> Sent: den 13 december 2016 10:15
> To: Jan Wickbom <jan.wickbom@ericsson.com>
> Cc: dev@dpdk.org; Patrik Andersson R <patrik.r.andersson@ericsson.com>
> Subject: Re: [PATCH v3] vhost: allow for many vhost user ports
> 
> On Mon, Dec 12, 2016 at 05:50:34PM +0100, Jan Wickbom wrote:
> > Currently select() is used to monitor file descriptors for vhostuser
> > ports. This limits the number of ports possible to create since the
> > fd number is used as index in the fd_set and we have seen fds > 1023.
> > This patch changes select() to poll(). This way we can keep an
> > packed (pollfd) array for the fds, e.g. as many fds as the size of
> > the array.
> >
> > Also see:
> > http://dpdk.org/ml/archives/dev/2016-April/037024.html
> >
> > Signed-off-by: Jan Wickbom <jan.wickbom@ericsson.com>
> > Reported-by: Patrik Andersson <patrik.r.andersson@ericsson.com>
> ...
> > +static struct pollfd rwfds[MAX_FDS];
> 
> Though it's unlikely, but just assume we have multiple instance of
> fdset_event_dispatch(pfdset), a global rwfds will not work.
> 
> Thought twice, and it's better to put it into the fdset struct:
> 
>     struct fdset {
>             struct fdentry fd[MAX_FDS];
>             struct pollfd rwfds[MAX_FDS];
>             ...
> 

Done


> >  /**
> >   * This functions runs in infinite blocking loop until there is no fd in
> >   * pfdset. It calls corresponding r/w handler if there is event on the fd.
> > @@ -229,55 +217,71 @@
> >  void
> >  fdset_event_dispatch(struct fdset *pfdset)
> >  {
> > -	fd_set rfds, wfds;
> > -	int i, maxfds;
> > +	int i;
> >  	struct fdentry *pfdentry;
> > -	int num = MAX_FDS;
> >  	fd_cb rcb, wcb;
> >  	void *dat;
> >  	int fd;
> >  	int remove1, remove2;
> > -	int ret;
> >
> >  	if (pfdset == NULL)
> >  		return;
> >
> > -	while (1) {
> > -		struct timeval tv;
> > -		tv.tv_sec = 1;
> > -		tv.tv_usec = 0;
> > -		FD_ZERO(&rfds);
> > -		FD_ZERO(&wfds);
> > -		pthread_mutex_lock(&pfdset->fd_mutex);
> > -
> > -		maxfds = fdset_fill(&rfds, &wfds, pfdset);
> > -
> > -		pthread_mutex_unlock(&pfdset->fd_mutex);
> > +	memset(rwfds, 0, sizeof(rwfds));
> >
> > +	while (1) {
> >  		/*
> > -		 * When select is blocked, other threads might
> unregister
> > +		 * When poll is blocked, other threads might
> unregister
> >  		 * listenfds from and register new listenfds into
> fdset.
> > -		 * When select returns, the entries for listenfds
> in the fdset
> > +		 * When poll returns, the entries for listenfds in
> the fdset
> >  		 * might have been updated. It is ok if there is
> unwanted call
> >  		 * for new listenfds.
> >  		 */
> > -		ret = select(maxfds + 1, &rfds, &wfds, NULL,
> &tv);
> > -		if (ret <= 0)
> > -			continue;
> > +		poll(rwfds, pfdset->num, 1000 /* millisecs */);
> >
> > -		for (i = 0; i < num; i++) {
> > -			remove1 = remove2 = 0;
> > +		for (i = 0; i < pfdset->num; ) {
> >  			pthread_mutex_lock(&pfdset-
> >fd_mutex);
> > +
> >  			pfdentry = &pfdset->fd[i];
> >  			fd = pfdentry->fd;
> > +
> > +			if (fd < 0) {
> > +				/* Removed during
> poll */
> > +
> > +
> 	fdset_move_last(pfdset, i);
> > +
> 	fdset_shrink(pfdset);
> > +
> > +
> 	pthread_mutex_unlock(&pfdset->fd_mutex);
> > +
> > +				continue;
> > +			}
> > +
> > +			if (!rwfds[i].revents) {
> > +				/* No revents,
> maybe added during poll */
> > +
> > +				rwfds[i].fd = fd;
> > +				rwfds[i].events =
> pfdentry->rcb ? POLLIN : 0;
> > +				rwfds[i].events |=
> pfdentry->wcb ? POLLOUT : 0;
> > +
> 	pthread_mutex_unlock(&pfdset->fd_mutex);
> > +
> > +				i++;
> > +				continue;
> 
> I think it's error-prone to manipulate the rwfds here. Besides, it
> registers an fd repeatedly.
> 
> The way I think of is:
> 
> - set rwfds[i] at fdset_add().
> 
>   This also simply makes sure that pfdset->rwfds[i] and pfdset->fd[i] is
>   correctly bond.
> 
> 	fdset_add(fdset, fd, ...) {
> 		lock();
> 
> 		i = fdset_find_free_slot(..);
> 
> 		pfdset->fd[i]->fd  = fd;
> 		pfdset->fd[i]->rcb = rcb;
> 		pfdset->fd[i]->...;
> 
> 		pfdset->rwfds[i]->fd = fd;
> 		pfdset->rwfds[i]->events  = ...;
> 		pfdset->rwfds[i]->revents = 0;
> 	}
> 
> 
> - set pfdset->fd[i]->fd = -1 on fdset_del. Note we should not decrease
>   'num' here, as we may be at poll.
> 
> 	fdset_del(fdset, fd) {
> 		lock();
> 
> 		i = fdset_find_fd(pfdset, fd);
> 		pfdset->fd[i]->fd = -1;
> 
> 		...
> 	}
> 
> 
> 
> - log down pfdset->num before poll, because 'num' may increase during poll.
> 
>   I think it's optional, since even 'num' is increased during poll, it just
>   leads to few more rwfds entries will be accessed. But it's not tracked by
>   kernel, and revents is initiated with 0, that there is no issue.
> 
> 
> - shrink the fdset and rwfds (together) for those removed entries,
> __outside__
>   the for loop after poll.
> 
> Works to you?

I did quite of a rework of this yesterday before reading your mail. I think
That should work, please have a look.
V4 coming in a minute
> 
> 	--yliu
  

Patch

diff --git a/lib/librte_vhost/fd_man.c b/lib/librte_vhost/fd_man.c
index 2d3eeb7..c360d07 100644
--- a/lib/librte_vhost/fd_man.c
+++ b/lib/librte_vhost/fd_man.c
@@ -35,16 +35,40 @@ 
 #include <stdio.h>
 #include <stdlib.h>
 #include <sys/socket.h>
-#include <sys/select.h>
 #include <sys/time.h>
 #include <sys/types.h>
+#include <poll.h>
 #include <unistd.h>
+#include <string.h>
 
 #include <rte_common.h>
 #include <rte_log.h>
 
 #include "fd_man.h"
 
+#define FDPOLLERR (POLLERR | POLLHUP | POLLNVAL)
+
+
+static struct pollfd rwfds[MAX_FDS];
+
+/**
+ * Adjusts the highest index populated in the array of fds
+ * @return
+ *   The new size of fdset.
+ */
+static void
+fdset_shrink(struct fdset *pfdset)
+{
+	int idx;
+
+	for (idx = pfdset->num - 1;
+	     idx >= 0 && pfdset->fd[idx].fd == -1;
+	     idx--)
+		;
+
+	pfdset->num = idx + 1;
+}
+
 /**
  * Returns the index in the fdset for a given fd.
  * If fd is -1, it means to search for a free entry.
@@ -56,72 +80,32 @@ 
 {
 	int i;
 
-	if (pfdset == NULL)
-		return -1;
-
-	for (i = 0; i < MAX_FDS && pfdset->fd[i].fd != fd; i++)
+	for (i = 0; i < pfdset->num && pfdset->fd[i].fd != fd; i++)
 		;
 
-	return i ==  MAX_FDS ? -1 : i;
+	return i == pfdset->num ? -1 : i;
 }
 
 static int
 fdset_find_free_slot(struct fdset *pfdset)
 {
-	return fdset_find_fd(pfdset, -1);
+	if (pfdset->num < MAX_FDS)
+		return pfdset->num;
+	else
+		return fdset_find_fd(pfdset, -1);
 }
 
-static int
-fdset_add_fd(struct fdset  *pfdset, int idx, int fd,
+static void
+fdset_add_fd(struct fdset *pfdset, int idx, int fd,
 	fd_cb rcb, fd_cb wcb, void *dat)
 {
 	struct fdentry *pfdentry;
 
-	if (pfdset == NULL || idx >= MAX_FDS || fd >= FD_SETSIZE)
-		return -1;
-
 	pfdentry = &pfdset->fd[idx];
 	pfdentry->fd = fd;
 	pfdentry->rcb = rcb;
 	pfdentry->wcb = wcb;
 	pfdentry->dat = dat;
-
-	return 0;
-}
-
-/**
- * Fill the read/write fd_set with the fds in the fdset.
- * @return
- *  the maximum fds filled in the read/write fd_set.
- */
-static int
-fdset_fill(fd_set *rfset, fd_set *wfset, struct fdset *pfdset)
-{
-	struct fdentry *pfdentry;
-	int i, maxfds = -1;
-	int num = MAX_FDS;
-
-	if (pfdset == NULL)
-		return -1;
-
-	for (i = 0; i < num; i++) {
-		pfdentry = &pfdset->fd[i];
-		if (pfdentry->fd != -1) {
-			int added = 0;
-			if (pfdentry->rcb && rfset) {
-				FD_SET(pfdentry->fd, rfset);
-				added = 1;
-			}
-			if (pfdentry->wcb && wfset) {
-				FD_SET(pfdentry->fd, wfset);
-				added = 1;
-			}
-			if (added)
-				maxfds = pfdentry->fd < maxfds ?
-					maxfds : pfdentry->fd;
-		}
-	}
-	return maxfds;
 }
 
 void
@@ -132,6 +116,8 @@ 
 	if (pfdset == NULL)
 		return;
 
+	pthread_mutex_init(&pfdset->fd_mutex, NULL);
+
 	for (i = 0; i < MAX_FDS; i++) {
 		pfdset->fd[i].fd = -1;
 		pfdset->fd[i].dat = NULL;
@@ -152,14 +138,15 @@ 
 
 	pthread_mutex_lock(&pfdset->fd_mutex);
 
-	/* Find a free slot in the list. */
 	i = fdset_find_free_slot(pfdset);
-	if (i == -1 || fdset_add_fd(pfdset, i, fd, rcb, wcb, dat) < 0) {
+	if (i == -1) {
 		pthread_mutex_unlock(&pfdset->fd_mutex);
 		return -2;
 	}
 
-	pfdset->num++;
+	fdset_add_fd(pfdset, i, fd, rcb, wcb, dat);
+	if (i == pfdset->num)
+		pfdset->num++;
 
 	pthread_mutex_unlock(&pfdset->fd_mutex);
 
@@ -189,7 +176,7 @@ 
 			pfdset->fd[i].fd = -1;
 			pfdset->fd[i].rcb = pfdset->fd[i].wcb = NULL;
 			pfdset->fd[i].dat = NULL;
-			pfdset->num--;
+			fdset_shrink(pfdset);
 			i = -1;
 		}
 		pthread_mutex_unlock(&pfdset->fd_mutex);
@@ -198,25 +185,26 @@ 
 	return dat;
 }
 
+
 /**
- *  Unregister the fd at the specified slot from the fdset.
+ *  Moves the fd from last slot to specified slot, including
+ *  corresponding pollfd
  */
 static void
-fdset_del_slot(struct fdset *pfdset, int index)
+fdset_move_last(struct fdset *pfdset, int idx)
 {
-	if (pfdset == NULL || index < 0 || index >= MAX_FDS)
-		return;
-
-	pthread_mutex_lock(&pfdset->fd_mutex);
+	int last_idx = pfdset->num - 1;
 
-	pfdset->fd[index].fd = -1;
-	pfdset->fd[index].rcb = pfdset->fd[index].wcb = NULL;
-	pfdset->fd[index].dat = NULL;
-	pfdset->num--;
+	if (idx < last_idx) {
+		pfdset->fd[idx] = pfdset->fd[last_idx];
+		pfdset->fd[last_idx].fd = -1;
 
-	pthread_mutex_unlock(&pfdset->fd_mutex);
+		rwfds[idx] = rwfds[last_idx];
+		rwfds[last_idx].revents = 0;
+	}
 }
 
+
 /**
  * This functions runs in infinite blocking loop until there is no fd in
  * pfdset. It calls corresponding r/w handler if there is event on the fd.
@@ -229,55 +217,71 @@ 
 void
 fdset_event_dispatch(struct fdset *pfdset)
 {
-	fd_set rfds, wfds;
-	int i, maxfds;
+	int i;
 	struct fdentry *pfdentry;
-	int num = MAX_FDS;
 	fd_cb rcb, wcb;
 	void *dat;
 	int fd;
 	int remove1, remove2;
-	int ret;
 
 	if (pfdset == NULL)
 		return;
 
-	while (1) {
-		struct timeval tv;
-		tv.tv_sec = 1;
-		tv.tv_usec = 0;
-		FD_ZERO(&rfds);
-		FD_ZERO(&wfds);
-		pthread_mutex_lock(&pfdset->fd_mutex);
-
-		maxfds = fdset_fill(&rfds, &wfds, pfdset);
-
-		pthread_mutex_unlock(&pfdset->fd_mutex);
+	memset(rwfds, 0, sizeof(rwfds));
 
+	while (1) {
 		/*
-		 * When select is blocked, other threads might unregister
+		 * When poll is blocked, other threads might unregister
 		 * listenfds from and register new listenfds into fdset.
-		 * When select returns, the entries for listenfds in the fdset
+		 * When poll returns, the entries for listenfds in the fdset
 		 * might have been updated. It is ok if there is unwanted call
 		 * for new listenfds.
 		 */
-		ret = select(maxfds + 1, &rfds, &wfds, NULL, &tv);
-		if (ret <= 0)
-			continue;
+		poll(rwfds, pfdset->num, 1000 /* millisecs */);
 
-		for (i = 0; i < num; i++) {
-			remove1 = remove2 = 0;
+		for (i = 0; i < pfdset->num; ) {
 			pthread_mutex_lock(&pfdset->fd_mutex);
+
 			pfdentry = &pfdset->fd[i];
 			fd = pfdentry->fd;
+
+			if (fd < 0) {
+				/* Removed during poll */
+
+				fdset_move_last(pfdset, i);
+				fdset_shrink(pfdset);
+
+				pthread_mutex_unlock(&pfdset->fd_mutex);
+
+				continue;
+			}
+
+			if (!rwfds[i].revents) {
+				/* No revents, maybe added during poll */
+
+				rwfds[i].fd = fd;
+				rwfds[i].events = pfdentry->rcb ? POLLIN : 0;
+				rwfds[i].events |= pfdentry->wcb ? POLLOUT : 0;
+				pthread_mutex_unlock(&pfdset->fd_mutex);
+
+				i++;
+				continue;
+			}
+
+			/* Valid fd, and at least one revent ... */
+
+			remove1 = remove2 = 0;
+
 			rcb = pfdentry->rcb;
 			wcb = pfdentry->wcb;
 			dat = pfdentry->dat;
 			pfdentry->busy = 1;
+
 			pthread_mutex_unlock(&pfdset->fd_mutex);
-			if (fd >= 0 && FD_ISSET(fd, &rfds) && rcb)
+
+			if (rcb && rwfds[i].revents & (POLLIN | FDPOLLERR))
 				rcb(fd, dat, &remove1);
-			if (fd >= 0 && FD_ISSET(fd, &wfds) && wcb)
+			if (wcb && rwfds[i].revents & (POLLOUT | FDPOLLERR))
 				wcb(fd, dat, &remove2);
 			pfdentry->busy = 0;
 			/*
@@ -292,8 +296,18 @@ 
 			 * listen fd in another thread, we couldn't call
 			 * fd_set_del.
 			 */
-			if (remove1 || remove2)
-				fdset_del_slot(pfdset, i);
+			if (remove1 || remove2) {
+				pthread_mutex_lock(&pfdset->fd_mutex);
+
+				fdset_move_last(pfdset, i);
+				fdset_shrink(pfdset);
+
+				pthread_mutex_unlock(&pfdset->fd_mutex);
+
+				continue;
+			}
+
+			i++;
 		}
 	}
 }
diff --git a/lib/librte_vhost/fd_man.h b/lib/librte_vhost/fd_man.h
index bd66ed1..b5ba688 100644
--- a/lib/librte_vhost/fd_man.h
+++ b/lib/librte_vhost/fd_man.h
@@ -51,7 +51,7 @@  struct fdentry {
 struct fdset {
 	struct fdentry fd[MAX_FDS];
 	pthread_mutex_t fd_mutex;
-	int num;	/* current fd number of this fdset */
+	int num;	/* highest index occupied in fd array + 1 */
 };