[dpdk-dev] [PATCH v2 3/4] eal: add synchronous multi-process communication

Ananyev, Konstantin konstantin.ananyev at intel.com
Wed Jan 17 11:50:22 CET 2018



> > Hi Jianfeng,
> >
> >> -----Original Message-----
> >> From: Tan, Jianfeng
> >> Sent: Tuesday, January 16, 2018 8:11 AM
> >> To: Ananyev, Konstantin <konstantin.ananyev at intel.com>; dev at dpdk.org; Burakov, Anatoly <anatoly.burakov at intel.com>
> >> Cc: Richardson, Bruce <bruce.richardson at intel.com>; thomas at monjalon.net
> >> Subject: Re: [PATCH v2 3/4] eal: add synchronous multi-process communication
> >>
> >> Thank you, Konstantin and Anatoly firstly. Other comments are well
> >> received and I'll send out a new version.
> >>
> >>
> >> On 1/16/2018 8:00 AM, Ananyev, Konstantin wrote:
> >>>> We need the synchronous way for multi-process communication,
> >>>> i.e., blockingly waiting for reply message when we send a request
> >>>> to the peer process.
> >>>>
> >>>> We add two APIs rte_eal_mp_request() and rte_eal_mp_reply() for
> >>>> such use case. By invoking rte_eal_mp_request(), a request message
> >>>> is sent out, and then it waits there for a reply message. The
> >>>> timeout is hard-coded 5 Sec. And the replied message will be copied
> >>>> in the parameters of this API so that the caller can decide how
> >>>> to translate those information (including params and fds). Note
> >>>> if a primary process owns multiple secondary processes, this API
> >>>> will fail.
> >>>>
> >>>> The API rte_eal_mp_reply() is always called by an mp action handler.
> >>>> Here we add another parameter for rte_eal_mp_t so that the action
> >>>> handler knows which peer address to reply.
> >>>>
> >>>> We use mutex in rte_eal_mp_request() to guarantee that only one
> >>>> request is on the fly for one pair of processes.
> >>> You don't need to do things in such strange and restrictive way.
> >>> Instead you can do something like that:
> >>> 1) Introduce new struct, list for it and mutex
> >>>    struct sync_request {
> >>>         int reply_received;
> >>>         char dst[PATH_MAX];
> >>>         char reply[...];
> >>>         LIST_ENTRY(sync_request) next;
> >>> };
> >>>
> >>> static struct
> >>>       LIST_HEAD(list, sync_request);
> >>>       pthread_mutex_t lock;
> >>>      pthead_cond_t cond;
> >>> } sync_requests;
> >>>
> >>> 2) then at request() call:
> >>>     Grab sync_requests.lock
> >>>     Check do we already have a pending request for that destination,
> >>>     If yes - the release the lock and returns with error.
> >>>     - allocate and init new sync_request struct, set reply_received=0
> >>>     - do send_msg()
> >>>     -then in a cycle:
> >>>     pthread_cond_timed_wait(&sync_requests.cond, &sync_request.lock, &timespec);
> >>>     - at return from it check if sync_request.reply_received == 1, if not
> >>> check if timeout expired and either return a failure or go to the start of the cycle.
> >>>
> >>> 3) at mp_handler() if REPLY received - grab sync_request.lock,
> >>>       search through sync_requests.list for dst[] ,
> >>>      if found, then set it's reply_received=1, copy the received message into reply
> >>>      and call pthread_cond_braodcast((&sync_requests.cond);
> >> The only benefit I can see is that now the sender can request to
> >> multiple receivers at the same time. And it makes things more
> >> complicated. Do we really need this?
> > The benefit is that one thread is blocked waiting for response,
> > your mp_handler can still receive and handle other messages.
> 
> This can already be done in the original implementation. mp_handler
> listens for msg, request from the other peer(s), and replies the
> requests, which is not affected.
> 
> > Plus as you said - other threads can keep sending messages.
> 
> For this one, in the original implementation, other threads can still
> send msg, but not request. I suppose the request is not in a fast path,
> why we care to make it fast?
> 

+int
+rte_eal_mp_request(const char *action_name,
+		   void *params,
+		   int len_p,
+		   int fds[],
+		   int fds_in,
+		   int fds_out)
+{
+	int i, j;
+	int sockfd;
+	int nprocs;
+	int ret = 0;
+	struct mp_msghdr *req;
+	struct timeval tv;
+	char buf[MAX_MSG_LENGTH];
+	struct mp_msghdr *hdr;
+
+	RTE_LOG(DEBUG, EAL, "request: %s\n", action_name);
+
+	if (fds_in > SCM_MAX_FD || fds_out > SCM_MAX_FD) {
+		RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n", SCM_MAX_FD);
+		rte_errno = -E2BIG;
+		return 0;
+	}
+
+	req = format_msg(action_name, params, len_p, fds_in, MP_REQ);
+	if (req == NULL)
+		return 0;
+
+	if ((sockfd = open_unix_fd(0)) < 0) {
+		free(req);
+		return 0;
+	}
+
+	tv.tv_sec = 5;  /* 5 Secs Timeout */
+	tv.tv_usec = 0;
+	if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO,
+			(const void *)&tv, sizeof(struct timeval)) < 0)
+		RTE_LOG(INFO, EAL, "Failed to set recv timeout\n");

I f you set it just for one call, why do you not restore it?
Also I don't think it is a good idea to change it here - 
if you'll make timeout a parameter value - then it could be overwritten
by different threads. 

+
+	/* Only allow one req at a time */
+	pthread_mutex_lock(&mp_mutex_request);
+
+	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
+		nprocs = 0;
+		for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
+			if (!mp_sec_sockets[i]) {
+				j = i;
+				nprocs++;
+			}
+
+		if (nprocs > 1) {
+			RTE_LOG(ERR, EAL,
+				"multi secondary processes not supported\n");
+			goto free_and_ret;
+		}
+
+		ret = send_msg(sockfd, mp_sec_sockets[j], req, fds);

As I remember - sndmsg() is also blocking call, so under some conditions you can stall
there forever.
As mp_mutex_requestis still held - next rte_eal_mp_request(0 will also block forever here.

+	} else
+		ret = send_msg(sockfd, eal_mp_unix_path(), req, fds);
+
+	if (ret == 0) {
+		RTE_LOG(ERR, EAL, "failed to send request: %s\n", action_name);
+		ret = -1;
+		goto free_and_ret;
+	}
+
+	ret = read_msg(sockfd, buf, MAX_MSG_LENGTH, fds, fds_out, NULL);

if the message you receive is not a reply you are expecting -
it will be simply dropped - mp_handler() would never process it.

+	if (ret > 0) {
+		hdr = (struct mp_msghdr *)buf;
+		if (hdr->len_params == len_p)
+			memcpy(params, hdr->params, len_p);
+		else {
+			RTE_LOG(ERR, EAL, "invalid reply\n");
+			ret = 0;
+		}
+	}
+
+free_and_ret:
+	free(req);
+	close(sockfd);
+	pthread_mutex_unlock(&mp_mutex_request);
+	return ret;
+}

All of the above makes me think that current implementation is erroneous
and needs to be reworked.
Konstantin




More information about the dev mailing list