[dpdk-dev,v2,3/4] eal: add synchronous multi-process communication

Message ID 1515643654-129489-4-git-send-email-jianfeng.tan@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers

Checks

Context Check Description
ci/checkpatch warning coding style issues
ci/Intel-compilation fail Compilation issues

Commit Message

Jianfeng Tan Jan. 11, 2018, 4:07 a.m. UTC
  We need the synchronous way for multi-process communication,
i.e., blockingly waiting for reply message when we send a request
to the peer process.

We add two APIs rte_eal_mp_request() and rte_eal_mp_reply() for
such use case. By invoking rte_eal_mp_request(), a request message
is sent out, and then it waits there for a reply message. The
timeout is hard-coded 5 Sec. And the replied message will be copied
in the parameters of this API so that the caller can decide how
to translate those information (including params and fds). Note
if a primary process owns multiple secondary processes, this API
will fail.

The API rte_eal_mp_reply() is always called by an mp action handler.
Here we add another parameter for rte_eal_mp_t so that the action
handler knows which peer address to reply.

We use mutex in rte_eal_mp_request() to guarantee that only one
request is on the fly for one pair of processes.

Suggested-by: Anatoly Burakov <anatoly.burakov@intel.com>
Signed-off-by: Jianfeng Tan <jianfeng.tan@intel.com>
---
 lib/librte_eal/common/eal_common_proc.c | 144 +++++++++++++++++++++++++++++---
 lib/librte_eal/common/include/rte_eal.h |  73 +++++++++++++++-
 lib/librte_eal/rte_eal_version.map      |   2 +
 3 files changed, 206 insertions(+), 13 deletions(-)
  

Comments

Anatoly Burakov Jan. 13, 2018, 1:41 p.m. UTC | #1
On 11-Jan-18 4:07 AM, Jianfeng Tan wrote:
> ---
>   lib/librte_eal/common/eal_common_proc.c | 144 +++++++++++++++++++++++++++++---
>   lib/librte_eal/common/include/rte_eal.h |  73 +++++++++++++++-
>   lib/librte_eal/rte_eal_version.map      |   2 +
>   3 files changed, 206 insertions(+), 13 deletions(-)
> 
> diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
> index 70519cc..f194a52 100644
> --- a/lib/librte_eal/common/eal_common_proc.c
> +++ b/lib/librte_eal/common/eal_common_proc.c
> @@ -32,6 +32,7 @@
>   static int mp_fd = -1;
>   static char *mp_sec_sockets[MAX_SECONDARY_PROCS];
>   static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
> +static pthread_mutex_t mp_mutex_request = PTHREAD_MUTEX_INITIALIZER;
>   
>   struct action_entry {
>   	TAILQ_ENTRY(action_entry) next;      /**< Next attached action entry */
> @@ -49,6 +50,10 @@ static struct action_entry_list action_entry_list =
>   
>   struct mp_msghdr {
>   	char action_name[MAX_ACTION_NAME_LEN];
> +#define MP_MSG	0 /* Share message with peers, will not block */
> +#define MP_REQ	1 /* Request for information, Will block for a reply */
> +#define MP_REP	2 /* Reply to previously-received request */

nitpicking, but... response instead of reply?

> +	int type;
>   	int fds_num;
>   	int len_params;
>   	char params[0];
> @@ -138,7 +143,8 @@ rte_eal_mp_action_unregister(const char *name)
>   }
>   
>   static int
> -read_msg(int fd, char *buf, int buflen, int *fds, int fds_num)
> +read_msg(int fd, char *buf, int buflen,
> +	 int *fds, int fds_num, struct sockaddr_un *s)

<snip>

> +	return mp_send(action_name, params, len_params,
> +			fds, fds_num, MP_MSG, NULL);
> +}
> +
> +int
> +rte_eal_mp_request(const char *action_name,
> +		   void *params,
> +		   int len_p,
> +		   int fds[],
> +		   int fds_in,
> +		   int fds_out)

name == NULL? name too long?

> +{
> +	int i, j;
> +	int sockfd;
> +	int nprocs;
> +	int ret = 0;
> +	struct mp_msghdr *req;
> +	struct timeval tv;
> +	char buf[MAX_MSG_LENGTH];
> +	struct mp_msghdr *hdr;
> +
> +	RTE_LOG(DEBUG, EAL, "request: %s\n", action_name);
> +
> +	if (fds_in > SCM_MAX_FD || fds_out > SCM_MAX_FD) {
> +		RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n", SCM_MAX_FD);
> +		rte_errno = -E2BIG;

(this also applies to previous patches) you set rte_errno to -EINVAL in 
format_msg when message with parameters is too big - should that be 
setting -E2BIG as well? Also, maybe not set rte_errno in multiple 
places, and put all parameter checking (or at least errno setting) in 
rte_eal_mp_* functions?

> +		return 0;
> +	}
> +
> +	req = format_msg(action_name, params, len_p, fds_in, MP_REQ);
> +	if (req == NULL)
> +		return 0;
> +
> +	if ((sockfd = open_unix_fd(0)) < 0) {
> +		free(req);
> +		return 0;
> +	}
> +
> +	tv.tv_sec = 5;  /* 5 Secs Timeout */
> +	tv.tv_usec = 0;
> +	if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO,
> +			(const void *)&tv, sizeof(struct timeval)) < 0)
> +		RTE_LOG(INFO, EAL, "Failed to set recv timeout\n");
> +
> +	/* Only allow one req at a time */
> +	pthread_mutex_lock(&mp_mutex_request);
> +
> +	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
> +		nprocs = 0;
> +		for (i = 0; i < MAX_SECONDARY_PROCS; ++i)

What follows is a bit confusing, some comments explaining what happens 
and maybe more informative variable names would've been helpful.

> +			if (!mp_sec_sockets[i]) {
> +				j = i;
> +				nprocs++;
> +			}
> +
> +		if (nprocs > 1) {
> +			RTE_LOG(ERR, EAL,
> +				"multi secondary processes not supported\n");
> +			goto free_and_ret;
> +		}
> +

<snip>
  
Ananyev, Konstantin Jan. 16, 2018, midnight UTC | #2
> 
> We need the synchronous way for multi-process communication,
> i.e., blockingly waiting for reply message when we send a request
> to the peer process.
> 
> We add two APIs rte_eal_mp_request() and rte_eal_mp_reply() for
> such use case. By invoking rte_eal_mp_request(), a request message
> is sent out, and then it waits there for a reply message. The
> timeout is hard-coded 5 Sec. And the replied message will be copied
> in the parameters of this API so that the caller can decide how
> to translate those information (including params and fds). Note
> if a primary process owns multiple secondary processes, this API
> will fail.
> 
> The API rte_eal_mp_reply() is always called by an mp action handler.
> Here we add another parameter for rte_eal_mp_t so that the action
> handler knows which peer address to reply.
> 
> We use mutex in rte_eal_mp_request() to guarantee that only one
> request is on the fly for one pair of processes.

You don't need to do things in such strange and restrictive way.
Instead you can do something like that:
1) Introduce new struct, list for it and mutex 
 struct sync_request {
      int reply_received;
      char dst[PATH_MAX];
      char reply[...];
      LIST_ENTRY(sync_request) next;
};

static struct  
    LIST_HEAD(list, sync_request);
    pthread_mutex_t lock;
   pthead_cond_t cond;
} sync_requests;

2) then at request() call:
  Grab sync_requests.lock
  Check do we already have a pending request for that destination,
  If yes - the release the lock and returns with error.
  - allocate and init new sync_request struct, set reply_received=0
  - do send_msg()
  -then in a cycle:
  pthread_cond_timed_wait(&sync_requests.cond, &sync_request.lock, &timespec);
  - at return from it check if sync_request.reply_received == 1, if not
check if timeout expired and either return a failure or go to the start of the cycle.

3) at mp_handler() if REPLY received - grab sync_request.lock, 
    search through sync_requests.list for dst[] ,
   if found, then set it's reply_received=1, copy the received message into reply
   and call pthread_cond_braodcast((&sync_requests.cond);
  
> 
> Suggested-by: Anatoly Burakov <anatoly.burakov@intel.com>
> Signed-off-by: Jianfeng Tan <jianfeng.tan@intel.com>
> ---
>  lib/librte_eal/common/eal_common_proc.c | 144 +++++++++++++++++++++++++++++---
>  lib/librte_eal/common/include/rte_eal.h |  73 +++++++++++++++-
>  lib/librte_eal/rte_eal_version.map      |   2 +
>  3 files changed, 206 insertions(+), 13 deletions(-)
> 
> diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
> index 70519cc..f194a52 100644
> --- a/lib/librte_eal/common/eal_common_proc.c
> +++ b/lib/librte_eal/common/eal_common_proc.c
> @@ -32,6 +32,7 @@
>  static int mp_fd = -1;
>  static char *mp_sec_sockets[MAX_SECONDARY_PROCS];
>  static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
> +static pthread_mutex_t mp_mutex_request = PTHREAD_MUTEX_INITIALIZER;
> 
>  struct action_entry {
>  	TAILQ_ENTRY(action_entry) next;      /**< Next attached action entry */
> @@ -49,6 +50,10 @@ static struct action_entry_list action_entry_list =
> 
>  struct mp_msghdr {
>  	char action_name[MAX_ACTION_NAME_LEN];
> +#define MP_MSG	0 /* Share message with peers, will not block */
> +#define MP_REQ	1 /* Request for information, Will block for a reply */
> +#define MP_REP	2 /* Reply to previously-received request */

As a nit - please use enum {} instead for the above macros.


> +	int type;
>  	int fds_num;
>  	int len_params;
>  	char params[0];
> @@ -138,7 +143,8 @@ rte_eal_mp_action_unregister(const char *name)
>  }
> 
>  static int
> -read_msg(int fd, char *buf, int buflen, int *fds, int fds_num)
> +read_msg(int fd, char *buf, int buflen,
> +	 int *fds, int fds_num, struct sockaddr_un *s)
>  {
>  	int ret;
>  	struct iovec iov;
> @@ -151,6 +157,8 @@ read_msg(int fd, char *buf, int buflen, int *fds, int fds_num)
>  	iov.iov_base = buf;
>  	iov.iov_len  = buflen;
> 
> +	msgh.msg_name = s;
> +	msgh.msg_namelen = sizeof(*s);
>  	msgh.msg_iov = &iov;
>  	msgh.msg_iovlen = 1;
>  	msgh.msg_control = control;
> @@ -181,7 +189,7 @@ read_msg(int fd, char *buf, int buflen, int *fds, int fds_num)
>  }
> 
>  static int
> -process_msg(struct mp_msghdr *hdr, int len, int fds[])
> +process_msg(struct mp_msghdr *hdr, int len, int fds[], struct sockaddr_un *s)
>  {
>  	int ret;
>  	int params_len;
> @@ -199,10 +207,10 @@ process_msg(struct mp_msghdr *hdr, int len, int fds[])
>  	}
> 
>  	params_len = len - sizeof(struct mp_msghdr);
> -	ret = entry->action(hdr->params, params_len, fds, hdr->fds_num);
> +	ret = entry->action(hdr->params, params_len,
> +			    fds, hdr->fds_num, s->sun_path);
>  	pthread_mutex_unlock(&mp_mutex_action);
>  	return ret;
> -
>  }
> 
>  static void *
> @@ -211,11 +219,12 @@ mp_handle(void *arg __rte_unused)
>  	int len;
>  	int fds[SCM_MAX_FD];
>  	char buf[MAX_MSG_LENGTH];
> +	struct sockaddr_un sa;
> 
>  	while (1) {
> -		len = read_msg(mp_fd, buf, MAX_MSG_LENGTH, fds, SCM_MAX_FD);
> +		len = read_msg(mp_fd, buf, MAX_MSG_LENGTH, fds, SCM_MAX_FD, &sa);
>  		if (len > 0)
> -			process_msg((struct mp_msghdr *)buf, len, fds);
> +			process_msg((struct mp_msghdr *)buf, len, fds, &sa);
>  	}
> 
>  	return NULL;
> @@ -255,7 +264,8 @@ static int
>  mp_primary_proc(const void *params,
>  		int len __rte_unused,
>  		int fds[] __rte_unused,
> -		int fds_num __rte_unused)
> +		int fds_num __rte_unused,
> +		const void *peer __rte_unused)
>  {
>  	const struct proc_request *r = (const struct proc_request *)params;
> 
> @@ -362,7 +372,8 @@ rte_eal_mp_channel_init(void)
>  }
> 
>  static inline struct mp_msghdr *
> -format_msg(const char *act_name, const void *p, int len_params, int fds_num)
> +format_msg(const char *act_name, const void *p,
> +	   int len_params, int fds_num, int type)
>  {
>  	int len_msg;
>  	struct mp_msghdr *msg;
> @@ -384,6 +395,7 @@ format_msg(const char *act_name, const void *p, int len_params, int fds_num)
>  	strcpy(msg->action_name, act_name);
>  	msg->fds_num = fds_num;
>  	msg->len_params = len_params;
> +	msg->type = type;
>  	memcpy(msg->params, p, len_params);
>  	return msg;
>  }
> @@ -455,7 +467,9 @@ mp_send(const char *action_name,
>  	const void *params,
>  	int len_params,
>  	int fds[],
> -	int fds_num)
> +	int fds_num,
> +	int type,
> +	const void *peer)
>  {
>  	int i;
>  	int n = 0;
> @@ -468,7 +482,7 @@ mp_send(const char *action_name,
>  		return 0;
>  	}
> 
> -	msg = format_msg(action_name, params, len_params, fds_num);
> +	msg = format_msg(action_name, params, len_params, fds_num, type);
>  	if (msg == NULL)
>  		return 0;
> 
> @@ -477,6 +491,11 @@ mp_send(const char *action_name,
>  		return 0;
>  	}
> 
> +	if (peer) {
> +		n += send_msg(sockfd, peer, msg, fds);
> +		goto ret;
> +	}
> +
>  	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
>  		/* broadcast to all secondaries */
>  		for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
> @@ -488,6 +507,7 @@ mp_send(const char *action_name,
>  	} else
>  		n += send_msg(sockfd, eal_mp_unix_path(), msg, fds);
> 
> +ret:
>  	free(msg);
>  	close(sockfd);
>  	return n;
> @@ -501,5 +521,107 @@ rte_eal_mp_sendmsg(const char *action_name,
>  		   int fds_num)
>  {
>  	RTE_LOG(DEBUG, EAL, "sendmsg: %s\n", action_name);
> -	return mp_send(action_name, params, len_params, fds, fds_num);
> +	return mp_send(action_name, params, len_params,
> +			fds, fds_num, MP_MSG, NULL);
> +}
> +
> +int
> +rte_eal_mp_request(const char *action_name,
> +		   void *params,
> +		   int len_p,
> +		   int fds[],
> +		   int fds_in,
> +		   int fds_out)
> +{
> +	int i, j;
> +	int sockfd;
> +	int nprocs;
> +	int ret = 0;
> +	struct mp_msghdr *req;
> +	struct timeval tv;
> +	char buf[MAX_MSG_LENGTH];
> +	struct mp_msghdr *hdr;
> +
> +	RTE_LOG(DEBUG, EAL, "request: %s\n", action_name);
> +
> +	if (fds_in > SCM_MAX_FD || fds_out > SCM_MAX_FD) {
> +		RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n", SCM_MAX_FD);
> +		rte_errno = -E2BIG;
> +		return 0;
> +	}
> +
> +	req = format_msg(action_name, params, len_p, fds_in, MP_REQ);
> +	if (req == NULL)
> +		return 0;
> +
> +	if ((sockfd = open_unix_fd(0)) < 0) {
> +		free(req);
> +		return 0;
> +	}
> +
> +	tv.tv_sec = 5;  /* 5 Secs Timeout */
> +	tv.tv_usec = 0;
> +	if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO,
> +			(const void *)&tv, sizeof(struct timeval)) < 0)
> +		RTE_LOG(INFO, EAL, "Failed to set recv timeout\n");
> +
> +	/* Only allow one req at a time */
> +	pthread_mutex_lock(&mp_mutex_request);
> +
> +	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
> +		nprocs = 0;
> +		for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
> +			if (!mp_sec_sockets[i]) {
> +				j = i;
> +				nprocs++;
> +			}
> +
> +		if (nprocs > 1) {
> +			RTE_LOG(ERR, EAL,
> +				"multi secondary processes not supported\n");
> +			goto free_and_ret;
> +		}
> +
> +		ret = send_msg(sockfd, mp_sec_sockets[j], req, fds);
> +	} else
> +		ret = send_msg(sockfd, eal_mp_unix_path(), req, fds);
> +
> +	if (ret == 0) {
> +		RTE_LOG(ERR, EAL, "failed to send request: %s\n", action_name);
> +		ret = -1;
> +		goto free_and_ret;
> +	}
> +
> +	ret = read_msg(sockfd, buf, MAX_MSG_LENGTH, fds, fds_out, NULL);
> +	if (ret > 0) {
> +		hdr = (struct mp_msghdr *)buf;
> +		if (hdr->len_params == len_p)
> +			memcpy(params, hdr->params, len_p);
> +		else {
> +			RTE_LOG(ERR, EAL, "invalid reply\n");
> +			ret = 0;
> +		}
> +	}
> +
> +free_and_ret:
> +	free(req);
> +	close(sockfd);
> +	pthread_mutex_unlock(&mp_mutex_request);
> +	return ret;
> +}
> +
> +int
> +rte_eal_mp_reply(const char *action_name,
> +		 const void *params,
> +		 int len_p,
> +		 int fds[],
> +		 int fds_in,
> +		 const void *peer)
> +{
> +	RTE_LOG(DEBUG, EAL, "reply: %s\n", action_name);
> +	if (peer == NULL) {
> +		RTE_LOG(ERR, EAL, "peer is not specified\n");
> +		return 0;
> +	}
> +	return mp_send(action_name, params, len_p, fds, fds_in, MP_REP, peer);
>  }
> diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
> index 9884c0b..2690a77 100644
> --- a/lib/librte_eal/common/include/rte_eal.h
> +++ b/lib/librte_eal/common/include/rte_eal.h
> @@ -192,7 +192,7 @@ int rte_eal_primary_proc_alive(const char *config_file_path);
>   * this function typedef to register action for coming messages.
>   */
>  typedef int (*rte_eal_mp_t)(const void *params, int len,
> -			    int fds[], int fds_num);
> +			    int fds[], int fds_num, const void *peer);
> 
>  /**
>   * Register an action function for primary/secondary communication.
> @@ -245,7 +245,7 @@ void rte_eal_mp_action_unregister(const char *name);
>   *   The fds argument is an array of fds sent with sendmsg.
>   *
>   * @param fds_num
> - *   The fds_num argument is number of fds to be sent with sendmsg.
> + *   The number of fds to be sent with sendmsg.
>   *
>   * @return
>   *  - Returns the number of messages being sent successfully.
> @@ -255,6 +255,75 @@ rte_eal_mp_sendmsg(const char *action_name, const void *params,
>  		   int len_params, int fds[], int fds_num);
> 
>  /**
> + * Send a request to the peer process and expect a reply.
> + *
> + * This function sends a request message to the peer process, and will
> + * block until receiving reply message from the peer process. Note:
> + * this does not work for the primary process sending requests to its
> + * multiple (>1) secondary processes.
> + *
> + * @param action_name
> + *   The action_name argument is used to identify which action will be used.
> + *
> + * @param params
> + *   The params argument contains the customized message; as the reply is
> + *   received, the replied params will be copied to this pointer.
> + *
> + * @param len_p
> + *   The length of the customized message.
> + *
> + * @param fds
> + *   The fds argument is an array of fds sent with sendmsg; as the reply
> + *   is received, the replied fds will be copied into this array.
> + *
> + * @param fds_in
> + *   The number of fds to be sent.
> + *
> + * @param fds_out
> + *   The number of fds to be received.
> + *
> + * @return
> + *  - (1) on success;
> + *  - (0) on sending request successfully but no valid reply received.
> + *  - (<0) on failing to sending request.
> + */
> +int
> +rte_eal_mp_request(const char *action_name, void *params,
> +		   int len_p, int fds[], int fds_in, int fds_out);
> +
> +/**
> + * Send a reply to the peer process.
> + *
> + * This function will send a reply message in response to a request message
> + * received previously.
> + *
> + * @param action_name
> + *   The action_name argument is used to identify which action will be used.
> + *
> + * @param params
> + *   The params argument contains the customized message.
> + *
> + * @param len_p
> + *   The length of the customized message.
> + *
> + * @param fds
> + *   The fds argument is an array of fds sent with sendmsg.
> + *
> + * @param fds_in
> + *   The number of fds to be sent with sendmsg.
> + *
> + * @param peer
> + *   The fds_num argument is number of fds to be sent with sendmsg.
> + *
> + * @return
> + *  - (1) on success;
> + *  - (0) on failure.
> + */
> +int
> +rte_eal_mp_reply(const char *action_name, const void *params,
> +		 int len_p, int fds[], int fds_in, const void *peer);
> +
> +/**
>   * Usage function typedef used by the application usage function.
>   *
>   * Use this function typedef to define and call rte_set_application_usage_hook()
> diff --git a/lib/librte_eal/rte_eal_version.map b/lib/librte_eal/rte_eal_version.map
> index 5dacde5..068ac0b 100644
> --- a/lib/librte_eal/rte_eal_version.map
> +++ b/lib/librte_eal/rte_eal_version.map
> @@ -243,5 +243,7 @@ DPDK_18.02 {
>  	rte_eal_mp_action_register;
>  	rte_eal_mp_action_unregister;
>  	rte_eal_mp_sendmsg;
> +	rte_eal_mp_request;
> +	rte_eal_mp_reply;
> 
>  } DPDK_17.11;
> --
> 2.7.4
  
Jianfeng Tan Jan. 16, 2018, 8:10 a.m. UTC | #3
Thank you, Konstantin and Anatoly firstly. Other comments are well 
received and I'll send out a new version.


On 1/16/2018 8:00 AM, Ananyev, Konstantin wrote:
>
>> We need the synchronous way for multi-process communication,
>> i.e., blockingly waiting for reply message when we send a request
>> to the peer process.
>>
>> We add two APIs rte_eal_mp_request() and rte_eal_mp_reply() for
>> such use case. By invoking rte_eal_mp_request(), a request message
>> is sent out, and then it waits there for a reply message. The
>> timeout is hard-coded 5 Sec. And the replied message will be copied
>> in the parameters of this API so that the caller can decide how
>> to translate those information (including params and fds). Note
>> if a primary process owns multiple secondary processes, this API
>> will fail.
>>
>> The API rte_eal_mp_reply() is always called by an mp action handler.
>> Here we add another parameter for rte_eal_mp_t so that the action
>> handler knows which peer address to reply.
>>
>> We use mutex in rte_eal_mp_request() to guarantee that only one
>> request is on the fly for one pair of processes.
> You don't need to do things in such strange and restrictive way.
> Instead you can do something like that:
> 1) Introduce new struct, list for it and mutex
>   struct sync_request {
>        int reply_received;
>        char dst[PATH_MAX];
>        char reply[...];
>        LIST_ENTRY(sync_request) next;
> };
>
> static struct
>      LIST_HEAD(list, sync_request);
>      pthread_mutex_t lock;
>     pthead_cond_t cond;
> } sync_requests;
>
> 2) then at request() call:
>    Grab sync_requests.lock
>    Check do we already have a pending request for that destination,
>    If yes - the release the lock and returns with error.
>    - allocate and init new sync_request struct, set reply_received=0
>    - do send_msg()
>    -then in a cycle:
>    pthread_cond_timed_wait(&sync_requests.cond, &sync_request.lock, &timespec);
>    - at return from it check if sync_request.reply_received == 1, if not
> check if timeout expired and either return a failure or go to the start of the cycle.
>
> 3) at mp_handler() if REPLY received - grab sync_request.lock,
>      search through sync_requests.list for dst[] ,
>     if found, then set it's reply_received=1, copy the received message into reply
>     and call pthread_cond_braodcast((&sync_requests.cond);

The only benefit I can see is that now the sender can request to 
multiple receivers at the same time. And it makes things more 
complicated. Do we really need this?

Thanks,
Jianfeng
  
Ananyev, Konstantin Jan. 16, 2018, 11:12 a.m. UTC | #4
Hi Jianfeng,

> -----Original Message-----
> From: Tan, Jianfeng
> Sent: Tuesday, January 16, 2018 8:11 AM
> To: Ananyev, Konstantin <konstantin.ananyev@intel.com>; dev@dpdk.org; Burakov, Anatoly <anatoly.burakov@intel.com>
> Cc: Richardson, Bruce <bruce.richardson@intel.com>; thomas@monjalon.net
> Subject: Re: [PATCH v2 3/4] eal: add synchronous multi-process communication
> 
> Thank you, Konstantin and Anatoly firstly. Other comments are well
> received and I'll send out a new version.
> 
> 
> On 1/16/2018 8:00 AM, Ananyev, Konstantin wrote:
> >
> >> We need the synchronous way for multi-process communication,
> >> i.e., blockingly waiting for reply message when we send a request
> >> to the peer process.
> >>
> >> We add two APIs rte_eal_mp_request() and rte_eal_mp_reply() for
> >> such use case. By invoking rte_eal_mp_request(), a request message
> >> is sent out, and then it waits there for a reply message. The
> >> timeout is hard-coded 5 Sec. And the replied message will be copied
> >> in the parameters of this API so that the caller can decide how
> >> to translate those information (including params and fds). Note
> >> if a primary process owns multiple secondary processes, this API
> >> will fail.
> >>
> >> The API rte_eal_mp_reply() is always called by an mp action handler.
> >> Here we add another parameter for rte_eal_mp_t so that the action
> >> handler knows which peer address to reply.
> >>
> >> We use mutex in rte_eal_mp_request() to guarantee that only one
> >> request is on the fly for one pair of processes.
> > You don't need to do things in such strange and restrictive way.
> > Instead you can do something like that:
> > 1) Introduce new struct, list for it and mutex
> >   struct sync_request {
> >        int reply_received;
> >        char dst[PATH_MAX];
> >        char reply[...];
> >        LIST_ENTRY(sync_request) next;
> > };
> >
> > static struct
> >      LIST_HEAD(list, sync_request);
> >      pthread_mutex_t lock;
> >     pthead_cond_t cond;
> > } sync_requests;
> >
> > 2) then at request() call:
> >    Grab sync_requests.lock
> >    Check do we already have a pending request for that destination,
> >    If yes - the release the lock and returns with error.
> >    - allocate and init new sync_request struct, set reply_received=0
> >    - do send_msg()
> >    -then in a cycle:
> >    pthread_cond_timed_wait(&sync_requests.cond, &sync_request.lock, &timespec);
> >    - at return from it check if sync_request.reply_received == 1, if not
> > check if timeout expired and either return a failure or go to the start of the cycle.
> >
> > 3) at mp_handler() if REPLY received - grab sync_request.lock,
> >      search through sync_requests.list for dst[] ,
> >     if found, then set it's reply_received=1, copy the received message into reply
> >     and call pthread_cond_braodcast((&sync_requests.cond);
> 
> The only benefit I can see is that now the sender can request to
> multiple receivers at the same time. And it makes things more
> complicated. Do we really need this?

The benefit is that one thread is blocked waiting for response,
your mp_handler can still receive and handle other messages.
Plus as you said - other threads can keep sending messages.
Konstantin 

> 
> Thanks,
> Jianfeng
  
Jianfeng Tan Jan. 16, 2018, 4:47 p.m. UTC | #5
On 1/16/2018 7:12 PM, Ananyev, Konstantin wrote:
> Hi Jianfeng,
>
>> -----Original Message-----
>> From: Tan, Jianfeng
>> Sent: Tuesday, January 16, 2018 8:11 AM
>> To: Ananyev, Konstantin <konstantin.ananyev@intel.com>; dev@dpdk.org; Burakov, Anatoly <anatoly.burakov@intel.com>
>> Cc: Richardson, Bruce <bruce.richardson@intel.com>; thomas@monjalon.net
>> Subject: Re: [PATCH v2 3/4] eal: add synchronous multi-process communication
>>
>> Thank you, Konstantin and Anatoly firstly. Other comments are well
>> received and I'll send out a new version.
>>
>>
>> On 1/16/2018 8:00 AM, Ananyev, Konstantin wrote:
>>>> We need the synchronous way for multi-process communication,
>>>> i.e., blockingly waiting for reply message when we send a request
>>>> to the peer process.
>>>>
>>>> We add two APIs rte_eal_mp_request() and rte_eal_mp_reply() for
>>>> such use case. By invoking rte_eal_mp_request(), a request message
>>>> is sent out, and then it waits there for a reply message. The
>>>> timeout is hard-coded 5 Sec. And the replied message will be copied
>>>> in the parameters of this API so that the caller can decide how
>>>> to translate those information (including params and fds). Note
>>>> if a primary process owns multiple secondary processes, this API
>>>> will fail.
>>>>
>>>> The API rte_eal_mp_reply() is always called by an mp action handler.
>>>> Here we add another parameter for rte_eal_mp_t so that the action
>>>> handler knows which peer address to reply.
>>>>
>>>> We use mutex in rte_eal_mp_request() to guarantee that only one
>>>> request is on the fly for one pair of processes.
>>> You don't need to do things in such strange and restrictive way.
>>> Instead you can do something like that:
>>> 1) Introduce new struct, list for it and mutex
>>>    struct sync_request {
>>>         int reply_received;
>>>         char dst[PATH_MAX];
>>>         char reply[...];
>>>         LIST_ENTRY(sync_request) next;
>>> };
>>>
>>> static struct
>>>       LIST_HEAD(list, sync_request);
>>>       pthread_mutex_t lock;
>>>      pthead_cond_t cond;
>>> } sync_requests;
>>>
>>> 2) then at request() call:
>>>     Grab sync_requests.lock
>>>     Check do we already have a pending request for that destination,
>>>     If yes - the release the lock and returns with error.
>>>     - allocate and init new sync_request struct, set reply_received=0
>>>     - do send_msg()
>>>     -then in a cycle:
>>>     pthread_cond_timed_wait(&sync_requests.cond, &sync_request.lock, &timespec);
>>>     - at return from it check if sync_request.reply_received == 1, if not
>>> check if timeout expired and either return a failure or go to the start of the cycle.
>>>
>>> 3) at mp_handler() if REPLY received - grab sync_request.lock,
>>>       search through sync_requests.list for dst[] ,
>>>      if found, then set it's reply_received=1, copy the received message into reply
>>>      and call pthread_cond_braodcast((&sync_requests.cond);
>> The only benefit I can see is that now the sender can request to
>> multiple receivers at the same time. And it makes things more
>> complicated. Do we really need this?
> The benefit is that one thread is blocked waiting for response,
> your mp_handler can still receive and handle other messages.

This can already be done in the original implementation. mp_handler 
listens for msg, request from the other peer(s), and replies the 
requests, which is not affected.

> Plus as you said - other threads can keep sending messages.

For this one, in the original implementation, other threads can still 
send msg, but not request. I suppose the request is not in a fast path, 
why we care to make it fast?

Thanks,
Jianfeng

> Konstantin
>
>> Thanks,
>> Jianfeng
  
Ananyev, Konstantin Jan. 17, 2018, 10:50 a.m. UTC | #6
> > Hi Jianfeng,
> >
> >> -----Original Message-----
> >> From: Tan, Jianfeng
> >> Sent: Tuesday, January 16, 2018 8:11 AM
> >> To: Ananyev, Konstantin <konstantin.ananyev@intel.com>; dev@dpdk.org; Burakov, Anatoly <anatoly.burakov@intel.com>
> >> Cc: Richardson, Bruce <bruce.richardson@intel.com>; thomas@monjalon.net
> >> Subject: Re: [PATCH v2 3/4] eal: add synchronous multi-process communication
> >>
> >> Thank you, Konstantin and Anatoly firstly. Other comments are well
> >> received and I'll send out a new version.
> >>
> >>
> >> On 1/16/2018 8:00 AM, Ananyev, Konstantin wrote:
> >>>> We need the synchronous way for multi-process communication,
> >>>> i.e., blockingly waiting for reply message when we send a request
> >>>> to the peer process.
> >>>>
> >>>> We add two APIs rte_eal_mp_request() and rte_eal_mp_reply() for
> >>>> such use case. By invoking rte_eal_mp_request(), a request message
> >>>> is sent out, and then it waits there for a reply message. The
> >>>> timeout is hard-coded 5 Sec. And the replied message will be copied
> >>>> in the parameters of this API so that the caller can decide how
> >>>> to translate those information (including params and fds). Note
> >>>> if a primary process owns multiple secondary processes, this API
> >>>> will fail.
> >>>>
> >>>> The API rte_eal_mp_reply() is always called by an mp action handler.
> >>>> Here we add another parameter for rte_eal_mp_t so that the action
> >>>> handler knows which peer address to reply.
> >>>>
> >>>> We use mutex in rte_eal_mp_request() to guarantee that only one
> >>>> request is on the fly for one pair of processes.
> >>> You don't need to do things in such strange and restrictive way.
> >>> Instead you can do something like that:
> >>> 1) Introduce new struct, list for it and mutex
> >>>    struct sync_request {
> >>>         int reply_received;
> >>>         char dst[PATH_MAX];
> >>>         char reply[...];
> >>>         LIST_ENTRY(sync_request) next;
> >>> };
> >>>
> >>> static struct
> >>>       LIST_HEAD(list, sync_request);
> >>>       pthread_mutex_t lock;
> >>>      pthead_cond_t cond;
> >>> } sync_requests;
> >>>
> >>> 2) then at request() call:
> >>>     Grab sync_requests.lock
> >>>     Check do we already have a pending request for that destination,
> >>>     If yes - the release the lock and returns with error.
> >>>     - allocate and init new sync_request struct, set reply_received=0
> >>>     - do send_msg()
> >>>     -then in a cycle:
> >>>     pthread_cond_timed_wait(&sync_requests.cond, &sync_request.lock, &timespec);
> >>>     - at return from it check if sync_request.reply_received == 1, if not
> >>> check if timeout expired and either return a failure or go to the start of the cycle.
> >>>
> >>> 3) at mp_handler() if REPLY received - grab sync_request.lock,
> >>>       search through sync_requests.list for dst[] ,
> >>>      if found, then set it's reply_received=1, copy the received message into reply
> >>>      and call pthread_cond_braodcast((&sync_requests.cond);
> >> The only benefit I can see is that now the sender can request to
> >> multiple receivers at the same time. And it makes things more
> >> complicated. Do we really need this?
> > The benefit is that one thread is blocked waiting for response,
> > your mp_handler can still receive and handle other messages.
> 
> This can already be done in the original implementation. mp_handler
> listens for msg, request from the other peer(s), and replies the
> requests, which is not affected.
> 
> > Plus as you said - other threads can keep sending messages.
> 
> For this one, in the original implementation, other threads can still
> send msg, but not request. I suppose the request is not in a fast path,
> why we care to make it fast?
> 

+int
+rte_eal_mp_request(const char *action_name,
+		   void *params,
+		   int len_p,
+		   int fds[],
+		   int fds_in,
+		   int fds_out)
+{
+	int i, j;
+	int sockfd;
+	int nprocs;
+	int ret = 0;
+	struct mp_msghdr *req;
+	struct timeval tv;
+	char buf[MAX_MSG_LENGTH];
+	struct mp_msghdr *hdr;
+
+	RTE_LOG(DEBUG, EAL, "request: %s\n", action_name);
+
+	if (fds_in > SCM_MAX_FD || fds_out > SCM_MAX_FD) {
+		RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n", SCM_MAX_FD);
+		rte_errno = -E2BIG;
+		return 0;
+	}
+
+	req = format_msg(action_name, params, len_p, fds_in, MP_REQ);
+	if (req == NULL)
+		return 0;
+
+	if ((sockfd = open_unix_fd(0)) < 0) {
+		free(req);
+		return 0;
+	}
+
+	tv.tv_sec = 5;  /* 5 Secs Timeout */
+	tv.tv_usec = 0;
+	if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO,
+			(const void *)&tv, sizeof(struct timeval)) < 0)
+		RTE_LOG(INFO, EAL, "Failed to set recv timeout\n");

I f you set it just for one call, why do you not restore it?
Also I don't think it is a good idea to change it here - 
if you'll make timeout a parameter value - then it could be overwritten
by different threads. 

+
+	/* Only allow one req at a time */
+	pthread_mutex_lock(&mp_mutex_request);
+
+	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
+		nprocs = 0;
+		for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
+			if (!mp_sec_sockets[i]) {
+				j = i;
+				nprocs++;
+			}
+
+		if (nprocs > 1) {
+			RTE_LOG(ERR, EAL,
+				"multi secondary processes not supported\n");
+			goto free_and_ret;
+		}
+
+		ret = send_msg(sockfd, mp_sec_sockets[j], req, fds);

As I remember - sndmsg() is also blocking call, so under some conditions you can stall
there forever.
As mp_mutex_requestis still held - next rte_eal_mp_request(0 will also block forever here.

+	} else
+		ret = send_msg(sockfd, eal_mp_unix_path(), req, fds);
+
+	if (ret == 0) {
+		RTE_LOG(ERR, EAL, "failed to send request: %s\n", action_name);
+		ret = -1;
+		goto free_and_ret;
+	}
+
+	ret = read_msg(sockfd, buf, MAX_MSG_LENGTH, fds, fds_out, NULL);

if the message you receive is not a reply you are expecting -
it will be simply dropped - mp_handler() would never process it.

+	if (ret > 0) {
+		hdr = (struct mp_msghdr *)buf;
+		if (hdr->len_params == len_p)
+			memcpy(params, hdr->params, len_p);
+		else {
+			RTE_LOG(ERR, EAL, "invalid reply\n");
+			ret = 0;
+		}
+	}
+
+free_and_ret:
+	free(req);
+	close(sockfd);
+	pthread_mutex_unlock(&mp_mutex_request);
+	return ret;
+}

All of the above makes me think that current implementation is erroneous
and needs to be reworked.
Konstantin
  
Jianfeng Tan Jan. 17, 2018, 1:09 p.m. UTC | #7
On 1/17/2018 6:50 PM, Ananyev, Konstantin wrote:
>
>>> Hi Jianfeng,
>>>
>>>> -----Original Message-----
>>>> From: Tan, Jianfeng
>>>> Sent: Tuesday, January 16, 2018 8:11 AM
>>>> To: Ananyev, Konstantin <konstantin.ananyev@intel.com>; dev@dpdk.org; Burakov, Anatoly <anatoly.burakov@intel.com>
>>>> Cc: Richardson, Bruce <bruce.richardson@intel.com>; thomas@monjalon.net
>>>> Subject: Re: [PATCH v2 3/4] eal: add synchronous multi-process communication
>>>>
>>>> Thank you, Konstantin and Anatoly firstly. Other comments are well
>>>> received and I'll send out a new version.
>>>>
>>>>
>>>> On 1/16/2018 8:00 AM, Ananyev, Konstantin wrote:
>>>>>> We need the synchronous way for multi-process communication,
>>>>>> i.e., blockingly waiting for reply message when we send a request
>>>>>> to the peer process.
>>>>>>
>>>>>> We add two APIs rte_eal_mp_request() and rte_eal_mp_reply() for
>>>>>> such use case. By invoking rte_eal_mp_request(), a request message
>>>>>> is sent out, and then it waits there for a reply message. The
>>>>>> timeout is hard-coded 5 Sec. And the replied message will be copied
>>>>>> in the parameters of this API so that the caller can decide how
>>>>>> to translate those information (including params and fds). Note
>>>>>> if a primary process owns multiple secondary processes, this API
>>>>>> will fail.
>>>>>>
>>>>>> The API rte_eal_mp_reply() is always called by an mp action handler.
>>>>>> Here we add another parameter for rte_eal_mp_t so that the action
>>>>>> handler knows which peer address to reply.
>>>>>>
>>>>>> We use mutex in rte_eal_mp_request() to guarantee that only one
>>>>>> request is on the fly for one pair of processes.
>>>>> You don't need to do things in such strange and restrictive way.
>>>>> Instead you can do something like that:
>>>>> 1) Introduce new struct, list for it and mutex
>>>>>     struct sync_request {
>>>>>          int reply_received;
>>>>>          char dst[PATH_MAX];
>>>>>          char reply[...];
>>>>>          LIST_ENTRY(sync_request) next;
>>>>> };
>>>>>
>>>>> static struct
>>>>>        LIST_HEAD(list, sync_request);
>>>>>        pthread_mutex_t lock;
>>>>>       pthead_cond_t cond;
>>>>> } sync_requests;
>>>>>
>>>>> 2) then at request() call:
>>>>>      Grab sync_requests.lock
>>>>>      Check do we already have a pending request for that destination,
>>>>>      If yes - the release the lock and returns with error.
>>>>>      - allocate and init new sync_request struct, set reply_received=0
>>>>>      - do send_msg()
>>>>>      -then in a cycle:
>>>>>      pthread_cond_timed_wait(&sync_requests.cond, &sync_request.lock, &timespec);
>>>>>      - at return from it check if sync_request.reply_received == 1, if not
>>>>> check if timeout expired and either return a failure or go to the start of the cycle.
>>>>>
>>>>> 3) at mp_handler() if REPLY received - grab sync_request.lock,
>>>>>        search through sync_requests.list for dst[] ,
>>>>>       if found, then set it's reply_received=1, copy the received message into reply
>>>>>       and call pthread_cond_braodcast((&sync_requests.cond);
>>>> The only benefit I can see is that now the sender can request to
>>>> multiple receivers at the same time. And it makes things more
>>>> complicated. Do we really need this?
>>> The benefit is that one thread is blocked waiting for response,
>>> your mp_handler can still receive and handle other messages.
>> This can already be done in the original implementation. mp_handler
>> listens for msg, request from the other peer(s), and replies the
>> requests, which is not affected.
>>
>>> Plus as you said - other threads can keep sending messages.
>> For this one, in the original implementation, other threads can still
>> send msg, but not request. I suppose the request is not in a fast path,
>> why we care to make it fast?
>>
> +int
> +rte_eal_mp_request(const char *action_name,
> +		   void *params,
> +		   int len_p,
> +		   int fds[],
> +		   int fds_in,
> +		   int fds_out)
> +{
> +	int i, j;
> +	int sockfd;
> +	int nprocs;
> +	int ret = 0;
> +	struct mp_msghdr *req;
> +	struct timeval tv;
> +	char buf[MAX_MSG_LENGTH];
> +	struct mp_msghdr *hdr;
> +
> +	RTE_LOG(DEBUG, EAL, "request: %s\n", action_name);
> +
> +	if (fds_in > SCM_MAX_FD || fds_out > SCM_MAX_FD) {
> +		RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n", SCM_MAX_FD);
> +		rte_errno = -E2BIG;
> +		return 0;
> +	}
> +
> +	req = format_msg(action_name, params, len_p, fds_in, MP_REQ);
> +	if (req == NULL)
> +		return 0;
> +
> +	if ((sockfd = open_unix_fd(0)) < 0) {
> +		free(req);
> +		return 0;
> +	}
> +
> +	tv.tv_sec = 5;  /* 5 Secs Timeout */
> +	tv.tv_usec = 0;
> +	if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO,
> +			(const void *)&tv, sizeof(struct timeval)) < 0)
> +		RTE_LOG(INFO, EAL, "Failed to set recv timeout\n");
>
> I f you set it just for one call, why do you not restore it?

Yes, original code is buggy, I should have put it into the critical section.

Do you mean we just create once and use for ever? if yes, we could put 
the open and setting into mp_init().

> Also I don't think it is a good idea to change it here -
> if you'll make timeout a parameter value - then it could be overwritten
> by different threads.

For simplicity, I'm not inclined to put the timeout as an parameter 
exposing to caller. So if you agree, I'll put it into the mp_init() with 
open.

>
> +
> +	/* Only allow one req at a time */
> +	pthread_mutex_lock(&mp_mutex_request);
> +
> +	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
> +		nprocs = 0;
> +		for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
> +			if (!mp_sec_sockets[i]) {
> +				j = i;
> +				nprocs++;
> +			}
> +
> +		if (nprocs > 1) {
> +			RTE_LOG(ERR, EAL,
> +				"multi secondary processes not supported\n");
> +			goto free_and_ret;
> +		}
> +
> +		ret = send_msg(sockfd, mp_sec_sockets[j], req, fds);
>
> As I remember - sndmsg() is also blocking call, so under some conditions you can stall
> there forever.

 From linux's unix_diagram_sendmsg(), we see:
     timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);

I assume it will not block for datagram unix socket in Linux. But I'm 
not sure what it behaves in freebsd.

Anyway, better to add an explicit setsockopt() to make it not blocking.

> As mp_mutex_requestis still held - next rte_eal_mp_request(0 will also block forever here.
>
> +	} else
> +		ret = send_msg(sockfd, eal_mp_unix_path(), req, fds);
> +
> +	if (ret == 0) {
> +		RTE_LOG(ERR, EAL, "failed to send request: %s\n", action_name);
> +		ret = -1;
> +		goto free_and_ret;
> +	}
> +
> +	ret = read_msg(sockfd, buf, MAX_MSG_LENGTH, fds, fds_out, NULL);
>
> if the message you receive is not a reply you are expecting -
> it will be simply dropped - mp_handler() would never process it.

We cannot detect if it's the right reply absolutely correctly, but just 
check the action_name, which means, it still possibly gets a wrong reply 
if an action_name contains multiple requests.

Is just comparing the action_name acceptable?

>
> +	if (ret > 0) {
> +		hdr = (struct mp_msghdr *)buf;
> +		if (hdr->len_params == len_p)
> +			memcpy(params, hdr->params, len_p);
> +		else {
> +			RTE_LOG(ERR, EAL, "invalid reply\n");
> +			ret = 0;
> +		}
> +	}
> +
> +free_and_ret:
> +	free(req);
> +	close(sockfd);
> +	pthread_mutex_unlock(&mp_mutex_request);
> +	return ret;
> +}
>
> All of the above makes me think that current implementation is erroneous
> and needs to be reworked.

Thank you for your review. I'll work on a new version.

Thanks,
Jianfeng

> Konstantin
>
>
  
Jianfeng Tan Jan. 17, 2018, 1:15 p.m. UTC | #8
On 1/17/2018 9:09 PM, Tan, Jianfeng wrote:
>
>
> On 1/17/2018 6:50 PM, Ananyev, Konstantin wrote:
>
[...]
>> +int
>> +rte_eal_mp_request(const char *action_name,
>> +           void *params,
>> +           int len_p,
>> +           int fds[],
>> +           int fds_in,
>> +           int fds_out)
>> +{
>> +    int i, j;
>> +    int sockfd;
>> +    int nprocs;
>> +    int ret = 0;
>> +    struct mp_msghdr *req;
>> +    struct timeval tv;
>> +    char buf[MAX_MSG_LENGTH];
>> +    struct mp_msghdr *hdr;
>> +
>> +    RTE_LOG(DEBUG, EAL, "request: %s\n", action_name);
>> +
>> +    if (fds_in > SCM_MAX_FD || fds_out > SCM_MAX_FD) {
>> +        RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n", 
>> SCM_MAX_FD);
>> +        rte_errno = -E2BIG;
>> +        return 0;
>> +    }
>> +
>> +    req = format_msg(action_name, params, len_p, fds_in, MP_REQ);
>> +    if (req == NULL)
>> +        return 0;
>> +
>> +    if ((sockfd = open_unix_fd(0)) < 0) {
>> +        free(req);
>> +        return 0;
>> +    }
>> +
>> +    tv.tv_sec = 5;  /* 5 Secs Timeout */
>> +    tv.tv_usec = 0;
>> +    if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO,
>> +            (const void *)&tv, sizeof(struct timeval)) < 0)
>> +        RTE_LOG(INFO, EAL, "Failed to set recv timeout\n");
>>
>> I f you set it just for one call, why do you not restore it?
>
> Yes, original code is buggy, I should have put it into the critical 
> section.
>
> Do you mean we just create once and use for ever? if yes, we could put 
> the open and setting into mp_init().

A second thought, we shall not put the setting into mp_init(). It'll be 
set to non-blocking as of sending msg, but blocking as of receiving msg.

Thanks,
Jianfeng
  
Ananyev, Konstantin Jan. 17, 2018, 5:20 p.m. UTC | #9
> 
> 
> On 1/17/2018 6:50 PM, Ananyev, Konstantin wrote:
> >
> >>> Hi Jianfeng,
> >>>
> >>>> -----Original Message-----
> >>>> From: Tan, Jianfeng
> >>>> Sent: Tuesday, January 16, 2018 8:11 AM
> >>>> To: Ananyev, Konstantin <konstantin.ananyev@intel.com>; dev@dpdk.org; Burakov, Anatoly <anatoly.burakov@intel.com>
> >>>> Cc: Richardson, Bruce <bruce.richardson@intel.com>; thomas@monjalon.net
> >>>> Subject: Re: [PATCH v2 3/4] eal: add synchronous multi-process communication
> >>>>
> >>>> Thank you, Konstantin and Anatoly firstly. Other comments are well
> >>>> received and I'll send out a new version.
> >>>>
> >>>>
> >>>> On 1/16/2018 8:00 AM, Ananyev, Konstantin wrote:
> >>>>>> We need the synchronous way for multi-process communication,
> >>>>>> i.e., blockingly waiting for reply message when we send a request
> >>>>>> to the peer process.
> >>>>>>
> >>>>>> We add two APIs rte_eal_mp_request() and rte_eal_mp_reply() for
> >>>>>> such use case. By invoking rte_eal_mp_request(), a request message
> >>>>>> is sent out, and then it waits there for a reply message. The
> >>>>>> timeout is hard-coded 5 Sec. And the replied message will be copied
> >>>>>> in the parameters of this API so that the caller can decide how
> >>>>>> to translate those information (including params and fds). Note
> >>>>>> if a primary process owns multiple secondary processes, this API
> >>>>>> will fail.
> >>>>>>
> >>>>>> The API rte_eal_mp_reply() is always called by an mp action handler.
> >>>>>> Here we add another parameter for rte_eal_mp_t so that the action
> >>>>>> handler knows which peer address to reply.
> >>>>>>
> >>>>>> We use mutex in rte_eal_mp_request() to guarantee that only one
> >>>>>> request is on the fly for one pair of processes.
> >>>>> You don't need to do things in such strange and restrictive way.
> >>>>> Instead you can do something like that:
> >>>>> 1) Introduce new struct, list for it and mutex
> >>>>>     struct sync_request {
> >>>>>          int reply_received;
> >>>>>          char dst[PATH_MAX];
> >>>>>          char reply[...];
> >>>>>          LIST_ENTRY(sync_request) next;
> >>>>> };
> >>>>>
> >>>>> static struct
> >>>>>        LIST_HEAD(list, sync_request);
> >>>>>        pthread_mutex_t lock;
> >>>>>       pthead_cond_t cond;
> >>>>> } sync_requests;
> >>>>>
> >>>>> 2) then at request() call:
> >>>>>      Grab sync_requests.lock
> >>>>>      Check do we already have a pending request for that destination,
> >>>>>      If yes - the release the lock and returns with error.
> >>>>>      - allocate and init new sync_request struct, set reply_received=0
> >>>>>      - do send_msg()
> >>>>>      -then in a cycle:
> >>>>>      pthread_cond_timed_wait(&sync_requests.cond, &sync_request.lock, &timespec);
> >>>>>      - at return from it check if sync_request.reply_received == 1, if not
> >>>>> check if timeout expired and either return a failure or go to the start of the cycle.
> >>>>>
> >>>>> 3) at mp_handler() if REPLY received - grab sync_request.lock,
> >>>>>        search through sync_requests.list for dst[] ,
> >>>>>       if found, then set it's reply_received=1, copy the received message into reply
> >>>>>       and call pthread_cond_braodcast((&sync_requests.cond);
> >>>> The only benefit I can see is that now the sender can request to
> >>>> multiple receivers at the same time. And it makes things more
> >>>> complicated. Do we really need this?
> >>> The benefit is that one thread is blocked waiting for response,
> >>> your mp_handler can still receive and handle other messages.
> >> This can already be done in the original implementation. mp_handler
> >> listens for msg, request from the other peer(s), and replies the
> >> requests, which is not affected.
> >>
> >>> Plus as you said - other threads can keep sending messages.
> >> For this one, in the original implementation, other threads can still
> >> send msg, but not request. I suppose the request is not in a fast path,
> >> why we care to make it fast?
> >>
> > +int
> > +rte_eal_mp_request(const char *action_name,
> > +		   void *params,
> > +		   int len_p,
> > +		   int fds[],
> > +		   int fds_in,
> > +		   int fds_out)
> > +{
> > +	int i, j;
> > +	int sockfd;
> > +	int nprocs;
> > +	int ret = 0;
> > +	struct mp_msghdr *req;
> > +	struct timeval tv;
> > +	char buf[MAX_MSG_LENGTH];
> > +	struct mp_msghdr *hdr;
> > +
> > +	RTE_LOG(DEBUG, EAL, "request: %s\n", action_name);
> > +
> > +	if (fds_in > SCM_MAX_FD || fds_out > SCM_MAX_FD) {
> > +		RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n", SCM_MAX_FD);
> > +		rte_errno = -E2BIG;
> > +		return 0;
> > +	}
> > +
> > +	req = format_msg(action_name, params, len_p, fds_in, MP_REQ);
> > +	if (req == NULL)
> > +		return 0;
> > +
> > +	if ((sockfd = open_unix_fd(0)) < 0) {
> > +		free(req);
> > +		return 0;
> > +	}
> > +
> > +	tv.tv_sec = 5;  /* 5 Secs Timeout */
> > +	tv.tv_usec = 0;
> > +	if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO,
> > +			(const void *)&tv, sizeof(struct timeval)) < 0)
> > +		RTE_LOG(INFO, EAL, "Failed to set recv timeout\n");
> >
> > I f you set it just for one call, why do you not restore it?
> 
> Yes, original code is buggy, I should have put it into the critical section.
> 
> Do you mean we just create once and use for ever? if yes, we could put
> the open and setting into mp_init().
> 
> > Also I don't think it is a good idea to change it here -
> > if you'll make timeout a parameter value - then it could be overwritten
> > by different threads.
> 
> For simplicity, I'm not inclined to put the timeout as an parameter
> exposing to caller. So if you agree, I'll put it into the mp_init() with
> open.

My preference would be to have timeout value on a per call basis.
For one request user would like to wait no more than 5sec,
for another one user would probably be ok to wait forever.

> 
> >
> > +
> > +	/* Only allow one req at a time */
> > +	pthread_mutex_lock(&mp_mutex_request);
> > +
> > +	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
> > +		nprocs = 0;
> > +		for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
> > +			if (!mp_sec_sockets[i]) {
> > +				j = i;
> > +				nprocs++;
> > +			}
> > +
> > +		if (nprocs > 1) {
> > +			RTE_LOG(ERR, EAL,
> > +				"multi secondary processes not supported\n");
> > +			goto free_and_ret;
> > +		}
> > +
> > +		ret = send_msg(sockfd, mp_sec_sockets[j], req, fds);
> >
> > As I remember - sndmsg() is also blocking call, so under some conditions you can stall
> > there forever.
> 
>  From linux's unix_diagram_sendmsg(), we see:
>      timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);

Ok, but it would have effect only if (msg->msg_flags & MSG_DONTWAIT) != 0.
And for that, as I remember you need your socket in non-blocking mode, no?

> 
> I assume it will not block for datagram unix socket in Linux. But I'm
> not sure what it behaves in freebsd.
> 
> Anyway, better to add an explicit setsockopt() to make it not blocking.

You can't do that - at the same moment another thread might call your sendmsg()
and it might expect it to be blocking call.

> 
> > As mp_mutex_requestis still held - next rte_eal_mp_request(0 will also block forever here.
> >
> > +	} else
> > +		ret = send_msg(sockfd, eal_mp_unix_path(), req, fds);
> > +
> > +	if (ret == 0) {
> > +		RTE_LOG(ERR, EAL, "failed to send request: %s\n", action_name);
> > +		ret = -1;
> > +		goto free_and_ret;
> > +	}
> > +
> > +	ret = read_msg(sockfd, buf, MAX_MSG_LENGTH, fds, fds_out, NULL);
> >
> > if the message you receive is not a reply you are expecting -
> > it will be simply dropped - mp_handler() would never process it.
> 
> We cannot detect if it's the right reply absolutely correctly, but just
> check the action_name, which means, it still possibly gets a wrong reply
> if an action_name contains multiple requests.
> 
> Is just comparing the action_name acceptable?

As I can see the main issue here is that you can call recvmsg() from 2 different
points and they are not syncronised:
1. your mp_handler() doesn't aware about reply you are waiting and not 
have any handler associated with it.
So if mp_handler() will receive a reply it will just drop it.
2. your reply() is not aware about any other messages and associated actions -
so again it can't handle them properly (and probably shouldn't).

The simplest (and most common) way - always call recvmsg from one place -
mp_handler() and have a special action for reply msg.
As I wrote before that action will be just find the appropriate buffer provided
by reply() - copy message into it and signal thread waiting in reply() that
it can proceed.
  
Konstantin

> 
> >
> > +	if (ret > 0) {
> > +		hdr = (struct mp_msghdr *)buf;
> > +		if (hdr->len_params == len_p)
> > +			memcpy(params, hdr->params, len_p);
> > +		else {
> > +			RTE_LOG(ERR, EAL, "invalid reply\n");
> > +			ret = 0;
> > +		}
> > +	}
> > +
> > +free_and_ret:
> > +	free(req);
> > +	close(sockfd);
> > +	pthread_mutex_unlock(&mp_mutex_request);
> > +	return ret;
> > +}
> >
> > All of the above makes me think that current implementation is erroneous
> > and needs to be reworked.
> 
> Thank you for your review. I'll work on a new version.
> 
> Thanks,
> Jianfeng
> 
> > Konstantin
> >
> >
  

Patch

diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index 70519cc..f194a52 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -32,6 +32,7 @@ 
 static int mp_fd = -1;
 static char *mp_sec_sockets[MAX_SECONDARY_PROCS];
 static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t mp_mutex_request = PTHREAD_MUTEX_INITIALIZER;
 
 struct action_entry {
 	TAILQ_ENTRY(action_entry) next;      /**< Next attached action entry */
@@ -49,6 +50,10 @@  static struct action_entry_list action_entry_list =
 
 struct mp_msghdr {
 	char action_name[MAX_ACTION_NAME_LEN];
+#define MP_MSG	0 /* Share message with peers, will not block */
+#define MP_REQ	1 /* Request for information, Will block for a reply */
+#define MP_REP	2 /* Reply to previously-received request */
+	int type;
 	int fds_num;
 	int len_params;
 	char params[0];
@@ -138,7 +143,8 @@  rte_eal_mp_action_unregister(const char *name)
 }
 
 static int
-read_msg(int fd, char *buf, int buflen, int *fds, int fds_num)
+read_msg(int fd, char *buf, int buflen,
+	 int *fds, int fds_num, struct sockaddr_un *s)
 {
 	int ret;
 	struct iovec iov;
@@ -151,6 +157,8 @@  read_msg(int fd, char *buf, int buflen, int *fds, int fds_num)
 	iov.iov_base = buf;
 	iov.iov_len  = buflen;
 
+	msgh.msg_name = s;
+	msgh.msg_namelen = sizeof(*s);
 	msgh.msg_iov = &iov;
 	msgh.msg_iovlen = 1;
 	msgh.msg_control = control;
@@ -181,7 +189,7 @@  read_msg(int fd, char *buf, int buflen, int *fds, int fds_num)
 }
 
 static int
-process_msg(struct mp_msghdr *hdr, int len, int fds[])
+process_msg(struct mp_msghdr *hdr, int len, int fds[], struct sockaddr_un *s)
 {
 	int ret;
 	int params_len;
@@ -199,10 +207,10 @@  process_msg(struct mp_msghdr *hdr, int len, int fds[])
 	}
 
 	params_len = len - sizeof(struct mp_msghdr);
-	ret = entry->action(hdr->params, params_len, fds, hdr->fds_num);
+	ret = entry->action(hdr->params, params_len,
+			    fds, hdr->fds_num, s->sun_path);
 	pthread_mutex_unlock(&mp_mutex_action);
 	return ret;
-
 }
 
 static void *
@@ -211,11 +219,12 @@  mp_handle(void *arg __rte_unused)
 	int len;
 	int fds[SCM_MAX_FD];
 	char buf[MAX_MSG_LENGTH];
+	struct sockaddr_un sa;
 
 	while (1) {
-		len = read_msg(mp_fd, buf, MAX_MSG_LENGTH, fds, SCM_MAX_FD);
+		len = read_msg(mp_fd, buf, MAX_MSG_LENGTH, fds, SCM_MAX_FD, &sa);
 		if (len > 0)
-			process_msg((struct mp_msghdr *)buf, len, fds);
+			process_msg((struct mp_msghdr *)buf, len, fds, &sa);
 	}
 
 	return NULL;
@@ -255,7 +264,8 @@  static int
 mp_primary_proc(const void *params,
 		int len __rte_unused,
 		int fds[] __rte_unused,
-		int fds_num __rte_unused)
+		int fds_num __rte_unused,
+		const void *peer __rte_unused)
 {
 	const struct proc_request *r = (const struct proc_request *)params;
 
@@ -362,7 +372,8 @@  rte_eal_mp_channel_init(void)
 }
 
 static inline struct mp_msghdr *
-format_msg(const char *act_name, const void *p, int len_params, int fds_num)
+format_msg(const char *act_name, const void *p,
+	   int len_params, int fds_num, int type)
 {
 	int len_msg;
 	struct mp_msghdr *msg;
@@ -384,6 +395,7 @@  format_msg(const char *act_name, const void *p, int len_params, int fds_num)
 	strcpy(msg->action_name, act_name);
 	msg->fds_num = fds_num;
 	msg->len_params = len_params;
+	msg->type = type;
 	memcpy(msg->params, p, len_params);
 	return msg;
 }
@@ -455,7 +467,9 @@  mp_send(const char *action_name,
 	const void *params,
 	int len_params,
 	int fds[],
-	int fds_num)
+	int fds_num,
+	int type,
+	const void *peer)
 {
 	int i;
 	int n = 0;
@@ -468,7 +482,7 @@  mp_send(const char *action_name,
 		return 0;
 	}
 
-	msg = format_msg(action_name, params, len_params, fds_num);
+	msg = format_msg(action_name, params, len_params, fds_num, type);
 	if (msg == NULL)
 		return 0;
 
@@ -477,6 +491,11 @@  mp_send(const char *action_name,
 		return 0;
 	}
 
+	if (peer) {
+		n += send_msg(sockfd, peer, msg, fds);
+		goto ret;
+	}
+
 	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
 		/* broadcast to all secondaries */
 		for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
@@ -488,6 +507,7 @@  mp_send(const char *action_name,
 	} else
 		n += send_msg(sockfd, eal_mp_unix_path(), msg, fds);
 
+ret:
 	free(msg);
 	close(sockfd);
 	return n;
@@ -501,5 +521,107 @@  rte_eal_mp_sendmsg(const char *action_name,
 		   int fds_num)
 {
 	RTE_LOG(DEBUG, EAL, "sendmsg: %s\n", action_name);
-	return mp_send(action_name, params, len_params, fds, fds_num);
+	return mp_send(action_name, params, len_params,
+			fds, fds_num, MP_MSG, NULL);
+}
+
+int
+rte_eal_mp_request(const char *action_name,
+		   void *params,
+		   int len_p,
+		   int fds[],
+		   int fds_in,
+		   int fds_out)
+{
+	int i, j;
+	int sockfd;
+	int nprocs;
+	int ret = 0;
+	struct mp_msghdr *req;
+	struct timeval tv;
+	char buf[MAX_MSG_LENGTH];
+	struct mp_msghdr *hdr;
+
+	RTE_LOG(DEBUG, EAL, "request: %s\n", action_name);
+
+	if (fds_in > SCM_MAX_FD || fds_out > SCM_MAX_FD) {
+		RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n", SCM_MAX_FD);
+		rte_errno = -E2BIG;
+		return 0;
+	}
+
+	req = format_msg(action_name, params, len_p, fds_in, MP_REQ);
+	if (req == NULL)
+		return 0;
+
+	if ((sockfd = open_unix_fd(0)) < 0) {
+		free(req);
+		return 0;
+	}
+
+	tv.tv_sec = 5;  /* 5 Secs Timeout */
+	tv.tv_usec = 0;
+	if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO,
+			(const void *)&tv, sizeof(struct timeval)) < 0)
+		RTE_LOG(INFO, EAL, "Failed to set recv timeout\n");
+
+	/* Only allow one req at a time */
+	pthread_mutex_lock(&mp_mutex_request);
+
+	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
+		nprocs = 0;
+		for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
+			if (!mp_sec_sockets[i]) {
+				j = i;
+				nprocs++;
+			}
+
+		if (nprocs > 1) {
+			RTE_LOG(ERR, EAL,
+				"multi secondary processes not supported\n");
+			goto free_and_ret;
+		}
+
+		ret = send_msg(sockfd, mp_sec_sockets[j], req, fds);
+	} else
+		ret = send_msg(sockfd, eal_mp_unix_path(), req, fds);
+
+	if (ret == 0) {
+		RTE_LOG(ERR, EAL, "failed to send request: %s\n", action_name);
+		ret = -1;
+		goto free_and_ret;
+	}
+
+	ret = read_msg(sockfd, buf, MAX_MSG_LENGTH, fds, fds_out, NULL);
+	if (ret > 0) {
+		hdr = (struct mp_msghdr *)buf;
+		if (hdr->len_params == len_p)
+			memcpy(params, hdr->params, len_p);
+		else {
+			RTE_LOG(ERR, EAL, "invalid reply\n");
+			ret = 0;
+		}
+	}
+
+free_and_ret:
+	free(req);
+	close(sockfd);
+	pthread_mutex_unlock(&mp_mutex_request);
+	return ret;
+}
+
+int
+rte_eal_mp_reply(const char *action_name,
+		 const void *params,
+		 int len_p,
+		 int fds[],
+		 int fds_in,
+		 const void *peer)
+{
+	RTE_LOG(DEBUG, EAL, "reply: %s\n", action_name);
+	if (peer == NULL) {
+		RTE_LOG(ERR, EAL, "peer is not specified\n");
+		return 0;
+	}
+	return mp_send(action_name, params, len_p, fds, fds_in, MP_REP, peer);
 }
diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
index 9884c0b..2690a77 100644
--- a/lib/librte_eal/common/include/rte_eal.h
+++ b/lib/librte_eal/common/include/rte_eal.h
@@ -192,7 +192,7 @@  int rte_eal_primary_proc_alive(const char *config_file_path);
  * this function typedef to register action for coming messages.
  */
 typedef int (*rte_eal_mp_t)(const void *params, int len,
-			    int fds[], int fds_num);
+			    int fds[], int fds_num, const void *peer);
 
 /**
  * Register an action function for primary/secondary communication.
@@ -245,7 +245,7 @@  void rte_eal_mp_action_unregister(const char *name);
  *   The fds argument is an array of fds sent with sendmsg.
  *
  * @param fds_num
- *   The fds_num argument is number of fds to be sent with sendmsg.
+ *   The number of fds to be sent with sendmsg.
  *
  * @return
  *  - Returns the number of messages being sent successfully.
@@ -255,6 +255,75 @@  rte_eal_mp_sendmsg(const char *action_name, const void *params,
 		   int len_params, int fds[], int fds_num);
 
 /**
+ * Send a request to the peer process and expect a reply.
+ *
+ * This function sends a request message to the peer process, and will
+ * block until receiving reply message from the peer process. Note:
+ * this does not work for the primary process sending requests to its
+ * multiple (>1) secondary processes.
+ *
+ * @param action_name
+ *   The action_name argument is used to identify which action will be used.
+ *
+ * @param params
+ *   The params argument contains the customized message; as the reply is
+ *   received, the replied params will be copied to this pointer. 
+ *
+ * @param len_p
+ *   The length of the customized message.
+ *
+ * @param fds
+ *   The fds argument is an array of fds sent with sendmsg; as the reply
+ *   is received, the replied fds will be copied into this array.
+ *
+ * @param fds_in
+ *   The number of fds to be sent.
+ *
+ * @param fds_out
+ *   The number of fds to be received.
+ *
+ * @return
+ *  - (1) on success;
+ *  - (0) on sending request successfully but no valid reply received.
+ *  - (<0) on failing to sending request.
+ */
+int
+rte_eal_mp_request(const char *action_name, void *params,
+		   int len_p, int fds[], int fds_in, int fds_out);
+
+/**
+ * Send a reply to the peer process.
+ *
+ * This function will send a reply message in response to a request message
+ * received previously.
+ *
+ * @param action_name
+ *   The action_name argument is used to identify which action will be used.
+ *
+ * @param params
+ *   The params argument contains the customized message.
+ *
+ * @param len_p
+ *   The length of the customized message.
+ *
+ * @param fds
+ *   The fds argument is an array of fds sent with sendmsg.
+ *
+ * @param fds_in
+ *   The number of fds to be sent with sendmsg.
+ *
+ * @param peer
+ *   The fds_num argument is number of fds to be sent with sendmsg.
+ *
+ * @return
+ *  - (1) on success;
+ *  - (0) on failure.
+ */
+int
+rte_eal_mp_reply(const char *action_name, const void *params,
+		 int len_p, int fds[], int fds_in, const void *peer);
+
+/**
  * Usage function typedef used by the application usage function.
  *
  * Use this function typedef to define and call rte_set_application_usage_hook()
diff --git a/lib/librte_eal/rte_eal_version.map b/lib/librte_eal/rte_eal_version.map
index 5dacde5..068ac0b 100644
--- a/lib/librte_eal/rte_eal_version.map
+++ b/lib/librte_eal/rte_eal_version.map
@@ -243,5 +243,7 @@  DPDK_18.02 {
 	rte_eal_mp_action_register;
 	rte_eal_mp_action_unregister;
 	rte_eal_mp_sendmsg;
+	rte_eal_mp_request;
+	rte_eal_mp_reply;
 
 } DPDK_17.11;