[dpdk-dev] [PATCH v2 3/4] eal: add synchronous multi-process communication

Jianfeng Tan jianfeng.tan at intel.com
Thu Jan 11 05:07:33 CET 2018


We need the synchronous way for multi-process communication,
i.e., blockingly waiting for reply message when we send a request
to the peer process.

We add two APIs rte_eal_mp_request() and rte_eal_mp_reply() for
such use case. By invoking rte_eal_mp_request(), a request message
is sent out, and then it waits there for a reply message. The
timeout is hard-coded 5 Sec. And the replied message will be copied
in the parameters of this API so that the caller can decide how
to translate those information (including params and fds). Note
if a primary process owns multiple secondary processes, this API
will fail.

The API rte_eal_mp_reply() is always called by an mp action handler.
Here we add another parameter for rte_eal_mp_t so that the action
handler knows which peer address to reply.

We use mutex in rte_eal_mp_request() to guarantee that only one
request is on the fly for one pair of processes.

Suggested-by: Anatoly Burakov <anatoly.burakov at intel.com>
Signed-off-by: Jianfeng Tan <jianfeng.tan at intel.com>
---
 lib/librte_eal/common/eal_common_proc.c | 144 +++++++++++++++++++++++++++++---
 lib/librte_eal/common/include/rte_eal.h |  73 +++++++++++++++-
 lib/librte_eal/rte_eal_version.map      |   2 +
 3 files changed, 206 insertions(+), 13 deletions(-)

diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index 70519cc..f194a52 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -32,6 +32,7 @@
 static int mp_fd = -1;
 static char *mp_sec_sockets[MAX_SECONDARY_PROCS];
 static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t mp_mutex_request = PTHREAD_MUTEX_INITIALIZER;
 
 struct action_entry {
 	TAILQ_ENTRY(action_entry) next;      /**< Next attached action entry */
@@ -49,6 +50,10 @@ static struct action_entry_list action_entry_list =
 
 struct mp_msghdr {
 	char action_name[MAX_ACTION_NAME_LEN];
+#define MP_MSG	0 /* Share message with peers, will not block */
+#define MP_REQ	1 /* Request for information, Will block for a reply */
+#define MP_REP	2 /* Reply to previously-received request */
+	int type;
 	int fds_num;
 	int len_params;
 	char params[0];
@@ -138,7 +143,8 @@ rte_eal_mp_action_unregister(const char *name)
 }
 
 static int
-read_msg(int fd, char *buf, int buflen, int *fds, int fds_num)
+read_msg(int fd, char *buf, int buflen,
+	 int *fds, int fds_num, struct sockaddr_un *s)
 {
 	int ret;
 	struct iovec iov;
@@ -151,6 +157,8 @@ read_msg(int fd, char *buf, int buflen, int *fds, int fds_num)
 	iov.iov_base = buf;
 	iov.iov_len  = buflen;
 
+	msgh.msg_name = s;
+	msgh.msg_namelen = sizeof(*s);
 	msgh.msg_iov = &iov;
 	msgh.msg_iovlen = 1;
 	msgh.msg_control = control;
@@ -181,7 +189,7 @@ read_msg(int fd, char *buf, int buflen, int *fds, int fds_num)
 }
 
 static int
-process_msg(struct mp_msghdr *hdr, int len, int fds[])
+process_msg(struct mp_msghdr *hdr, int len, int fds[], struct sockaddr_un *s)
 {
 	int ret;
 	int params_len;
@@ -199,10 +207,10 @@ process_msg(struct mp_msghdr *hdr, int len, int fds[])
 	}
 
 	params_len = len - sizeof(struct mp_msghdr);
-	ret = entry->action(hdr->params, params_len, fds, hdr->fds_num);
+	ret = entry->action(hdr->params, params_len,
+			    fds, hdr->fds_num, s->sun_path);
 	pthread_mutex_unlock(&mp_mutex_action);
 	return ret;
-
 }
 
 static void *
@@ -211,11 +219,12 @@ mp_handle(void *arg __rte_unused)
 	int len;
 	int fds[SCM_MAX_FD];
 	char buf[MAX_MSG_LENGTH];
+	struct sockaddr_un sa;
 
 	while (1) {
-		len = read_msg(mp_fd, buf, MAX_MSG_LENGTH, fds, SCM_MAX_FD);
+		len = read_msg(mp_fd, buf, MAX_MSG_LENGTH, fds, SCM_MAX_FD, &sa);
 		if (len > 0)
-			process_msg((struct mp_msghdr *)buf, len, fds);
+			process_msg((struct mp_msghdr *)buf, len, fds, &sa);
 	}
 
 	return NULL;
@@ -255,7 +264,8 @@ static int
 mp_primary_proc(const void *params,
 		int len __rte_unused,
 		int fds[] __rte_unused,
-		int fds_num __rte_unused)
+		int fds_num __rte_unused,
+		const void *peer __rte_unused)
 {
 	const struct proc_request *r = (const struct proc_request *)params;
 
@@ -362,7 +372,8 @@ rte_eal_mp_channel_init(void)
 }
 
 static inline struct mp_msghdr *
-format_msg(const char *act_name, const void *p, int len_params, int fds_num)
+format_msg(const char *act_name, const void *p,
+	   int len_params, int fds_num, int type)
 {
 	int len_msg;
 	struct mp_msghdr *msg;
@@ -384,6 +395,7 @@ format_msg(const char *act_name, const void *p, int len_params, int fds_num)
 	strcpy(msg->action_name, act_name);
 	msg->fds_num = fds_num;
 	msg->len_params = len_params;
+	msg->type = type;
 	memcpy(msg->params, p, len_params);
 	return msg;
 }
@@ -455,7 +467,9 @@ mp_send(const char *action_name,
 	const void *params,
 	int len_params,
 	int fds[],
-	int fds_num)
+	int fds_num,
+	int type,
+	const void *peer)
 {
 	int i;
 	int n = 0;
@@ -468,7 +482,7 @@ mp_send(const char *action_name,
 		return 0;
 	}
 
-	msg = format_msg(action_name, params, len_params, fds_num);
+	msg = format_msg(action_name, params, len_params, fds_num, type);
 	if (msg == NULL)
 		return 0;
 
@@ -477,6 +491,11 @@ mp_send(const char *action_name,
 		return 0;
 	}
 
+	if (peer) {
+		n += send_msg(sockfd, peer, msg, fds);
+		goto ret;
+	}
+
 	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
 		/* broadcast to all secondaries */
 		for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
@@ -488,6 +507,7 @@ mp_send(const char *action_name,
 	} else
 		n += send_msg(sockfd, eal_mp_unix_path(), msg, fds);
 
+ret:
 	free(msg);
 	close(sockfd);
 	return n;
@@ -501,5 +521,107 @@ rte_eal_mp_sendmsg(const char *action_name,
 		   int fds_num)
 {
 	RTE_LOG(DEBUG, EAL, "sendmsg: %s\n", action_name);
-	return mp_send(action_name, params, len_params, fds, fds_num);
+	return mp_send(action_name, params, len_params,
+			fds, fds_num, MP_MSG, NULL);
+}
+
+int
+rte_eal_mp_request(const char *action_name,
+		   void *params,
+		   int len_p,
+		   int fds[],
+		   int fds_in,
+		   int fds_out)
+{
+	int i, j;
+	int sockfd;
+	int nprocs;
+	int ret = 0;
+	struct mp_msghdr *req;
+	struct timeval tv;
+	char buf[MAX_MSG_LENGTH];
+	struct mp_msghdr *hdr;
+
+	RTE_LOG(DEBUG, EAL, "request: %s\n", action_name);
+
+	if (fds_in > SCM_MAX_FD || fds_out > SCM_MAX_FD) {
+		RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n", SCM_MAX_FD);
+		rte_errno = -E2BIG;
+		return 0;
+	}
+
+	req = format_msg(action_name, params, len_p, fds_in, MP_REQ);
+	if (req == NULL)
+		return 0;
+
+	if ((sockfd = open_unix_fd(0)) < 0) {
+		free(req);
+		return 0;
+	}
+
+	tv.tv_sec = 5;  /* 5 Secs Timeout */
+	tv.tv_usec = 0;
+	if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO,
+			(const void *)&tv, sizeof(struct timeval)) < 0)
+		RTE_LOG(INFO, EAL, "Failed to set recv timeout\n");
+
+	/* Only allow one req at a time */
+	pthread_mutex_lock(&mp_mutex_request);
+
+	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
+		nprocs = 0;
+		for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
+			if (!mp_sec_sockets[i]) {
+				j = i;
+				nprocs++;
+			}
+
+		if (nprocs > 1) {
+			RTE_LOG(ERR, EAL,
+				"multi secondary processes not supported\n");
+			goto free_and_ret;
+		}
+
+		ret = send_msg(sockfd, mp_sec_sockets[j], req, fds);
+	} else
+		ret = send_msg(sockfd, eal_mp_unix_path(), req, fds);
+
+	if (ret == 0) {
+		RTE_LOG(ERR, EAL, "failed to send request: %s\n", action_name);
+		ret = -1;
+		goto free_and_ret;
+	}
+
+	ret = read_msg(sockfd, buf, MAX_MSG_LENGTH, fds, fds_out, NULL);
+	if (ret > 0) {
+		hdr = (struct mp_msghdr *)buf;
+		if (hdr->len_params == len_p)
+			memcpy(params, hdr->params, len_p);
+		else {
+			RTE_LOG(ERR, EAL, "invalid reply\n");
+			ret = 0;
+		}
+	}
+
+free_and_ret:
+	free(req);
+	close(sockfd);
+	pthread_mutex_unlock(&mp_mutex_request);
+	return ret;
+}
+
+int
+rte_eal_mp_reply(const char *action_name,
+		 const void *params,
+		 int len_p,
+		 int fds[],
+		 int fds_in,
+		 const void *peer)
+{
+	RTE_LOG(DEBUG, EAL, "reply: %s\n", action_name);
+	if (peer == NULL) {
+		RTE_LOG(ERR, EAL, "peer is not specified\n");
+		return 0;
+	}
+	return mp_send(action_name, params, len_p, fds, fds_in, MP_REP, peer);
 }
diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
index 9884c0b..2690a77 100644
--- a/lib/librte_eal/common/include/rte_eal.h
+++ b/lib/librte_eal/common/include/rte_eal.h
@@ -192,7 +192,7 @@ int rte_eal_primary_proc_alive(const char *config_file_path);
  * this function typedef to register action for coming messages.
  */
 typedef int (*rte_eal_mp_t)(const void *params, int len,
-			    int fds[], int fds_num);
+			    int fds[], int fds_num, const void *peer);
 
 /**
  * Register an action function for primary/secondary communication.
@@ -245,7 +245,7 @@ void rte_eal_mp_action_unregister(const char *name);
  *   The fds argument is an array of fds sent with sendmsg.
  *
  * @param fds_num
- *   The fds_num argument is number of fds to be sent with sendmsg.
+ *   The number of fds to be sent with sendmsg.
  *
  * @return
  *  - Returns the number of messages being sent successfully.
@@ -255,6 +255,75 @@ rte_eal_mp_sendmsg(const char *action_name, const void *params,
 		   int len_params, int fds[], int fds_num);
 
 /**
+ * Send a request to the peer process and expect a reply.
+ *
+ * This function sends a request message to the peer process, and will
+ * block until receiving reply message from the peer process. Note:
+ * this does not work for the primary process sending requests to its
+ * multiple (>1) secondary processes.
+ *
+ * @param action_name
+ *   The action_name argument is used to identify which action will be used.
+ *
+ * @param params
+ *   The params argument contains the customized message; as the reply is
+ *   received, the replied params will be copied to this pointer. 
+ *
+ * @param len_p
+ *   The length of the customized message.
+ *
+ * @param fds
+ *   The fds argument is an array of fds sent with sendmsg; as the reply
+ *   is received, the replied fds will be copied into this array.
+ *
+ * @param fds_in
+ *   The number of fds to be sent.
+ *
+ * @param fds_out
+ *   The number of fds to be received.
+ *
+ * @return
+ *  - (1) on success;
+ *  - (0) on sending request successfully but no valid reply received.
+ *  - (<0) on failing to sending request.
+ */
+int
+rte_eal_mp_request(const char *action_name, void *params,
+		   int len_p, int fds[], int fds_in, int fds_out);
+
+/**
+ * Send a reply to the peer process.
+ *
+ * This function will send a reply message in response to a request message
+ * received previously.
+ *
+ * @param action_name
+ *   The action_name argument is used to identify which action will be used.
+ *
+ * @param params
+ *   The params argument contains the customized message.
+ *
+ * @param len_p
+ *   The length of the customized message.
+ *
+ * @param fds
+ *   The fds argument is an array of fds sent with sendmsg.
+ *
+ * @param fds_in
+ *   The number of fds to be sent with sendmsg.
+ *
+ * @param peer
+ *   The fds_num argument is number of fds to be sent with sendmsg.
+ *
+ * @return
+ *  - (1) on success;
+ *  - (0) on failure.
+ */
+int
+rte_eal_mp_reply(const char *action_name, const void *params,
+		 int len_p, int fds[], int fds_in, const void *peer);
+
+/**
  * Usage function typedef used by the application usage function.
  *
  * Use this function typedef to define and call rte_set_application_usage_hook()
diff --git a/lib/librte_eal/rte_eal_version.map b/lib/librte_eal/rte_eal_version.map
index 5dacde5..068ac0b 100644
--- a/lib/librte_eal/rte_eal_version.map
+++ b/lib/librte_eal/rte_eal_version.map
@@ -243,5 +243,7 @@ DPDK_18.02 {
 	rte_eal_mp_action_register;
 	rte_eal_mp_action_unregister;
 	rte_eal_mp_sendmsg;
+	rte_eal_mp_request;
+	rte_eal_mp_reply;
 
 } DPDK_17.11;
-- 
2.7.4



More information about the dev mailing list