[dpdk-dev] [PATCH 06/12] eal: add channel for primary/secondary communication
Jianfeng Tan
jianfeng.tan at intel.com
Fri Aug 25 11:40:46 CEST 2017
Previouly, there is only one way for primary/secondary to exchange
messages, that is, primary process writes info into some predefind
file, and secondary process reads info out. That cannot address
the requirements:
a. Secondary wants to send info to primary.
b. Sending info at any time, instead of just initialization time.
c. Share FD with the other side.
This patch proposes to create a communication channel (as an unix
socket connection) for above requirements.
Three new APIs are added:
1. rte_eal_primary_secondary_add_action is used to register an action,
if the calling component wants to response the messages from the
corresponding component in its primary process or secondary processes.
2. rte_eal_primary_secondary_del_action is used to unregister the
action if the calling component does not want to response the messages.
3. rte_eal_primary_secondary_sendmsg is used to send a message.
Signed-off-by: Jianfeng Tan <jianfeng.tan at intel.com>
---
lib/librte_eal/bsdapp/eal/rte_eal_version.map | 8 +
lib/librte_eal/common/eal_common_proc.c | 454 ++++++++++++++++++++++++
lib/librte_eal/common/eal_filesystem.h | 18 +
lib/librte_eal/common/eal_private.h | 10 +
lib/librte_eal/common/include/rte_eal.h | 74 ++++
lib/librte_eal/linuxapp/eal/eal.c | 6 +
lib/librte_eal/linuxapp/eal/rte_eal_version.map | 8 +
7 files changed, 578 insertions(+)
diff --git a/lib/librte_eal/bsdapp/eal/rte_eal_version.map b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
index aac6fd7..f4ff29f 100644
--- a/lib/librte_eal/bsdapp/eal/rte_eal_version.map
+++ b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
@@ -237,3 +237,11 @@ EXPERIMENTAL {
rte_service_unregister;
} DPDK_17.08;
+
+EXPERIMENTAL {
+ global:
+
+ rte_eal_primary_secondary_add_action;
+ rte_eal_primary_secondary_del_action;
+ rte_eal_primary_secondary_sendmsg;
+} DPDK_17.11;
diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index 60526ca..fa316bf 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -33,8 +33,20 @@
#include <stdio.h>
#include <fcntl.h>
#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/epoll.h>
+#include <limits.h>
+#include <unistd.h>
+#include <sys/un.h>
+#include <errno.h>
+#include <pthread.h>
+
+#include <rte_log.h>
#include <rte_eal.h>
+#include <rte_lcore.h>
+#include "eal_private.h"
#include "eal_filesystem.h"
#include "eal_internal_cfg.h"
@@ -59,3 +71,445 @@ rte_eal_primary_proc_alive(const char *config_file_path)
return !!ret;
}
+
+struct action_entry {
+ TAILQ_ENTRY(action_entry) next; /**< Next attached action entry*/
+
+#define MAX_ACTION_NAME_LEN 64
+ char action_name[MAX_ACTION_NAME_LEN];
+ rte_eal_primary_secondary_t *action;
+};
+
+/** Double linked list of actions. */
+TAILQ_HEAD(action_entry_list, action_entry);
+
+static struct action_entry_list action_entry_list =
+ TAILQ_HEAD_INITIALIZER(action_entry_list);
+
+static struct action_entry *
+find_action_entry_by_name(const char *name)
+{
+ int len = strlen(name);
+ struct action_entry *entry;
+
+ TAILQ_FOREACH(entry, &action_entry_list, next) {
+ if (strncmp(entry->action_name, name, len) == 0)
+ break;
+ }
+
+ return entry;
+}
+
+int
+rte_eal_primary_secondary_add_action(const char *action_name,
+ rte_eal_primary_secondary_t action)
+{
+ struct action_entry *entry = malloc(sizeof(struct action_entry));
+
+ if (entry == NULL)
+ return -ENOMEM;
+
+ strncpy(entry->action_name, action_name, MAX_ACTION_NAME_LEN);
+ entry->action = action;
+ TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
+ return 0;
+}
+
+void
+rte_eal_primary_secondary_del_action(const char *name)
+{
+ struct action_entry *entry = find_action_entry_by_name(name);
+
+ TAILQ_REMOVE(&action_entry_list, entry, next);
+ free(entry);
+}
+
+#define MAX_SECONDARY_PROCS 8
+
+static int efd_pri_sec; /* epoll fd for primary/secondary channel thread */
+static int fd_listen; /* unix listen socket by primary */
+static int fd_to_pri; /* only used by secondary process */
+static int fds_to_sec[MAX_SECONDARY_PROCS];
+
+struct msg_hdr {
+ char action_name[MAX_ACTION_NAME_LEN];
+ int fds_num;
+ char params[0];
+} __rte_packed;
+
+static int
+add_sec_proc(int fd)
+{
+ int i;
+
+ for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
+ if (fds_to_sec[i] == -1)
+ break;
+
+ if (i >= MAX_SECONDARY_PROCS)
+ return -1;
+
+ fds_to_sec[i] = fd;
+
+ return i;
+}
+
+static void
+del_sec_proc(int fd)
+{
+ int i;
+
+ for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
+ if (fds_to_sec[i] == fd) {
+ fds_to_sec[i] = -1;
+ break;
+ }
+ }
+}
+
+static int
+read_msg(int sockfd, char *buf, int buflen, int *fds, int fds_num)
+{
+ struct iovec iov;
+ struct msghdr msgh;
+ size_t fdsize = fds_num * sizeof(int);
+ char control[CMSG_SPACE(fdsize)];
+ struct cmsghdr *cmsg;
+ int ret;
+
+ memset(&msgh, 0, sizeof(msgh));
+ iov.iov_base = buf;
+ iov.iov_len = buflen;
+
+ msgh.msg_iov = &iov;
+ msgh.msg_iovlen = 1;
+ msgh.msg_control = control;
+ msgh.msg_controllen = sizeof(control);
+
+ ret = recvmsg(sockfd, &msgh, 0);
+ if (ret <= 0) {
+ RTE_LOG(ERR, EAL, "recvmsg failed\n");
+ return ret;
+ }
+
+ if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) {
+ RTE_LOG(ERR, EAL, "truncted msg\n");
+ return -1;
+ }
+
+ for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
+ cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
+ if ((cmsg->cmsg_level == SOL_SOCKET) &&
+ (cmsg->cmsg_type == SCM_RIGHTS)) {
+ memcpy(fds, CMSG_DATA(cmsg), fdsize);
+ break;
+ }
+ }
+
+ return ret;
+}
+
+static int
+process_msg(int fd)
+{
+ int len;
+ int params_len;
+ char buf[1024];
+ int fds[8]; /* accept at most 8 FDs per message */
+ struct msg_hdr *hdr;
+ struct action_entry *entry;
+
+ len = read_msg(fd, buf, 1024, fds, 8);
+ if (len < 0) {
+ RTE_LOG(ERR, EAL, "failed to read message: %s\n",
+ strerror(errno));
+ return -1;
+ }
+
+ hdr = (struct msg_hdr *) buf;
+
+ entry = find_action_entry_by_name(hdr->action_name);
+ if (entry == NULL) {
+ RTE_LOG(ERR, EAL, "cannot find action by: %s\n",
+ hdr->action_name);
+ return -1;
+ }
+
+ params_len = len - sizeof(struct msg_hdr);
+ return entry->action(hdr->params, params_len, fds, hdr->fds_num);
+}
+
+static void *
+thread_primary(__attribute__((unused)) void *arg)
+{
+ int fd;
+ int i, n;
+ struct epoll_event event;
+ struct epoll_event *events;
+
+ event.events = EPOLLIN | EPOLLRDHUP;
+ event.data.fd = fd_listen;
+ if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_listen, &event) < 0) {
+ RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n",
+ strerror(errno));
+ exit(EXIT_FAILURE);
+ }
+
+ events = calloc(20, sizeof event);
+
+ while (1) {
+ n = epoll_wait(efd_pri_sec, events, 20, -1);
+ for (i = 0; i < n; i++) {
+ if (events[i].data.fd == fd_listen) {
+ if (events[i].events != EPOLLIN) {
+ RTE_LOG(ERR, EAL, "what happens?\n");
+ exit(EXIT_FAILURE);
+ }
+
+ fd = accept(fd_listen, NULL, NULL);
+ if (fd < 0) {
+ RTE_LOG(ERR, EAL, "primary failed to accept: %s\n",
+ strerror(errno));
+ continue;
+ }
+
+ event.data.fd = fd;
+ if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd, &event) < 0) {
+ RTE_LOG(ERR, EAL, "failed to add secondary: %s\n",
+ strerror(errno));
+ continue;
+ }
+ if (add_sec_proc(fd) < 0)
+ RTE_LOG(ERR, EAL, "too many secondary processes\n");
+
+ continue;
+ }
+
+ fd = events[i].data.fd;
+
+ if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
+ RTE_LOG(ERR, EAL,
+ "secondary process exit: %d\n", fd);
+ epoll_ctl(efd_pri_sec, EPOLL_CTL_DEL, fd, NULL);
+ del_sec_proc(fd);
+ continue;
+ }
+
+ if ((events[i].events & EPOLLIN)) {
+ RTE_LOG(INFO, EAL,
+ "recv msg from secondary process\n");
+
+ process_msg(fd);
+ }
+ }
+ }
+
+ return NULL;
+}
+
+static void *
+thread_secondary(__attribute__((unused)) void *arg)
+{
+ int fd;
+ int i, n;
+ struct epoll_event event;
+ struct epoll_event *events;
+
+ event.events = EPOLLIN | EPOLLRDHUP;
+ event.data.fd = fd_to_pri;
+ if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_to_pri, &event) < 0) {
+ RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n", strerror(errno));
+ exit(EXIT_FAILURE);
+ }
+
+ events = calloc(20, sizeof event);
+
+ while (1) {
+ n = epoll_wait(efd_pri_sec, events, 20, -1);
+ for (i = 0; i < n; i++) {
+
+ fd = events[i].data.fd;
+
+ if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
+ RTE_LOG(ERR, EAL, "primary exits, so do I\n");
+ /* Do we need exit secondary when primary exits? */
+ exit(EXIT_FAILURE);
+ }
+
+ if ((events[i].events & EPOLLIN)) {
+ RTE_LOG(INFO, EAL,
+ "recv msg from primary process\n");
+ process_msg(fd);
+ }
+ }
+ }
+
+ return NULL;
+}
+
+int
+rte_eal_primary_secondary_channel_init(void)
+{
+ int i, fd, ret;
+ const char *path;
+ struct sockaddr_un un;
+ pthread_t tid;
+ void*(*fn)(void *);
+ char thread_name[RTE_MAX_THREAD_NAME_LEN];
+
+ efd_pri_sec = epoll_create1(0);
+ if (efd_pri_sec < 0) {
+ RTE_LOG(ERR, EAL, "epoll_create1 failed\n");
+ return -1;
+ }
+
+ fd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (fd < 0) {
+ RTE_LOG(ERR, EAL, "Failed to create unix socket");
+ return -1;
+ }
+
+ memset(&un, 0, sizeof(un));
+ un.sun_family = AF_UNIX;
+ path = eal_primary_secondary_unix_path();
+ strncpy(un.sun_path, path, sizeof(un.sun_path));
+ un.sun_path[sizeof(un.sun_path) - 1] = '\0';
+
+ if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
+
+ for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
+ fds_to_sec[i] = -1;
+
+ if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) {
+ RTE_LOG(ERR, EAL, "cannot set nonblocking mode\n");
+ close(fd);
+ return -1;
+ }
+
+ /* The file still exists since last run */
+ unlink(path);
+
+ ret = bind(fd, (struct sockaddr *)&un, sizeof(un));
+ if (ret < 0) {
+ RTE_LOG(ERR, EAL, "failed to bind to %s: %s\n",
+ path, strerror(errno));
+ close(fd);
+ return -1;
+ }
+ RTE_LOG(INFO, EAL, "primary bind to %s\n", path);
+
+ ret = listen(fd, 1024);
+ if (ret < 0) {
+ RTE_LOG(ERR, EAL, "failed to listen: %s", strerror(errno));
+ close(fd);
+ return -1;
+ }
+
+ fn = thread_primary;
+ fd_listen = fd;
+ } else {
+ ret = connect(fd, (struct sockaddr *)&un, sizeof(un));
+ if (ret < 0) {
+ RTE_LOG(ERR, EAL, "failed to connect primary\n");
+ return -1;
+ }
+ fn = thread_secondary;
+ fd_to_pri = fd;
+ }
+
+ ret = pthread_create(&tid, NULL, fn, NULL);
+ if (ret < 0) {
+ RTE_LOG(ERR, EAL, "failed to create thead: %s\n",
+ strerror(errno));
+ close(fd);
+ close(efd_pri_sec);
+ return -1;
+ }
+
+ snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
+ "ps_channel");
+ ret = rte_thread_setname(tid, thread_name);
+ if (ret < 0) {
+ RTE_LOG(ERR, EAL, "failed to set thead name\n");
+ close(fd);
+ close(efd_pri_sec);
+ return -1;
+ }
+
+ return 0;
+}
+
+static int
+send_msg(int fd, struct msghdr *p_msgh)
+{
+ int ret;
+
+ do {
+ ret = sendmsg(fd, p_msgh, 0);
+ } while (ret < 0 && errno == EINTR);
+
+ return ret;
+}
+
+int
+rte_eal_primary_secondary_sendmsg(const char *action_name,
+ const void *params,
+ int len_params,
+ int fds[],
+ int fds_num)
+{
+ int i;
+ int ret = 0;
+ struct msghdr msgh;
+ struct iovec iov;
+ size_t fd_size = fds_num * sizeof(int);
+ char control[CMSG_SPACE(fd_size)];
+ struct cmsghdr *cmsg;
+ struct msg_hdr *msg;
+ int len_msg;
+
+ len_msg = sizeof(struct msg_hdr) + len_params;
+ msg = malloc(len_msg);
+ if (!msg) {
+ RTE_LOG(ERR, EAL, "Cannot alloc memory for msg");
+ return -ENOMEM;
+ }
+ memset(msg, 0, len_msg);
+ strcpy(msg->action_name, action_name);
+ msg->fds_num = fds_num;
+ memcpy(msg->params, params, len_params);
+
+ memset(&msgh, 0, sizeof(msgh));
+ memset(control, 0, sizeof(control));
+
+ iov.iov_base = (uint8_t *)msg;
+ iov.iov_len = len_msg;
+
+ msgh.msg_iov = &iov;
+ msgh.msg_iovlen = 1;
+ msgh.msg_control = control;
+ msgh.msg_controllen = sizeof(control);
+
+ cmsg = CMSG_FIRSTHDR(&msgh);
+ cmsg->cmsg_len = CMSG_LEN(fd_size);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ memcpy(CMSG_DATA(cmsg), fds, fd_size);
+
+ if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
+ for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
+ if (fds_to_sec[i] == -1)
+ continue;
+
+ ret = send_msg(fds_to_sec[i], &msgh);
+ if (ret < 0)
+ break;
+ }
+ } else {
+ ret = send_msg(fd_to_pri, &msgh);
+ }
+
+ free(msg);
+
+ return ret;
+}
diff --git a/lib/librte_eal/common/eal_filesystem.h b/lib/librte_eal/common/eal_filesystem.h
index 8acbd99..78bb4fb 100644
--- a/lib/librte_eal/common/eal_filesystem.h
+++ b/lib/librte_eal/common/eal_filesystem.h
@@ -67,6 +67,24 @@ eal_runtime_config_path(void)
return buffer;
}
+/** Path of primary/secondary communication unix socket file. */
+#define PRIMARY_SECONDARY_UNIX_PATH_FMT "%s/.%s_unix"
+static inline const char *
+eal_primary_secondary_unix_path(void)
+{
+ static char buffer[PATH_MAX]; /* static so auto-zeroed */
+ const char *directory = default_config_dir;
+ const char *home_dir = getenv("HOME");
+
+ if (getuid() != 0 && home_dir != NULL)
+ directory = home_dir;
+ snprintf(buffer, sizeof(buffer) - 1, PRIMARY_SECONDARY_UNIX_PATH_FMT,
+ directory, internal_config.hugefile_prefix);
+
+ return buffer;
+
+}
+
/** Path of hugepage info file. */
#define HUGEPAGE_INFO_FMT "%s/.%s_hugepage_info"
diff --git a/lib/librte_eal/common/eal_private.h b/lib/librte_eal/common/eal_private.h
index 597d82e..719b160 100644
--- a/lib/librte_eal/common/eal_private.h
+++ b/lib/librte_eal/common/eal_private.h
@@ -355,4 +355,14 @@ bool rte_eal_using_phys_addrs(void);
*/
struct rte_bus *rte_bus_find_by_device_name(const char *str);
+/**
+ * Create the unix channel for primary/secondary communication.
+ *
+ * @return
+ * 0 on success;
+ * (<0) on failure.
+ */
+
+int rte_eal_primary_secondary_channel_init(void);
+
#endif /* _EAL_PRIVATE_H_ */
diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
index 0e7363d..6cfc9d6 100644
--- a/lib/librte_eal/common/include/rte_eal.h
+++ b/lib/librte_eal/common/include/rte_eal.h
@@ -210,6 +210,80 @@ int rte_eal_init(int argc, char **argv);
int rte_eal_primary_proc_alive(const char *config_file_path);
/**
+ * Action function typedef used by other components.
+ *
+ * As we create unix socket channel for primary/secondary communication, use
+ * this function typedef to register action for coming messages.
+ */
+typedef int (rte_eal_primary_secondary_t)(const char *params,
+ int len,
+ int fds[],
+ int fds_num);
+/**
+ * Register an action function for primary/secondary communication.
+ *
+ * Call this function to register an action, if the calling component wants
+ * to response the messages from the corresponding component in its primary
+ * process or secondary processes.
+ *
+ * @param action_name
+ * The action_name argument plays as the nonredundant key to find the action.
+ *
+ * @param action
+ * The action argument is the function pointer to the action function.
+ *
+ * @return
+ * - 0 on success.
+ * - (<0) on failure.
+ */
+int rte_eal_primary_secondary_add_action(const char *action_name,
+ rte_eal_primary_secondary_t action);
+/**
+ * Unregister an action function for primary/secondary communication.
+ *
+ * Call this function to unregister an action if the calling component does
+ * not want to response the messages from the corresponding component in its
+ * primary process or secondary processes.
+ *
+ * @param action_name
+ * The action_name argument plays as the nonredundant key to find the action.
+ *
+ */
+void rte_eal_primary_secondary_del_action(const char *name);
+
+/**
+ * Send a message to the primary process or the secondary processes.
+ *
+ * This function will send a message which will be responsed by the action
+ * identified by action_name of the process on the other side.
+ *
+ * @param action_name
+ * The action_name argument is used to identify which action will be used.
+ *
+ * @param params
+ * The params argument contains the customized message.
+ *
+ * @param len_params
+ * The len_params argument is the length of the customized message.
+ *
+ * @param fds
+ * The fds argument is an array of fds sent with sendmsg.
+ *
+ * @param fds_num
+ * The fds_num argument is number of fds to be sent with sendmsg.
+ *
+ * @return
+ * - (>=0) on success.
+ * - (<0) on failure.
+ */
+int
+rte_eal_primary_secondary_sendmsg(const char *action_name,
+ const void *params,
+ int len_params,
+ int fds[],
+ int fds_num);
+
+/**
* Usage function typedef used by the application usage function.
*
* Use this function typedef to define and call rte_set_applcation_usage_hook()
diff --git a/lib/librte_eal/linuxapp/eal/eal.c b/lib/librte_eal/linuxapp/eal/eal.c
index 48f12f4..237c0b1 100644
--- a/lib/librte_eal/linuxapp/eal/eal.c
+++ b/lib/librte_eal/linuxapp/eal/eal.c
@@ -873,6 +873,12 @@ rte_eal_init(int argc, char **argv)
eal_check_mem_on_local_socket();
+ if (rte_eal_primary_secondary_channel_init() < 0) {
+ rte_eal_init_alert("Cannot create unix channel.");
+ rte_errno = EFAULT;
+ return -1;
+ }
+
if (eal_plugins_init() < 0)
rte_eal_init_alert("Cannot init plugins\n");
diff --git a/lib/librte_eal/linuxapp/eal/rte_eal_version.map b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
index 3a8f154..c618aec 100644
--- a/lib/librte_eal/linuxapp/eal/rte_eal_version.map
+++ b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
@@ -242,3 +242,11 @@ EXPERIMENTAL {
rte_service_unregister;
} DPDK_17.08;
+
+EXPERIMENTAL {
+ global:
+
+ rte_eal_primary_secondary_add_action;
+ rte_eal_primary_secondary_del_action;
+ rte_eal_primary_secondary_sendmsg;
+} DPDK_17.11;
--
2.7.4
More information about the dev
mailing list