[dpdk-dev] [PATCH 2/3] eal: add synchronous multi-process communication
Jianfeng Tan
jianfeng.tan at intel.com
Thu Nov 30 19:44:09 CET 2017
We need the synchronous way for multi-process communication, that
is to say we need an immediate response after we send a message
to the other side.
We will stop the mp_handler thread, and after sending message,
the send thread will wait there for reponse and process the
respond.
Suggested-by: Anatoly Burakov <anatoly.burakov at intel.com>
Signed-off-by: Jianfeng Tan <jianfeng.tan at intel.com>
---
lib/librte_eal/common/eal_common_proc.c | 53 +++++++++++++++++++++++++++++++--
lib/librte_eal/common/include/rte_eal.h | 5 +++-
2 files changed, 55 insertions(+), 3 deletions(-)
diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index 5d0a095..65ebaf2 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -30,6 +30,8 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
+#define _GNU_SOURCE
+
#include <stdio.h>
#include <fcntl.h>
#include <stdlib.h>
@@ -41,6 +43,8 @@
#include <sys/un.h>
#include <errno.h>
#include <pthread.h>
+#include <sys/eventfd.h>
+#include <signal.h>
#include <rte_log.h>
#include <rte_eal.h>
@@ -134,6 +138,7 @@ rte_eal_mp_action_unregister(const char *name)
struct mp_fds {
int efd;
+ int evfd; /* eventfd used for pausing mp_handler thread */
union {
/* fds for primary process */
@@ -331,6 +336,13 @@ mp_handler(void *arg __rte_unused)
exit(EXIT_FAILURE);
}
+ ev.data.fd = mp_fds.evfd;
+ if (epoll_ctl(mp_fds.efd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) {
+ RTE_LOG(ERR, EAL, "epoll_ctl failed: %s\n",
+ strerror(errno));
+ exit(EXIT_FAILURE);
+ }
+
events = calloc(20, sizeof ev);
while (1) {
@@ -348,6 +360,14 @@ mp_handler(void *arg __rte_unused)
continue;
}
+ if (events[i].data.fd == mp_fds.evfd) {
+ RTE_LOG(INFO, EAL, "mp_handler thread will pause\n");
+ pause();
+ RTE_LOG(INFO, EAL, "mp_handler thread stops pausing\n");
+
+ continue;
+ }
+
fd = events[i].data.fd;
if ((events[i].events & EPOLLIN)) {
@@ -377,13 +397,14 @@ mp_handler(void *arg __rte_unused)
return NULL;
}
+static pthread_t tid;
+
int
rte_eal_mp_channel_init(void)
{
int i, fd, ret;
const char *path;
struct sockaddr_un un;
- pthread_t tid;
char thread_name[RTE_MAX_THREAD_NAME_LEN];
mp_fds.efd = epoll_create1(0);
@@ -462,6 +483,8 @@ rte_eal_mp_channel_init(void)
return -1;
}
+ mp_fds.evfd = eventfd(0, 0);
+
return 0;
}
@@ -485,7 +508,8 @@ rte_eal_mp_sendmsg(const char *action_name,
const void *params,
int len_params,
int fds[],
- int fds_num)
+ int fds_num,
+ int need_ack)
{
int i;
int ret = 0;
@@ -511,6 +535,11 @@ rte_eal_mp_sendmsg(const char *action_name,
RTE_LOG(INFO, EAL, "send msg: %s, %d\n", action_name, len_msg);
+ if (need_ack) {
+ // stop mp_handler thread.
+ eventfd_write(mp_fds.evfd, (eventfd_t)1);
+ }
+
msg = malloc(len_msg);
if (!msg) {
RTE_LOG(ERR, EAL, "Cannot alloc memory for msg\n");
@@ -547,12 +576,32 @@ rte_eal_mp_sendmsg(const char *action_name,
ret = send_msg(mp_fds.secondaries[i], &msgh);
if (ret < 0)
break;
+
+ if (need_ack) {
+ /* We will hang there until the other side
+ * responses and what if other side is sending
+ * msg at the same time?
+ */
+ process_msg(mp_fds.secondaries[i]);
+ }
}
} else {
ret = send_msg(mp_fds.primary, &msgh);
+
+ if (ret > 0 && need_ack) {
+ // We will hang there until the other side responses
+ ret = process_msg(mp_fds.primary);
+ }
}
free(msg);
+ if (need_ack) {
+ // start mp_handler thread.
+ union sigval value;
+
+ pthread_sigqueue(tid, 0, value);
+ }
+
return ret;
}
diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
index 8776bcf..9875cae 100644
--- a/lib/librte_eal/common/include/rte_eal.h
+++ b/lib/librte_eal/common/include/rte_eal.h
@@ -274,13 +274,16 @@ void rte_eal_mp_action_unregister(const char *name);
* @param fds_num
* The fds_num argument is number of fds to be sent with sendmsg.
*
+ * @param need_ack
+ * The fds_num argument is number of fds to be sent with sendmsg.
+ *
* @return
* - (>=0) on success.
* - (<0) on failure.
*/
int
rte_eal_mp_sendmsg(const char *action_name, const void *params,
- int len_params, int fds[], int fds_num);
+ int len_params, int fds[], int fds_num, int need_ack);
/**
* Usage function typedef used by the application usage function.
--
2.7.4
More information about the dev
mailing list