[dpdk-dev] [PATCH 06/12] eal: add channel for primary/secondary communication

Tan, Jianfeng jianfeng.tan at intel.com
Thu Sep 21 08:53:06 CEST 2017


Hi Jiayu,


On 9/20/2017 11:00 AM, Jiayu Hu wrote:
> Hi Jianfeng,
>
> Few questions are inline.
>
> Thanks,
> Jiayu
>
> On Fri, Aug 25, 2017 at 09:40:46AM +0000, Jianfeng Tan wrote:
>> Previouly, there is only one way for primary/secondary to exchange
>> messages, that is, primary process writes info into some predefind
>> file, and secondary process reads info out. That cannot address
>> the requirements:
>>    a. Secondary wants to send info to primary.
>>    b. Sending info at any time, instead of just initialization time.
>>    c. Share FD with the other side.
>>
>> This patch proposes to create a communication channel (as an unix
>> socket connection) for above requirements.
>>
>> Three new APIs are added:
>>
>>    1. rte_eal_primary_secondary_add_action is used to register an action,
>> if the calling component wants to response the messages from the
>> corresponding component in its primary process or secondary processes.
>>    2. rte_eal_primary_secondary_del_action is used to unregister the
>> action if the calling component does not want to response the messages.
>>    3. rte_eal_primary_secondary_sendmsg is used to send a message.
>>
>> Signed-off-by: Jianfeng Tan <jianfeng.tan at intel.com>
>> ---
>>   lib/librte_eal/bsdapp/eal/rte_eal_version.map   |   8 +
>>   lib/librte_eal/common/eal_common_proc.c         | 454 ++++++++++++++++++++++++
>>   lib/librte_eal/common/eal_filesystem.h          |  18 +
>>   lib/librte_eal/common/eal_private.h             |  10 +
>>   lib/librte_eal/common/include/rte_eal.h         |  74 ++++
>>   lib/librte_eal/linuxapp/eal/eal.c               |   6 +
>>   lib/librte_eal/linuxapp/eal/rte_eal_version.map |   8 +
>>   7 files changed, 578 insertions(+)
>>
>> diff --git a/lib/librte_eal/bsdapp/eal/rte_eal_version.map b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
>> index aac6fd7..f4ff29f 100644
>> --- a/lib/librte_eal/bsdapp/eal/rte_eal_version.map
>> +++ b/lib/librte_eal/bsdapp/eal/rte_eal_version.map
>> @@ -237,3 +237,11 @@ EXPERIMENTAL {
>>   	rte_service_unregister;
>>   
>>   } DPDK_17.08;
>> +
>> +EXPERIMENTAL {
>> +	global:
>> +
>> +	rte_eal_primary_secondary_add_action;
>> +	rte_eal_primary_secondary_del_action;
>> +	rte_eal_primary_secondary_sendmsg;
>> +} DPDK_17.11;
>> diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
>> index 60526ca..fa316bf 100644
>> --- a/lib/librte_eal/common/eal_common_proc.c
>> +++ b/lib/librte_eal/common/eal_common_proc.c
>> @@ -33,8 +33,20 @@
>>   #include <stdio.h>
>>   #include <fcntl.h>
>>   #include <stdlib.h>
>> +#include <sys/types.h>
>> +#include <sys/socket.h>
>> +#include <sys/epoll.h>
>> +#include <limits.h>
>> +#include <unistd.h>
>> +#include <sys/un.h>
>> +#include <errno.h>
>> +#include <pthread.h>
>> +
>> +#include <rte_log.h>
>>   #include <rte_eal.h>
>> +#include <rte_lcore.h>
>>   
>> +#include "eal_private.h"
>>   #include "eal_filesystem.h"
>>   #include "eal_internal_cfg.h"
>>   
>> @@ -59,3 +71,445 @@ rte_eal_primary_proc_alive(const char *config_file_path)
>>   
>>   	return !!ret;
>>   }
>> +
>> +struct action_entry {
>> +	TAILQ_ENTRY(action_entry) next;      /**< Next attached action entry*/
>> +
>> +#define MAX_ACTION_NAME_LEN	64
>> +	char action_name[MAX_ACTION_NAME_LEN];
>> +	rte_eal_primary_secondary_t *action;
>> +};
>> +
>> +/** Double linked list of actions. */
>> +TAILQ_HEAD(action_entry_list, action_entry);
>> +
>> +static struct action_entry_list action_entry_list =
>> +	TAILQ_HEAD_INITIALIZER(action_entry_list);
>> +
>> +static struct action_entry *
>> +find_action_entry_by_name(const char *name)
>> +{
>> +	int len = strlen(name);
>> +	struct action_entry *entry;
>> +
>> +	TAILQ_FOREACH(entry, &action_entry_list, next) {
>> +		if (strncmp(entry->action_name, name, len) == 0)
>> +				break;
>> +	}
>> +
>> +	return entry;
>> +}
>> +
>> +int
>> +rte_eal_primary_secondary_add_action(const char *action_name,
>> +				     rte_eal_primary_secondary_t action)
>> +{
>> +	struct action_entry *entry = malloc(sizeof(struct action_entry));
>> +
>> +	if (entry == NULL)
>> +		return -ENOMEM;
>> +
>> +	strncpy(entry->action_name, action_name, MAX_ACTION_NAME_LEN);
>> +	entry->action = action;
>> +	TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
>> +	return 0;
>> +}
>> +
>> +void
>> +rte_eal_primary_secondary_del_action(const char *name)
>> +{
>> +	struct action_entry *entry = find_action_entry_by_name(name);
>> +
>> +	TAILQ_REMOVE(&action_entry_list, entry, next);
>> +	free(entry);
>> +}
>> +
>> +#define MAX_SECONDARY_PROCS	8
>> +
>> +static int efd_pri_sec; /* epoll fd for primary/secondary channel thread */
>> +static int fd_listen;   /* unix listen socket by primary */
>> +static int fd_to_pri;   /* only used by secondary process */
>> +static int fds_to_sec[MAX_SECONDARY_PROCS];
>> +
>> +struct msg_hdr {
>> +	char action_name[MAX_ACTION_NAME_LEN];
>> +	int fds_num;
>> +	char params[0];
>> +} __rte_packed;
>> +
>> +static int
>> +add_sec_proc(int fd)
>> +{
>> +	int i;
>> +
>> +	for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
>> +		if (fds_to_sec[i] == -1)
>> +			break;
>> +
>> +	if (i >= MAX_SECONDARY_PROCS)
>> +		return -1;
>> +
>> +	fds_to_sec[i] = fd;
>> +
>> +	return i;
>> +}
>> +
>> +static void
>> +del_sec_proc(int fd)
>> +{
>> +	int i;
>> +
>> +	for (i = 0; i < MAX_SECONDARY_PROCS; ++i) {
>> +		if (fds_to_sec[i] == fd) {
>> +			fds_to_sec[i] = -1;
>> +			break;
>> +		}
>> +	}
>> +}
>> +
>> +static int
>> +read_msg(int sockfd, char *buf, int buflen, int *fds, int fds_num)
>> +{
>> +	struct iovec iov;
>> +	struct msghdr msgh;
>> +	size_t fdsize = fds_num * sizeof(int);
>> +	char control[CMSG_SPACE(fdsize)];
>> +	struct cmsghdr *cmsg;
>> +	int ret;
>> +
>> +	memset(&msgh, 0, sizeof(msgh));
>> +	iov.iov_base = buf;
>> +	iov.iov_len  = buflen;
>> +
>> +	msgh.msg_iov = &iov;
>> +	msgh.msg_iovlen = 1;
>> +	msgh.msg_control = control;
>> +	msgh.msg_controllen = sizeof(control);
>> +
>> +	ret = recvmsg(sockfd, &msgh, 0);
>> +	if (ret <= 0) {
>> +		RTE_LOG(ERR, EAL, "recvmsg failed\n");
>> +		return ret;
>> +	}
>> +
>> +	if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) {
>> +		RTE_LOG(ERR, EAL, "truncted msg\n");
>> +		return -1;
>> +	}
>> +
>> +	for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
>> +		cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
>> +		if ((cmsg->cmsg_level == SOL_SOCKET) &&
>> +			(cmsg->cmsg_type == SCM_RIGHTS)) {
>> +			memcpy(fds, CMSG_DATA(cmsg), fdsize);
>> +			break;
>> +		}
>> +	}
>> +
>> +	return ret;
>> +}
>> +
>> +static int
>> +process_msg(int fd)
>> +{
>> +	int len;
>> +	int params_len;
>> +	char buf[1024];
> The max length of message to receive is 1024 here, but the
> senders don't know the limit. It's better to define a macro
> for the max message length?

OK, let's make it a macro, and check the length when sending messages.

>
>> +	int fds[8]; /* accept at most 8 FDs per message */
>> +	struct msg_hdr *hdr;
>> +	struct action_entry *entry;
>> +
>> +	len = read_msg(fd, buf, 1024, fds, 8);
>> +	if (len < 0) {
>> +		RTE_LOG(ERR, EAL, "failed to read message: %s\n",
>> +			strerror(errno));
>> +		return -1;
>> +	}
> Why don't check if len is equal to 0?

Nice catch!

>
>> +
>> +	hdr = (struct msg_hdr *) buf;
>> +
>> +	entry = find_action_entry_by_name(hdr->action_name);
>> +	if (entry == NULL) {
>> +		RTE_LOG(ERR, EAL, "cannot find action by: %s\n",
>> +			hdr->action_name);
>> +		return -1;
>> +	}
>> +
>> +	params_len = len - sizeof(struct msg_hdr);
>> +	return entry->action(hdr->params, params_len, fds, hdr->fds_num);
>> +}
>> +
>> +static void *
>> +thread_primary(__attribute__((unused)) void *arg)
>> +{
>> +	int fd;
>> +	int i, n;
>> +	struct epoll_event event;
>> +	struct epoll_event *events;
>> +
>> +	event.events = EPOLLIN | EPOLLRDHUP;
>> +	event.data.fd = fd_listen;
>> +	if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_listen, &event) < 0) {
>> +		RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n",
>> +			strerror(errno));
>> +		exit(EXIT_FAILURE);
>> +	}
>> +
>> +	events = calloc(20, sizeof event);
> A simple question: Why the max events number is 20?

Another hard-coded value, this value only decides how many events can be 
process for each iteration, other events will be kept in kernel for 
another iteration.

>
>> +
>> +	while (1) {
>> +		n = epoll_wait(efd_pri_sec, events, 20, -1);
>> +		for (i = 0; i < n; i++) {
>> +			if (events[i].data.fd == fd_listen) {
>> +				if (events[i].events != EPOLLIN) {
>> +					RTE_LOG(ERR, EAL, "what happens?\n");
>> +					exit(EXIT_FAILURE);
>> +				}
>> +
>> +				fd = accept(fd_listen, NULL, NULL);
>> +				if (fd < 0) {
>> +					RTE_LOG(ERR, EAL, "primary failed to accept: %s\n",
> This line is beyond 80 characters.

Will fix it.
[...]
>> +
>> +				continue;
>> +			}
>> +
>> +			fd = events[i].data.fd;
>> +
>> +			if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
>> +				RTE_LOG(ERR, EAL,
>> +					"secondary process exit: %d\n", fd);
>> +				epoll_ctl(efd_pri_sec, EPOLL_CTL_DEL, fd, NULL);
>> +				del_sec_proc(fd);
> Need close(fd) here?

Nice catch.

>
>> +				continue;
>> +			}
>> +
>> +			if ((events[i].events & EPOLLIN)) {
>> +				RTE_LOG(INFO, EAL,
>> +					"recv msg from secondary process\n");
>> +
>> +				process_msg(fd);
>> +			}
>> +		}
>> +	}
>> +
>> +	return NULL;
>> +}
>> +
>> +static void *
>> +thread_secondary(__attribute__((unused)) void *arg)
>> +{
>> +	int fd;
>> +	int i, n;
>> +	struct epoll_event event;
>> +	struct epoll_event *events;
>> +
>> +	event.events = EPOLLIN | EPOLLRDHUP;
>> +	event.data.fd = fd_to_pri;
>> +	if (epoll_ctl(efd_pri_sec, EPOLL_CTL_ADD, fd_to_pri, &event) < 0) {
>> +		RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n", strerror(errno));
>> +		exit(EXIT_FAILURE);
>> +	}
>> +
>> +	events = calloc(20, sizeof event);
>> +
>> +	while (1) {
>> +		n = epoll_wait(efd_pri_sec, events, 20, -1);
>> +		for (i = 0; i < n; i++) {
>> +
>> +			fd = events[i].data.fd;
>> +
>> +			if ((events[i].events & (EPOLLERR | EPOLLHUP))) {
>> +				RTE_LOG(ERR, EAL, "primary exits, so do I\n");
>> +				/* Do we need exit secondary when primary exits? */
> Need close(fd) here?

We will exit here, no need to close().

Thanks,
Jianfeng


More information about the dev mailing list