[dpdk-dev,06/12] eal: add channel for primary/secondary communication

Message ID 1503654052-84730-7-git-send-email-jianfeng.tan@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Yuanhan Liu
Headers

Checks

Context Check Description
ci/checkpatch warning coding style issues
ci/Intel-compilation fail Compilation issues

Commit Message

Jianfeng Tan Aug. 25, 2017, 9:40 a.m. UTC
  Previouly, there is only one way for primary/secondary to exchange
messages, that is, primary process writes info into some predefind
file, and secondary process reads info out. That cannot address
the requirements:
  a. Secondary wants to send info to primary.
  b. Sending info at any time, instead of just initialization time.
  c. Share FD with the other side.

This patch proposes to create a communication channel (as an unix
socket connection) for above requirements.

Three new APIs are added:

  1. rte_eal_primary_secondary_add_action is used to register an action,
if the calling component wants to response the messages from the
corresponding component in its primary process or secondary processes.
  2. rte_eal_primary_secondary_del_action is used to unregister the
action if the calling component does not want to response the messages.
  3. rte_eal_primary_secondary_sendmsg is used to send a message.

Signed-off-by: Jianfeng Tan <jianfeng.tan@intel.com>
---
 lib/librte_eal/bsdapp/eal/rte_eal_version.map   |   8 +
 lib/librte_eal/common/eal_common_proc.c         | 454 ++++++++++++++++++++++++
 lib/librte_eal/common/eal_filesystem.h          |  18 +
 lib/librte_eal/common/eal_private.h             |  10 +
 lib/librte_eal/common/include/rte_eal.h         |  74 ++++
 lib/librte_eal/linuxapp/eal/eal.c               |   6 +
 lib/librte_eal/linuxapp/eal/rte_eal_version.map |   8 +
 7 files changed, 578 insertions(+)
  

Comments

Hu, Jiayu Sept. 18, 2017, 1:49 p.m. UTC | #1
Hi Jianfeng,


On Fri, Aug 25, 2017 at 09:40:46AM +0000, Jianfeng Tan wrote:
> Previouly, there is only one way for primary/secondary to exchange
> messages, that is, primary process writes info into some predefind
> file, and secondary process reads info out. That cannot address
> the requirements:
>   a. Secondary wants to send info to primary.
>   b. Sending info at any time, instead of just initialization time.
>   c. Share FD with the other side.

If you can explain more about why the above three characters are required
for enabling vdev in the secondary process here, that would be better. For
example, vdev may hot plugin or remove, so the primary and the secondary
process need to exchange data bidirectionally and dynamically.

> 
> This patch proposes to create a communication channel (as an unix
> socket connection) for above requirements.

Can you give more explainations about how the channel works? Like both
the primary and the secondary register actions for specific messages, and
another thread is created to listen and react incoming messages.

> 
> Three new APIs are added:
> 
>   1. rte_eal_primary_secondary_add_action is used to register an action,
> if the calling component wants to response the messages from the
> corresponding component in its primary process or secondary processes.
>   2. rte_eal_primary_secondary_del_action is used to unregister the
> action if the calling component does not want to response the messages.
>   3. rte_eal_primary_secondary_sendmsg is used to send a message.
> 
> Signed-off-by: Jianfeng Tan <jianfeng.tan@intel.com>
> ---
>  lib/librte_eal/bsdapp/eal/rte_eal_version.map   |   8 +
>  lib/librte_eal/common/eal_common_proc.c         | 454 ++++++++++++++++++++++++
>  lib/librte_eal/common/eal_filesystem.h          |  18 +
>  lib/librte_eal/common/eal_private.h             |  10 +
>  lib/librte_eal/common/include/rte_eal.h         |  74 ++++
>  lib/librte_eal/linuxapp/eal/eal.c               |   6 +
>  lib/librte_eal/linuxapp/eal/rte_eal_version.map |   8 +
>  7 files changed, 578 insertions(+)
> 
> diff --git a/lib/librte_eal/bsdapp/eal/rte_eal_version.map b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
> index aac6fd7..f4ff29f 100644
> --- a/lib/librte_eal/bsdapp/eal/rte_eal_version.map
> +++ b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
> @@ -237,3 +237,11 @@ EXPERIMENTAL {
>  	rte_service_unregister;
>  
>  } DPDK_17.08;
> +
> +EXPERIMENTAL {
> +	global:
> +
> +	rte_eal_primary_secondary_add_action;
> +	rte_eal_primary_secondary_del_action;
> +	rte_eal_primary_secondary_sendmsg;
> +} DPDK_17.11;
> diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
> index 60526ca..fa316bf 100644
> --- a/lib/librte_eal/common/eal_common_proc.c
> +++ b/lib/librte_eal/common/eal_common_proc.c
> @@ -33,8 +33,20 @@
>  #include <stdio.h>
>  #include <fcntl.h>
>  #include <stdlib.h>
> +#include <sys/types.h>
> +#include <sys/socket.h>
> +#include <sys/epoll.h>
> +#include <limits.h>
> +#include <unistd.h>
> +#include <sys/un.h>
> +#include <errno.h>
> +#include <pthread.h>
> +
> +#include <rte_log.h>
>  #include <rte_eal.h>
> +#include <rte_lcore.h>
>  
> +#include "eal_private.h"
>  #include "eal_filesystem.h"
>  #include "eal_internal_cfg.h"
>  
> @@ -59,3 +71,445 @@ rte_eal_primary_proc_alive(const char *config_file_path)
>  
>  	return !!ret;
>  }
> +
> +struct action_entry {
> +	TAILQ_ENTRY(action_entry) next;      /**< Next attached action entry*/
> +
> +#define MAX_ACTION_NAME_LEN	64
> +	char action_name[MAX_ACTION_NAME_LEN];
> +	rte_eal_primary_secondary_t *action;
> +};
> +
> +/** Double linked list of actions. */
> +TAILQ_HEAD(action_entry_list, action_entry);
> +
> +static struct action_entry_list action_entry_list =
> +	TAILQ_HEAD_INITIALIZER(action_entry_list);
> +
> +static struct action_entry *
> +find_action_entry_by_name(const char *name)
> +{
> +	int len = strlen(name);
> +	struct action_entry *entry;
> +
> +	TAILQ_FOREACH(entry, &action_entry_list, next) {
> +		if (strncmp(entry->action_name, name, len) == 0)
> +				break;
> +	}
> +
> +	return entry;
> +}
> +
> +int
> +rte_eal_primary_secondary_add_action(const char *action_name,
> +				     rte_eal_primary_secondary_t action)
> +{
> +	struct action_entry *entry = malloc(sizeof(struct action_entry));
> +
> +	if (entry == NULL)
> +		return -ENOMEM;
> +
> +	strncpy(entry->action_name, action_name, MAX_ACTION_NAME_LEN);
> +	entry->action = action;

In struct action_entry, the type of action is 'rte_eal_primary_secondary_t *',
but you assign an object to action here.

> +	TAILQ_INSERT_TAIL(&action_entry_list, entry, next);

What would happen if register two actions for a same message name?

> +	return 0;
> +}
> +
> +void
> +rte_eal_primary_secondary_del_action(const char *name)
> +{
> +	struct action_entry *entry = find_action_entry_by_name(name);
> +
> +	TAILQ_REMOVE(&action_entry_list, entry, next);
> +	free(entry);
> +}
> +
> +#define MAX_SECONDARY_PROCS	8

A simple question: why the max number is 8?

> +
> +static int efd_pri_sec; /* epoll fd for primary/secondary channel thread */
> +static int fd_listen;   /* unix listen socket by primary */
> +static int fd_to_pri;   /* only used by secondary process */
> +static int fds_to_sec[MAX_SECONDARY_PROCS];
> +
> +struct msg_hdr {
> +	char action_name[MAX_ACTION_NAME_LEN];
> +	int fds_num;
> +	char params[0];
> +} __rte_packed;
> +
> +static int
> +add_sec_proc(int fd)
> +{
> +	int i;
> +
> +	for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
> +		if (fds_to_sec[i] == -1)
> +			break;
> +
> +	if (i >= MAX_SECONDARY_PROCS)
> +		return -1;
> +
> +	fds_to_sec[i] = fd;
> +
> +	return i;
> +}
> +
> +static void
> +del_sec_proc(int fd)
> +{
> +	int i;
> +
> +	for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
> +		if (fds_to_sec[i] == fd) {
> +			fds_to_sec[i] = -1;
> +			break;
> +		}
> +	}
> +}
> +
> +static int
> +read_msg(int sockfd, char *buf, int buflen, int *fds, int fds_num)
> +{
> +	struct iovec iov;
> +	struct msghdr msgh;
> +	size_t fdsize = fds_num * sizeof(int);
> +	char control[CMSG_SPACE(fdsize)];
> +	struct cmsghdr *cmsg;
> +	int ret;
> +
> +	memset(&msgh, 0, sizeof(msgh));
> +	iov.iov_base = buf;
> +	iov.iov_len  = buflen;
> +
> +	msgh.msg_iov = &iov;
> +	msgh.msg_iovlen = 1;
> +	msgh.msg_control = control;
> +	msgh.msg_controllen = sizeof(control);
> +
> +	ret = recvmsg(sockfd, &msgh, 0);
> +	if (ret <= 0) {
> +		RTE_LOG(ERR, EAL, "recvmsg failed\n");
> +		return ret;
> +	}
> +
> +	if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) {
> +		RTE_LOG(ERR, EAL, "truncted msg\n");
> +		return -1;
> +	}
> +
> +	for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
> +		cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
> +		if ((cmsg->cmsg_level == SOL_SOCKET) &&
> +			(cmsg->cmsg_type == SCM_RIGHTS)) {
> +			memcpy(fds, CMSG_DATA(cmsg), fdsize);
> +			break;
> +		}
> +	}
> +
> +	return ret;
> +}
> +
> +static int
> +process_msg(int fd)
> +{
> +	int len;
> +	int params_len;
> +	char buf[1024];
> +	int fds[8]; /* accept at most 8 FDs per message */
> +	struct msg_hdr *hdr;
> +	struct action_entry *entry;
> +
> +	len = read_msg(fd, buf, 1024, fds, 8);
> +	if (len < 0) {
> +		RTE_LOG(ERR, EAL, "failed to read message: %s\n",
> +			strerror(errno));
> +		return -1;
> +	}
> +
> +	hdr = (struct msg_hdr *) buf;
> +
> +	entry = find_action_entry_by_name(hdr->action_name);
> +	if (entry == NULL) {
> +		RTE_LOG(ERR, EAL, "cannot find action by: %s\n",
> +			hdr->action_name);
> +		return -1;
> +	}
> +
> +	params_len = len - sizeof(struct msg_hdr);
> +	return entry->action(hdr->params, params_len, fds, hdr->fds_num);
> +}
> +
> +static void *
> +thread_primary(__attribute__((unused)) void *arg)
> +{
> +	int fd;
> +	int i, n;
> +	struct epoll_event event;
> +	struct epoll_event *events;
> +
> +	event.events = EPOLLIN | EPOLLRDHUP;
> +	event.data.fd = fd_listen;
> +	if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_listen, &event) < 0) {
> +		RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n",
> +			strerror(errno));
> +		exit(EXIT_FAILURE);
> +	}
> +
> +	events = calloc(20, sizeof event);
> +
> +	while (1) {
> +		n = epoll_wait(efd_pri_sec, events, 20, -1);
> +		for (i = 0; i < n; i++) {
> +			if (events[i].data.fd == fd_listen) {
> +				if (events[i].events != EPOLLIN) {
> +					RTE_LOG(ERR, EAL, "what happens?\n");
> +					exit(EXIT_FAILURE);
> +				}
> +
> +				fd = accept(fd_listen, NULL, NULL);
> +				if (fd < 0) {
> +					RTE_LOG(ERR, EAL, "primary failed to accept: %s\n",
> +						strerror(errno));
> +					continue;
> +				}
> +
> +				event.data.fd = fd;
> +				if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd, &event) < 0) {
> +					RTE_LOG(ERR, EAL, "failed to add secondary: %s\n",
> +						strerror(errno));
> +					continue;
> +				}
> +				if (add_sec_proc(fd) < 0)
> +					RTE_LOG(ERR, EAL, "too many secondary processes\n");
> +
> +				continue;
> +			}
> +
> +			fd = events[i].data.fd;
> +
> +			if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
> +				RTE_LOG(ERR, EAL,
> +					"secondary process exit: %d\n", fd);
> +				epoll_ctl(efd_pri_sec, EPOLL_CTL_DEL, fd, NULL);
> +				del_sec_proc(fd);
> +				continue;
> +			}
> +
> +			if ((events[i].events & EPOLLIN)) {
> +				RTE_LOG(INFO, EAL,
> +					"recv msg from secondary process\n");
> +
> +				process_msg(fd);
> +			}
> +		}
> +	}
> +
> +	return NULL;
> +}
> +
> +static void *
> +thread_secondary(__attribute__((unused)) void *arg)
> +{
> +	int fd;
> +	int i, n;
> +	struct epoll_event event;
> +	struct epoll_event *events;
> +
> +	event.events = EPOLLIN | EPOLLRDHUP;
> +	event.data.fd = fd_to_pri;
> +	if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_to_pri, &event) < 0) {
> +		RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n", strerror(errno));
> +		exit(EXIT_FAILURE);
> +	}
> +
> +	events = calloc(20, sizeof event);
> +
> +	while (1) {
> +		n = epoll_wait(efd_pri_sec, events, 20, -1);
> +		for (i = 0; i < n; i++) {
> +
> +			fd = events[i].data.fd;
> +
> +			if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
> +				RTE_LOG(ERR, EAL, "primary exits, so do I\n");
> +				/* Do we need exit secondary when primary exits? */
> +				exit(EXIT_FAILURE);
> +			}
> +
> +			if ((events[i].events & EPOLLIN)) {
> +				RTE_LOG(INFO, EAL,
> +					"recv msg from primary process\n");
> +				process_msg(fd);
> +			}
> +		}
> +	}
> +
> +	return NULL;
> +}
> +
> +int
> +rte_eal_primary_secondary_channel_init(void)
> +{
> +	int i, fd, ret;
> +	const char *path;
> +	struct sockaddr_un un;
> +	pthread_t tid;
> +	void*(*fn)(void *);
> +	char thread_name[RTE_MAX_THREAD_NAME_LEN];
> +
> +	efd_pri_sec = epoll_create1(0);
> +	if (efd_pri_sec < 0) {
> +		RTE_LOG(ERR, EAL, "epoll_create1 failed\n");
> +		return -1;
> +	}
> +
> +	fd = socket(AF_UNIX, SOCK_STREAM, 0);
> +	if (fd < 0) {
> +		RTE_LOG(ERR, EAL, "Failed to create unix socket");
> +		return -1;
> +	}
> +
> +	memset(&un, 0, sizeof(un));
> +	un.sun_family = AF_UNIX;
> +	path = eal_primary_secondary_unix_path();
> +	strncpy(un.sun_path, path, sizeof(un.sun_path));
> +	un.sun_path[sizeof(un.sun_path) - 1] = '\0';
> +
> +	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
> +
> +		for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
> +			fds_to_sec[i] = -1;
> +
> +		if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) {
> +			RTE_LOG(ERR, EAL, "cannot set nonblocking mode\n");
> +			close(fd);
> +			return -1;
> +		}
> +
> +		/* The file still exists since last run */
> +		unlink(path);
> +
> +		ret = bind(fd, (struct sockaddr *)&un, sizeof(un));
> +		if (ret < 0) {
> +			RTE_LOG(ERR, EAL, "failed to bind to %s: %s\n",
> +				path, strerror(errno));
> +			close(fd);
> +			return -1;
> +		}
> +		RTE_LOG(INFO, EAL, "primary bind to %s\n", path);
> +
> +		ret = listen(fd, 1024);
> +		if (ret < 0) {
> +			RTE_LOG(ERR, EAL, "failed to listen: %s", strerror(errno));
> +			close(fd);
> +			return -1;
> +		}
> +
> +		fn = thread_primary;
> +		fd_listen = fd;
> +	} else {
> +		ret = connect(fd, (struct sockaddr *)&un, sizeof(un));
> +		if (ret < 0) {
> +			RTE_LOG(ERR, EAL, "failed to connect primary\n");
> +			return -1;
> +		}
> +		fn = thread_secondary;
> +		fd_to_pri = fd;
> +	}
> +
> +	ret = pthread_create(&tid, NULL, fn, NULL);
> +	if (ret < 0) {
> +		RTE_LOG(ERR, EAL, "failed to create thead: %s\n",
> +			strerror(errno));
> +		close(fd);
> +		close(efd_pri_sec);
> +		return -1;
> +	}
> +
> +	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
> +		 "ps_channel");
> +	ret = rte_thread_setname(tid, thread_name);
> +	if (ret < 0) {
> +		RTE_LOG(ERR, EAL, "failed to set thead name\n");
> +		close(fd);
> +		close(efd_pri_sec);
> +		return -1;
> +	}
> +
> +	return 0;
> +}
> +
> +static int
> +send_msg(int fd, struct msghdr *p_msgh)
> +{
> +	int ret;
> +
> +	do {
> +		ret = sendmsg(fd, p_msgh, 0);
> +	} while (ret < 0 && errno == EINTR);
> +
> +	return ret;
> +}
> +
> +int
> +rte_eal_primary_secondary_sendmsg(const char *action_name,
> +				  const void *params,
> +				  int len_params,
> +				  int fds[],
> +				  int fds_num)
> +{
> +	int i;
> +	int ret = 0;
> +	struct msghdr msgh;
> +	struct iovec iov;
> +	size_t fd_size = fds_num * sizeof(int);
> +	char control[CMSG_SPACE(fd_size)];
> +	struct cmsghdr *cmsg;
> +	struct msg_hdr *msg;
> +	int len_msg;
> +
> +	len_msg = sizeof(struct msg_hdr) + len_params;
> +	msg = malloc(len_msg);
> +	if (!msg) {
> +		RTE_LOG(ERR, EAL, "Cannot alloc memory for msg");
> +		return -ENOMEM;
> +	}
> +	memset(msg, 0, len_msg);
> +	strcpy(msg->action_name, action_name);
> +	msg->fds_num = fds_num;
> +	memcpy(msg->params, params, len_params);
> +
> +	memset(&msgh, 0, sizeof(msgh));
> +	memset(control, 0, sizeof(control));
> +
> +	iov.iov_base = (uint8_t *)msg;
> +	iov.iov_len = len_msg;
> +
> +	msgh.msg_iov = &iov;
> +	msgh.msg_iovlen = 1;
> +	msgh.msg_control = control;
> +	msgh.msg_controllen = sizeof(control);
> +
> +	cmsg = CMSG_FIRSTHDR(&msgh);
> +	cmsg->cmsg_len = CMSG_LEN(fd_size);
> +	cmsg->cmsg_level = SOL_SOCKET;
> +	cmsg->cmsg_type = SCM_RIGHTS;
> +	memcpy(CMSG_DATA(cmsg), fds, fd_size);
> +
> +	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
> +		for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
> +			if (fds_to_sec[i] == -1)
> +				continue;
> +
> +			ret = send_msg(fds_to_sec[i], &msgh);
> +			if (ret < 0)
> +				break;
> +		}
> +	} else {
> +		ret = send_msg(fd_to_pri, &msgh);
> +	}
> +
> +	free(msg);
> +
> +	return ret;
> +}
> diff --git a/lib/librte_eal/common/eal_filesystem.h b/lib/librte_eal/common/eal_filesystem.h
> index 8acbd99..78bb4fb 100644
> --- a/lib/librte_eal/common/eal_filesystem.h
> +++ b/lib/librte_eal/common/eal_filesystem.h
> @@ -67,6 +67,24 @@ eal_runtime_config_path(void)
>  	return buffer;
>  }
>  
> +/** Path of primary/secondary communication unix socket file. */
> +#define PRIMARY_SECONDARY_UNIX_PATH_FMT "%s/.%s_unix"
> +static inline const char *
> +eal_primary_secondary_unix_path(void)
> +{
> +	static char buffer[PATH_MAX]; /* static so auto-zeroed */
> +	const char *directory = default_config_dir;
> +	const char *home_dir = getenv("HOME");
> +
> +	if (getuid() != 0 && home_dir != NULL)
> +		directory = home_dir;
> +	snprintf(buffer, sizeof(buffer) - 1, PRIMARY_SECONDARY_UNIX_PATH_FMT,
> +		 directory, internal_config.hugefile_prefix);
> +
> +	return buffer;
> +
> +}
> +
>  /** Path of hugepage info file. */
>  #define HUGEPAGE_INFO_FMT "%s/.%s_hugepage_info"
>  
> diff --git a/lib/librte_eal/common/eal_private.h b/lib/librte_eal/common/eal_private.h
> index 597d82e..719b160 100644
> --- a/lib/librte_eal/common/eal_private.h
> +++ b/lib/librte_eal/common/eal_private.h
> @@ -355,4 +355,14 @@ bool rte_eal_using_phys_addrs(void);
>   */
>  struct rte_bus *rte_bus_find_by_device_name(const char *str);
>  
> +/**
> + * Create the unix channel for primary/secondary communication.
> + *
> + * @return
> + *   0 on success;
> + *   (<0) on failure.
> + */
> +
> +int rte_eal_primary_secondary_channel_init(void);
> +
>  #endif /* _EAL_PRIVATE_H_ */
> diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
> index 0e7363d..6cfc9d6 100644
> --- a/lib/librte_eal/common/include/rte_eal.h
> +++ b/lib/librte_eal/common/include/rte_eal.h
> @@ -210,6 +210,80 @@ int rte_eal_init(int argc, char **argv);
>  int rte_eal_primary_proc_alive(const char *config_file_path);
>  
>  /**
> + * Action function typedef used by other components.
> + *
> + * As we create unix socket channel for primary/secondary communication, use
> + * this function typedef to register action for coming messages.
> + */
> +typedef int (rte_eal_primary_secondary_t)(const char *params,
> +					  int len,
> +					  int fds[],
> +					  int fds_num);
> +/**
> + * Register an action function for primary/secondary communication.
> + *
> + * Call this function to register an action, if the calling component wants
> + * to response the messages from the corresponding component in its primary
> + * process or secondary processes.
> + *
> + * @param action_name
> + *   The action_name argument plays as the nonredundant key to find the action.
> + *
> + * @param action
> + *   The action argument is the function pointer to the action function.
> + *
> + * @return
> + *  - 0 on success.
> + *  - (<0) on failure.
> + */
> +int rte_eal_primary_secondary_add_action(const char *action_name,
> +					 rte_eal_primary_secondary_t action);
> +/**
> + * Unregister an action function for primary/secondary communication.
> + *
> + * Call this function to unregister an action  if the calling component does
> + * not want to response the messages from the corresponding component in its
> + * primary process or secondary processes.
> + *
> + * @param action_name
> + *   The action_name argument plays as the nonredundant key to find the action.
> + *
> + */
> +void rte_eal_primary_secondary_del_action(const char *name);
> +
> +/**
> + * Send a message to the primary process or the secondary processes.
> + *
> + * This function will send a message which will be responsed by the action
> + * identified by action_name of the process on the other side.
> + *
> + * @param action_name
> + *   The action_name argument is used to identify which action will be used.
> + *
> + * @param params
> + *   The params argument contains the customized message.
> + *
> + * @param len_params
> + *   The len_params argument is the length of the customized message.
> + *
> + * @param fds
> + *   The fds argument is an array of fds sent with sendmsg.
> + *
> + * @param fds_num
> + *   The fds_num argument is number of fds to be sent with sendmsg.
> + *
> + * @return
> + *  - (>=0) on success.
> + *  - (<0) on failure.
> + */
> +int
> +rte_eal_primary_secondary_sendmsg(const char *action_name,
> +				  const void *params,
> +				  int len_params,
> +				  int fds[],
> +				  int fds_num);
> +
> +/**
>   * Usage function typedef used by the application usage function.
>   *
>   * Use this function typedef to define and call rte_set_applcation_usage_hook()
> diff --git a/lib/librte_eal/linuxapp/eal/eal.c b/lib/librte_eal/linuxapp/eal/eal.c
> index 48f12f4..237c0b1 100644
> --- a/lib/librte_eal/linuxapp/eal/eal.c
> +++ b/lib/librte_eal/linuxapp/eal/eal.c
> @@ -873,6 +873,12 @@ rte_eal_init(int argc, char **argv)
>  
>  	eal_check_mem_on_local_socket();
>  
> +	if (rte_eal_primary_secondary_channel_init() < 0) {
> +		rte_eal_init_alert("Cannot create unix channel.");
> +		rte_errno = EFAULT;
> +		return -1;
> +	}
> +
>  	if (eal_plugins_init() < 0)
>  		rte_eal_init_alert("Cannot init plugins\n");
>  
> diff --git a/lib/librte_eal/linuxapp/eal/rte_eal_version.map b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
> index 3a8f154..c618aec 100644
> --- a/lib/librte_eal/linuxapp/eal/rte_eal_version.map
> +++ b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
> @@ -242,3 +242,11 @@ EXPERIMENTAL {
>  	rte_service_unregister;
>  
>  } DPDK_17.08;
> +
> +EXPERIMENTAL {
> +	global:
> +
> +	rte_eal_primary_secondary_add_action;
> +	rte_eal_primary_secondary_del_action;
> +	rte_eal_primary_secondary_sendmsg;
> +} DPDK_17.11;
> -- 
> 2.7.4
  
Hu, Jiayu Sept. 20, 2017, 3 a.m. UTC | #2
Hi Jianfeng,

Few questions are inline.

Thanks,
Jiayu

On Fri, Aug 25, 2017 at 09:40:46AM +0000, Jianfeng Tan wrote:
> Previouly, there is only one way for primary/secondary to exchange
> messages, that is, primary process writes info into some predefind
> file, and secondary process reads info out. That cannot address
> the requirements:
>   a. Secondary wants to send info to primary.
>   b. Sending info at any time, instead of just initialization time.
>   c. Share FD with the other side.
> 
> This patch proposes to create a communication channel (as an unix
> socket connection) for above requirements.
> 
> Three new APIs are added:
> 
>   1. rte_eal_primary_secondary_add_action is used to register an action,
> if the calling component wants to response the messages from the
> corresponding component in its primary process or secondary processes.
>   2. rte_eal_primary_secondary_del_action is used to unregister the
> action if the calling component does not want to response the messages.
>   3. rte_eal_primary_secondary_sendmsg is used to send a message.
> 
> Signed-off-by: Jianfeng Tan <jianfeng.tan@intel.com>
> ---
>  lib/librte_eal/bsdapp/eal/rte_eal_version.map   |   8 +
>  lib/librte_eal/common/eal_common_proc.c         | 454 ++++++++++++++++++++++++
>  lib/librte_eal/common/eal_filesystem.h          |  18 +
>  lib/librte_eal/common/eal_private.h             |  10 +
>  lib/librte_eal/common/include/rte_eal.h         |  74 ++++
>  lib/librte_eal/linuxapp/eal/eal.c               |   6 +
>  lib/librte_eal/linuxapp/eal/rte_eal_version.map |   8 +
>  7 files changed, 578 insertions(+)
> 
> diff --git a/lib/librte_eal/bsdapp/eal/rte_eal_version.map b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
> index aac6fd7..f4ff29f 100644
> --- a/lib/librte_eal/bsdapp/eal/rte_eal_version.map
> +++ b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
> @@ -237,3 +237,11 @@ EXPERIMENTAL {
>  	rte_service_unregister;
>  
>  } DPDK_17.08;
> +
> +EXPERIMENTAL {
> +	global:
> +
> +	rte_eal_primary_secondary_add_action;
> +	rte_eal_primary_secondary_del_action;
> +	rte_eal_primary_secondary_sendmsg;
> +} DPDK_17.11;
> diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
> index 60526ca..fa316bf 100644
> --- a/lib/librte_eal/common/eal_common_proc.c
> +++ b/lib/librte_eal/common/eal_common_proc.c
> @@ -33,8 +33,20 @@
>  #include <stdio.h>
>  #include <fcntl.h>
>  #include <stdlib.h>
> +#include <sys/types.h>
> +#include <sys/socket.h>
> +#include <sys/epoll.h>
> +#include <limits.h>
> +#include <unistd.h>
> +#include <sys/un.h>
> +#include <errno.h>
> +#include <pthread.h>
> +
> +#include <rte_log.h>
>  #include <rte_eal.h>
> +#include <rte_lcore.h>
>  
> +#include "eal_private.h"
>  #include "eal_filesystem.h"
>  #include "eal_internal_cfg.h"
>  
> @@ -59,3 +71,445 @@ rte_eal_primary_proc_alive(const char *config_file_path)
>  
>  	return !!ret;
>  }
> +
> +struct action_entry {
> +	TAILQ_ENTRY(action_entry) next;      /**< Next attached action entry*/
> +
> +#define MAX_ACTION_NAME_LEN	64
> +	char action_name[MAX_ACTION_NAME_LEN];
> +	rte_eal_primary_secondary_t *action;
> +};
> +
> +/** Double linked list of actions. */
> +TAILQ_HEAD(action_entry_list, action_entry);
> +
> +static struct action_entry_list action_entry_list =
> +	TAILQ_HEAD_INITIALIZER(action_entry_list);
> +
> +static struct action_entry *
> +find_action_entry_by_name(const char *name)
> +{
> +	int len = strlen(name);
> +	struct action_entry *entry;
> +
> +	TAILQ_FOREACH(entry, &action_entry_list, next) {
> +		if (strncmp(entry->action_name, name, len) == 0)
> +				break;
> +	}
> +
> +	return entry;
> +}
> +
> +int
> +rte_eal_primary_secondary_add_action(const char *action_name,
> +				     rte_eal_primary_secondary_t action)
> +{
> +	struct action_entry *entry = malloc(sizeof(struct action_entry));
> +
> +	if (entry == NULL)
> +		return -ENOMEM;
> +
> +	strncpy(entry->action_name, action_name, MAX_ACTION_NAME_LEN);
> +	entry->action = action;
> +	TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
> +	return 0;
> +}
> +
> +void
> +rte_eal_primary_secondary_del_action(const char *name)
> +{
> +	struct action_entry *entry = find_action_entry_by_name(name);
> +
> +	TAILQ_REMOVE(&action_entry_list, entry, next);
> +	free(entry);
> +}
> +
> +#define MAX_SECONDARY_PROCS	8
> +
> +static int efd_pri_sec; /* epoll fd for primary/secondary channel thread */
> +static int fd_listen;   /* unix listen socket by primary */
> +static int fd_to_pri;   /* only used by secondary process */
> +static int fds_to_sec[MAX_SECONDARY_PROCS];
> +
> +struct msg_hdr {
> +	char action_name[MAX_ACTION_NAME_LEN];
> +	int fds_num;
> +	char params[0];
> +} __rte_packed;
> +
> +static int
> +add_sec_proc(int fd)
> +{
> +	int i;
> +
> +	for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
> +		if (fds_to_sec[i] == -1)
> +			break;
> +
> +	if (i >= MAX_SECONDARY_PROCS)
> +		return -1;
> +
> +	fds_to_sec[i] = fd;
> +
> +	return i;
> +}
> +
> +static void
> +del_sec_proc(int fd)
> +{
> +	int i;
> +
> +	for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
> +		if (fds_to_sec[i] == fd) {
> +			fds_to_sec[i] = -1;
> +			break;
> +		}
> +	}
> +}
> +
> +static int
> +read_msg(int sockfd, char *buf, int buflen, int *fds, int fds_num)
> +{
> +	struct iovec iov;
> +	struct msghdr msgh;
> +	size_t fdsize = fds_num * sizeof(int);
> +	char control[CMSG_SPACE(fdsize)];
> +	struct cmsghdr *cmsg;
> +	int ret;
> +
> +	memset(&msgh, 0, sizeof(msgh));
> +	iov.iov_base = buf;
> +	iov.iov_len  = buflen;
> +
> +	msgh.msg_iov = &iov;
> +	msgh.msg_iovlen = 1;
> +	msgh.msg_control = control;
> +	msgh.msg_controllen = sizeof(control);
> +
> +	ret = recvmsg(sockfd, &msgh, 0);
> +	if (ret <= 0) {
> +		RTE_LOG(ERR, EAL, "recvmsg failed\n");
> +		return ret;
> +	}
> +
> +	if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) {
> +		RTE_LOG(ERR, EAL, "truncted msg\n");
> +		return -1;
> +	}
> +
> +	for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
> +		cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
> +		if ((cmsg->cmsg_level == SOL_SOCKET) &&
> +			(cmsg->cmsg_type == SCM_RIGHTS)) {
> +			memcpy(fds, CMSG_DATA(cmsg), fdsize);
> +			break;
> +		}
> +	}
> +
> +	return ret;
> +}
> +
> +static int
> +process_msg(int fd)
> +{
> +	int len;
> +	int params_len;
> +	char buf[1024];

The max length of message to receive is 1024 here, but the
senders don't know the limit. It's better to define a macro
for the max message length? 

> +	int fds[8]; /* accept at most 8 FDs per message */
> +	struct msg_hdr *hdr;
> +	struct action_entry *entry;
> +
> +	len = read_msg(fd, buf, 1024, fds, 8);
> +	if (len < 0) {
> +		RTE_LOG(ERR, EAL, "failed to read message: %s\n",
> +			strerror(errno));
> +		return -1;
> +	}

Why don't check if len is equal to 0?

> +
> +	hdr = (struct msg_hdr *) buf;
> +
> +	entry = find_action_entry_by_name(hdr->action_name);
> +	if (entry == NULL) {
> +		RTE_LOG(ERR, EAL, "cannot find action by: %s\n",
> +			hdr->action_name);
> +		return -1;
> +	}
> +
> +	params_len = len - sizeof(struct msg_hdr);
> +	return entry->action(hdr->params, params_len, fds, hdr->fds_num);
> +}
> +
> +static void *
> +thread_primary(__attribute__((unused)) void *arg)
> +{
> +	int fd;
> +	int i, n;
> +	struct epoll_event event;
> +	struct epoll_event *events;
> +
> +	event.events = EPOLLIN | EPOLLRDHUP;
> +	event.data.fd = fd_listen;
> +	if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_listen, &event) < 0) {
> +		RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n",
> +			strerror(errno));
> +		exit(EXIT_FAILURE);
> +	}
> +
> +	events = calloc(20, sizeof event);

A simple question: Why the max events number is 20?

> +
> +	while (1) {
> +		n = epoll_wait(efd_pri_sec, events, 20, -1);
> +		for (i = 0; i < n; i++) {
> +			if (events[i].data.fd == fd_listen) {
> +				if (events[i].events != EPOLLIN) {
> +					RTE_LOG(ERR, EAL, "what happens?\n");
> +					exit(EXIT_FAILURE);
> +				}
> +
> +				fd = accept(fd_listen, NULL, NULL);
> +				if (fd < 0) {
> +					RTE_LOG(ERR, EAL, "primary failed to accept: %s\n",

This line is beyond 80 characters.

> +						strerror(errno));
> +					continue;
> +				}
> +
> +				event.data.fd = fd;
> +				if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd, &event) < 0) {
> +					RTE_LOG(ERR, EAL, "failed to add secondary: %s\n",

These two lines are beyond 80 characters.

> +						strerror(errno));
> +					continue;
> +				}
> +				if (add_sec_proc(fd) < 0)
> +					RTE_LOG(ERR, EAL, "too many secondary processes\n");

This line is beyond 80 characters.

> +
> +				continue;
> +			}
> +
> +			fd = events[i].data.fd;
> +
> +			if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
> +				RTE_LOG(ERR, EAL,
> +					"secondary process exit: %d\n", fd);
> +				epoll_ctl(efd_pri_sec, EPOLL_CTL_DEL, fd, NULL);
> +				del_sec_proc(fd);

Need close(fd) here?

> +				continue;
> +			}
> +
> +			if ((events[i].events & EPOLLIN)) {
> +				RTE_LOG(INFO, EAL,
> +					"recv msg from secondary process\n");
> +
> +				process_msg(fd);
> +			}
> +		}
> +	}
> +
> +	return NULL;
> +}
> +
> +static void *
> +thread_secondary(__attribute__((unused)) void *arg)
> +{
> +	int fd;
> +	int i, n;
> +	struct epoll_event event;
> +	struct epoll_event *events;
> +
> +	event.events = EPOLLIN | EPOLLRDHUP;
> +	event.data.fd = fd_to_pri;
> +	if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_to_pri, &event) < 0) {
> +		RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n", strerror(errno));
> +		exit(EXIT_FAILURE);
> +	}
> +
> +	events = calloc(20, sizeof event);
> +
> +	while (1) {
> +		n = epoll_wait(efd_pri_sec, events, 20, -1);
> +		for (i = 0; i < n; i++) {
> +
> +			fd = events[i].data.fd;
> +
> +			if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
> +				RTE_LOG(ERR, EAL, "primary exits, so do I\n");
> +				/* Do we need exit secondary when primary exits? */

Need close(fd) here?

> +				exit(EXIT_FAILURE);
> +			}
> +
> +			if ((events[i].events & EPOLLIN)) {
> +				RTE_LOG(INFO, EAL,
> +					"recv msg from primary process\n");
> +				process_msg(fd);
> +			}
> +		}
> +	}
> +
> +	return NULL;
> +}
> +
> +int
> +rte_eal_primary_secondary_channel_init(void)
> +{
> +	int i, fd, ret;
> +	const char *path;
> +	struct sockaddr_un un;
> +	pthread_t tid;
> +	void*(*fn)(void *);
> +	char thread_name[RTE_MAX_THREAD_NAME_LEN];
> +
> +	efd_pri_sec = epoll_create1(0);
> +	if (efd_pri_sec < 0) {
> +		RTE_LOG(ERR, EAL, "epoll_create1 failed\n");
> +		return -1;
> +	}
> +
> +	fd = socket(AF_UNIX, SOCK_STREAM, 0);
> +	if (fd < 0) {
> +		RTE_LOG(ERR, EAL, "Failed to create unix socket");
> +		return -1;
> +	}
> +
> +	memset(&un, 0, sizeof(un));
> +	un.sun_family = AF_UNIX;
> +	path = eal_primary_secondary_unix_path();
> +	strncpy(un.sun_path, path, sizeof(un.sun_path));
> +	un.sun_path[sizeof(un.sun_path) - 1] = '\0';
> +
> +	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
> +
> +		for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
> +			fds_to_sec[i] = -1;
> +
> +		if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) {
> +			RTE_LOG(ERR, EAL, "cannot set nonblocking mode\n");
> +			close(fd);
> +			return -1;
> +		}
> +
> +		/* The file still exists since last run */
> +		unlink(path);
> +
> +		ret = bind(fd, (struct sockaddr *)&un, sizeof(un));
> +		if (ret < 0) {
> +			RTE_LOG(ERR, EAL, "failed to bind to %s: %s\n",
> +				path, strerror(errno));
> +			close(fd);
> +			return -1;
> +		}
> +		RTE_LOG(INFO, EAL, "primary bind to %s\n", path);
> +
> +		ret = listen(fd, 1024);
> +		if (ret < 0) {
> +			RTE_LOG(ERR, EAL, "failed to listen: %s", strerror(errno));
> +			close(fd);
> +			return -1;
> +		}
> +
> +		fn = thread_primary;
> +		fd_listen = fd;
> +	} else {
> +		ret = connect(fd, (struct sockaddr *)&un, sizeof(un));
> +		if (ret < 0) {
> +			RTE_LOG(ERR, EAL, "failed to connect primary\n");
> +			return -1;
> +		}
> +		fn = thread_secondary;
> +		fd_to_pri = fd;
> +	}
> +
> +	ret = pthread_create(&tid, NULL, fn, NULL);
> +	if (ret < 0) {
> +		RTE_LOG(ERR, EAL, "failed to create thead: %s\n",
> +			strerror(errno));
> +		close(fd);
> +		close(efd_pri_sec);
> +		return -1;
> +	}
> +
> +	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
> +		 "ps_channel");
> +	ret = rte_thread_setname(tid, thread_name);
> +	if (ret < 0) {
> +		RTE_LOG(ERR, EAL, "failed to set thead name\n");
> +		close(fd);
> +		close(efd_pri_sec);
> +		return -1;
> +	}
> +
> +	return 0;
> +}
> +
> +static int
> +send_msg(int fd, struct msghdr *p_msgh)
> +{
> +	int ret;
> +
> +	do {
> +		ret = sendmsg(fd, p_msgh, 0);
> +	} while (ret < 0 && errno == EINTR);
> +
> +	return ret;
> +}
> +
> +int
> +rte_eal_primary_secondary_sendmsg(const char *action_name,
> +				  const void *params,
> +				  int len_params,
> +				  int fds[],
> +				  int fds_num)
> +{
> +	int i;
> +	int ret = 0;
> +	struct msghdr msgh;
> +	struct iovec iov;
> +	size_t fd_size = fds_num * sizeof(int);
> +	char control[CMSG_SPACE(fd_size)];
> +	struct cmsghdr *cmsg;
> +	struct msg_hdr *msg;
> +	int len_msg;
> +
> +	len_msg = sizeof(struct msg_hdr) + len_params;
> +	msg = malloc(len_msg);
> +	if (!msg) {
> +		RTE_LOG(ERR, EAL, "Cannot alloc memory for msg");
> +		return -ENOMEM;
> +	}
> +	memset(msg, 0, len_msg);
> +	strcpy(msg->action_name, action_name);
> +	msg->fds_num = fds_num;
> +	memcpy(msg->params, params, len_params);
> +
> +	memset(&msgh, 0, sizeof(msgh));
> +	memset(control, 0, sizeof(control));
> +
> +	iov.iov_base = (uint8_t *)msg;
> +	iov.iov_len = len_msg;
> +
> +	msgh.msg_iov = &iov;
> +	msgh.msg_iovlen = 1;
> +	msgh.msg_control = control;
> +	msgh.msg_controllen = sizeof(control);
> +
> +	cmsg = CMSG_FIRSTHDR(&msgh);
> +	cmsg->cmsg_len = CMSG_LEN(fd_size);
> +	cmsg->cmsg_level = SOL_SOCKET;
> +	cmsg->cmsg_type = SCM_RIGHTS;
> +	memcpy(CMSG_DATA(cmsg), fds, fd_size);
> +
> +	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
> +		for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
> +			if (fds_to_sec[i] == -1)
> +				continue;
> +
> +			ret = send_msg(fds_to_sec[i], &msgh);
> +			if (ret < 0)
> +				break;
> +		}
> +	} else {
> +		ret = send_msg(fd_to_pri, &msgh);
> +	}
> +
> +	free(msg);
> +
> +	return ret;
> +}
> diff --git a/lib/librte_eal/common/eal_filesystem.h b/lib/librte_eal/common/eal_filesystem.h
> index 8acbd99..78bb4fb 100644
> --- a/lib/librte_eal/common/eal_filesystem.h
> +++ b/lib/librte_eal/common/eal_filesystem.h
> @@ -67,6 +67,24 @@ eal_runtime_config_path(void)
>  	return buffer;
>  }
>  
> +/** Path of primary/secondary communication unix socket file. */
> +#define PRIMARY_SECONDARY_UNIX_PATH_FMT "%s/.%s_unix"
> +static inline const char *
> +eal_primary_secondary_unix_path(void)
> +{
> +	static char buffer[PATH_MAX]; /* static so auto-zeroed */
> +	const char *directory = default_config_dir;
> +	const char *home_dir = getenv("HOME");
> +
> +	if (getuid() != 0 && home_dir != NULL)
> +		directory = home_dir;
> +	snprintf(buffer, sizeof(buffer) - 1, PRIMARY_SECONDARY_UNIX_PATH_FMT,
> +		 directory, internal_config.hugefile_prefix);
> +
> +	return buffer;
> +
> +}
> +
>  /** Path of hugepage info file. */
>  #define HUGEPAGE_INFO_FMT "%s/.%s_hugepage_info"
>  
> diff --git a/lib/librte_eal/common/eal_private.h b/lib/librte_eal/common/eal_private.h
> index 597d82e..719b160 100644
> --- a/lib/librte_eal/common/eal_private.h
> +++ b/lib/librte_eal/common/eal_private.h
> @@ -355,4 +355,14 @@ bool rte_eal_using_phys_addrs(void);
>   */
>  struct rte_bus *rte_bus_find_by_device_name(const char *str);
>  
> +/**
> + * Create the unix channel for primary/secondary communication.
> + *
> + * @return
> + *   0 on success;
> + *   (<0) on failure.
> + */
> +
> +int rte_eal_primary_secondary_channel_init(void);
> +
>  #endif /* _EAL_PRIVATE_H_ */
> diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
> index 0e7363d..6cfc9d6 100644
> --- a/lib/librte_eal/common/include/rte_eal.h
> +++ b/lib/librte_eal/common/include/rte_eal.h
> @@ -210,6 +210,80 @@ int rte_eal_init(int argc, char **argv);
>  int rte_eal_primary_proc_alive(const char *config_file_path);
>  
>  /**
> + * Action function typedef used by other components.
> + *
> + * As we create unix socket channel for primary/secondary communication, use
> + * this function typedef to register action for coming messages.
> + */
> +typedef int (rte_eal_primary_secondary_t)(const char *params,
> +					  int len,
> +					  int fds[],
> +					  int fds_num);
> +/**
> + * Register an action function for primary/secondary communication.
> + *
> + * Call this function to register an action, if the calling component wants
> + * to response the messages from the corresponding component in its primary
> + * process or secondary processes.
> + *
> + * @param action_name
> + *   The action_name argument plays as the nonredundant key to find the action.
> + *
> + * @param action
> + *   The action argument is the function pointer to the action function.
> + *
> + * @return
> + *  - 0 on success.
> + *  - (<0) on failure.
> + */
> +int rte_eal_primary_secondary_add_action(const char *action_name,
> +					 rte_eal_primary_secondary_t action);
> +/**
> + * Unregister an action function for primary/secondary communication.
> + *
> + * Call this function to unregister an action  if the calling component does
> + * not want to response the messages from the corresponding component in its
> + * primary process or secondary processes.
> + *
> + * @param action_name
> + *   The action_name argument plays as the nonredundant key to find the action.
> + *
> + */
> +void rte_eal_primary_secondary_del_action(const char *name);
> +
> +/**
> + * Send a message to the primary process or the secondary processes.
> + *
> + * This function will send a message which will be responsed by the action
> + * identified by action_name of the process on the other side.
> + *
> + * @param action_name
> + *   The action_name argument is used to identify which action will be used.
> + *
> + * @param params
> + *   The params argument contains the customized message.
> + *
> + * @param len_params
> + *   The len_params argument is the length of the customized message.
> + *
> + * @param fds
> + *   The fds argument is an array of fds sent with sendmsg.
> + *
> + * @param fds_num
> + *   The fds_num argument is number of fds to be sent with sendmsg.
> + *
> + * @return
> + *  - (>=0) on success.
> + *  - (<0) on failure.
> + */
> +int
> +rte_eal_primary_secondary_sendmsg(const char *action_name,
> +				  const void *params,
> +				  int len_params,
> +				  int fds[],
> +				  int fds_num);
> +
> +/**
>   * Usage function typedef used by the application usage function.
>   *
>   * Use this function typedef to define and call rte_set_applcation_usage_hook()
> diff --git a/lib/librte_eal/linuxapp/eal/eal.c b/lib/librte_eal/linuxapp/eal/eal.c
> index 48f12f4..237c0b1 100644
> --- a/lib/librte_eal/linuxapp/eal/eal.c
> +++ b/lib/librte_eal/linuxapp/eal/eal.c
> @@ -873,6 +873,12 @@ rte_eal_init(int argc, char **argv)
>  
>  	eal_check_mem_on_local_socket();
>  
> +	if (rte_eal_primary_secondary_channel_init() < 0) {
> +		rte_eal_init_alert("Cannot create unix channel.");
> +		rte_errno = EFAULT;
> +		return -1;
> +	}
> +
>  	if (eal_plugins_init() < 0)
>  		rte_eal_init_alert("Cannot init plugins\n");
>  
> diff --git a/lib/librte_eal/linuxapp/eal/rte_eal_version.map b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
> index 3a8f154..c618aec 100644
> --- a/lib/librte_eal/linuxapp/eal/rte_eal_version.map
> +++ b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
> @@ -242,3 +242,11 @@ EXPERIMENTAL {
>  	rte_service_unregister;
>  
>  } DPDK_17.08;
> +
> +EXPERIMENTAL {
> +	global:
> +
> +	rte_eal_primary_secondary_add_action;
> +	rte_eal_primary_secondary_del_action;
> +	rte_eal_primary_secondary_sendmsg;
> +} DPDK_17.11;
> -- 
> 2.7.4
  
Jianfeng Tan Sept. 21, 2017, 6:11 a.m. UTC | #3
Hi Jiayu,


On 9/18/2017 9:49 PM, Jiayu Hu wrote:
> Hi Jianfeng,
>
>
> On Fri, Aug 25, 2017 at 09:40:46AM +0000, Jianfeng Tan wrote:
>> Previouly, there is only one way for primary/secondary to exchange
>> messages, that is, primary process writes info into some predefind
>> file, and secondary process reads info out. That cannot address
>> the requirements:
>>    a. Secondary wants to send info to primary.
>>    b. Sending info at any time, instead of just initialization time.
>>    c. Share FD with the other side.
> If you can explain more about why the above three characters are required
> for enabling vdev in the secondary process here, that would be better. For
> example, vdev may hot plugin or remove, so the primary and the secondary
> process need to exchange data bidirectionally and dynamically.

OK, I'll exemplify each item with a case.

>
>> This patch proposes to create a communication channel (as an unix
>> socket connection) for above requirements.
> Can you give more explainations about how the channel works? Like both
> the primary and the secondary register actions for specific messages, and
> another thread is created to listen and react incoming messages.

I suppose for users/developers who want to use it, below description 
about how to use related APIs is enough. As for how the channel is 
created, i'll try to describe more here.

>
>> Three new APIs are added:
>>
>>    1. rte_eal_primary_secondary_add_action is used to register an action,
>> if the calling component wants to response the messages from the
>> corresponding component in its primary process or secondary processes.
>>    2. rte_eal_primary_secondary_del_action is used to unregister the
>> action if the calling component does not want to response the messages.
>>    3. rte_eal_primary_secondary_sendmsg is used to send a message.
>>
>> Signed-off-by: Jianfeng Tan <jianfeng.tan@intel.com>
>> ---
>>   lib/librte_eal/bsdapp/eal/rte_eal_version.map   |   8 +
>>   lib/librte_eal/common/eal_common_proc.c         | 454 ++++++++++++++++++++++++
>>   lib/librte_eal/common/eal_filesystem.h          |  18 +
>>   lib/librte_eal/common/eal_private.h             |  10 +
>>   lib/librte_eal/common/include/rte_eal.h         |  74 ++++
>>   lib/librte_eal/linuxapp/eal/eal.c               |   6 +
>>   lib/librte_eal/linuxapp/eal/rte_eal_version.map |   8 +
>>   7 files changed, 578 insertions(+)
...
>> +
>> +int
>> +rte_eal_primary_secondary_add_action(const char *action_name,
>> +				     rte_eal_primary_secondary_t action)
>> +{
>> +	struct action_entry *entry = malloc(sizeof(struct action_entry));
>> +
>> +	if (entry == NULL)
>> +		return -ENOMEM;
>> +
>> +	strncpy(entry->action_name, action_name, MAX_ACTION_NAME_LEN);
>> +	entry->action = action;
> In struct action_entry, the type of action is 'rte_eal_primary_secondary_t *',
> but you assign an object to action here.

Nice catch!

>
>> +	TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
> What would happen if register two actions for a same message name?

Hmm, yes, let's return error if there's an existing one for that name.

>
>> +	return 0;
>> +}
>> +
>> +void
>> +rte_eal_primary_secondary_del_action(const char *name)
>> +{
>> +	struct action_entry *entry = find_action_entry_by_name(name);
>> +
>> +	TAILQ_REMOVE(&action_entry_list, entry, next);
>> +	free(entry);
>> +}
>> +
>> +#define MAX_SECONDARY_PROCS	8
> A simple question: why the max number is 8?

Just a hard-coded value.

Thanks,
Jianfeng
  
Jianfeng Tan Sept. 21, 2017, 6:53 a.m. UTC | #4
Hi Jiayu,


On 9/20/2017 11:00 AM, Jiayu Hu wrote:
> Hi Jianfeng,
>
> Few questions are inline.
>
> Thanks,
> Jiayu
>
> On Fri, Aug 25, 2017 at 09:40:46AM +0000, Jianfeng Tan wrote:
>> Previouly, there is only one way for primary/secondary to exchange
>> messages, that is, primary process writes info into some predefind
>> file, and secondary process reads info out. That cannot address
>> the requirements:
>>    a. Secondary wants to send info to primary.
>>    b. Sending info at any time, instead of just initialization time.
>>    c. Share FD with the other side.
>>
>> This patch proposes to create a communication channel (as an unix
>> socket connection) for above requirements.
>>
>> Three new APIs are added:
>>
>>    1. rte_eal_primary_secondary_add_action is used to register an action,
>> if the calling component wants to response the messages from the
>> corresponding component in its primary process or secondary processes.
>>    2. rte_eal_primary_secondary_del_action is used to unregister the
>> action if the calling component does not want to response the messages.
>>    3. rte_eal_primary_secondary_sendmsg is used to send a message.
>>
>> Signed-off-by: Jianfeng Tan <jianfeng.tan@intel.com>
>> ---
>>   lib/librte_eal/bsdapp/eal/rte_eal_version.map   |   8 +
>>   lib/librte_eal/common/eal_common_proc.c         | 454 ++++++++++++++++++++++++
>>   lib/librte_eal/common/eal_filesystem.h          |  18 +
>>   lib/librte_eal/common/eal_private.h             |  10 +
>>   lib/librte_eal/common/include/rte_eal.h         |  74 ++++
>>   lib/librte_eal/linuxapp/eal/eal.c               |   6 +
>>   lib/librte_eal/linuxapp/eal/rte_eal_version.map |   8 +
>>   7 files changed, 578 insertions(+)
>>
>> diff --git a/lib/librte_eal/bsdapp/eal/rte_eal_version.map b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
>> index aac6fd7..f4ff29f 100644
>> --- a/lib/librte_eal/bsdapp/eal/rte_eal_version.map
>> +++ b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
>> @@ -237,3 +237,11 @@ EXPERIMENTAL {
>>   	rte_service_unregister;
>>   
>>   } DPDK_17.08;
>> +
>> +EXPERIMENTAL {
>> +	global:
>> +
>> +	rte_eal_primary_secondary_add_action;
>> +	rte_eal_primary_secondary_del_action;
>> +	rte_eal_primary_secondary_sendmsg;
>> +} DPDK_17.11;
>> diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
>> index 60526ca..fa316bf 100644
>> --- a/lib/librte_eal/common/eal_common_proc.c
>> +++ b/lib/librte_eal/common/eal_common_proc.c
>> @@ -33,8 +33,20 @@
>>   #include <stdio.h>
>>   #include <fcntl.h>
>>   #include <stdlib.h>
>> +#include <sys/types.h>
>> +#include <sys/socket.h>
>> +#include <sys/epoll.h>
>> +#include <limits.h>
>> +#include <unistd.h>
>> +#include <sys/un.h>
>> +#include <errno.h>
>> +#include <pthread.h>
>> +
>> +#include <rte_log.h>
>>   #include <rte_eal.h>
>> +#include <rte_lcore.h>
>>   
>> +#include "eal_private.h"
>>   #include "eal_filesystem.h"
>>   #include "eal_internal_cfg.h"
>>   
>> @@ -59,3 +71,445 @@ rte_eal_primary_proc_alive(const char *config_file_path)
>>   
>>   	return !!ret;
>>   }
>> +
>> +struct action_entry {
>> +	TAILQ_ENTRY(action_entry) next;      /**< Next attached action entry*/
>> +
>> +#define MAX_ACTION_NAME_LEN	64
>> +	char action_name[MAX_ACTION_NAME_LEN];
>> +	rte_eal_primary_secondary_t *action;
>> +};
>> +
>> +/** Double linked list of actions. */
>> +TAILQ_HEAD(action_entry_list, action_entry);
>> +
>> +static struct action_entry_list action_entry_list =
>> +	TAILQ_HEAD_INITIALIZER(action_entry_list);
>> +
>> +static struct action_entry *
>> +find_action_entry_by_name(const char *name)
>> +{
>> +	int len = strlen(name);
>> +	struct action_entry *entry;
>> +
>> +	TAILQ_FOREACH(entry, &action_entry_list, next) {
>> +		if (strncmp(entry->action_name, name, len) == 0)
>> +				break;
>> +	}
>> +
>> +	return entry;
>> +}
>> +
>> +int
>> +rte_eal_primary_secondary_add_action(const char *action_name,
>> +				     rte_eal_primary_secondary_t action)
>> +{
>> +	struct action_entry *entry = malloc(sizeof(struct action_entry));
>> +
>> +	if (entry == NULL)
>> +		return -ENOMEM;
>> +
>> +	strncpy(entry->action_name, action_name, MAX_ACTION_NAME_LEN);
>> +	entry->action = action;
>> +	TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
>> +	return 0;
>> +}
>> +
>> +void
>> +rte_eal_primary_secondary_del_action(const char *name)
>> +{
>> +	struct action_entry *entry = find_action_entry_by_name(name);
>> +
>> +	TAILQ_REMOVE(&action_entry_list, entry, next);
>> +	free(entry);
>> +}
>> +
>> +#define MAX_SECONDARY_PROCS	8
>> +
>> +static int efd_pri_sec; /* epoll fd for primary/secondary channel thread */
>> +static int fd_listen;   /* unix listen socket by primary */
>> +static int fd_to_pri;   /* only used by secondary process */
>> +static int fds_to_sec[MAX_SECONDARY_PROCS];
>> +
>> +struct msg_hdr {
>> +	char action_name[MAX_ACTION_NAME_LEN];
>> +	int fds_num;
>> +	char params[0];
>> +} __rte_packed;
>> +
>> +static int
>> +add_sec_proc(int fd)
>> +{
>> +	int i;
>> +
>> +	for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
>> +		if (fds_to_sec[i] == -1)
>> +			break;
>> +
>> +	if (i >= MAX_SECONDARY_PROCS)
>> +		return -1;
>> +
>> +	fds_to_sec[i] = fd;
>> +
>> +	return i;
>> +}
>> +
>> +static void
>> +del_sec_proc(int fd)
>> +{
>> +	int i;
>> +
>> +	for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
>> +		if (fds_to_sec[i] == fd) {
>> +			fds_to_sec[i] = -1;
>> +			break;
>> +		}
>> +	}
>> +}
>> +
>> +static int
>> +read_msg(int sockfd, char *buf, int buflen, int *fds, int fds_num)
>> +{
>> +	struct iovec iov;
>> +	struct msghdr msgh;
>> +	size_t fdsize = fds_num * sizeof(int);
>> +	char control[CMSG_SPACE(fdsize)];
>> +	struct cmsghdr *cmsg;
>> +	int ret;
>> +
>> +	memset(&msgh, 0, sizeof(msgh));
>> +	iov.iov_base = buf;
>> +	iov.iov_len  = buflen;
>> +
>> +	msgh.msg_iov = &iov;
>> +	msgh.msg_iovlen = 1;
>> +	msgh.msg_control = control;
>> +	msgh.msg_controllen = sizeof(control);
>> +
>> +	ret = recvmsg(sockfd, &msgh, 0);
>> +	if (ret <= 0) {
>> +		RTE_LOG(ERR, EAL, "recvmsg failed\n");
>> +		return ret;
>> +	}
>> +
>> +	if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) {
>> +		RTE_LOG(ERR, EAL, "truncted msg\n");
>> +		return -1;
>> +	}
>> +
>> +	for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
>> +		cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
>> +		if ((cmsg->cmsg_level == SOL_SOCKET) &&
>> +			(cmsg->cmsg_type == SCM_RIGHTS)) {
>> +			memcpy(fds, CMSG_DATA(cmsg), fdsize);
>> +			break;
>> +		}
>> +	}
>> +
>> +	return ret;
>> +}
>> +
>> +static int
>> +process_msg(int fd)
>> +{
>> +	int len;
>> +	int params_len;
>> +	char buf[1024];
> The max length of message to receive is 1024 here, but the
> senders don't know the limit. It's better to define a macro
> for the max message length?

OK, let's make it a macro, and check the length when sending messages.

>
>> +	int fds[8]; /* accept at most 8 FDs per message */
>> +	struct msg_hdr *hdr;
>> +	struct action_entry *entry;
>> +
>> +	len = read_msg(fd, buf, 1024, fds, 8);
>> +	if (len < 0) {
>> +		RTE_LOG(ERR, EAL, "failed to read message: %s\n",
>> +			strerror(errno));
>> +		return -1;
>> +	}
> Why don't check if len is equal to 0?

Nice catch!

>
>> +
>> +	hdr = (struct msg_hdr *) buf;
>> +
>> +	entry = find_action_entry_by_name(hdr->action_name);
>> +	if (entry == NULL) {
>> +		RTE_LOG(ERR, EAL, "cannot find action by: %s\n",
>> +			hdr->action_name);
>> +		return -1;
>> +	}
>> +
>> +	params_len = len - sizeof(struct msg_hdr);
>> +	return entry->action(hdr->params, params_len, fds, hdr->fds_num);
>> +}
>> +
>> +static void *
>> +thread_primary(__attribute__((unused)) void *arg)
>> +{
>> +	int fd;
>> +	int i, n;
>> +	struct epoll_event event;
>> +	struct epoll_event *events;
>> +
>> +	event.events = EPOLLIN | EPOLLRDHUP;
>> +	event.data.fd = fd_listen;
>> +	if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_listen, &event) < 0) {
>> +		RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n",
>> +			strerror(errno));
>> +		exit(EXIT_FAILURE);
>> +	}
>> +
>> +	events = calloc(20, sizeof event);
> A simple question: Why the max events number is 20?

Another hard-coded value, this value only decides how many events can be 
process for each iteration, other events will be kept in kernel for 
another iteration.

>
>> +
>> +	while (1) {
>> +		n = epoll_wait(efd_pri_sec, events, 20, -1);
>> +		for (i = 0; i < n; i++) {
>> +			if (events[i].data.fd == fd_listen) {
>> +				if (events[i].events != EPOLLIN) {
>> +					RTE_LOG(ERR, EAL, "what happens?\n");
>> +					exit(EXIT_FAILURE);
>> +				}
>> +
>> +				fd = accept(fd_listen, NULL, NULL);
>> +				if (fd < 0) {
>> +					RTE_LOG(ERR, EAL, "primary failed to accept: %s\n",
> This line is beyond 80 characters.

Will fix it.
[...]
>> +
>> +				continue;
>> +			}
>> +
>> +			fd = events[i].data.fd;
>> +
>> +			if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
>> +				RTE_LOG(ERR, EAL,
>> +					"secondary process exit: %d\n", fd);
>> +				epoll_ctl(efd_pri_sec, EPOLL_CTL_DEL, fd, NULL);
>> +				del_sec_proc(fd);
> Need close(fd) here?

Nice catch.

>
>> +				continue;
>> +			}
>> +
>> +			if ((events[i].events & EPOLLIN)) {
>> +				RTE_LOG(INFO, EAL,
>> +					"recv msg from secondary process\n");
>> +
>> +				process_msg(fd);
>> +			}
>> +		}
>> +	}
>> +
>> +	return NULL;
>> +}
>> +
>> +static void *
>> +thread_secondary(__attribute__((unused)) void *arg)
>> +{
>> +	int fd;
>> +	int i, n;
>> +	struct epoll_event event;
>> +	struct epoll_event *events;
>> +
>> +	event.events = EPOLLIN | EPOLLRDHUP;
>> +	event.data.fd = fd_to_pri;
>> +	if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_to_pri, &event) < 0) {
>> +		RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n", strerror(errno));
>> +		exit(EXIT_FAILURE);
>> +	}
>> +
>> +	events = calloc(20, sizeof event);
>> +
>> +	while (1) {
>> +		n = epoll_wait(efd_pri_sec, events, 20, -1);
>> +		for (i = 0; i < n; i++) {
>> +
>> +			fd = events[i].data.fd;
>> +
>> +			if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
>> +				RTE_LOG(ERR, EAL, "primary exits, so do I\n");
>> +				/* Do we need exit secondary when primary exits? */
> Need close(fd) here?

We will exit here, no need to close().

Thanks,
Jianfeng
  
Yuanhan Liu Sept. 27, 2017, 12:19 p.m. UTC | #5
On Fri, Aug 25, 2017 at 09:40:46AM +0000, Jianfeng Tan wrote:
> Previouly, there is only one way for primary/secondary to exchange
> messages, that is, primary process writes info into some predefind
> file, and secondary process reads info out. That cannot address
> the requirements:
>   a. Secondary wants to send info to primary.
>   b. Sending info at any time, instead of just initialization time.
>   c. Share FD with the other side.
> 
> This patch proposes to create a communication channel (as an unix
> socket connection) for above requirements.

Firstly, I think it's a good idea to have such generic interfaces for
multiple process.

> Three new APIs are added:
> 
>   1. rte_eal_primary_secondary_add_action is used to register an action,

As you have said, it's for registration, why use "add" verb here?
Normally, "register" implies one time action, while "add" means
it could be a repeat action.

Also, the function name is a bit long. Maybe something like
"rte_eal_mp_xxx" is shorter and better.

[...]
> +static struct action_entry *
> +find_action_entry_by_name(const char *name)
> +{
> +	int len = strlen(name);
> +	struct action_entry *entry;
> +
> +	TAILQ_FOREACH(entry, &action_entry_list, next) {
> +		if (strncmp(entry->action_name, name, len) == 0)
> +				break;

Broken indentation here.

> +	}
> +
> +	return entry;
> +}
> +
> +int
> +rte_eal_primary_secondary_add_action(const char *action_name,
> +				     rte_eal_primary_secondary_t action)
> +{
> +	struct action_entry *entry = malloc(sizeof(struct action_entry));
> +
> +	if (entry == NULL)
> +		return -ENOMEM;
> +
> +	strncpy(entry->action_name, action_name, MAX_ACTION_NAME_LEN);
> +	entry->action = action;
> +	TAILQ_INSERT_TAIL(&action_entry_list, entry, next);

Since you intended to support "one primary process and multiple secondary
process", here we need a lock to protect the list.

Another wonder is do we really need that, I mean 1:N model?

> +	return 0;
> +}
> +
> +void
> +rte_eal_primary_secondary_del_action(const char *name)
> +{
> +	struct action_entry *entry = find_action_entry_by_name(name);
> +
> +	TAILQ_REMOVE(&action_entry_list, entry, next);
> +	free(entry);
> +}
> +
> +#define MAX_SECONDARY_PROCS	8
> +
> +static int efd_pri_sec; /* epoll fd for primary/secondary channel thread */

I think it's not a good idea to use "pri". For me, "private" comes to
my mind firstly but not "primary".

> +static int fd_listen;   /* unix listen socket by primary */
> +static int fd_to_pri;   /* only used by secondary process */
> +static int fds_to_sec[MAX_SECONDARY_PROCS];

Too many vars. I'd suggest to use a struct here, which could also make
the naming a bit simpler. For instance,

struct mp_fds {
        int efd;

        union {
                /* fds for primary process */
                struct {
                        int listen;
			/* fds used to send msg to secondary process */
                        int secondaries[...];
                };

                /* fds for secondary process */
                struct {
			/* fds used to send msg to the primary process */
                        int primary;
                };
        };
};

It also separates the scope well. Note that above field naming does
not like perfect though. Feel free to come up with some better names.

[...]
> +static int
> +process_msg(int fd)
> +{
> +	int len;
> +	int params_len;
> +	char buf[1024];
> +	int fds[8]; /* accept at most 8 FDs per message */

Why it's 8? I think you are adding a vhost-user specific limitation to a
generic interface, which isn't quite right.

> +	struct msg_hdr *hdr;
> +	struct action_entry *entry;
> +
> +	len = read_msg(fd, buf, 1024, fds, 8);
> +	if (len < 0) {
> +		RTE_LOG(ERR, EAL, "failed to read message: %s\n",
> +			strerror(errno));
> +		return -1;
> +	}
> +
> +	hdr = (struct msg_hdr *) buf;
                                ^
An extra whitespace.

> +
> +	entry = find_action_entry_by_name(hdr->action_name);
> +	if (entry == NULL) {
> +		RTE_LOG(ERR, EAL, "cannot find action by: %s\n",
> +			hdr->action_name);
> +		return -1;
> +	}
> +
> +	params_len = len - sizeof(struct msg_hdr);
> +	return entry->action(hdr->params, params_len, fds, hdr->fds_num);
> +}
> +
> +static void *
> +thread_primary(__attribute__((unused)) void *arg)

Use __rte_unused instead, and put it after the var name.

> +{
> +	int fd;
> +	int i, n;
> +	struct epoll_event event;
> +	struct epoll_event *events;
> +
> +	event.events = EPOLLIN | EPOLLRDHUP;
> +	event.data.fd = fd_listen;
> +	if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_listen, &event) < 0) {
> +		RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n",
> +			strerror(errno));
> +		exit(EXIT_FAILURE);
> +	}
> +
> +	events = calloc(20, sizeof event);
> +
> +	while (1) {
> +		n = epoll_wait(efd_pri_sec, events, 20, -1);
> +		for (i = 0; i < n; i++) {
> +			if (events[i].data.fd == fd_listen) {
> +				if (events[i].events != EPOLLIN) {
> +					RTE_LOG(ERR, EAL, "what happens?\n");
> +					exit(EXIT_FAILURE);
> +				}
> +
> +				fd = accept(fd_listen, NULL, NULL);
> +				if (fd < 0) {
> +					RTE_LOG(ERR, EAL, "primary failed to accept: %s\n",
> +						strerror(errno));
> +					continue;
> +				}
> +
> +				event.data.fd = fd;
> +				if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd, &event) < 0) {
> +					RTE_LOG(ERR, EAL, "failed to add secondary: %s\n",
> +						strerror(errno));
> +					continue;
> +				}
> +				if (add_sec_proc(fd) < 0)
> +					RTE_LOG(ERR, EAL, "too many secondary processes\n");
> +
> +				continue;
> +			}
> +
> +			fd = events[i].data.fd;
> +
> +			if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
> +				RTE_LOG(ERR, EAL,
> +					"secondary process exit: %d\n", fd);
> +				epoll_ctl(efd_pri_sec, EPOLL_CTL_DEL, fd, NULL);
> +				del_sec_proc(fd);
> +				continue;
> +			}
> +
> +			if ((events[i].events & EPOLLIN)) {
> +				RTE_LOG(INFO, EAL,
> +					"recv msg from secondary process\n");
> +
> +				process_msg(fd);
> +			}
> +		}
> +	}

Too much redundant code. You are doing check twice and it could be
simplified.

> +
> +	return NULL;
> +}
> +
> +static void *
> +thread_secondary(__attribute__((unused)) void *arg)

I'm also wondering this one can be removed. I think we just need one
thread handling.

> +{
> +	int fd;
> +	int i, n;
> +	struct epoll_event event;
> +	struct epoll_event *events;
> +
> +	event.events = EPOLLIN | EPOLLRDHUP;
> +	event.data.fd = fd_to_pri;
> +	if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_to_pri, &event) < 0) {
> +		RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n", strerror(errno));
> +		exit(EXIT_FAILURE);
> +	}
> +
> +	events = calloc(20, sizeof event);
> +
> +	while (1) {
> +		n = epoll_wait(efd_pri_sec, events, 20, -1);
> +		for (i = 0; i < n; i++) {
> +
> +			fd = events[i].data.fd;
> +
> +			if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
> +				RTE_LOG(ERR, EAL, "primary exits, so do I\n");
> +				/* Do we need exit secondary when primary exits? */
> +				exit(EXIT_FAILURE);
> +			}
> +
> +			if ((events[i].events & EPOLLIN)) {
> +				RTE_LOG(INFO, EAL,
> +					"recv msg from primary process\n");
> +				process_msg(fd);
> +			}
> +		}
> +	}
> +
> +	return NULL;
> +}
> +
> +int
> +rte_eal_primary_secondary_channel_init(void)
> +{
> +	int i, fd, ret;
> +	const char *path;
> +	struct sockaddr_un un;
> +	pthread_t tid;
> +	void*(*fn)(void *);
> +	char thread_name[RTE_MAX_THREAD_NAME_LEN];
> +
> +	efd_pri_sec = epoll_create1(0);
> +	if (efd_pri_sec < 0) {
> +		RTE_LOG(ERR, EAL, "epoll_create1 failed\n");
> +		return -1;
> +	}
> +
> +	fd = socket(AF_UNIX, SOCK_STREAM, 0);
> +	if (fd < 0) {
> +		RTE_LOG(ERR, EAL, "Failed to create unix socket");
> +		return -1;
> +	}
> +
> +	memset(&un, 0, sizeof(un));
> +	un.sun_family = AF_UNIX;
> +	path = eal_primary_secondary_unix_path();
> +	strncpy(un.sun_path, path, sizeof(un.sun_path));
> +	un.sun_path[sizeof(un.sun_path) - 1] = '\0';
> +
> +	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
> +

Do not leave an extra whitespace line here.
> +		for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
> +			fds_to_sec[i] = -1;
> +
> +		if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) {
> +			RTE_LOG(ERR, EAL, "cannot set nonblocking mode\n");
> +			close(fd);
> +			return -1;
> +		}
> +
> +		/* The file still exists since last run */
> +		unlink(path);
> +
> +		ret = bind(fd, (struct sockaddr *)&un, sizeof(un));
> +		if (ret < 0) {
> +			RTE_LOG(ERR, EAL, "failed to bind to %s: %s\n",
> +				path, strerror(errno));
> +			close(fd);
> +			return -1;
> +		}
> +		RTE_LOG(INFO, EAL, "primary bind to %s\n", path);
> +
> +		ret = listen(fd, 1024);
> +		if (ret < 0) {
> +			RTE_LOG(ERR, EAL, "failed to listen: %s", strerror(errno));
> +			close(fd);
> +			return -1;
> +		}
> +
> +		fn = thread_primary;
> +		fd_listen = fd;
> +	} else {
> +		ret = connect(fd, (struct sockaddr *)&un, sizeof(un));
> +		if (ret < 0) {
> +			RTE_LOG(ERR, EAL, "failed to connect primary\n");
> +			return -1;
> +		}
> +		fn = thread_secondary;
> +		fd_to_pri = fd;
> +	}
> +
> +	ret = pthread_create(&tid, NULL, fn, NULL);
> +	if (ret < 0) {
> +		RTE_LOG(ERR, EAL, "failed to create thead: %s\n",
> +			strerror(errno));
> +		close(fd);
> +		close(efd_pri_sec);
> +		return -1;
> +	}
> +
> +	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
> +		 "ps_channel");

It may be not that easy to know what "ps" stands for. Maybe "rte_mp_xx"
is better. Naming it start with "rte_" reminds people easily that it's
a thread from DPDK.

[...]
> diff --git a/lib/librte_eal/common/eal_filesystem.h b/lib/librte_eal/common/eal_filesystem.h
> index 8acbd99..78bb4fb 100644
> --- a/lib/librte_eal/common/eal_filesystem.h
> +++ b/lib/librte_eal/common/eal_filesystem.h
> @@ -67,6 +67,24 @@ eal_runtime_config_path(void)
>  	return buffer;
>  }
>  
> +/** Path of primary/secondary communication unix socket file. */
> +#define PRIMARY_SECONDARY_UNIX_PATH_FMT "%s/.%s_unix"
> +static inline const char *
> +eal_primary_secondary_unix_path(void)
> +{
> +	static char buffer[PATH_MAX]; /* static so auto-zeroed */
> +	const char *directory = default_config_dir;
> +	const char *home_dir = getenv("HOME");

It's not a good practice to generate such file at HOME dir. User would
be surprised to find it at HOME dir. In the worst case, user might delete
it.

The more common way is to put it to tmp dir, like "/tmp".

> +
> +	if (getuid() != 0 && home_dir != NULL)
> +		directory = home_dir;
> +	snprintf(buffer, sizeof(buffer) - 1, PRIMARY_SECONDARY_UNIX_PATH_FMT,
> +		 directory, internal_config.hugefile_prefix);
> +
> +	return buffer;
> +
> +}
> +
[...]
> diff --git a/lib/librte_eal/linuxapp/eal/eal.c b/lib/librte_eal/linuxapp/eal/eal.c
> index 48f12f4..237c0b1 100644
> --- a/lib/librte_eal/linuxapp/eal/eal.c
> +++ b/lib/librte_eal/linuxapp/eal/eal.c
> @@ -873,6 +873,12 @@ rte_eal_init(int argc, char **argv)
>  
>  	eal_check_mem_on_local_socket();
>  
> +	if (rte_eal_primary_secondary_channel_init() < 0) {
> +		rte_eal_init_alert("Cannot create unix channel.");

The alert message doesn't quite match the function name. Actually, you
have already print the specific error inside that init function when it
fails. Thus, you could just say "failed to init mp channel" or something
like this.

	--yliu
  
Jianfeng Tan Sept. 28, 2017, 1:50 p.m. UTC | #6
Yuanhan,

Thank you for the detailed review! Most of your suggestions are very good and I'll fix them in next version.

> -----Original Message-----
> From: Yuanhan Liu [mailto:yliu@fridaylinux.org]
> Sent: Wednesday, September 27, 2017 8:20 PM
> To: Tan, Jianfeng
> Cc: dev@dpdk.org; Richardson, Bruce; Ananyev, Konstantin; De Lara Guarch,
> Pablo; thomas@monjalon.net; maxime.coquelin@redhat.com;
> mtetsuyah@gmail.com; Yigit, Ferruh
> Subject: Re: [PATCH 06/12] eal: add channel for primary/secondary
> communication
> 
[...]
> > +int
> > +rte_eal_primary_secondary_add_action(const char *action_name,
> > +				     rte_eal_primary_secondary_t action)
> > +{
> > +	struct action_entry *entry = malloc(sizeof(struct action_entry));
> > +
> > +	if (entry == NULL)
> > +		return -ENOMEM;
> > +
> > +	strncpy(entry->action_name, action_name,
> MAX_ACTION_NAME_LEN);
> > +	entry->action = action;
> > +	TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
> 
> Since you intended to support "one primary process and multiple secondary
> process", here we need a lock to protect the list.

Only one thread of each process (either primary or secondary) does the register. So I wonder we don't have to add lock? Of course, no harm to add a lock.

> 
> Another wonder is do we really need that, I mean 1:N model?

I'm open to suggestions. IMO, not much extra code for 1:N model than 1:1 model. So not necessary to restrict that.

> 
> > +	return 0;
> > +}
> > +
> > +void
> > +rte_eal_primary_secondary_del_action(const char *name)
> > +{
> > +	struct action_entry *entry = find_action_entry_by_name(name);
> > +
> > +	TAILQ_REMOVE(&action_entry_list, entry, next);
> > +	free(entry);
> > +}
> > +
> > +#define MAX_SECONDARY_PROCS	8
> > +
> > +static int efd_pri_sec; /* epoll fd for primary/secondary channel thread */
> 
> I think it's not a good idea to use "pri". For me, "private" comes to
> my mind firstly but not "primary".
> 
> > +static int fd_listen;   /* unix listen socket by primary */
> > +static int fd_to_pri;   /* only used by secondary process */
> > +static int fds_to_sec[MAX_SECONDARY_PROCS];
> 
> Too many vars. I'd suggest to use a struct here, which could also make
> the naming a bit simpler. For instance,
> 
> struct mp_fds {
>         int efd;
> 
>         union {
>                 /* fds for primary process */
>                 struct {
>                         int listen;
> 			/* fds used to send msg to secondary process */
>                         int secondaries[...];
>                 };
> 
>                 /* fds for secondary process */
>                 struct {
> 			/* fds used to send msg to the primary process */
>                         int primary;
>                 };
>         };
> };
> 
> It also separates the scope well. Note that above field naming does
> not like perfect though. Feel free to come up with some better names.

You can always make the code look so clean, thank you!

[...]

> > +/** Path of primary/secondary communication unix socket file. */
> > +#define PRIMARY_SECONDARY_UNIX_PATH_FMT "%s/.%s_unix"
> > +static inline const char *
> > +eal_primary_secondary_unix_path(void)
> > +{
> > +	static char buffer[PATH_MAX]; /* static so auto-zeroed */
> > +	const char *directory = default_config_dir;
> > +	const char *home_dir = getenv("HOME");
> 
> It's not a good practice to generate such file at HOME dir. User would
> be surprised to find it at HOME dir. In the worst case, user might delete
> it.

This way is the legacy way in DPDK, for example the config path. So I think we should fix that in another patch.

> 
> The more common way is to put it to tmp dir, like "/tmp".

Thanks,
Jianfeng
  
Yuanhan Liu Sept. 29, 2017, 1:24 a.m. UTC | #7
On Thu, Sep 28, 2017 at 01:50:20PM +0000, Tan, Jianfeng wrote:
> > > +int
> > > +rte_eal_primary_secondary_add_action(const char *action_name,
> > > +				     rte_eal_primary_secondary_t action)
> > > +{
> > > +	struct action_entry *entry = malloc(sizeof(struct action_entry));
> > > +
> > > +	if (entry == NULL)
> > > +		return -ENOMEM;
> > > +
> > > +	strncpy(entry->action_name, action_name,
> > MAX_ACTION_NAME_LEN);
> > > +	entry->action = action;
> > > +	TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
> > 
> > Since you intended to support "one primary process and multiple secondary
> > process", here we need a lock to protect the list.
> 
> Only one thread of each process (either primary or secondary) does the register. So I wonder we don't have to add lock? Of course, no harm to add a lock.

Right, I was wrong. I was thinking secondary processes could start at the
same time, that we need a lock here. But as you said, the list is process
specific: each process has it's own copy. No locked is needed then.

> > 
> > Another wonder is do we really need that, I mean 1:N model?
> 
> I'm open to suggestions. IMO, not much extra code for 1:N model than 1:1 model. So not necessary to restrict that.

Fair enough. I was just wondering; I have no objection though.

> > > +/** Path of primary/secondary communication unix socket file. */
> > > +#define PRIMARY_SECONDARY_UNIX_PATH_FMT "%s/.%s_unix"
> > > +static inline const char *
> > > +eal_primary_secondary_unix_path(void)
> > > +{
> > > +	static char buffer[PATH_MAX]; /* static so auto-zeroed */
> > > +	const char *directory = default_config_dir;
> > > +	const char *home_dir = getenv("HOME");
> > 
> > It's not a good practice to generate such file at HOME dir. User would
> > be surprised to find it at HOME dir. In the worst case, user might delete
> > it.
> 
> This way is the legacy way in DPDK, for example the config path. So I think we should fix that in another patch.

Yes, I think so.

	--yliu
> 
> > 
> > The more common way is to put it to tmp dir, like "/tmp".
> 
> Thanks,
> Jianfeng
  
Burakov, Anatoly Sept. 29, 2017, 10:09 a.m. UTC | #8
On 29-Sep-17 2:24 AM, Yuanhan Liu wrote:
> On Thu, Sep 28, 2017 at 01:50:20PM +0000, Tan, Jianfeng wrote:
>>>> +/** Path of primary/secondary communication unix socket file. */
>>>> +#define PRIMARY_SECONDARY_UNIX_PATH_FMT "%s/.%s_unix"
>>>> +static inline const char *
>>>> +eal_primary_secondary_unix_path(void)
>>>> +{
>>>> +	static char buffer[PATH_MAX]; /* static so auto-zeroed */
>>>> +	const char *directory = default_config_dir;
>>>> +	const char *home_dir = getenv("HOME");
>>>
>>> It's not a good practice to generate such file at HOME dir. User would
>>> be surprised to find it at HOME dir. In the worst case, user might delete
>>> it.
>>
>> This way is the legacy way in DPDK, for example the config path. So I think we should fix that in another patch.
> 
> Yes, I think so.
> 
> 	--yliu
>>
>>>
>>> The more common way is to put it to tmp dir, like "/tmp".
>>
>> Thanks,
>> Jianfeng
> 

The way VFIO does it is, if we have permissions, we put the socket file 
in /var/run (which i also think is a better place for a socket than 
/tmp). If we don't, we fall back to HOME.
  
Yuanhan Liu Sept. 29, 2017, 10:25 a.m. UTC | #9
On Fri, Sep 29, 2017 at 11:09:23AM +0100, Burakov, Anatoly wrote:
> On 29-Sep-17 2:24 AM, Yuanhan Liu wrote:
> >On Thu, Sep 28, 2017 at 01:50:20PM +0000, Tan, Jianfeng wrote:
> >>>>+/** Path of primary/secondary communication unix socket file. */
> >>>>+#define PRIMARY_SECONDARY_UNIX_PATH_FMT "%s/.%s_unix"
> >>>>+static inline const char *
> >>>>+eal_primary_secondary_unix_path(void)
> >>>>+{
> >>>>+	static char buffer[PATH_MAX]; /* static so auto-zeroed */
> >>>>+	const char *directory = default_config_dir;
> >>>>+	const char *home_dir = getenv("HOME");
> >>>
> >>>It's not a good practice to generate such file at HOME dir. User would
> >>>be surprised to find it at HOME dir. In the worst case, user might delete
> >>>it.
> >>
> >>This way is the legacy way in DPDK, for example the config path. So I think we should fix that in another patch.
> >
> >Yes, I think so.
> >
> >	--yliu
> >>
> >>>
> >>>The more common way is to put it to tmp dir, like "/tmp".
> >>
> >>Thanks,
> >>Jianfeng
> >
> 
> The way VFIO does it is, if we have permissions, we put the socket file in
> /var/run (which i also think is a better place for a socket than /tmp). If
> we don't, we fall back to HOME.

I have no objection with /var/run. But HOME, no.

	--yliu
  

Patch

diff --git a/lib/librte_eal/bsdapp/eal/rte_eal_version.map b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
index aac6fd7..f4ff29f 100644
--- a/lib/librte_eal/bsdapp/eal/rte_eal_version.map
+++ b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
@@ -237,3 +237,11 @@  EXPERIMENTAL {
 	rte_service_unregister;
 
 } DPDK_17.08;
+
+EXPERIMENTAL {
+	global:
+
+	rte_eal_primary_secondary_add_action;
+	rte_eal_primary_secondary_del_action;
+	rte_eal_primary_secondary_sendmsg;
+} DPDK_17.11;
diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index 60526ca..fa316bf 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -33,8 +33,20 @@ 
 #include <stdio.h>
 #include <fcntl.h>
 #include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/epoll.h>
+#include <limits.h>
+#include <unistd.h>
+#include <sys/un.h>
+#include <errno.h>
+#include <pthread.h>
+
+#include <rte_log.h>
 #include <rte_eal.h>
+#include <rte_lcore.h>
 
+#include "eal_private.h"
 #include "eal_filesystem.h"
 #include "eal_internal_cfg.h"
 
@@ -59,3 +71,445 @@  rte_eal_primary_proc_alive(const char *config_file_path)
 
 	return !!ret;
 }
+
+struct action_entry {
+	TAILQ_ENTRY(action_entry) next;      /**< Next attached action entry*/
+
+#define MAX_ACTION_NAME_LEN	64
+	char action_name[MAX_ACTION_NAME_LEN];
+	rte_eal_primary_secondary_t *action;
+};
+
+/** Double linked list of actions. */
+TAILQ_HEAD(action_entry_list, action_entry);
+
+static struct action_entry_list action_entry_list =
+	TAILQ_HEAD_INITIALIZER(action_entry_list);
+
+static struct action_entry *
+find_action_entry_by_name(const char *name)
+{
+	int len = strlen(name);
+	struct action_entry *entry;
+
+	TAILQ_FOREACH(entry, &action_entry_list, next) {
+		if (strncmp(entry->action_name, name, len) == 0)
+				break;
+	}
+
+	return entry;
+}
+
+int
+rte_eal_primary_secondary_add_action(const char *action_name,
+				     rte_eal_primary_secondary_t action)
+{
+	struct action_entry *entry = malloc(sizeof(struct action_entry));
+
+	if (entry == NULL)
+		return -ENOMEM;
+
+	strncpy(entry->action_name, action_name, MAX_ACTION_NAME_LEN);
+	entry->action = action;
+	TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
+	return 0;
+}
+
+void
+rte_eal_primary_secondary_del_action(const char *name)
+{
+	struct action_entry *entry = find_action_entry_by_name(name);
+
+	TAILQ_REMOVE(&action_entry_list, entry, next);
+	free(entry);
+}
+
+#define MAX_SECONDARY_PROCS	8
+
+static int efd_pri_sec; /* epoll fd for primary/secondary channel thread */
+static int fd_listen;   /* unix listen socket by primary */
+static int fd_to_pri;   /* only used by secondary process */
+static int fds_to_sec[MAX_SECONDARY_PROCS];
+
+struct msg_hdr {
+	char action_name[MAX_ACTION_NAME_LEN];
+	int fds_num;
+	char params[0];
+} __rte_packed;
+
+static int
+add_sec_proc(int fd)
+{
+	int i;
+
+	for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
+		if (fds_to_sec[i] == -1)
+			break;
+
+	if (i >= MAX_SECONDARY_PROCS)
+		return -1;
+
+	fds_to_sec[i] = fd;
+
+	return i;
+}
+
+static void
+del_sec_proc(int fd)
+{
+	int i;
+
+	for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
+		if (fds_to_sec[i] == fd) {
+			fds_to_sec[i] = -1;
+			break;
+		}
+	}
+}
+
+static int
+read_msg(int sockfd, char *buf, int buflen, int *fds, int fds_num)
+{
+	struct iovec iov;
+	struct msghdr msgh;
+	size_t fdsize = fds_num * sizeof(int);
+	char control[CMSG_SPACE(fdsize)];
+	struct cmsghdr *cmsg;
+	int ret;
+
+	memset(&msgh, 0, sizeof(msgh));
+	iov.iov_base = buf;
+	iov.iov_len  = buflen;
+
+	msgh.msg_iov = &iov;
+	msgh.msg_iovlen = 1;
+	msgh.msg_control = control;
+	msgh.msg_controllen = sizeof(control);
+
+	ret = recvmsg(sockfd, &msgh, 0);
+	if (ret <= 0) {
+		RTE_LOG(ERR, EAL, "recvmsg failed\n");
+		return ret;
+	}
+
+	if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) {
+		RTE_LOG(ERR, EAL, "truncted msg\n");
+		return -1;
+	}
+
+	for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
+		cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
+		if ((cmsg->cmsg_level == SOL_SOCKET) &&
+			(cmsg->cmsg_type == SCM_RIGHTS)) {
+			memcpy(fds, CMSG_DATA(cmsg), fdsize);
+			break;
+		}
+	}
+
+	return ret;
+}
+
+static int
+process_msg(int fd)
+{
+	int len;
+	int params_len;
+	char buf[1024];
+	int fds[8]; /* accept at most 8 FDs per message */
+	struct msg_hdr *hdr;
+	struct action_entry *entry;
+
+	len = read_msg(fd, buf, 1024, fds, 8);
+	if (len < 0) {
+		RTE_LOG(ERR, EAL, "failed to read message: %s\n",
+			strerror(errno));
+		return -1;
+	}
+
+	hdr = (struct msg_hdr *) buf;
+
+	entry = find_action_entry_by_name(hdr->action_name);
+	if (entry == NULL) {
+		RTE_LOG(ERR, EAL, "cannot find action by: %s\n",
+			hdr->action_name);
+		return -1;
+	}
+
+	params_len = len - sizeof(struct msg_hdr);
+	return entry->action(hdr->params, params_len, fds, hdr->fds_num);
+}
+
+static void *
+thread_primary(__attribute__((unused)) void *arg)
+{
+	int fd;
+	int i, n;
+	struct epoll_event event;
+	struct epoll_event *events;
+
+	event.events = EPOLLIN | EPOLLRDHUP;
+	event.data.fd = fd_listen;
+	if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_listen, &event) < 0) {
+		RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n",
+			strerror(errno));
+		exit(EXIT_FAILURE);
+	}
+
+	events = calloc(20, sizeof event);
+
+	while (1) {
+		n = epoll_wait(efd_pri_sec, events, 20, -1);
+		for (i = 0; i < n; i++) {
+			if (events[i].data.fd == fd_listen) {
+				if (events[i].events != EPOLLIN) {
+					RTE_LOG(ERR, EAL, "what happens?\n");
+					exit(EXIT_FAILURE);
+				}
+
+				fd = accept(fd_listen, NULL, NULL);
+				if (fd < 0) {
+					RTE_LOG(ERR, EAL, "primary failed to accept: %s\n",
+						strerror(errno));
+					continue;
+				}
+
+				event.data.fd = fd;
+				if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd, &event) < 0) {
+					RTE_LOG(ERR, EAL, "failed to add secondary: %s\n",
+						strerror(errno));
+					continue;
+				}
+				if (add_sec_proc(fd) < 0)
+					RTE_LOG(ERR, EAL, "too many secondary processes\n");
+
+				continue;
+			}
+
+			fd = events[i].data.fd;
+
+			if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
+				RTE_LOG(ERR, EAL,
+					"secondary process exit: %d\n", fd);
+				epoll_ctl(efd_pri_sec, EPOLL_CTL_DEL, fd, NULL);
+				del_sec_proc(fd);
+				continue;
+			}
+
+			if ((events[i].events & EPOLLIN)) {
+				RTE_LOG(INFO, EAL,
+					"recv msg from secondary process\n");
+
+				process_msg(fd);
+			}
+		}
+	}
+
+	return NULL;
+}
+
+static void *
+thread_secondary(__attribute__((unused)) void *arg)
+{
+	int fd;
+	int i, n;
+	struct epoll_event event;
+	struct epoll_event *events;
+
+	event.events = EPOLLIN | EPOLLRDHUP;
+	event.data.fd = fd_to_pri;
+	if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_to_pri, &event) < 0) {
+		RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n", strerror(errno));
+		exit(EXIT_FAILURE);
+	}
+
+	events = calloc(20, sizeof event);
+
+	while (1) {
+		n = epoll_wait(efd_pri_sec, events, 20, -1);
+		for (i = 0; i < n; i++) {
+
+			fd = events[i].data.fd;
+
+			if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
+				RTE_LOG(ERR, EAL, "primary exits, so do I\n");
+				/* Do we need exit secondary when primary exits? */
+				exit(EXIT_FAILURE);
+			}
+
+			if ((events[i].events & EPOLLIN)) {
+				RTE_LOG(INFO, EAL,
+					"recv msg from primary process\n");
+				process_msg(fd);
+			}
+		}
+	}
+
+	return NULL;
+}
+
+int
+rte_eal_primary_secondary_channel_init(void)
+{
+	int i, fd, ret;
+	const char *path;
+	struct sockaddr_un un;
+	pthread_t tid;
+	void*(*fn)(void *);
+	char thread_name[RTE_MAX_THREAD_NAME_LEN];
+
+	efd_pri_sec = epoll_create1(0);
+	if (efd_pri_sec < 0) {
+		RTE_LOG(ERR, EAL, "epoll_create1 failed\n");
+		return -1;
+	}
+
+	fd = socket(AF_UNIX, SOCK_STREAM, 0);
+	if (fd < 0) {
+		RTE_LOG(ERR, EAL, "Failed to create unix socket");
+		return -1;
+	}
+
+	memset(&un, 0, sizeof(un));
+	un.sun_family = AF_UNIX;
+	path = eal_primary_secondary_unix_path();
+	strncpy(un.sun_path, path, sizeof(un.sun_path));
+	un.sun_path[sizeof(un.sun_path) - 1] = '\0';
+
+	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
+
+		for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
+			fds_to_sec[i] = -1;
+
+		if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) {
+			RTE_LOG(ERR, EAL, "cannot set nonblocking mode\n");
+			close(fd);
+			return -1;
+		}
+
+		/* The file still exists since last run */
+		unlink(path);
+
+		ret = bind(fd, (struct sockaddr *)&un, sizeof(un));
+		if (ret < 0) {
+			RTE_LOG(ERR, EAL, "failed to bind to %s: %s\n",
+				path, strerror(errno));
+			close(fd);
+			return -1;
+		}
+		RTE_LOG(INFO, EAL, "primary bind to %s\n", path);
+
+		ret = listen(fd, 1024);
+		if (ret < 0) {
+			RTE_LOG(ERR, EAL, "failed to listen: %s", strerror(errno));
+			close(fd);
+			return -1;
+		}
+
+		fn = thread_primary;
+		fd_listen = fd;
+	} else {
+		ret = connect(fd, (struct sockaddr *)&un, sizeof(un));
+		if (ret < 0) {
+			RTE_LOG(ERR, EAL, "failed to connect primary\n");
+			return -1;
+		}
+		fn = thread_secondary;
+		fd_to_pri = fd;
+	}
+
+	ret = pthread_create(&tid, NULL, fn, NULL);
+	if (ret < 0) {
+		RTE_LOG(ERR, EAL, "failed to create thead: %s\n",
+			strerror(errno));
+		close(fd);
+		close(efd_pri_sec);
+		return -1;
+	}
+
+	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
+		 "ps_channel");
+	ret = rte_thread_setname(tid, thread_name);
+	if (ret < 0) {
+		RTE_LOG(ERR, EAL, "failed to set thead name\n");
+		close(fd);
+		close(efd_pri_sec);
+		return -1;
+	}
+
+	return 0;
+}
+
+static int
+send_msg(int fd, struct msghdr *p_msgh)
+{
+	int ret;
+
+	do {
+		ret = sendmsg(fd, p_msgh, 0);
+	} while (ret < 0 && errno == EINTR);
+
+	return ret;
+}
+
+int
+rte_eal_primary_secondary_sendmsg(const char *action_name,
+				  const void *params,
+				  int len_params,
+				  int fds[],
+				  int fds_num)
+{
+	int i;
+	int ret = 0;
+	struct msghdr msgh;
+	struct iovec iov;
+	size_t fd_size = fds_num * sizeof(int);
+	char control[CMSG_SPACE(fd_size)];
+	struct cmsghdr *cmsg;
+	struct msg_hdr *msg;
+	int len_msg;
+
+	len_msg = sizeof(struct msg_hdr) + len_params;
+	msg = malloc(len_msg);
+	if (!msg) {
+		RTE_LOG(ERR, EAL, "Cannot alloc memory for msg");
+		return -ENOMEM;
+	}
+	memset(msg, 0, len_msg);
+	strcpy(msg->action_name, action_name);
+	msg->fds_num = fds_num;
+	memcpy(msg->params, params, len_params);
+
+	memset(&msgh, 0, sizeof(msgh));
+	memset(control, 0, sizeof(control));
+
+	iov.iov_base = (uint8_t *)msg;
+	iov.iov_len = len_msg;
+
+	msgh.msg_iov = &iov;
+	msgh.msg_iovlen = 1;
+	msgh.msg_control = control;
+	msgh.msg_controllen = sizeof(control);
+
+	cmsg = CMSG_FIRSTHDR(&msgh);
+	cmsg->cmsg_len = CMSG_LEN(fd_size);
+	cmsg->cmsg_level = SOL_SOCKET;
+	cmsg->cmsg_type = SCM_RIGHTS;
+	memcpy(CMSG_DATA(cmsg), fds, fd_size);
+
+	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
+		for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
+			if (fds_to_sec[i] == -1)
+				continue;
+
+			ret = send_msg(fds_to_sec[i], &msgh);
+			if (ret < 0)
+				break;
+		}
+	} else {
+		ret = send_msg(fd_to_pri, &msgh);
+	}
+
+	free(msg);
+
+	return ret;
+}
diff --git a/lib/librte_eal/common/eal_filesystem.h b/lib/librte_eal/common/eal_filesystem.h
index 8acbd99..78bb4fb 100644
--- a/lib/librte_eal/common/eal_filesystem.h
+++ b/lib/librte_eal/common/eal_filesystem.h
@@ -67,6 +67,24 @@  eal_runtime_config_path(void)
 	return buffer;
 }
 
+/** Path of primary/secondary communication unix socket file. */
+#define PRIMARY_SECONDARY_UNIX_PATH_FMT "%s/.%s_unix"
+static inline const char *
+eal_primary_secondary_unix_path(void)
+{
+	static char buffer[PATH_MAX]; /* static so auto-zeroed */
+	const char *directory = default_config_dir;
+	const char *home_dir = getenv("HOME");
+
+	if (getuid() != 0 && home_dir != NULL)
+		directory = home_dir;
+	snprintf(buffer, sizeof(buffer) - 1, PRIMARY_SECONDARY_UNIX_PATH_FMT,
+		 directory, internal_config.hugefile_prefix);
+
+	return buffer;
+
+}
+
 /** Path of hugepage info file. */
 #define HUGEPAGE_INFO_FMT "%s/.%s_hugepage_info"
 
diff --git a/lib/librte_eal/common/eal_private.h b/lib/librte_eal/common/eal_private.h
index 597d82e..719b160 100644
--- a/lib/librte_eal/common/eal_private.h
+++ b/lib/librte_eal/common/eal_private.h
@@ -355,4 +355,14 @@  bool rte_eal_using_phys_addrs(void);
  */
 struct rte_bus *rte_bus_find_by_device_name(const char *str);
 
+/**
+ * Create the unix channel for primary/secondary communication.
+ *
+ * @return
+ *   0 on success;
+ *   (<0) on failure.
+ */
+
+int rte_eal_primary_secondary_channel_init(void);
+
 #endif /* _EAL_PRIVATE_H_ */
diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
index 0e7363d..6cfc9d6 100644
--- a/lib/librte_eal/common/include/rte_eal.h
+++ b/lib/librte_eal/common/include/rte_eal.h
@@ -210,6 +210,80 @@  int rte_eal_init(int argc, char **argv);
 int rte_eal_primary_proc_alive(const char *config_file_path);
 
 /**
+ * Action function typedef used by other components.
+ *
+ * As we create unix socket channel for primary/secondary communication, use
+ * this function typedef to register action for coming messages.
+ */
+typedef int (rte_eal_primary_secondary_t)(const char *params,
+					  int len,
+					  int fds[],
+					  int fds_num);
+/**
+ * Register an action function for primary/secondary communication.
+ *
+ * Call this function to register an action, if the calling component wants
+ * to response the messages from the corresponding component in its primary
+ * process or secondary processes.
+ *
+ * @param action_name
+ *   The action_name argument plays as the nonredundant key to find the action.
+ *
+ * @param action
+ *   The action argument is the function pointer to the action function.
+ *
+ * @return
+ *  - 0 on success.
+ *  - (<0) on failure.
+ */
+int rte_eal_primary_secondary_add_action(const char *action_name,
+					 rte_eal_primary_secondary_t action);
+/**
+ * Unregister an action function for primary/secondary communication.
+ *
+ * Call this function to unregister an action  if the calling component does
+ * not want to response the messages from the corresponding component in its
+ * primary process or secondary processes.
+ *
+ * @param action_name
+ *   The action_name argument plays as the nonredundant key to find the action.
+ *
+ */
+void rte_eal_primary_secondary_del_action(const char *name);
+
+/**
+ * Send a message to the primary process or the secondary processes.
+ *
+ * This function will send a message which will be responsed by the action
+ * identified by action_name of the process on the other side.
+ *
+ * @param action_name
+ *   The action_name argument is used to identify which action will be used.
+ *
+ * @param params
+ *   The params argument contains the customized message.
+ *
+ * @param len_params
+ *   The len_params argument is the length of the customized message.
+ *
+ * @param fds
+ *   The fds argument is an array of fds sent with sendmsg.
+ *
+ * @param fds_num
+ *   The fds_num argument is number of fds to be sent with sendmsg.
+ *
+ * @return
+ *  - (>=0) on success.
+ *  - (<0) on failure.
+ */
+int
+rte_eal_primary_secondary_sendmsg(const char *action_name,
+				  const void *params,
+				  int len_params,
+				  int fds[],
+				  int fds_num);
+
+/**
  * Usage function typedef used by the application usage function.
  *
  * Use this function typedef to define and call rte_set_applcation_usage_hook()
diff --git a/lib/librte_eal/linuxapp/eal/eal.c b/lib/librte_eal/linuxapp/eal/eal.c
index 48f12f4..237c0b1 100644
--- a/lib/librte_eal/linuxapp/eal/eal.c
+++ b/lib/librte_eal/linuxapp/eal/eal.c
@@ -873,6 +873,12 @@  rte_eal_init(int argc, char **argv)
 
 	eal_check_mem_on_local_socket();
 
+	if (rte_eal_primary_secondary_channel_init() < 0) {
+		rte_eal_init_alert("Cannot create unix channel.");
+		rte_errno = EFAULT;
+		return -1;
+	}
+
 	if (eal_plugins_init() < 0)
 		rte_eal_init_alert("Cannot init plugins\n");
 
diff --git a/lib/librte_eal/linuxapp/eal/rte_eal_version.map b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
index 3a8f154..c618aec 100644
--- a/lib/librte_eal/linuxapp/eal/rte_eal_version.map
+++ b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
@@ -242,3 +242,11 @@  EXPERIMENTAL {
 	rte_service_unregister;
 
 } DPDK_17.08;
+
+EXPERIMENTAL {
+	global:
+
+	rte_eal_primary_secondary_add_action;
+	rte_eal_primary_secondary_del_action;
+	rte_eal_primary_secondary_sendmsg;
+} DPDK_17.11;