[dpdk-dev] [PATCH 06/12] eal: add channel for primary/secondary communication

Jianfeng Tan jianfeng.tan at intel.com
Fri Aug 25 11:40:46 CEST 2017


Previouly, there is only one way for primary/secondary to exchange
messages, that is, primary process writes info into some predefind
file, and secondary process reads info out. That cannot address
the requirements:
  a. Secondary wants to send info to primary.
  b. Sending info at any time, instead of just initialization time.
  c. Share FD with the other side.

This patch proposes to create a communication channel (as an unix
socket connection) for above requirements.

Three new APIs are added:

  1. rte_eal_primary_secondary_add_action is used to register an action,
if the calling component wants to response the messages from the
corresponding component in its primary process or secondary processes.
  2. rte_eal_primary_secondary_del_action is used to unregister the
action if the calling component does not want to response the messages.
  3. rte_eal_primary_secondary_sendmsg is used to send a message.

Signed-off-by: Jianfeng Tan <jianfeng.tan at intel.com>
---
 lib/librte_eal/bsdapp/eal/rte_eal_version.map   |   8 +
 lib/librte_eal/common/eal_common_proc.c         | 454 ++++++++++++++++++++++++
 lib/librte_eal/common/eal_filesystem.h          |  18 +
 lib/librte_eal/common/eal_private.h             |  10 +
 lib/librte_eal/common/include/rte_eal.h         |  74 ++++
 lib/librte_eal/linuxapp/eal/eal.c               |   6 +
 lib/librte_eal/linuxapp/eal/rte_eal_version.map |   8 +
 7 files changed, 578 insertions(+)

diff --git a/lib/librte_eal/bsdapp/eal/rte_eal_version.map b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
index aac6fd7..f4ff29f 100644
--- a/lib/librte_eal/bsdapp/eal/rte_eal_version.map
+++ b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
@@ -237,3 +237,11 @@ EXPERIMENTAL {
 	rte_service_unregister;
 
 } DPDK_17.08;
+
+EXPERIMENTAL {
+	global:
+
+	rte_eal_primary_secondary_add_action;
+	rte_eal_primary_secondary_del_action;
+	rte_eal_primary_secondary_sendmsg;
+} DPDK_17.11;
diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index 60526ca..fa316bf 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -33,8 +33,20 @@
 #include <stdio.h>
 #include <fcntl.h>
 #include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/epoll.h>
+#include <limits.h>
+#include <unistd.h>
+#include <sys/un.h>
+#include <errno.h>
+#include <pthread.h>
+
+#include <rte_log.h>
 #include <rte_eal.h>
+#include <rte_lcore.h>
 
+#include "eal_private.h"
 #include "eal_filesystem.h"
 #include "eal_internal_cfg.h"
 
@@ -59,3 +71,445 @@ rte_eal_primary_proc_alive(const char *config_file_path)
 
 	return !!ret;
 }
+
+struct action_entry {
+	TAILQ_ENTRY(action_entry) next;      /**< Next attached action entry*/
+
+#define MAX_ACTION_NAME_LEN	64
+	char action_name[MAX_ACTION_NAME_LEN];
+	rte_eal_primary_secondary_t *action;
+};
+
+/** Double linked list of actions. */
+TAILQ_HEAD(action_entry_list, action_entry);
+
+static struct action_entry_list action_entry_list =
+	TAILQ_HEAD_INITIALIZER(action_entry_list);
+
+static struct action_entry *
+find_action_entry_by_name(const char *name)
+{
+	int len = strlen(name);
+	struct action_entry *entry;
+
+	TAILQ_FOREACH(entry, &action_entry_list, next) {
+		if (strncmp(entry->action_name, name, len) == 0)
+				break;
+	}
+
+	return entry;
+}
+
+int
+rte_eal_primary_secondary_add_action(const char *action_name,
+				     rte_eal_primary_secondary_t action)
+{
+	struct action_entry *entry = malloc(sizeof(struct action_entry));
+
+	if (entry == NULL)
+		return -ENOMEM;
+
+	strncpy(entry->action_name, action_name, MAX_ACTION_NAME_LEN);
+	entry->action = action;
+	TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
+	return 0;
+}
+
+void
+rte_eal_primary_secondary_del_action(const char *name)
+{
+	struct action_entry *entry = find_action_entry_by_name(name);
+
+	TAILQ_REMOVE(&action_entry_list, entry, next);
+	free(entry);
+}
+
+#define MAX_SECONDARY_PROCS	8
+
+static int efd_pri_sec; /* epoll fd for primary/secondary channel thread */
+static int fd_listen;   /* unix listen socket by primary */
+static int fd_to_pri;   /* only used by secondary process */
+static int fds_to_sec[MAX_SECONDARY_PROCS];
+
+struct msg_hdr {
+	char action_name[MAX_ACTION_NAME_LEN];
+	int fds_num;
+	char params[0];
+} __rte_packed;
+
+static int
+add_sec_proc(int fd)
+{
+	int i;
+
+	for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
+		if (fds_to_sec[i] == -1)
+			break;
+
+	if (i >= MAX_SECONDARY_PROCS)
+		return -1;
+
+	fds_to_sec[i] = fd;
+
+	return i;
+}
+
+static void
+del_sec_proc(int fd)
+{
+	int i;
+
+	for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
+		if (fds_to_sec[i] == fd) {
+			fds_to_sec[i] = -1;
+			break;
+		}
+	}
+}
+
+static int
+read_msg(int sockfd, char *buf, int buflen, int *fds, int fds_num)
+{
+	struct iovec iov;
+	struct msghdr msgh;
+	size_t fdsize = fds_num * sizeof(int);
+	char control[CMSG_SPACE(fdsize)];
+	struct cmsghdr *cmsg;
+	int ret;
+
+	memset(&msgh, 0, sizeof(msgh));
+	iov.iov_base = buf;
+	iov.iov_len  = buflen;
+
+	msgh.msg_iov = &iov;
+	msgh.msg_iovlen = 1;
+	msgh.msg_control = control;
+	msgh.msg_controllen = sizeof(control);
+
+	ret = recvmsg(sockfd, &msgh, 0);
+	if (ret <= 0) {
+		RTE_LOG(ERR, EAL, "recvmsg failed\n");
+		return ret;
+	}
+
+	if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) {
+		RTE_LOG(ERR, EAL, "truncted msg\n");
+		return -1;
+	}
+
+	for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
+		cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
+		if ((cmsg->cmsg_level == SOL_SOCKET) &&
+			(cmsg->cmsg_type == SCM_RIGHTS)) {
+			memcpy(fds, CMSG_DATA(cmsg), fdsize);
+			break;
+		}
+	}
+
+	return ret;
+}
+
+static int
+process_msg(int fd)
+{
+	int len;
+	int params_len;
+	char buf[1024];
+	int fds[8]; /* accept at most 8 FDs per message */
+	struct msg_hdr *hdr;
+	struct action_entry *entry;
+
+	len = read_msg(fd, buf, 1024, fds, 8);
+	if (len < 0) {
+		RTE_LOG(ERR, EAL, "failed to read message: %s\n",
+			strerror(errno));
+		return -1;
+	}
+
+	hdr = (struct msg_hdr *) buf;
+
+	entry = find_action_entry_by_name(hdr->action_name);
+	if (entry == NULL) {
+		RTE_LOG(ERR, EAL, "cannot find action by: %s\n",
+			hdr->action_name);
+		return -1;
+	}
+
+	params_len = len - sizeof(struct msg_hdr);
+	return entry->action(hdr->params, params_len, fds, hdr->fds_num);
+}
+
+static void *
+thread_primary(__attribute__((unused)) void *arg)
+{
+	int fd;
+	int i, n;
+	struct epoll_event event;
+	struct epoll_event *events;
+
+	event.events = EPOLLIN | EPOLLRDHUP;
+	event.data.fd = fd_listen;
+	if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_listen, &event) < 0) {
+		RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n",
+			strerror(errno));
+		exit(EXIT_FAILURE);
+	}
+
+	events = calloc(20, sizeof event);
+
+	while (1) {
+		n = epoll_wait(efd_pri_sec, events, 20, -1);
+		for (i = 0; i < n; i++) {
+			if (events[i].data.fd == fd_listen) {
+				if (events[i].events != EPOLLIN) {
+					RTE_LOG(ERR, EAL, "what happens?\n");
+					exit(EXIT_FAILURE);
+				}
+
+				fd = accept(fd_listen, NULL, NULL);
+				if (fd < 0) {
+					RTE_LOG(ERR, EAL, "primary failed to accept: %s\n",
+						strerror(errno));
+					continue;
+				}
+
+				event.data.fd = fd;
+				if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd, &event) < 0) {
+					RTE_LOG(ERR, EAL, "failed to add secondary: %s\n",
+						strerror(errno));
+					continue;
+				}
+				if (add_sec_proc(fd) < 0)
+					RTE_LOG(ERR, EAL, "too many secondary processes\n");
+
+				continue;
+			}
+
+			fd = events[i].data.fd;
+
+			if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
+				RTE_LOG(ERR, EAL,
+					"secondary process exit: %d\n", fd);
+				epoll_ctl(efd_pri_sec, EPOLL_CTL_DEL, fd, NULL);
+				del_sec_proc(fd);
+				continue;
+			}
+
+			if ((events[i].events & EPOLLIN)) {
+				RTE_LOG(INFO, EAL,
+					"recv msg from secondary process\n");
+
+				process_msg(fd);
+			}
+		}
+	}
+
+	return NULL;
+}
+
+static void *
+thread_secondary(__attribute__((unused)) void *arg)
+{
+	int fd;
+	int i, n;
+	struct epoll_event event;
+	struct epoll_event *events;
+
+	event.events = EPOLLIN | EPOLLRDHUP;
+	event.data.fd = fd_to_pri;
+	if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_to_pri, &event) < 0) {
+		RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n", strerror(errno));
+		exit(EXIT_FAILURE);
+	}
+
+	events = calloc(20, sizeof event);
+
+	while (1) {
+		n = epoll_wait(efd_pri_sec, events, 20, -1);
+		for (i = 0; i < n; i++) {
+
+			fd = events[i].data.fd;
+
+			if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
+				RTE_LOG(ERR, EAL, "primary exits, so do I\n");
+				/* Do we need exit secondary when primary exits? */
+				exit(EXIT_FAILURE);
+			}
+
+			if ((events[i].events & EPOLLIN)) {
+				RTE_LOG(INFO, EAL,
+					"recv msg from primary process\n");
+				process_msg(fd);
+			}
+		}
+	}
+
+	return NULL;
+}
+
+int
+rte_eal_primary_secondary_channel_init(void)
+{
+	int i, fd, ret;
+	const char *path;
+	struct sockaddr_un un;
+	pthread_t tid;
+	void*(*fn)(void *);
+	char thread_name[RTE_MAX_THREAD_NAME_LEN];
+
+	efd_pri_sec = epoll_create1(0);
+	if (efd_pri_sec < 0) {
+		RTE_LOG(ERR, EAL, "epoll_create1 failed\n");
+		return -1;
+	}
+
+	fd = socket(AF_UNIX, SOCK_STREAM, 0);
+	if (fd < 0) {
+		RTE_LOG(ERR, EAL, "Failed to create unix socket");
+		return -1;
+	}
+
+	memset(&un, 0, sizeof(un));
+	un.sun_family = AF_UNIX;
+	path = eal_primary_secondary_unix_path();
+	strncpy(un.sun_path, path, sizeof(un.sun_path));
+	un.sun_path[sizeof(un.sun_path) - 1] = '\0';
+
+	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
+
+		for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
+			fds_to_sec[i] = -1;
+
+		if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) {
+			RTE_LOG(ERR, EAL, "cannot set nonblocking mode\n");
+			close(fd);
+			return -1;
+		}
+
+		/* The file still exists since last run */
+		unlink(path);
+
+		ret = bind(fd, (struct sockaddr *)&un, sizeof(un));
+		if (ret < 0) {
+			RTE_LOG(ERR, EAL, "failed to bind to %s: %s\n",
+				path, strerror(errno));
+			close(fd);
+			return -1;
+		}
+		RTE_LOG(INFO, EAL, "primary bind to %s\n", path);
+
+		ret = listen(fd, 1024);
+		if (ret < 0) {
+			RTE_LOG(ERR, EAL, "failed to listen: %s", strerror(errno));
+			close(fd);
+			return -1;
+		}
+
+		fn = thread_primary;
+		fd_listen = fd;
+	} else {
+		ret = connect(fd, (struct sockaddr *)&un, sizeof(un));
+		if (ret < 0) {
+			RTE_LOG(ERR, EAL, "failed to connect primary\n");
+			return -1;
+		}
+		fn = thread_secondary;
+		fd_to_pri = fd;
+	}
+
+	ret = pthread_create(&tid, NULL, fn, NULL);
+	if (ret < 0) {
+		RTE_LOG(ERR, EAL, "failed to create thead: %s\n",
+			strerror(errno));
+		close(fd);
+		close(efd_pri_sec);
+		return -1;
+	}
+
+	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
+		 "ps_channel");
+	ret = rte_thread_setname(tid, thread_name);
+	if (ret < 0) {
+		RTE_LOG(ERR, EAL, "failed to set thead name\n");
+		close(fd);
+		close(efd_pri_sec);
+		return -1;
+	}
+
+	return 0;
+}
+
+static int
+send_msg(int fd, struct msghdr *p_msgh)
+{
+	int ret;
+
+	do {
+		ret = sendmsg(fd, p_msgh, 0);
+	} while (ret < 0 && errno == EINTR);
+
+	return ret;
+}
+
+int
+rte_eal_primary_secondary_sendmsg(const char *action_name,
+				  const void *params,
+				  int len_params,
+				  int fds[],
+				  int fds_num)
+{
+	int i;
+	int ret = 0;
+	struct msghdr msgh;
+	struct iovec iov;
+	size_t fd_size = fds_num * sizeof(int);
+	char control[CMSG_SPACE(fd_size)];
+	struct cmsghdr *cmsg;
+	struct msg_hdr *msg;
+	int len_msg;
+
+	len_msg = sizeof(struct msg_hdr) + len_params;
+	msg = malloc(len_msg);
+	if (!msg) {
+		RTE_LOG(ERR, EAL, "Cannot alloc memory for msg");
+		return -ENOMEM;
+	}
+	memset(msg, 0, len_msg);
+	strcpy(msg->action_name, action_name);
+	msg->fds_num = fds_num;
+	memcpy(msg->params, params, len_params);
+
+	memset(&msgh, 0, sizeof(msgh));
+	memset(control, 0, sizeof(control));
+
+	iov.iov_base = (uint8_t *)msg;
+	iov.iov_len = len_msg;
+
+	msgh.msg_iov = &iov;
+	msgh.msg_iovlen = 1;
+	msgh.msg_control = control;
+	msgh.msg_controllen = sizeof(control);
+
+	cmsg = CMSG_FIRSTHDR(&msgh);
+	cmsg->cmsg_len = CMSG_LEN(fd_size);
+	cmsg->cmsg_level = SOL_SOCKET;
+	cmsg->cmsg_type = SCM_RIGHTS;
+	memcpy(CMSG_DATA(cmsg), fds, fd_size);
+
+	if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
+		for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
+			if (fds_to_sec[i] == -1)
+				continue;
+
+			ret = send_msg(fds_to_sec[i], &msgh);
+			if (ret < 0)
+				break;
+		}
+	} else {
+		ret = send_msg(fd_to_pri, &msgh);
+	}
+
+	free(msg);
+
+	return ret;
+}
diff --git a/lib/librte_eal/common/eal_filesystem.h b/lib/librte_eal/common/eal_filesystem.h
index 8acbd99..78bb4fb 100644
--- a/lib/librte_eal/common/eal_filesystem.h
+++ b/lib/librte_eal/common/eal_filesystem.h
@@ -67,6 +67,24 @@ eal_runtime_config_path(void)
 	return buffer;
 }
 
+/** Path of primary/secondary communication unix socket file. */
+#define PRIMARY_SECONDARY_UNIX_PATH_FMT "%s/.%s_unix"
+static inline const char *
+eal_primary_secondary_unix_path(void)
+{
+	static char buffer[PATH_MAX]; /* static so auto-zeroed */
+	const char *directory = default_config_dir;
+	const char *home_dir = getenv("HOME");
+
+	if (getuid() != 0 && home_dir != NULL)
+		directory = home_dir;
+	snprintf(buffer, sizeof(buffer) - 1, PRIMARY_SECONDARY_UNIX_PATH_FMT,
+		 directory, internal_config.hugefile_prefix);
+
+	return buffer;
+
+}
+
 /** Path of hugepage info file. */
 #define HUGEPAGE_INFO_FMT "%s/.%s_hugepage_info"
 
diff --git a/lib/librte_eal/common/eal_private.h b/lib/librte_eal/common/eal_private.h
index 597d82e..719b160 100644
--- a/lib/librte_eal/common/eal_private.h
+++ b/lib/librte_eal/common/eal_private.h
@@ -355,4 +355,14 @@ bool rte_eal_using_phys_addrs(void);
  */
 struct rte_bus *rte_bus_find_by_device_name(const char *str);
 
+/**
+ * Create the unix channel for primary/secondary communication.
+ *
+ * @return
+ *   0 on success;
+ *   (<0) on failure.
+ */
+
+int rte_eal_primary_secondary_channel_init(void);
+
 #endif /* _EAL_PRIVATE_H_ */
diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
index 0e7363d..6cfc9d6 100644
--- a/lib/librte_eal/common/include/rte_eal.h
+++ b/lib/librte_eal/common/include/rte_eal.h
@@ -210,6 +210,80 @@ int rte_eal_init(int argc, char **argv);
 int rte_eal_primary_proc_alive(const char *config_file_path);
 
 /**
+ * Action function typedef used by other components.
+ *
+ * As we create unix socket channel for primary/secondary communication, use
+ * this function typedef to register action for coming messages.
+ */
+typedef int (rte_eal_primary_secondary_t)(const char *params,
+					  int len,
+					  int fds[],
+					  int fds_num);
+/**
+ * Register an action function for primary/secondary communication.
+ *
+ * Call this function to register an action, if the calling component wants
+ * to response the messages from the corresponding component in its primary
+ * process or secondary processes.
+ *
+ * @param action_name
+ *   The action_name argument plays as the nonredundant key to find the action.
+ *
+ * @param action
+ *   The action argument is the function pointer to the action function.
+ *
+ * @return
+ *  - 0 on success.
+ *  - (<0) on failure.
+ */
+int rte_eal_primary_secondary_add_action(const char *action_name,
+					 rte_eal_primary_secondary_t action);
+/**
+ * Unregister an action function for primary/secondary communication.
+ *
+ * Call this function to unregister an action  if the calling component does
+ * not want to response the messages from the corresponding component in its
+ * primary process or secondary processes.
+ *
+ * @param action_name
+ *   The action_name argument plays as the nonredundant key to find the action.
+ *
+ */
+void rte_eal_primary_secondary_del_action(const char *name);
+
+/**
+ * Send a message to the primary process or the secondary processes.
+ *
+ * This function will send a message which will be responsed by the action
+ * identified by action_name of the process on the other side.
+ *
+ * @param action_name
+ *   The action_name argument is used to identify which action will be used.
+ *
+ * @param params
+ *   The params argument contains the customized message.
+ *
+ * @param len_params
+ *   The len_params argument is the length of the customized message.
+ *
+ * @param fds
+ *   The fds argument is an array of fds sent with sendmsg.
+ *
+ * @param fds_num
+ *   The fds_num argument is number of fds to be sent with sendmsg.
+ *
+ * @return
+ *  - (>=0) on success.
+ *  - (<0) on failure.
+ */
+int
+rte_eal_primary_secondary_sendmsg(const char *action_name,
+				  const void *params,
+				  int len_params,
+				  int fds[],
+				  int fds_num);
+
+/**
  * Usage function typedef used by the application usage function.
  *
  * Use this function typedef to define and call rte_set_applcation_usage_hook()
diff --git a/lib/librte_eal/linuxapp/eal/eal.c b/lib/librte_eal/linuxapp/eal/eal.c
index 48f12f4..237c0b1 100644
--- a/lib/librte_eal/linuxapp/eal/eal.c
+++ b/lib/librte_eal/linuxapp/eal/eal.c
@@ -873,6 +873,12 @@ rte_eal_init(int argc, char **argv)
 
 	eal_check_mem_on_local_socket();
 
+	if (rte_eal_primary_secondary_channel_init() < 0) {
+		rte_eal_init_alert("Cannot create unix channel.");
+		rte_errno = EFAULT;
+		return -1;
+	}
+
 	if (eal_plugins_init() < 0)
 		rte_eal_init_alert("Cannot init plugins\n");
 
diff --git a/lib/librte_eal/linuxapp/eal/rte_eal_version.map b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
index 3a8f154..c618aec 100644
--- a/lib/librte_eal/linuxapp/eal/rte_eal_version.map
+++ b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
@@ -242,3 +242,11 @@ EXPERIMENTAL {
 	rte_service_unregister;
 
 } DPDK_17.08;
+
+EXPERIMENTAL {
+	global:
+
+	rte_eal_primary_secondary_add_action;
+	rte_eal_primary_secondary_del_action;
+	rte_eal_primary_secondary_sendmsg;
+} DPDK_17.11;
-- 
2.7.4



More information about the dev mailing list