[dpdk-dev] eal: add asynchronous request API to DPDK IPC

Message ID 92186ea34a31743ed76dbd9267f0586da22575f3.1519742486.git.anatoly.burakov@intel.com (mailing list archive)
State Superseded, archived
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation fail apply patch file failure

Commit Message

Burakov, Anatoly Feb. 27, 2018, 2:59 p.m. UTC
  This API is similar to the blocking API that is already present,
but reply will be received in a separate callback by the caller.

Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer (it'll wake itself up every
minute regardless of whether it was called, but if there are no
requests in the queue, nothing will be done and it'll go to sleep
for another minute).

Signed-off-by: Anatoly Burakov <anatoly.burakov@intel.com>
---

Notes:
    This patch is dependent upon previously published patchsets
    for IPC fixes [1] and improvements [2].
    
    rte_mp_action_unregister and rte_mp_async_reply_unregister
    do the same thing - should we perhaps make it one function?
    
    [1] http://dpdk.org/dev/patchwork/bundle/aburakov/IPC_Fixes/
    [2] http://dpdk.org/dev/patchwork/bundle/aburakov/IPC_Improvements/

 lib/librte_eal/common/eal_common_proc.c | 528 +++++++++++++++++++++++++++++---
 lib/librte_eal/common/include/rte_eal.h |  71 +++++
 2 files changed, 564 insertions(+), 35 deletions(-)
  

Comments

Burakov, Anatoly Feb. 28, 2018, 10:22 a.m. UTC | #1
On 27-Feb-18 2:59 PM, Anatoly Burakov wrote:
> This API is similar to the blocking API that is already present,
> but reply will be received in a separate callback by the caller.
> 
> Under the hood, we create a separate thread to deal with replies to
> asynchronous requests, that will just wait to be notified by the
> main thread, or woken up on a timer (it'll wake itself up every
> minute regardless of whether it was called, but if there are no
> requests in the queue, nothing will be done and it'll go to sleep
> for another minute).
> 
> Signed-off-by: Anatoly Burakov <anatoly.burakov@intel.com>
> ---

Missed updating .map file, will respin.
  
Stephen Hemminger March 2, 2018, 6:48 p.m. UTC | #2
On Tue, 27 Feb 2018 14:59:29 +0000
Anatoly Burakov <anatoly.burakov@intel.com> wrote:

> +rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts)
>  {
> +	struct sync_request *dummy;
> +	struct async_request_shared_param *param = NULL;
> +	struct rte_mp_reply *reply = NULL;
> +	int dir_fd, ret = 0;
> +	DIR *mp_dir;
> +	struct dirent *ent;
> +	struct timeval now;
> +	struct timespec *end = NULL;
> +
> +	RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
> +
> +	if (check_input(req) == false)
> +		return -1;
> +	if (gettimeofday(&now, NULL) < 0) {
> +		RTE_LOG(ERR, EAL, "Faile to get current time\n");
> +		rte_errno = errno;
> +		return -1;
> +	}

gettimeofday is not a good API to use in DPDK.
It gets changed by NTP; if you have to use system time you want monotonic clock
  
Stephen Hemminger March 2, 2018, 6:51 p.m. UTC | #3
On Tue, 27 Feb 2018 14:59:29 +0000
Anatoly Burakov <anatoly.burakov@intel.com> wrote:

> This API is similar to the blocking API that is already present,
> but reply will be received in a separate callback by the caller.
> 
> Under the hood, we create a separate thread to deal with replies to
> asynchronous requests, that will just wait to be notified by the
> main thread, or woken up on a timer (it'll wake itself up every
> minute regardless of whether it was called, but if there are no
> requests in the queue, nothing will be done and it'll go to sleep
> for another minute).
> 
> Signed-off-by: Anatoly Burakov <anatoly.burakov@intel.com>

The problem with this callback model is it makes it possible to
have a single wait for multiple events model (like epoll) which
is the most scaleable way to write applications.
  
Burakov, Anatoly March 3, 2018, 12:29 p.m. UTC | #4
On 02-Mar-18 6:48 PM, Stephen Hemminger wrote:
> On Tue, 27 Feb 2018 14:59:29 +0000
> Anatoly Burakov <anatoly.burakov@intel.com> wrote:
> 
>> +rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts)
>>   {
>> +	struct sync_request *dummy;
>> +	struct async_request_shared_param *param = NULL;
>> +	struct rte_mp_reply *reply = NULL;
>> +	int dir_fd, ret = 0;
>> +	DIR *mp_dir;
>> +	struct dirent *ent;
>> +	struct timeval now;
>> +	struct timespec *end = NULL;
>> +
>> +	RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
>> +
>> +	if (check_input(req) == false)
>> +		return -1;
>> +	if (gettimeofday(&now, NULL) < 0) {
>> +		RTE_LOG(ERR, EAL, "Faile to get current time\n");
>> +		rte_errno = errno;
>> +		return -1;
>> +	}
> 
> gettimeofday is not a good API to use in DPDK.
> It gets changed by NTP; if you have to use system time you want monotonic clock
> 

We need current time because pthread_cond_timedwait() accepts current 
time. So it's either that, or reimplementing pthread_cond_timedwait() in 
DPDK using monotonic clock :) Unless, of course, there already are 
alternatives that use monotonic clock and that don't need other DPDK 
machinery (e.g. rte_malloc) to work (like rte_alarm callbacks).
  
Burakov, Anatoly March 3, 2018, 1:44 p.m. UTC | #5
On 02-Mar-18 6:51 PM, Stephen Hemminger wrote:
> On Tue, 27 Feb 2018 14:59:29 +0000
> Anatoly Burakov <anatoly.burakov@intel.com> wrote:
> 
>> This API is similar to the blocking API that is already present,
>> but reply will be received in a separate callback by the caller.
>>
>> Under the hood, we create a separate thread to deal with replies to
>> asynchronous requests, that will just wait to be notified by the
>> main thread, or woken up on a timer (it'll wake itself up every
>> minute regardless of whether it was called, but if there are no
>> requests in the queue, nothing will be done and it'll go to sleep
>> for another minute).
>>
>> Signed-off-by: Anatoly Burakov <anatoly.burakov@intel.com>
> 
> The problem with this callback model is it makes it possible to
> have a single wait for multiple events model (like epoll) which
> is the most scaleable way to write applications.
> 
> 

I assume there's a typo in there somewhere, because the way it's written 
makes it look like an advantage, not a problem :) Some more details on 
what exactly is the issue would be welcome though.
  

Patch

diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index bdea6d6..c5ae569 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -41,7 +41,11 @@  static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
 struct action_entry {
 	TAILQ_ENTRY(action_entry) next;
 	char action_name[RTE_MP_MAX_NAME_LEN];
-	rte_mp_t action;
+	RTE_STD_C11
+	union {
+		rte_mp_t action;
+		rte_mp_async_reply_t reply;
+	};
 };
 
 /** Double linked list of actions. */
@@ -73,13 +77,37 @@  TAILQ_HEAD(message_queue, message_queue_entry);
 static struct message_queue message_queue =
 	TAILQ_HEAD_INITIALIZER(message_queue);
 
+enum mp_request_type {
+	REQUEST_TYPE_SYNC,
+	REQUEST_TYPE_ASYNC
+};
+
+struct async_request_shared_param {
+	struct rte_mp_reply *user_reply;
+	struct timespec *end;
+	int n_requests_processed;
+};
+
+struct async_request_param {
+	struct async_request_shared_param *param;
+};
+
+struct sync_request_param {
+	pthread_cond_t cond;
+};
+
 struct sync_request {
 	TAILQ_ENTRY(sync_request) next;
-	int reply_received;
+	enum mp_request_type type;
 	char dst[PATH_MAX];
 	struct rte_mp_msg *request;
-	struct rte_mp_msg *reply;
-	pthread_cond_t cond;
+	struct rte_mp_msg *reply_msg;
+	int reply_received;
+	RTE_STD_C11
+	union {
+		struct sync_request_param sync;
+		struct async_request_param async;
+	};
 };
 
 TAILQ_HEAD(sync_request_list, sync_request);
@@ -87,9 +115,12 @@  TAILQ_HEAD(sync_request_list, sync_request);
 static struct {
 	struct sync_request_list requests;
 	pthread_mutex_t lock;
+	pthread_cond_t async_cond;
 } sync_requests = {
 	.requests = TAILQ_HEAD_INITIALIZER(sync_requests.requests),
-	.lock = PTHREAD_MUTEX_INITIALIZER
+	.lock = PTHREAD_MUTEX_INITIALIZER,
+	.async_cond = PTHREAD_COND_INITIALIZER
+	/**< used in async requests only */
 };
 
 static struct sync_request *
@@ -201,53 +232,97 @@  validate_action_name(const char *name)
 	return 0;
 }
 
-int __rte_experimental
-rte_mp_action_register(const char *name, rte_mp_t action)
+static struct action_entry *
+action_register(const char *name)
 {
 	struct action_entry *entry;
 
 	if (validate_action_name(name))
-		return -1;
+		return NULL;
 
 	entry = malloc(sizeof(struct action_entry));
 	if (entry == NULL) {
 		rte_errno = ENOMEM;
-		return -1;
+		return NULL;
 	}
 	strcpy(entry->action_name, name);
-	entry->action = action;
 
-	pthread_mutex_lock(&mp_mutex_action);
 	if (find_action_entry_by_name(name) != NULL) {
 		pthread_mutex_unlock(&mp_mutex_action);
 		rte_errno = EEXIST;
 		free(entry);
-		return -1;
+		return NULL;
 	}
 	TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
-	pthread_mutex_unlock(&mp_mutex_action);
-	return 0;
+
+	/* async and sync replies are handled by different threads, so even
+	 * though they a share pointer in a union, one will never trigger in
+	 * place of the other.
+	 */
+
+	return entry;
 }
 
-void __rte_experimental
-rte_mp_action_unregister(const char *name)
+static void
+action_unregister(const char *name)
 {
 	struct action_entry *entry;
 
 	if (validate_action_name(name))
 		return;
 
-	pthread_mutex_lock(&mp_mutex_action);
 	entry = find_action_entry_by_name(name);
 	if (entry == NULL) {
-		pthread_mutex_unlock(&mp_mutex_action);
 		return;
 	}
 	TAILQ_REMOVE(&action_entry_list, entry, next);
-	pthread_mutex_unlock(&mp_mutex_action);
 	free(entry);
 }
 
+int __rte_experimental
+rte_mp_action_register(const char *name, rte_mp_t action)
+{
+	struct action_entry *entry;
+
+	pthread_mutex_lock(&mp_mutex_action);
+
+	entry = action_register(name);
+	if (entry != NULL)
+		entry->action = action;
+	pthread_mutex_unlock(&mp_mutex_action);
+
+	return entry == NULL ? -1 : 0;
+}
+
+void __rte_experimental
+rte_mp_action_unregister(const char *name)
+{
+	pthread_mutex_lock(&mp_mutex_action);
+	action_unregister(name);
+	pthread_mutex_unlock(&mp_mutex_action);
+}
+
+int __rte_experimental
+rte_mp_async_reply_register(const char *name, rte_mp_async_reply_t reply)
+{
+	struct action_entry *entry;
+
+	pthread_mutex_lock(&mp_mutex_action);
+
+	entry = action_register(name);
+	if (entry != NULL)
+		entry->reply = reply;
+	pthread_mutex_unlock(&mp_mutex_action);
+
+	return entry == NULL ? -1 : 0;
+}
+
+void __rte_experimental
+rte_mp_async_reply_unregister(const char *name)
+{
+	rte_mp_action_unregister(name);
+}
+
 static int
 read_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
 {
@@ -307,9 +382,13 @@  process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
 		pthread_mutex_lock(&sync_requests.lock);
 		sync_req = find_sync_request(s->sun_path, msg->name);
 		if (sync_req) {
-			memcpy(sync_req->reply, msg, sizeof(*msg));
+			memcpy(sync_req->reply_msg, msg, sizeof(*msg));
 			sync_req->reply_received = 1;
-			pthread_cond_signal(&sync_req->cond);
+
+			if (sync_req->type == REQUEST_TYPE_SYNC)
+				pthread_cond_signal(&sync_req->sync.cond);
+			else if (sync_req->type == REQUEST_TYPE_ASYNC)
+				pthread_cond_signal(&sync_requests.async_cond);
 		} else
 			RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
 		pthread_mutex_unlock(&sync_requests.lock);
@@ -370,6 +449,166 @@  mp_handle(void *arg __rte_unused)
 }
 
 static int
+timespec_cmp(const struct timespec *a, const struct timespec *b)
+{
+	if (a->tv_sec < b->tv_sec)
+		return -1;
+	if (a->tv_sec > b->tv_sec)
+		return 1;
+	if (a->tv_nsec < b->tv_nsec)
+		return -1;
+	if (a->tv_nsec > b->tv_nsec)
+		return 1;
+	return 0;
+}
+
+static int
+process_async_request(struct sync_request *sr, const struct timespec *now)
+{
+	struct async_request_shared_param *param;
+	struct rte_mp_reply *reply;
+	int ret;
+	bool timeout, received, last_msg;
+
+	param = sr->async.param;
+	reply = param->user_reply;
+
+	/* did we timeout? */
+	timeout = timespec_cmp(param->end, now) <= 0;
+
+	/* did we receive a response? */
+	received = sr->reply_received != 0;
+
+	/* if we didn't time out, and we didn't receive a response, ignore */
+	if (!timeout && !received)
+		return 0;
+
+	ret = 1;
+
+	/* if we received a response, adjust relevant data and copy mesasge. */
+	if (received && reply->nb_sent != 0) {
+		struct rte_mp_msg *msg, *user_msgs, *tmp;
+
+		msg = sr->reply_msg;
+		user_msgs = reply->msgs;
+
+		tmp = realloc(user_msgs, sizeof(*msg) *
+				(reply->nb_received + 1));
+		if (!tmp) {
+			RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
+				sr->dst, sr->request->name);
+			/* this entry is going to be removed and its message
+			 * dropped, but we don't want to leak memory, so
+			 * continue.
+			 */
+			ret = -1;
+		} else {
+			user_msgs = tmp;
+			reply->msgs = user_msgs;
+			memcpy(&user_msgs[reply->nb_received],
+					msg, sizeof(*msg));
+			reply->nb_received++;
+		}
+	}
+	free(sr->reply_msg);
+
+	/* mark this request as processed */
+	param->n_requests_processed++;
+
+	/* if number of sent messages is zero, we're short-circuiting */
+	last_msg = param->n_requests_processed == reply->nb_sent ||
+			reply->nb_sent == 0;
+
+	/* if this was the last request, we can call the callback */
+	if (last_msg) {
+		pthread_mutex_lock(&mp_mutex_action);
+		struct action_entry *entry =
+				find_action_entry_by_name(sr->request->name);
+		if (!entry) {
+			RTE_LOG(ERR, EAL, "Cannot find async request callback for %s\n",
+					sr->request->name);
+			ret = -1;
+		} else {
+			entry->reply(reply);
+		}
+		pthread_mutex_unlock(&mp_mutex_action);
+		/* clean up */
+		free(sr->async.param->user_reply->msgs);
+		free(sr->async.param->user_reply);
+		free(sr->async.param->end);
+		free(sr->async.param);
+	}
+	return ret;
+}
+
+static void *
+async_reply_handle(void *arg __rte_unused)
+{
+	struct sync_request *sr;
+	struct timeval now;
+	struct timespec timeout, ts_now;
+	do {
+		int ret;
+		pthread_mutex_lock(&sync_requests.lock);
+
+		if (gettimeofday(&now, NULL) < 0) {
+			RTE_LOG(ERR, EAL, "Cannot get current time\n");
+			pthread_mutex_unlock(&sync_requests.lock);
+			break;
+		}
+
+		/* set a 60 second timeout by default */
+		timeout.tv_nsec = (now.tv_usec * 1000 + 60) % 1000000000;
+		timeout.tv_sec = now.tv_sec + 60 +
+				(now.tv_usec * 1000 + 60) / 1000000000;
+
+		/* scan through the list and see if there are any timeouts that
+		 * are earlier than our current timeout.
+		 */
+		TAILQ_FOREACH(sr, &sync_requests.requests, next) {
+			if (sr->type != REQUEST_TYPE_ASYNC)
+				continue;
+			if (timespec_cmp(sr->async.param->end, &timeout) < 0)
+				memcpy(&timeout, sr->async.param->end,
+					sizeof(timeout));
+		}
+
+		/* now, wait until we either time out or get woken up */
+		ret = pthread_cond_timedwait(&sync_requests.async_cond,
+				&sync_requests.lock, &timeout);
+
+		if (gettimeofday(&now, NULL) < 0) {
+			RTE_LOG(ERR, EAL, "Cannot get current time\n");
+			break;
+		}
+		ts_now.tv_nsec = now.tv_usec * 1000;
+		ts_now.tv_sec = now.tv_sec;
+
+		if (ret == 0 || ret == ETIMEDOUT) {
+			struct sync_request *next;
+			/* we've either been woken up, or we timed out */
+
+			/* we have still the lock, check if anything needs
+			 * processing.
+			 */
+			TAILQ_FOREACH_SAFE(sr, &sync_requests.requests, next,
+					next) {
+				if (process_async_request(sr, &ts_now)) {
+					TAILQ_REMOVE(&sync_requests.requests,
+							sr, next);
+					free(sr);
+				}
+			}
+		}
+		pthread_mutex_unlock(&sync_requests.lock);
+	} while (1);
+
+	RTE_LOG(ERR, EAL, "ERROR: asynchronous requests disabled\n");
+
+	return NULL;
+}
+
+static int
 open_socket_fd(void)
 {
 	char peer_name[PATH_MAX] = {0};
@@ -506,7 +745,7 @@  rte_mp_channel_init(void)
 	char thread_name[RTE_MAX_THREAD_NAME_LEN];
 	char path[PATH_MAX];
 	int dir_fd;
-	pthread_t tid;
+	pthread_t mp_handle_tid, async_reply_handle_tid;
 
 	/* create filter path */
 	create_socket_path("*", path, sizeof(path));
@@ -543,7 +782,16 @@  rte_mp_channel_init(void)
 		return -1;
 	}
 
-	if (pthread_create(&tid, NULL, mp_handle, NULL) < 0) {
+	if (pthread_create(&mp_handle_tid, NULL, mp_handle, NULL) < 0) {
+		RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
+			strerror(errno));
+		close(mp_fd);
+		mp_fd = -1;
+		return -1;
+	}
+
+	if (pthread_create(&async_reply_handle_tid, NULL,
+			async_reply_handle, NULL) < 0) {
 		RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
 			strerror(errno));
 		close(mp_fd);
@@ -553,7 +801,11 @@  rte_mp_channel_init(void)
 
 	/* try best to set thread name */
 	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_handle");
-	rte_thread_setname(tid, thread_name);
+	rte_thread_setname(mp_handle_tid, thread_name);
+
+	/* try best to set thread name */
+	snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_async_handle");
+	rte_thread_setname(async_reply_handle_tid, thread_name);
 
 	/* unlock the directory */
 	flock(dir_fd, LOCK_UN);
@@ -750,19 +1002,76 @@  rte_mp_sendmsg(struct rte_mp_msg *msg)
 }
 
 static int
-mp_request_one(const char *dst, struct rte_mp_msg *req,
+mp_request_async(const char *dst, struct rte_mp_msg *req,
+		struct async_request_shared_param *param)
+{
+	struct rte_mp_msg *reply_msg;
+	struct sync_request *sync_req, *exist;
+	int ret;
+
+	sync_req = malloc(sizeof(*sync_req));
+	reply_msg = malloc(sizeof(*reply_msg));
+	if (sync_req == NULL || reply_msg == NULL) {
+		RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n");
+		rte_errno = ENOMEM;
+		ret = -1;
+		goto fail;
+	}
+
+	sync_req->type = REQUEST_TYPE_ASYNC;
+	strcpy(sync_req->dst, dst);
+	sync_req->request = req;
+	sync_req->reply_msg = reply_msg;
+	sync_req->async.param = param;
+
+	/* queue already locked by caller */
+
+	exist = find_sync_request(dst, req->name);
+	if (!exist)
+		TAILQ_INSERT_TAIL(&sync_requests.requests, sync_req, next);
+	if (exist) {
+		RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
+		rte_errno = EEXIST;
+		ret = -1;
+		goto fail;
+	}
+
+	ret = send_msg(dst, req, MP_REQ);
+	if (ret < 0) {
+		RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
+			dst, req->name);
+		ret = -1;
+		goto fail;
+	} else if (ret == 0) {
+		ret = 0;
+		goto fail;
+	}
+
+	param->user_reply->nb_sent++;
+
+	return 0;
+fail:
+	free(sync_req);
+	free(reply_msg);
+	return ret;
+}
+
+static int
+mp_request_sync(const char *dst, struct rte_mp_msg *req,
 	       struct rte_mp_reply *reply, const struct timespec *ts)
 {
 	int ret;
 	struct timeval now;
+	struct timespec ts_now;
 	struct rte_mp_msg msg, *tmp;
 	struct sync_request sync_req, *exist;
 
+	sync_req.type = REQUEST_TYPE_SYNC;
 	sync_req.reply_received = 0;
 	strcpy(sync_req.dst, dst);
 	sync_req.request = req;
-	sync_req.reply = &msg;
-	pthread_cond_init(&sync_req.cond, NULL);
+	sync_req.reply_msg = &msg;
+	pthread_cond_init(&sync_req.sync.cond, NULL);
 
 	pthread_mutex_lock(&sync_requests.lock);
 	exist = find_sync_request(dst, req->name);
@@ -787,17 +1096,17 @@  mp_request_one(const char *dst, struct rte_mp_msg *req,
 
 	pthread_mutex_lock(&sync_requests.lock);
 	do {
-		pthread_cond_timedwait(&sync_req.cond, &sync_requests.lock, ts);
+		pthread_cond_timedwait(&sync_req.sync.cond,
+				&sync_requests.lock, ts);
 		/* Check spurious wakeups */
 		if (sync_req.reply_received == 1)
 			break;
 		/* Check if time is out */
 		if (gettimeofday(&now, NULL) < 0)
 			break;
-		if (ts->tv_sec < now.tv_sec)
-			break;
-		else if (now.tv_sec == ts->tv_sec &&
-			 now.tv_usec * 1000 < ts->tv_nsec)
+		ts_now.tv_nsec = now.tv_usec * 1000;
+		ts_now.tv_sec = now.tv_sec;
+		if (timespec_cmp(ts, &ts_now) < 0)
 			break;
 	} while (1);
 	/* We got the lock now */
@@ -854,7 +1163,7 @@  rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
 
 	/* for secondary process, send request to the primary process only */
 	if (rte_eal_process_type() == RTE_PROC_SECONDARY)
-		return mp_request_one(eal_mp_socket_path(), req, reply, &end);
+		return mp_request_sync(eal_mp_socket_path(), req, reply, &end);
 
 	/* for primary process, broadcast request, and collect reply 1 by 1 */
 	mp_dir = opendir(mp_dir_path);
@@ -896,7 +1205,7 @@  rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
 			continue;
 		}
 
-		if (mp_request_one(path, req, reply, &end))
+		if (mp_request_sync(path, req, reply, &end))
 			ret = -1;
 	}
 	/* unlock the directory */
@@ -907,9 +1216,158 @@  rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
 }
 
 int __rte_experimental
-rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
+rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts)
 {
+	struct sync_request *dummy;
+	struct async_request_shared_param *param = NULL;
+	struct rte_mp_reply *reply = NULL;
+	int dir_fd, ret = 0;
+	DIR *mp_dir;
+	struct dirent *ent;
+	struct timeval now;
+	struct timespec *end = NULL;
+
+	RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
+
+	if (check_input(req) == false)
+		return -1;
+	if (gettimeofday(&now, NULL) < 0) {
+		RTE_LOG(ERR, EAL, "Faile to get current time\n");
+		rte_errno = errno;
+		return -1;
+	}
+	dummy = malloc(sizeof(*dummy));
+	param = malloc(sizeof(*param));
+	reply = malloc(sizeof(*reply));
+	end = malloc(sizeof(*end));
+	if (reply == NULL || end == NULL || param == NULL || dummy == NULL) {
+		RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n");
+		rte_errno = ENOMEM;
+		goto fail;
+	}
+
+	param->n_requests_processed = 0;
+	param->end = end;
+	param->user_reply = reply;
+
+	end->tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000;
+	end->tv_sec = now.tv_sec + ts->tv_sec +
+			(now.tv_usec * 1000 + ts->tv_nsec) / 1000000000;
+	reply->nb_sent = 0;
+	reply->nb_received = 0;
+	reply->msgs = NULL;
 
+	/* we have to lock the request queue here, as we will be adding a bunch
+	 * of requests to the queue at once, and some of the replies may arrive
+	 * before we add all of the requests to the queue.
+	 */
+	pthread_mutex_lock(&sync_requests.lock);
+
+	/* we have to ensure that callback gets triggered even if we don't send
+	 * anything, therefore earlier we have allocated a dummy request. put it
+	 * on the queue and fill it. we will remove it once we know we sent
+	 * something.
+	 */
+	dummy->type = REQUEST_TYPE_ASYNC;
+	dummy->request = req;
+	dummy->reply_msg = NULL;
+	dummy->async.param = param;
+	dummy->reply_received = 1; /* short-circuit the timeout */
+
+	TAILQ_INSERT_TAIL(&sync_requests.requests, dummy, next);
+
+	/* for secondary process, send request to the primary process only */
+	if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
+		ret = mp_request_async(eal_mp_socket_path(), req, param);
+
+		/* if we sent something, remove dummy request from the queue */
+		if (reply->nb_sent != 0) {
+			TAILQ_REMOVE(&sync_requests.requests, dummy, next);
+			free(dummy);
+			dummy = NULL;
+		}
+
+		pthread_mutex_unlock(&sync_requests.lock);
+
+		/* if we couldn't send anything, clean up */
+		if (ret != 0)
+			goto fail;
+		return 0;
+	}
+
+	/* for primary process, broadcast request */
+	mp_dir = opendir(mp_dir_path);
+	if (!mp_dir) {
+		RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
+		rte_errno = errno;
+		goto unlock_fail;
+	}
+	dir_fd = dirfd(mp_dir);
+
+	/* lock the directory to prevent processes spinning up while we send */
+	if (flock(dir_fd, LOCK_EX)) {
+		RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
+			mp_dir_path);
+		closedir(mp_dir);
+		rte_errno = errno;
+		goto unlock_fail;
+	}
+
+	while ((ent = readdir(mp_dir))) {
+		const char *peer_name;
+		char path[PATH_MAX];
+		int active;
+
+		if (fnmatch(mp_filter, ent->d_name, 0) != 0)
+			continue;
+
+		snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
+			 ent->d_name);
+		peer_name = get_peer_name(path);
+
+		active = socket_is_active(peer_name);
+
+		if (active < 0) {
+			ret = -1;
+			break;
+		} else if (active == 0) {
+			unlinkat(dir_fd, ent->d_name, 0);
+			continue;
+		}
+
+		if (mp_request_async(path, req, param))
+			ret = -1;
+	}
+	/* if we sent something, remove dummy request from the queue */
+	if (reply->nb_sent != 0) {
+		TAILQ_REMOVE(&sync_requests.requests, dummy, next);
+		free(dummy);
+		dummy = NULL;
+	}
+	/* trigger async request thread wake up */
+	pthread_cond_signal(&sync_requests.async_cond);
+
+	/* finally, unlock the queue */
+	pthread_mutex_unlock(&sync_requests.lock);
+
+	/* unlock the directory */
+	flock(dir_fd, LOCK_UN);
+
+	closedir(mp_dir);
+	return ret;
+unlock_fail:
+	pthread_mutex_unlock(&sync_requests.lock);
+fail:
+	free(dummy);
+	free(param);
+	free(reply);
+	free(end);
+	return -1;
+}
+
+int __rte_experimental
+rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
+{
 	RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name);
 
 	if (check_input(msg) == false)
diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
index 5269dab..78a40de 100644
--- a/lib/librte_eal/common/include/rte_eal.h
+++ b/lib/librte_eal/common/include/rte_eal.h
@@ -231,6 +231,15 @@  struct rte_mp_reply {
 typedef int (*rte_mp_t)(const struct rte_mp_msg *msg, const void *peer);
 
 /**
+ * Asynchronous reply function typedef used by other components.
+ *
+ * As we create  socket channel for primary/secondary communication, use
+ * this function typedef to register action for coming responses to asynchronous
+ * requests.
+ */
+typedef int (*rte_mp_async_reply_t)(const struct rte_mp_reply *msg);
+
+/**
  * @warning
  * @b EXPERIMENTAL: this API may change without prior notice
  *
@@ -274,6 +283,46 @@  rte_mp_action_unregister(const char *name);
  * @warning
  * @b EXPERIMENTAL: this API may change without prior notice
  *
+ * Register an asynchronous reply callback for primary/secondary communication.
+ *
+ * Call this function to register a callback for asynchronous requests, if the
+ * calling component wants to receive responses to its own asynchronous requests
+ * from the corresponding component in its primary or secondary processes.
+ *
+ * @param name
+ *   The name argument plays as a unique key to find the action.
+ *
+ * @param reply
+ *   The reply argument is the function pointer to the reply callback.
+ *
+ * @return
+ *  - 0 on success.
+ *  - (<0) on failure.
+ */
+int __rte_experimental
+rte_mp_async_reply_register(const char *name, rte_mp_async_reply_t reply);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Unregister an asynchronous reply callback.
+ *
+ * Call this function to unregister a callback if the calling component does
+ * not want responses the messages from the corresponding component in its
+ * primary process or secondary processes.
+ *
+ * @param name
+ *   The name argument plays as a unique key to find the action.
+ *
+ */
+void __rte_experimental
+rte_mp_async_reply_unregister(const char *name);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
  * Send a message to the peer process.
  *
  * This function will send a message which will be responsed by the action
@@ -322,6 +371,28 @@  rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
  * @warning
  * @b EXPERIMENTAL: this API may change without prior notice
  *
+ * Send a request to the peer process and expect a reply in a separate callback.
+ *
+ * This function sends a request message to the peer process, and will not
+ * block. Instead, reply will be received in a separate callback.
+ *
+ * @param req
+ *   The req argument contains the customized request message.
+ *
+ * @param ts
+ *   The ts argument specifies how long we can wait for the peer(s) to reply.
+ *
+ * @return
+ *  - On success, return 0.
+ *  - On failure, return -1, and the reason will be stored in rte_errno.
+ */
+int __rte_experimental
+rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
  * Send a reply to the peer process.
  *
  * This function will send a reply message in response to a request message