[dpdk-dev] [PATCH 06/12] eal: add channel for primary/secondary communication
Jiayu Hu
jiayu.hu at intel.com
Wed Sep 20 05:00:38 CEST 2017
Hi Jianfeng,
Few questions are inline.
Thanks,
Jiayu
On Fri, Aug 25, 2017 at 09:40:46AM +0000, Jianfeng Tan wrote:
> Previouly, there is only one way for primary/secondary to exchange
> messages, that is, primary process writes info into some predefind
> file, and secondary process reads info out. That cannot address
> the requirements:
> a. Secondary wants to send info to primary.
> b. Sending info at any time, instead of just initialization time.
> c. Share FD with the other side.
>
> This patch proposes to create a communication channel (as an unix
> socket connection) for above requirements.
>
> Three new APIs are added:
>
> 1. rte_eal_primary_secondary_add_action is used to register an action,
> if the calling component wants to response the messages from the
> corresponding component in its primary process or secondary processes.
> 2. rte_eal_primary_secondary_del_action is used to unregister the
> action if the calling component does not want to response the messages.
> 3. rte_eal_primary_secondary_sendmsg is used to send a message.
>
> Signed-off-by: Jianfeng Tan <jianfeng.tan at intel.com>
> ---
> lib/librte_eal/bsdapp/eal/rte_eal_version.map | 8 +
> lib/librte_eal/common/eal_common_proc.c | 454 ++++++++++++++++++++++++
> lib/librte_eal/common/eal_filesystem.h | 18 +
> lib/librte_eal/common/eal_private.h | 10 +
> lib/librte_eal/common/include/rte_eal.h | 74 ++++
> lib/librte_eal/linuxapp/eal/eal.c | 6 +
> lib/librte_eal/linuxapp/eal/rte_eal_version.map | 8 +
> 7 files changed, 578 insertions(+)
>
> diff --git a/lib/librte_eal/bsdapp/eal/rte_eal_version.map b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
> index aac6fd7..f4ff29f 100644
> --- a/lib/librte_eal/bsdapp/eal/rte_eal_version.map
> +++ b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
> @@ -237,3 +237,11 @@ EXPERIMENTAL {
> rte_service_unregister;
>
> } DPDK_17.08;
> +
> +EXPERIMENTAL {
> + global:
> +
> + rte_eal_primary_secondary_add_action;
> + rte_eal_primary_secondary_del_action;
> + rte_eal_primary_secondary_sendmsg;
> +} DPDK_17.11;
> diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
> index 60526ca..fa316bf 100644
> --- a/lib/librte_eal/common/eal_common_proc.c
> +++ b/lib/librte_eal/common/eal_common_proc.c
> @@ -33,8 +33,20 @@
> #include <stdio.h>
> #include <fcntl.h>
> #include <stdlib.h>
> +#include <sys/types.h>
> +#include <sys/socket.h>
> +#include <sys/epoll.h>
> +#include <limits.h>
> +#include <unistd.h>
> +#include <sys/un.h>
> +#include <errno.h>
> +#include <pthread.h>
> +
> +#include <rte_log.h>
> #include <rte_eal.h>
> +#include <rte_lcore.h>
>
> +#include "eal_private.h"
> #include "eal_filesystem.h"
> #include "eal_internal_cfg.h"
>
> @@ -59,3 +71,445 @@ rte_eal_primary_proc_alive(const char *config_file_path)
>
> return !!ret;
> }
> +
> +struct action_entry {
> + TAILQ_ENTRY(action_entry) next; /**< Next attached action entry*/
> +
> +#define MAX_ACTION_NAME_LEN 64
> + char action_name[MAX_ACTION_NAME_LEN];
> + rte_eal_primary_secondary_t *action;
> +};
> +
> +/** Double linked list of actions. */
> +TAILQ_HEAD(action_entry_list, action_entry);
> +
> +static struct action_entry_list action_entry_list =
> + TAILQ_HEAD_INITIALIZER(action_entry_list);
> +
> +static struct action_entry *
> +find_action_entry_by_name(const char *name)
> +{
> + int len = strlen(name);
> + struct action_entry *entry;
> +
> + TAILQ_FOREACH(entry, &action_entry_list, next) {
> + if (strncmp(entry->action_name, name, len) == 0)
> + break;
> + }
> +
> + return entry;
> +}
> +
> +int
> +rte_eal_primary_secondary_add_action(const char *action_name,
> + rte_eal_primary_secondary_t action)
> +{
> + struct action_entry *entry = malloc(sizeof(struct action_entry));
> +
> + if (entry == NULL)
> + return -ENOMEM;
> +
> + strncpy(entry->action_name, action_name, MAX_ACTION_NAME_LEN);
> + entry->action = action;
> + TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
> + return 0;
> +}
> +
> +void
> +rte_eal_primary_secondary_del_action(const char *name)
> +{
> + struct action_entry *entry = find_action_entry_by_name(name);
> +
> + TAILQ_REMOVE(&action_entry_list, entry, next);
> + free(entry);
> +}
> +
> +#define MAX_SECONDARY_PROCS 8
> +
> +static int efd_pri_sec; /* epoll fd for primary/secondary channel thread */
> +static int fd_listen; /* unix listen socket by primary */
> +static int fd_to_pri; /* only used by secondary process */
> +static int fds_to_sec[MAX_SECONDARY_PROCS];
> +
> +struct msg_hdr {
> + char action_name[MAX_ACTION_NAME_LEN];
> + int fds_num;
> + char params[0];
> +} __rte_packed;
> +
> +static int
> +add_sec_proc(int fd)
> +{
> + int i;
> +
> + for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
> + if (fds_to_sec[i] == -1)
> + break;
> +
> + if (i >= MAX_SECONDARY_PROCS)
> + return -1;
> +
> + fds_to_sec[i] = fd;
> +
> + return i;
> +}
> +
> +static void
> +del_sec_proc(int fd)
> +{
> + int i;
> +
> + for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
> + if (fds_to_sec[i] == fd) {
> + fds_to_sec[i] = -1;
> + break;
> + }
> + }
> +}
> +
> +static int
> +read_msg(int sockfd, char *buf, int buflen, int *fds, int fds_num)
> +{
> + struct iovec iov;
> + struct msghdr msgh;
> + size_t fdsize = fds_num * sizeof(int);
> + char control[CMSG_SPACE(fdsize)];
> + struct cmsghdr *cmsg;
> + int ret;
> +
> + memset(&msgh, 0, sizeof(msgh));
> + iov.iov_base = buf;
> + iov.iov_len = buflen;
> +
> + msgh.msg_iov = &iov;
> + msgh.msg_iovlen = 1;
> + msgh.msg_control = control;
> + msgh.msg_controllen = sizeof(control);
> +
> + ret = recvmsg(sockfd, &msgh, 0);
> + if (ret <= 0) {
> + RTE_LOG(ERR, EAL, "recvmsg failed\n");
> + return ret;
> + }
> +
> + if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) {
> + RTE_LOG(ERR, EAL, "truncted msg\n");
> + return -1;
> + }
> +
> + for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
> + cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
> + if ((cmsg->cmsg_level == SOL_SOCKET) &&
> + (cmsg->cmsg_type == SCM_RIGHTS)) {
> + memcpy(fds, CMSG_DATA(cmsg), fdsize);
> + break;
> + }
> + }
> +
> + return ret;
> +}
> +
> +static int
> +process_msg(int fd)
> +{
> + int len;
> + int params_len;
> + char buf[1024];
The max length of message to receive is 1024 here, but the
senders don't know the limit. It's better to define a macro
for the max message length?
> + int fds[8]; /* accept at most 8 FDs per message */
> + struct msg_hdr *hdr;
> + struct action_entry *entry;
> +
> + len = read_msg(fd, buf, 1024, fds, 8);
> + if (len < 0) {
> + RTE_LOG(ERR, EAL, "failed to read message: %s\n",
> + strerror(errno));
> + return -1;
> + }
Why don't check if len is equal to 0?
> +
> + hdr = (struct msg_hdr *) buf;
> +
> + entry = find_action_entry_by_name(hdr->action_name);
> + if (entry == NULL) {
> + RTE_LOG(ERR, EAL, "cannot find action by: %s\n",
> + hdr->action_name);
> + return -1;
> + }
> +
> + params_len = len - sizeof(struct msg_hdr);
> + return entry->action(hdr->params, params_len, fds, hdr->fds_num);
> +}
> +
> +static void *
> +thread_primary(__attribute__((unused)) void *arg)
> +{
> + int fd;
> + int i, n;
> + struct epoll_event event;
> + struct epoll_event *events;
> +
> + event.events = EPOLLIN | EPOLLRDHUP;
> + event.data.fd = fd_listen;
> + if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_listen, &event) < 0) {
> + RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n",
> + strerror(errno));
> + exit(EXIT_FAILURE);
> + }
> +
> + events = calloc(20, sizeof event);
A simple question: Why the max events number is 20?
> +
> + while (1) {
> + n = epoll_wait(efd_pri_sec, events, 20, -1);
> + for (i = 0; i < n; i++) {
> + if (events[i].data.fd == fd_listen) {
> + if (events[i].events != EPOLLIN) {
> + RTE_LOG(ERR, EAL, "what happens?\n");
> + exit(EXIT_FAILURE);
> + }
> +
> + fd = accept(fd_listen, NULL, NULL);
> + if (fd < 0) {
> + RTE_LOG(ERR, EAL, "primary failed to accept: %s\n",
This line is beyond 80 characters.
> + strerror(errno));
> + continue;
> + }
> +
> + event.data.fd = fd;
> + if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd, &event) < 0) {
> + RTE_LOG(ERR, EAL, "failed to add secondary: %s\n",
These two lines are beyond 80 characters.
> + strerror(errno));
> + continue;
> + }
> + if (add_sec_proc(fd) < 0)
> + RTE_LOG(ERR, EAL, "too many secondary processes\n");
This line is beyond 80 characters.
> +
> + continue;
> + }
> +
> + fd = events[i].data.fd;
> +
> + if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
> + RTE_LOG(ERR, EAL,
> + "secondary process exit: %d\n", fd);
> + epoll_ctl(efd_pri_sec, EPOLL_CTL_DEL, fd, NULL);
> + del_sec_proc(fd);
Need close(fd) here?
> + continue;
> + }
> +
> + if ((events[i].events & EPOLLIN)) {
> + RTE_LOG(INFO, EAL,
> + "recv msg from secondary process\n");
> +
> + process_msg(fd);
> + }
> + }
> + }
> +
> + return NULL;
> +}
> +
> +static void *
> +thread_secondary(__attribute__((unused)) void *arg)
> +{
> + int fd;
> + int i, n;
> + struct epoll_event event;
> + struct epoll_event *events;
> +
> + event.events = EPOLLIN | EPOLLRDHUP;
> + event.data.fd = fd_to_pri;
> + if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_to_pri, &event) < 0) {
> + RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n", strerror(errno));
> + exit(EXIT_FAILURE);
> + }
> +
> + events = calloc(20, sizeof event);
> +
> + while (1) {
> + n = epoll_wait(efd_pri_sec, events, 20, -1);
> + for (i = 0; i < n; i++) {
> +
> + fd = events[i].data.fd;
> +
> + if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
> + RTE_LOG(ERR, EAL, "primary exits, so do I\n");
> + /* Do we need exit secondary when primary exits? */
Need close(fd) here?
> + exit(EXIT_FAILURE);
> + }
> +
> + if ((events[i].events & EPOLLIN)) {
> + RTE_LOG(INFO, EAL,
> + "recv msg from primary process\n");
> + process_msg(fd);
> + }
> + }
> + }
> +
> + return NULL;
> +}
> +
> +int
> +rte_eal_primary_secondary_channel_init(void)
> +{
> + int i, fd, ret;
> + const char *path;
> + struct sockaddr_un un;
> + pthread_t tid;
> + void*(*fn)(void *);
> + char thread_name[RTE_MAX_THREAD_NAME_LEN];
> +
> + efd_pri_sec = epoll_create1(0);
> + if (efd_pri_sec < 0) {
> + RTE_LOG(ERR, EAL, "epoll_create1 failed\n");
> + return -1;
> + }
> +
> + fd = socket(AF_UNIX, SOCK_STREAM, 0);
> + if (fd < 0) {
> + RTE_LOG(ERR, EAL, "Failed to create unix socket");
> + return -1;
> + }
> +
> + memset(&un, 0, sizeof(un));
> + un.sun_family = AF_UNIX;
> + path = eal_primary_secondary_unix_path();
> + strncpy(un.sun_path, path, sizeof(un.sun_path));
> + un.sun_path[sizeof(un.sun_path) - 1] = '\0';
> +
> + if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
> +
> + for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
> + fds_to_sec[i] = -1;
> +
> + if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) {
> + RTE_LOG(ERR, EAL, "cannot set nonblocking mode\n");
> + close(fd);
> + return -1;
> + }
> +
> + /* The file still exists since last run */
> + unlink(path);
> +
> + ret = bind(fd, (struct sockaddr *)&un, sizeof(un));
> + if (ret < 0) {
> + RTE_LOG(ERR, EAL, "failed to bind to %s: %s\n",
> + path, strerror(errno));
> + close(fd);
> + return -1;
> + }
> + RTE_LOG(INFO, EAL, "primary bind to %s\n", path);
> +
> + ret = listen(fd, 1024);
> + if (ret < 0) {
> + RTE_LOG(ERR, EAL, "failed to listen: %s", strerror(errno));
> + close(fd);
> + return -1;
> + }
> +
> + fn = thread_primary;
> + fd_listen = fd;
> + } else {
> + ret = connect(fd, (struct sockaddr *)&un, sizeof(un));
> + if (ret < 0) {
> + RTE_LOG(ERR, EAL, "failed to connect primary\n");
> + return -1;
> + }
> + fn = thread_secondary;
> + fd_to_pri = fd;
> + }
> +
> + ret = pthread_create(&tid, NULL, fn, NULL);
> + if (ret < 0) {
> + RTE_LOG(ERR, EAL, "failed to create thead: %s\n",
> + strerror(errno));
> + close(fd);
> + close(efd_pri_sec);
> + return -1;
> + }
> +
> + snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
> + "ps_channel");
> + ret = rte_thread_setname(tid, thread_name);
> + if (ret < 0) {
> + RTE_LOG(ERR, EAL, "failed to set thead name\n");
> + close(fd);
> + close(efd_pri_sec);
> + return -1;
> + }
> +
> + return 0;
> +}
> +
> +static int
> +send_msg(int fd, struct msghdr *p_msgh)
> +{
> + int ret;
> +
> + do {
> + ret = sendmsg(fd, p_msgh, 0);
> + } while (ret < 0 && errno == EINTR);
> +
> + return ret;
> +}
> +
> +int
> +rte_eal_primary_secondary_sendmsg(const char *action_name,
> + const void *params,
> + int len_params,
> + int fds[],
> + int fds_num)
> +{
> + int i;
> + int ret = 0;
> + struct msghdr msgh;
> + struct iovec iov;
> + size_t fd_size = fds_num * sizeof(int);
> + char control[CMSG_SPACE(fd_size)];
> + struct cmsghdr *cmsg;
> + struct msg_hdr *msg;
> + int len_msg;
> +
> + len_msg = sizeof(struct msg_hdr) + len_params;
> + msg = malloc(len_msg);
> + if (!msg) {
> + RTE_LOG(ERR, EAL, "Cannot alloc memory for msg");
> + return -ENOMEM;
> + }
> + memset(msg, 0, len_msg);
> + strcpy(msg->action_name, action_name);
> + msg->fds_num = fds_num;
> + memcpy(msg->params, params, len_params);
> +
> + memset(&msgh, 0, sizeof(msgh));
> + memset(control, 0, sizeof(control));
> +
> + iov.iov_base = (uint8_t *)msg;
> + iov.iov_len = len_msg;
> +
> + msgh.msg_iov = &iov;
> + msgh.msg_iovlen = 1;
> + msgh.msg_control = control;
> + msgh.msg_controllen = sizeof(control);
> +
> + cmsg = CMSG_FIRSTHDR(&msgh);
> + cmsg->cmsg_len = CMSG_LEN(fd_size);
> + cmsg->cmsg_level = SOL_SOCKET;
> + cmsg->cmsg_type = SCM_RIGHTS;
> + memcpy(CMSG_DATA(cmsg), fds, fd_size);
> +
> + if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
> + for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
> + if (fds_to_sec[i] == -1)
> + continue;
> +
> + ret = send_msg(fds_to_sec[i], &msgh);
> + if (ret < 0)
> + break;
> + }
> + } else {
> + ret = send_msg(fd_to_pri, &msgh);
> + }
> +
> + free(msg);
> +
> + return ret;
> +}
> diff --git a/lib/librte_eal/common/eal_filesystem.h b/lib/librte_eal/common/eal_filesystem.h
> index 8acbd99..78bb4fb 100644
> --- a/lib/librte_eal/common/eal_filesystem.h
> +++ b/lib/librte_eal/common/eal_filesystem.h
> @@ -67,6 +67,24 @@ eal_runtime_config_path(void)
> return buffer;
> }
>
> +/** Path of primary/secondary communication unix socket file. */
> +#define PRIMARY_SECONDARY_UNIX_PATH_FMT "%s/.%s_unix"
> +static inline const char *
> +eal_primary_secondary_unix_path(void)
> +{
> + static char buffer[PATH_MAX]; /* static so auto-zeroed */
> + const char *directory = default_config_dir;
> + const char *home_dir = getenv("HOME");
> +
> + if (getuid() != 0 && home_dir != NULL)
> + directory = home_dir;
> + snprintf(buffer, sizeof(buffer) - 1, PRIMARY_SECONDARY_UNIX_PATH_FMT,
> + directory, internal_config.hugefile_prefix);
> +
> + return buffer;
> +
> +}
> +
> /** Path of hugepage info file. */
> #define HUGEPAGE_INFO_FMT "%s/.%s_hugepage_info"
>
> diff --git a/lib/librte_eal/common/eal_private.h b/lib/librte_eal/common/eal_private.h
> index 597d82e..719b160 100644
> --- a/lib/librte_eal/common/eal_private.h
> +++ b/lib/librte_eal/common/eal_private.h
> @@ -355,4 +355,14 @@ bool rte_eal_using_phys_addrs(void);
> */
> struct rte_bus *rte_bus_find_by_device_name(const char *str);
>
> +/**
> + * Create the unix channel for primary/secondary communication.
> + *
> + * @return
> + * 0 on success;
> + * (<0) on failure.
> + */
> +
> +int rte_eal_primary_secondary_channel_init(void);
> +
> #endif /* _EAL_PRIVATE_H_ */
> diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
> index 0e7363d..6cfc9d6 100644
> --- a/lib/librte_eal/common/include/rte_eal.h
> +++ b/lib/librte_eal/common/include/rte_eal.h
> @@ -210,6 +210,80 @@ int rte_eal_init(int argc, char **argv);
> int rte_eal_primary_proc_alive(const char *config_file_path);
>
> /**
> + * Action function typedef used by other components.
> + *
> + * As we create unix socket channel for primary/secondary communication, use
> + * this function typedef to register action for coming messages.
> + */
> +typedef int (rte_eal_primary_secondary_t)(const char *params,
> + int len,
> + int fds[],
> + int fds_num);
> +/**
> + * Register an action function for primary/secondary communication.
> + *
> + * Call this function to register an action, if the calling component wants
> + * to response the messages from the corresponding component in its primary
> + * process or secondary processes.
> + *
> + * @param action_name
> + * The action_name argument plays as the nonredundant key to find the action.
> + *
> + * @param action
> + * The action argument is the function pointer to the action function.
> + *
> + * @return
> + * - 0 on success.
> + * - (<0) on failure.
> + */
> +int rte_eal_primary_secondary_add_action(const char *action_name,
> + rte_eal_primary_secondary_t action);
> +/**
> + * Unregister an action function for primary/secondary communication.
> + *
> + * Call this function to unregister an action if the calling component does
> + * not want to response the messages from the corresponding component in its
> + * primary process or secondary processes.
> + *
> + * @param action_name
> + * The action_name argument plays as the nonredundant key to find the action.
> + *
> + */
> +void rte_eal_primary_secondary_del_action(const char *name);
> +
> +/**
> + * Send a message to the primary process or the secondary processes.
> + *
> + * This function will send a message which will be responsed by the action
> + * identified by action_name of the process on the other side.
> + *
> + * @param action_name
> + * The action_name argument is used to identify which action will be used.
> + *
> + * @param params
> + * The params argument contains the customized message.
> + *
> + * @param len_params
> + * The len_params argument is the length of the customized message.
> + *
> + * @param fds
> + * The fds argument is an array of fds sent with sendmsg.
> + *
> + * @param fds_num
> + * The fds_num argument is number of fds to be sent with sendmsg.
> + *
> + * @return
> + * - (>=0) on success.
> + * - (<0) on failure.
> + */
> +int
> +rte_eal_primary_secondary_sendmsg(const char *action_name,
> + const void *params,
> + int len_params,
> + int fds[],
> + int fds_num);
> +
> +/**
> * Usage function typedef used by the application usage function.
> *
> * Use this function typedef to define and call rte_set_applcation_usage_hook()
> diff --git a/lib/librte_eal/linuxapp/eal/eal.c b/lib/librte_eal/linuxapp/eal/eal.c
> index 48f12f4..237c0b1 100644
> --- a/lib/librte_eal/linuxapp/eal/eal.c
> +++ b/lib/librte_eal/linuxapp/eal/eal.c
> @@ -873,6 +873,12 @@ rte_eal_init(int argc, char **argv)
>
> eal_check_mem_on_local_socket();
>
> + if (rte_eal_primary_secondary_channel_init() < 0) {
> + rte_eal_init_alert("Cannot create unix channel.");
> + rte_errno = EFAULT;
> + return -1;
> + }
> +
> if (eal_plugins_init() < 0)
> rte_eal_init_alert("Cannot init plugins\n");
>
> diff --git a/lib/librte_eal/linuxapp/eal/rte_eal_version.map b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
> index 3a8f154..c618aec 100644
> --- a/lib/librte_eal/linuxapp/eal/rte_eal_version.map
> +++ b/lib/librte_eal/linuxapp/eal/rte_eal_version.map
> @@ -242,3 +242,11 @@ EXPERIMENTAL {
> rte_service_unregister;
>
> } DPDK_17.08;
> +
> +EXPERIMENTAL {
> + global:
> +
> + rte_eal_primary_secondary_add_action;
> + rte_eal_primary_secondary_del_action;
> + rte_eal_primary_secondary_sendmsg;
> +} DPDK_17.11;
> --
> 2.7.4
More information about the dev
mailing list