[dpdk-dev,2/3] eal: add synchronous multi-process communication

Message ID 1512067450-59203-3-git-send-email-jianfeng.tan@intel.com (mailing list archive)
State Changes Requested, archived
Delegated to: Thomas Monjalon
Headers

Checks

Context Check Description
ci/checkpatch warning coding style issues
ci/Intel-compilation fail Compilation issues

Commit Message

Jianfeng Tan Nov. 30, 2017, 6:44 p.m. UTC
  We need the synchronous way for multi-process communication, that
is to say we need an immediate response after we send a message
to the other side.

We will stop the mp_handler thread, and after sending message,
the send thread will wait there for reponse and process the
respond.

Suggested-by: Anatoly Burakov <anatoly.burakov@intel.com>
Signed-off-by: Jianfeng Tan <jianfeng.tan@intel.com>
---
 lib/librte_eal/common/eal_common_proc.c | 53 +++++++++++++++++++++++++++++++--
 lib/librte_eal/common/include/rte_eal.h |  5 +++-
 2 files changed, 55 insertions(+), 3 deletions(-)
  

Comments

Burakov, Anatoly Dec. 11, 2017, 11:39 a.m. UTC | #1
On 30-Nov-17 6:44 PM, Jianfeng Tan wrote:
> We need the synchronous way for multi-process communication, that
> is to say we need an immediate response after we send a message
> to the other side.
> 
> We will stop the mp_handler thread, and after sending message,
> the send thread will wait there for reponse and process the
> respond.
> 
> Suggested-by: Anatoly Burakov <anatoly.burakov@intel.com>
> Signed-off-by: Jianfeng Tan <jianfeng.tan@intel.com>
> ---
>   lib/librte_eal/common/eal_common_proc.c | 53 +++++++++++++++++++++++++++++++--
>   lib/librte_eal/common/include/rte_eal.h |  5 +++-
>   2 files changed, 55 insertions(+), 3 deletions(-)
> 
> diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
> index 5d0a095..65ebaf2 100644
> --- a/lib/librte_eal/common/eal_common_proc.c
> +++ b/lib/librte_eal/common/eal_common_proc.c
> @@ -30,6 +30,8 @@
>    *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
>    */
>   
> +#define _GNU_SOURCE
> +

shouldn't this be in Makefile flags?

>   #include <stdio.h>
>   #include <fcntl.h>
>   #include <stdlib.h>
> @@ -41,6 +43,8 @@
>   #include <sys/un.h>
>   #include <errno.h>
>   #include <pthread.h>
> +#include <sys/eventfd.h>
> +#include <signal.h>
>   
>   #include <rte_log.h>
>   #include <rte_eal.h>
> @@ -134,6 +138,7 @@ rte_eal_mp_action_unregister(const char *name)
>   
>   struct mp_fds {
>   	int efd;
> +	int evfd; /* eventfd used for pausing mp_handler thread */
>   
>   	union {
>   		/* fds for primary process */
> @@ -331,6 +336,13 @@ mp_handler(void *arg __rte_unused)
>   		exit(EXIT_FAILURE);
>   	}
>   
> +	ev.data.fd = mp_fds.evfd;
> +	if (epoll_ctl(mp_fds.efd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) {
> +		RTE_LOG(ERR, EAL, "epoll_ctl failed: %s\n",
> +			strerror(errno));
> +		exit(EXIT_FAILURE);

here and in other places - rte_exit?

> +	}
> +
>   	events = calloc(20, sizeof ev);
>   
>   	while (1) {
> @@ -348,6 +360,14 @@ mp_handler(void *arg __rte_unused)
>   				continue;
>   			}
>   
> +			if (events[i].data.fd == mp_fds.evfd) {
> +				RTE_LOG(INFO, EAL, "mp_handler thread will pause\n");
> +				pause();
> +				RTE_LOG(INFO, EAL, "mp_handler thread stops pausing\n");
> +
> +				continue;
> +			}
> +
>   			fd = events[i].data.fd;
>   
>   			if ((events[i].events & EPOLLIN)) {
> @@ -377,13 +397,14 @@ mp_handler(void *arg __rte_unused)
>   	return NULL;
>   }
>   
> +static pthread_t tid;
> +
>   int
>   rte_eal_mp_channel_init(void)
>   {
>   	int i, fd, ret;
>   	const char *path;
>   	struct sockaddr_un un;
> -	pthread_t tid;
>   	char thread_name[RTE_MAX_THREAD_NAME_LEN];
>   
>   	mp_fds.efd = epoll_create1(0);
> @@ -462,6 +483,8 @@ rte_eal_mp_channel_init(void)
>   		return -1;
>   	}
>   
> +	mp_fds.evfd = eventfd(0, 0);
> +
>   	return 0;
>   }
>   
> @@ -485,7 +508,8 @@ rte_eal_mp_sendmsg(const char *action_name,
>   		   const void *params,
>   		   int len_params,
>   		   int fds[],
> -		   int fds_num)
> +		   int fds_num,
> +		   int need_ack)

I think "need_ack" is a misnomer because what we really want is not 
"ack" but a response.

More importantly, i think for clarity's sake, this should be a separate 
function - something like rte_eal_mp_sendreq() or maybe a better name 
(reqdata? communicate?). Also, i don't think reusing send parameters is 
a good idea - a user is expecting a response, so a user allocates data 
for a response separately from requests, and passes it explicitly.

>   {
>   	int i;
>   	int ret = 0;
> @@ -511,6 +535,11 @@ rte_eal_mp_sendmsg(const char *action_name,
>   
>   	RTE_LOG(INFO, EAL, "send msg: %s, %d\n", action_name, len_msg);
>   
> +	if (need_ack) {
> +		// stop mp_handler thread.

Do we accept C++-style comments?

> +		eventfd_write(mp_fds.evfd, (eventfd_t)1);
> +	}
> +
>   	msg = malloc(len_msg);
>   	if (!msg) {
>   		RTE_LOG(ERR, EAL, "Cannot alloc memory for msg\n");
> @@ -547,12 +576,32 @@ rte_eal_mp_sendmsg(const char *action_name,
>   			ret = send_msg(mp_fds.secondaries[i], &msgh);
>   			if (ret < 0)
>   				break;
> +
> +			if (need_ack) {
> +				/* We will hang there until the other side
> +				 * responses and what if other side is sending
> +				 * msg at the same time?
> +				 */
> +				process_msg(mp_fds.secondaries[i]);
> +			}
>   		}
>   	} else {
>   		ret = send_msg(mp_fds.primary, &msgh);
> +
> +		if (ret > 0 && need_ack) {
> +			// We will hang there until the other side responses
> +			ret = process_msg(mp_fds.primary);
> +		}
>   	}
>   
>   	free(msg);
>   
> +	if (need_ack) {
> +		// start mp_handler thread.
> +		union sigval value;

it's not used, but still, maybe zero-initialize it?

> +
> +		pthread_sigqueue(tid, 0, value);
> +	}
> +
>   	return ret;
>   }
> diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
> index 8776bcf..9875cae 100644
> --- a/lib/librte_eal/common/include/rte_eal.h
> +++ b/lib/librte_eal/common/include/rte_eal.h
> @@ -274,13 +274,16 @@ void rte_eal_mp_action_unregister(const char *name);
>    * @param fds_num
>    *   The fds_num argument is number of fds to be sent with sendmsg.
>    *
> + * @param need_ack
> + *   The fds_num argument is number of fds to be sent with sendmsg.
> + *
>    * @return
>    *  - (>=0) on success.
>    *  - (<0) on failure.
>    */
>   int
>   rte_eal_mp_sendmsg(const char *action_name, const void *params,
> -		   int len_params, int fds[], int fds_num);
> +		   int len_params, int fds[], int fds_num, int need_ack);
>   
>   /**
>    * Usage function typedef used by the application usage function.
>
  
Ananyev, Konstantin Dec. 11, 2017, 4:49 p.m. UTC | #2
> >

> > @@ -485,7 +508,8 @@ rte_eal_mp_sendmsg(const char *action_name,

> >   		   const void *params,

> >   		   int len_params,

> >   		   int fds[],

> > -		   int fds_num)

> > +		   int fds_num,

> > +		   int need_ack)

> 

> I think "need_ack" is a misnomer because what we really want is not

> "ack" but a response.

> 

> More importantly, i think for clarity's sake, this should be a separate

> function - something like rte_eal_mp_sendreq() or maybe a better name

> (reqdata? communicate?). 


+1 for a separate function.
Also I don't think it should disturb/block mp_handler() - there could be messages
for other actions (from other endpoints).
I think only rte_eal_mp_sendreq() should be blocked till ack/response is received.
And probably it needs max timeout to block for.
Konstantin
  

Patch

diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index 5d0a095..65ebaf2 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -30,6 +30,8 @@ 
  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  */
 
+#define _GNU_SOURCE
+
 #include <stdio.h>
 #include <fcntl.h>
 #include <stdlib.h>
@@ -41,6 +43,8 @@ 
 #include <sys/un.h>
 #include <errno.h>
 #include <pthread.h>
+#include <sys/eventfd.h>
+#include <signal.h>
 
 #include <rte_log.h>
 #include <rte_eal.h>
@@ -134,6 +138,7 @@  rte_eal_mp_action_unregister(const char *name)
 
 struct mp_fds {
 	int efd;
+	int evfd; /* eventfd used for pausing mp_handler thread */
 
 	union {
 		/* fds for primary process */
@@ -331,6 +336,13 @@  mp_handler(void *arg __rte_unused)
 		exit(EXIT_FAILURE);
 	}
 
+	ev.data.fd = mp_fds.evfd;
+	if (epoll_ctl(mp_fds.efd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) {
+		RTE_LOG(ERR, EAL, "epoll_ctl failed: %s\n",
+			strerror(errno));
+		exit(EXIT_FAILURE);
+	}
+
 	events = calloc(20, sizeof ev);
 
 	while (1) {
@@ -348,6 +360,14 @@  mp_handler(void *arg __rte_unused)
 				continue;
 			}
 
+			if (events[i].data.fd == mp_fds.evfd) {
+				RTE_LOG(INFO, EAL, "mp_handler thread will pause\n");
+				pause();
+				RTE_LOG(INFO, EAL, "mp_handler thread stops pausing\n");
+
+				continue;
+			}
+
 			fd = events[i].data.fd;
 
 			if ((events[i].events & EPOLLIN)) {
@@ -377,13 +397,14 @@  mp_handler(void *arg __rte_unused)
 	return NULL;
 }
 
+static pthread_t tid;
+
 int
 rte_eal_mp_channel_init(void)
 {
 	int i, fd, ret;
 	const char *path;
 	struct sockaddr_un un;
-	pthread_t tid;
 	char thread_name[RTE_MAX_THREAD_NAME_LEN];
 
 	mp_fds.efd = epoll_create1(0);
@@ -462,6 +483,8 @@  rte_eal_mp_channel_init(void)
 		return -1;
 	}
 
+	mp_fds.evfd = eventfd(0, 0);
+
 	return 0;
 }
 
@@ -485,7 +508,8 @@  rte_eal_mp_sendmsg(const char *action_name,
 		   const void *params,
 		   int len_params,
 		   int fds[],
-		   int fds_num)
+		   int fds_num,
+		   int need_ack)
 {
 	int i;
 	int ret = 0;
@@ -511,6 +535,11 @@  rte_eal_mp_sendmsg(const char *action_name,
 
 	RTE_LOG(INFO, EAL, "send msg: %s, %d\n", action_name, len_msg);
 
+	if (need_ack) {
+		// stop mp_handler thread.
+		eventfd_write(mp_fds.evfd, (eventfd_t)1);
+	}
+
 	msg = malloc(len_msg);
 	if (!msg) {
 		RTE_LOG(ERR, EAL, "Cannot alloc memory for msg\n");
@@ -547,12 +576,32 @@  rte_eal_mp_sendmsg(const char *action_name,
 			ret = send_msg(mp_fds.secondaries[i], &msgh);
 			if (ret < 0)
 				break;
+
+			if (need_ack) {
+				/* We will hang there until the other side
+				 * responses and what if other side is sending
+				 * msg at the same time?
+				 */
+				process_msg(mp_fds.secondaries[i]);
+			}
 		}
 	} else {
 		ret = send_msg(mp_fds.primary, &msgh);
+
+		if (ret > 0 && need_ack) {
+			// We will hang there until the other side responses
+			ret = process_msg(mp_fds.primary);
+		}
 	}
 
 	free(msg);
 
+	if (need_ack) {
+		// start mp_handler thread.
+		union sigval value;
+
+		pthread_sigqueue(tid, 0, value);
+	}
+
 	return ret;
 }
diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
index 8776bcf..9875cae 100644
--- a/lib/librte_eal/common/include/rte_eal.h
+++ b/lib/librte_eal/common/include/rte_eal.h
@@ -274,13 +274,16 @@  void rte_eal_mp_action_unregister(const char *name);
  * @param fds_num
  *   The fds_num argument is number of fds to be sent with sendmsg.
  *
+ * @param need_ack
+ *   The fds_num argument is number of fds to be sent with sendmsg.
+ *
  * @return
  *  - (>=0) on success.
  *  - (<0) on failure.
  */
 int
 rte_eal_mp_sendmsg(const char *action_name, const void *params,
-		   int len_params, int fds[], int fds_num);
+		   int len_params, int fds[], int fds_num, int need_ack);
 
 /**
  * Usage function typedef used by the application usage function.