[dpdk-dev] [PATCH 06/12] eal: add channel for primary/secondary communication
Tan, Jianfeng
jianfeng.tan at intel.com
Thu Sep 21 08:53:06 CEST 2017
Hi Jiayu,
On 9/20/2017 11:00 AM, Jiayu Hu wrote:
> Hi Jianfeng,
>
> Few questions are inline.
>
> Thanks,
> Jiayu
>
> On Fri, Aug 25, 2017 at 09:40:46AM +0000, Jianfeng Tan wrote:
>> Previouly, there is only one way for primary/secondary to exchange
>> messages, that is, primary process writes info into some predefind
>> file, and secondary process reads info out. That cannot address
>> the requirements:
>> a. Secondary wants to send info to primary.
>> b. Sending info at any time, instead of just initialization time.
>> c. Share FD with the other side.
>>
>> This patch proposes to create a communication channel (as an unix
>> socket connection) for above requirements.
>>
>> Three new APIs are added:
>>
>> 1. rte_eal_primary_secondary_add_action is used to register an action,
>> if the calling component wants to response the messages from the
>> corresponding component in its primary process or secondary processes.
>> 2. rte_eal_primary_secondary_del_action is used to unregister the
>> action if the calling component does not want to response the messages.
>> 3. rte_eal_primary_secondary_sendmsg is used to send a message.
>>
>> Signed-off-by: Jianfeng Tan <jianfeng.tan at intel.com>
>> ---
>> lib/librte_eal/bsdapp/eal/rte_eal_version.map | 8 +
>> lib/librte_eal/common/eal_common_proc.c | 454 ++++++++++++++++++++++++
>> lib/librte_eal/common/eal_filesystem.h | 18 +
>> lib/librte_eal/common/eal_private.h | 10 +
>> lib/librte_eal/common/include/rte_eal.h | 74 ++++
>> lib/librte_eal/linuxapp/eal/eal.c | 6 +
>> lib/librte_eal/linuxapp/eal/rte_eal_version.map | 8 +
>> 7 files changed, 578 insertions(+)
>>
>> diff --git a/lib/librte_eal/bsdapp/eal/rte_eal_version.map b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
>> index aac6fd7..f4ff29f 100644
>> --- a/lib/librte_eal/bsdapp/eal/rte_eal_version.map
>> +++ b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
>> @@ -237,3 +237,11 @@ EXPERIMENTAL {
>> rte_service_unregister;
>>
>> } DPDK_17.08;
>> +
>> +EXPERIMENTAL {
>> + global:
>> +
>> + rte_eal_primary_secondary_add_action;
>> + rte_eal_primary_secondary_del_action;
>> + rte_eal_primary_secondary_sendmsg;
>> +} DPDK_17.11;
>> diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
>> index 60526ca..fa316bf 100644
>> --- a/lib/librte_eal/common/eal_common_proc.c
>> +++ b/lib/librte_eal/common/eal_common_proc.c
>> @@ -33,8 +33,20 @@
>> #include <stdio.h>
>> #include <fcntl.h>
>> #include <stdlib.h>
>> +#include <sys/types.h>
>> +#include <sys/socket.h>
>> +#include <sys/epoll.h>
>> +#include <limits.h>
>> +#include <unistd.h>
>> +#include <sys/un.h>
>> +#include <errno.h>
>> +#include <pthread.h>
>> +
>> +#include <rte_log.h>
>> #include <rte_eal.h>
>> +#include <rte_lcore.h>
>>
>> +#include "eal_private.h"
>> #include "eal_filesystem.h"
>> #include "eal_internal_cfg.h"
>>
>> @@ -59,3 +71,445 @@ rte_eal_primary_proc_alive(const char *config_file_path)
>>
>> return !!ret;
>> }
>> +
>> +struct action_entry {
>> + TAILQ_ENTRY(action_entry) next; /**< Next attached action entry*/
>> +
>> +#define MAX_ACTION_NAME_LEN 64
>> + char action_name[MAX_ACTION_NAME_LEN];
>> + rte_eal_primary_secondary_t *action;
>> +};
>> +
>> +/** Double linked list of actions. */
>> +TAILQ_HEAD(action_entry_list, action_entry);
>> +
>> +static struct action_entry_list action_entry_list =
>> + TAILQ_HEAD_INITIALIZER(action_entry_list);
>> +
>> +static struct action_entry *
>> +find_action_entry_by_name(const char *name)
>> +{
>> + int len = strlen(name);
>> + struct action_entry *entry;
>> +
>> + TAILQ_FOREACH(entry, &action_entry_list, next) {
>> + if (strncmp(entry->action_name, name, len) == 0)
>> + break;
>> + }
>> +
>> + return entry;
>> +}
>> +
>> +int
>> +rte_eal_primary_secondary_add_action(const char *action_name,
>> + rte_eal_primary_secondary_t action)
>> +{
>> + struct action_entry *entry = malloc(sizeof(struct action_entry));
>> +
>> + if (entry == NULL)
>> + return -ENOMEM;
>> +
>> + strncpy(entry->action_name, action_name, MAX_ACTION_NAME_LEN);
>> + entry->action = action;
>> + TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
>> + return 0;
>> +}
>> +
>> +void
>> +rte_eal_primary_secondary_del_action(const char *name)
>> +{
>> + struct action_entry *entry = find_action_entry_by_name(name);
>> +
>> + TAILQ_REMOVE(&action_entry_list, entry, next);
>> + free(entry);
>> +}
>> +
>> +#define MAX_SECONDARY_PROCS 8
>> +
>> +static int efd_pri_sec; /* epoll fd for primary/secondary channel thread */
>> +static int fd_listen; /* unix listen socket by primary */
>> +static int fd_to_pri; /* only used by secondary process */
>> +static int fds_to_sec[MAX_SECONDARY_PROCS];
>> +
>> +struct msg_hdr {
>> + char action_name[MAX_ACTION_NAME_LEN];
>> + int fds_num;
>> + char params[0];
>> +} __rte_packed;
>> +
>> +static int
>> +add_sec_proc(int fd)
>> +{
>> + int i;
>> +
>> + for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
>> + if (fds_to_sec[i] == -1)
>> + break;
>> +
>> + if (i >= MAX_SECONDARY_PROCS)
>> + return -1;
>> +
>> + fds_to_sec[i] = fd;
>> +
>> + return i;
>> +}
>> +
>> +static void
>> +del_sec_proc(int fd)
>> +{
>> + int i;
>> +
>> + for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
>> + if (fds_to_sec[i] == fd) {
>> + fds_to_sec[i] = -1;
>> + break;
>> + }
>> + }
>> +}
>> +
>> +static int
>> +read_msg(int sockfd, char *buf, int buflen, int *fds, int fds_num)
>> +{
>> + struct iovec iov;
>> + struct msghdr msgh;
>> + size_t fdsize = fds_num * sizeof(int);
>> + char control[CMSG_SPACE(fdsize)];
>> + struct cmsghdr *cmsg;
>> + int ret;
>> +
>> + memset(&msgh, 0, sizeof(msgh));
>> + iov.iov_base = buf;
>> + iov.iov_len = buflen;
>> +
>> + msgh.msg_iov = &iov;
>> + msgh.msg_iovlen = 1;
>> + msgh.msg_control = control;
>> + msgh.msg_controllen = sizeof(control);
>> +
>> + ret = recvmsg(sockfd, &msgh, 0);
>> + if (ret <= 0) {
>> + RTE_LOG(ERR, EAL, "recvmsg failed\n");
>> + return ret;
>> + }
>> +
>> + if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) {
>> + RTE_LOG(ERR, EAL, "truncted msg\n");
>> + return -1;
>> + }
>> +
>> + for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
>> + cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
>> + if ((cmsg->cmsg_level == SOL_SOCKET) &&
>> + (cmsg->cmsg_type == SCM_RIGHTS)) {
>> + memcpy(fds, CMSG_DATA(cmsg), fdsize);
>> + break;
>> + }
>> + }
>> +
>> + return ret;
>> +}
>> +
>> +static int
>> +process_msg(int fd)
>> +{
>> + int len;
>> + int params_len;
>> + char buf[1024];
> The max length of message to receive is 1024 here, but the
> senders don't know the limit. It's better to define a macro
> for the max message length?
OK, let's make it a macro, and check the length when sending messages.
>
>> + int fds[8]; /* accept at most 8 FDs per message */
>> + struct msg_hdr *hdr;
>> + struct action_entry *entry;
>> +
>> + len = read_msg(fd, buf, 1024, fds, 8);
>> + if (len < 0) {
>> + RTE_LOG(ERR, EAL, "failed to read message: %s\n",
>> + strerror(errno));
>> + return -1;
>> + }
> Why don't check if len is equal to 0?
Nice catch!
>
>> +
>> + hdr = (struct msg_hdr *) buf;
>> +
>> + entry = find_action_entry_by_name(hdr->action_name);
>> + if (entry == NULL) {
>> + RTE_LOG(ERR, EAL, "cannot find action by: %s\n",
>> + hdr->action_name);
>> + return -1;
>> + }
>> +
>> + params_len = len - sizeof(struct msg_hdr);
>> + return entry->action(hdr->params, params_len, fds, hdr->fds_num);
>> +}
>> +
>> +static void *
>> +thread_primary(__attribute__((unused)) void *arg)
>> +{
>> + int fd;
>> + int i, n;
>> + struct epoll_event event;
>> + struct epoll_event *events;
>> +
>> + event.events = EPOLLIN | EPOLLRDHUP;
>> + event.data.fd = fd_listen;
>> + if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_listen, &event) < 0) {
>> + RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n",
>> + strerror(errno));
>> + exit(EXIT_FAILURE);
>> + }
>> +
>> + events = calloc(20, sizeof event);
> A simple question: Why the max events number is 20?
Another hard-coded value, this value only decides how many events can be
process for each iteration, other events will be kept in kernel for
another iteration.
>
>> +
>> + while (1) {
>> + n = epoll_wait(efd_pri_sec, events, 20, -1);
>> + for (i = 0; i < n; i++) {
>> + if (events[i].data.fd == fd_listen) {
>> + if (events[i].events != EPOLLIN) {
>> + RTE_LOG(ERR, EAL, "what happens?\n");
>> + exit(EXIT_FAILURE);
>> + }
>> +
>> + fd = accept(fd_listen, NULL, NULL);
>> + if (fd < 0) {
>> + RTE_LOG(ERR, EAL, "primary failed to accept: %s\n",
> This line is beyond 80 characters.
Will fix it.
[...]
>> +
>> + continue;
>> + }
>> +
>> + fd = events[i].data.fd;
>> +
>> + if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
>> + RTE_LOG(ERR, EAL,
>> + "secondary process exit: %d\n", fd);
>> + epoll_ctl(efd_pri_sec, EPOLL_CTL_DEL, fd, NULL);
>> + del_sec_proc(fd);
> Need close(fd) here?
Nice catch.
>
>> + continue;
>> + }
>> +
>> + if ((events[i].events & EPOLLIN)) {
>> + RTE_LOG(INFO, EAL,
>> + "recv msg from secondary process\n");
>> +
>> + process_msg(fd);
>> + }
>> + }
>> + }
>> +
>> + return NULL;
>> +}
>> +
>> +static void *
>> +thread_secondary(__attribute__((unused)) void *arg)
>> +{
>> + int fd;
>> + int i, n;
>> + struct epoll_event event;
>> + struct epoll_event *events;
>> +
>> + event.events = EPOLLIN | EPOLLRDHUP;
>> + event.data.fd = fd_to_pri;
>> + if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_to_pri, &event) < 0) {
>> + RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n", strerror(errno));
>> + exit(EXIT_FAILURE);
>> + }
>> +
>> + events = calloc(20, sizeof event);
>> +
>> + while (1) {
>> + n = epoll_wait(efd_pri_sec, events, 20, -1);
>> + for (i = 0; i < n; i++) {
>> +
>> + fd = events[i].data.fd;
>> +
>> + if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
>> + RTE_LOG(ERR, EAL, "primary exits, so do I\n");
>> + /* Do we need exit secondary when primary exits? */
> Need close(fd) here?
We will exit here, no need to close().
Thanks,
Jianfeng
More information about the dev
mailing list