[dpdk-dev,v3,2/5] eal: don't process IPC messages before init finished

Message ID 3903de6f3824e5063d49936e743e22dd819bad09.1519740527.git.anatoly.burakov@intel.com (mailing list archive)
State Superseded, archived
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation fail apply patch file failure

Commit Message

Burakov, Anatoly Feb. 27, 2018, 2:35 p.m. UTC
  It is not possible for a primary process to receive any messages
while initializing, because RTE_MAGIC value is not set in the
shared config, and hence no secondary process can ever spin up
during that time.

However, it is possible for a secondary process to receive messages
from the primary during initialization. We can't just drop the
messages as they may be important, and also we might need to process
replies to our own requests (e.g. VFIO) during initialization.

Therefore, add a tailq for incoming messages, and queue them up
until initialization is complete, and process them in order they
arrived.

Signed-off-by: Anatoly Burakov <anatoly.burakov@intel.com>
---

Notes:
    v3: check for init_complete after receiving message
    
    v2: no changes

 lib/librte_eal/common/eal_common_proc.c | 52 +++++++++++++++++++++++++++++----
 1 file changed, 47 insertions(+), 5 deletions(-)
  

Comments

Jianfeng Tan Feb. 28, 2018, 1:09 a.m. UTC | #1
> -----Original Message-----
> From: Burakov, Anatoly
> Sent: Tuesday, February 27, 2018 10:36 PM
> To: dev@dpdk.org
> Cc: Tan, Jianfeng
> Subject: [PATCH v3 2/5] eal: don't process IPC messages before init finished
> 
> It is not possible for a primary process to receive any messages
> while initializing, because RTE_MAGIC value is not set in the
> shared config, and hence no secondary process can ever spin up
> during that time.
> 
> However, it is possible for a secondary process to receive messages
> from the primary during initialization. We can't just drop the
> messages as they may be important, and also we might need to process
> replies to our own requests (e.g. VFIO) during initialization.
> 
> Therefore, add a tailq for incoming messages, and queue them up
> until initialization is complete, and process them in order they
> arrived.
> 
> Signed-off-by: Anatoly Burakov <anatoly.burakov@intel.com>
> ---
> 
> Notes:
>     v3: check for init_complete after receiving message
> 
>     v2: no changes
> 
>  lib/librte_eal/common/eal_common_proc.c | 52
> +++++++++++++++++++++++++++++----
>  1 file changed, 47 insertions(+), 5 deletions(-)
> 
> diff --git a/lib/librte_eal/common/eal_common_proc.c
> b/lib/librte_eal/common/eal_common_proc.c
> index 3a1088e..a6e24e6 100644
> --- a/lib/librte_eal/common/eal_common_proc.c
> +++ b/lib/librte_eal/common/eal_common_proc.c
> @@ -25,6 +25,7 @@
>  #include <rte_errno.h>
>  #include <rte_lcore.h>
>  #include <rte_log.h>
> +#include <rte_tailq.h>
> 
>  #include "eal_private.h"
>  #include "eal_filesystem.h"
> @@ -58,6 +59,18 @@ struct mp_msg_internal {
>  	struct rte_mp_msg msg;
>  };
> 
> +struct message_queue_entry {
> +	TAILQ_ENTRY(message_queue_entry) next;
> +	struct mp_msg_internal msg;
> +	struct sockaddr_un sa;
> +};
> +
> +/** Double linked list of received messages. */
> +TAILQ_HEAD(message_queue, message_queue_entry);
> +
> +static struct message_queue message_queue =
> +	TAILQ_HEAD_INITIALIZER(message_queue);
> +
>  struct sync_request {
>  	TAILQ_ENTRY(sync_request) next;
>  	int reply_received;
> @@ -276,12 +289,41 @@ process_msg(struct mp_msg_internal *m, struct
> sockaddr_un *s)
>  static void *
>  mp_handle(void *arg __rte_unused)
>  {
> -	struct mp_msg_internal msg;
> -	struct sockaddr_un sa;
> -
> +	struct message_queue_entry *cur_msg, *next_msg, *new_msg =
> NULL;
>  	while (1) {
> -		if (read_msg(&msg, &sa) == 0)
> -			process_msg(&msg, &sa);
> +		/* we want to process all messages in order of their arrival,
> +		 * but status of init_complete may change while we're
> iterating
> +		 * the tailq. so, store it here and check once every iteration.
> +		 */
> +		int init_complete;
> +
> +		if (new_msg == NULL)
> +			new_msg = malloc(sizeof(*new_msg));
> +		if (read_msg(&new_msg->msg, &new_msg->sa) == 0) {

Suppose a case that: req and msg received but init not completed, so we enqueue all of them in the tailq; and from now on, no req/rep/msg comes. Then mp thread will hang here for reading new message.
In such a case, we might need the master thread to signal mp thread to wake up when init is completed?

Thanks,
Jianfeng

> +			/* we successfully read the message, so enqueue it
> */
> +			TAILQ_INSERT_TAIL(&message_queue, new_msg,
> next);
> +			new_msg = NULL;
> +		} /* reuse new_msg for next message if we couldn't
> read_msg */
> +
> +		init_complete = internal_config.init_complete;
> +
> +		/* tailq only accessed here, so no locking needed */
> +		TAILQ_FOREACH_SAFE(cur_msg, &message_queue, next,
> next_msg) {
> +			/* secondary process should not process any
> incoming
> +			 * requests until its initialization is complete, but
> +			 * it is allowed to process replies to its own queries.
> +			 */
> +			if (rte_eal_process_type() ==
> RTE_PROC_SECONDARY &&
> +					!init_complete &&
> +					cur_msg->msg.type != MP_REP)
> +				continue;
> +
> +			TAILQ_REMOVE(&message_queue, cur_msg, next);
> +
> +			process_msg(&cur_msg->msg, &cur_msg->sa);
> +
> +			free(cur_msg);
> +		}
>  	}
> 
>  	return NULL;
> --
> 2.7.4
  
Wiles, Keith Feb. 28, 2018, 4 a.m. UTC | #2
> 

> +	struct message_queue_entry *cur_msg, *next_msg, *new_msg = NULL;

> 	while (1) {

> -		if (read_msg(&msg, &sa) == 0)

> -			process_msg(&msg, &sa);

> +		/* we want to process all messages in order of their arrival,

> +		 * but status of init_complete may change while we're iterating

> +		 * the tailq. so, store it here and check once every iteration.

> +		 */

> +		int init_complete;


Do we allow variables to be defined in the middle of a block, I thought we only allowed them after a ‘{‘ or open block.

> +

> +		if (new_msg == NULL)

> +			new_msg = malloc(sizeof(*new_msg));


I am very concerned about allocating memory with no limit. If the process never completes then we could receive messages and consume a lot of memory. I would want to see a limit to the number of messages we can consume in the queue just to be sure.

> +		if (read_msg(&new_msg->msg, &new_msg->sa) == 0) {

> +			/* we successfully read the message, so enqueue it */

> +			TAILQ_INSERT_TAIL(&message_queue, new_msg, next);

> +			new_msg = NULL;

> +		} /* reuse new_msg for next message if we couldn't read_msg */

> +

> +		init_complete = internal_config.init_complete;


Does the internal_config.init_complete need to be a volatile to make sure it is reread each time thru the loop?

> +

> +		/* tailq only accessed here, so no locking needed */

> +		TAILQ_FOREACH_SAFE(cur_msg, &message_queue, next, next_msg) {

> +			/* secondary process should not process any incoming

> +			 * requests until its initialization is complete, but

> +			 * it is allowed to process replies to its own queries.

> +			 */

> +			if (rte_eal_process_type() == RTE_PROC_SECONDARY &&

> +					!init_complete &&

> +					cur_msg->msg.type != MP_REP)

> +				continue;

> +

> +			TAILQ_REMOVE(&message_queue, cur_msg, next);

> +

> +			process_msg(&cur_msg->msg, &cur_msg->sa);

> +

> +			free(cur_msg);

> +		}

> 	}

> 

> 	return NULL;

> -- 

> 2.7.4


Regards,
Keith
  
Burakov, Anatoly Feb. 28, 2018, 9:45 a.m. UTC | #3
On 28-Feb-18 1:09 AM, Tan, Jianfeng wrote:
> 
> 
>> -----Original Message-----
>> From: Burakov, Anatoly
>> Sent: Tuesday, February 27, 2018 10:36 PM
>> To: dev@dpdk.org
>> Cc: Tan, Jianfeng
>> Subject: [PATCH v3 2/5] eal: don't process IPC messages before init finished
>>
>> It is not possible for a primary process to receive any messages
>> while initializing, because RTE_MAGIC value is not set in the
>> shared config, and hence no secondary process can ever spin up
>> during that time.
>>
>> However, it is possible for a secondary process to receive messages
>> from the primary during initialization. We can't just drop the
>> messages as they may be important, and also we might need to process
>> replies to our own requests (e.g. VFIO) during initialization.
>>
>> Therefore, add a tailq for incoming messages, and queue them up
>> until initialization is complete, and process them in order they
>> arrived.
>>
>> Signed-off-by: Anatoly Burakov <anatoly.burakov@intel.com>
>> ---
>>
>> Notes:
>>      v3: check for init_complete after receiving message
>>
>>      v2: no changes
>>
>>   lib/librte_eal/common/eal_common_proc.c | 52
>> +++++++++++++++++++++++++++++----
>>   1 file changed, 47 insertions(+), 5 deletions(-)
>>
>> diff --git a/lib/librte_eal/common/eal_common_proc.c
>> b/lib/librte_eal/common/eal_common_proc.c
>> index 3a1088e..a6e24e6 100644
>> --- a/lib/librte_eal/common/eal_common_proc.c
>> +++ b/lib/librte_eal/common/eal_common_proc.c
>> @@ -25,6 +25,7 @@
>>   #include <rte_errno.h>
>>   #include <rte_lcore.h>
>>   #include <rte_log.h>
>> +#include <rte_tailq.h>
>>
>>   #include "eal_private.h"
>>   #include "eal_filesystem.h"
>> @@ -58,6 +59,18 @@ struct mp_msg_internal {
>>   	struct rte_mp_msg msg;
>>   };
>>
>> +struct message_queue_entry {
>> +	TAILQ_ENTRY(message_queue_entry) next;
>> +	struct mp_msg_internal msg;
>> +	struct sockaddr_un sa;
>> +};
>> +
>> +/** Double linked list of received messages. */
>> +TAILQ_HEAD(message_queue, message_queue_entry);
>> +
>> +static struct message_queue message_queue =
>> +	TAILQ_HEAD_INITIALIZER(message_queue);
>> +
>>   struct sync_request {
>>   	TAILQ_ENTRY(sync_request) next;
>>   	int reply_received;
>> @@ -276,12 +289,41 @@ process_msg(struct mp_msg_internal *m, struct
>> sockaddr_un *s)
>>   static void *
>>   mp_handle(void *arg __rte_unused)
>>   {
>> -	struct mp_msg_internal msg;
>> -	struct sockaddr_un sa;
>> -
>> +	struct message_queue_entry *cur_msg, *next_msg, *new_msg =
>> NULL;
>>   	while (1) {
>> -		if (read_msg(&msg, &sa) == 0)
>> -			process_msg(&msg, &sa);
>> +		/* we want to process all messages in order of their arrival,
>> +		 * but status of init_complete may change while we're
>> iterating
>> +		 * the tailq. so, store it here and check once every iteration.
>> +		 */
>> +		int init_complete;
>> +
>> +		if (new_msg == NULL)
>> +			new_msg = malloc(sizeof(*new_msg));
>> +		if (read_msg(&new_msg->msg, &new_msg->sa) == 0) {
> 
> Suppose a case that: req and msg received but init not completed, so we enqueue all of them in the tailq; and from now on, no req/rep/msg comes. Then mp thread will hang here for reading new message.
> In such a case, we might need the master thread to signal mp thread to wake up when init is completed?

True. Will have to think about how to do that.
  
Burakov, Anatoly Feb. 28, 2018, 9:47 a.m. UTC | #4
On 28-Feb-18 4:00 AM, Wiles, Keith wrote:
> 
> 
>>
>> +	struct message_queue_entry *cur_msg, *next_msg, *new_msg = NULL;
>> 	while (1) {
>> -		if (read_msg(&msg, &sa) == 0)
>> -			process_msg(&msg, &sa);
>> +		/* we want to process all messages in order of their arrival,
>> +		 * but status of init_complete may change while we're iterating
>> +		 * the tailq. so, store it here and check once every iteration.
>> +		 */
>> +		int init_complete;
> 
> Do we allow variables to be defined in the middle of a block, I thought we only allowed them after a ‘{‘ or open block.

Apologies, will fix.

> 
>> +
>> +		if (new_msg == NULL)
>> +			new_msg = malloc(sizeof(*new_msg));
> 
> I am very concerned about allocating memory with no limit. If the process never completes then we could receive messages and consume a lot of memory. I would want to see a limit to the number of messages we can consume in the queue just to be sure.

Sure, will do.

> 
>> +		if (read_msg(&new_msg->msg, &new_msg->sa) == 0) {
>> +			/* we successfully read the message, so enqueue it */
>> +			TAILQ_INSERT_TAIL(&message_queue, new_msg, next);
>> +			new_msg = NULL;
>> +		} /* reuse new_msg for next message if we couldn't read_msg */
>> +
>> +		init_complete = internal_config.init_complete;
> 
> Does the internal_config.init_complete need to be a volatile to make sure it is reread each time thru the loop?

Will fix.

> 
>> +
>> +		/* tailq only accessed here, so no locking needed */
>> +		TAILQ_FOREACH_SAFE(cur_msg, &message_queue, next, next_msg) {
>> +			/* secondary process should not process any incoming
>> +			 * requests until its initialization is complete, but
>> +			 * it is allowed to process replies to its own queries.
>> +			 */
>> +			if (rte_eal_process_type() == RTE_PROC_SECONDARY &&
>> +					!init_complete &&
>> +					cur_msg->msg.type != MP_REP)
>> +				continue;
>> +
>> +			TAILQ_REMOVE(&message_queue, cur_msg, next);
>> +
>> +			process_msg(&cur_msg->msg, &cur_msg->sa);
>> +
>> +			free(cur_msg);
>> +		}
>> 	}
>>
>> 	return NULL;
>> -- 
>> 2.7.4
> 
> Regards,
> Keith
>
  

Patch

diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index 3a1088e..a6e24e6 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -25,6 +25,7 @@ 
 #include <rte_errno.h>
 #include <rte_lcore.h>
 #include <rte_log.h>
+#include <rte_tailq.h>
 
 #include "eal_private.h"
 #include "eal_filesystem.h"
@@ -58,6 +59,18 @@  struct mp_msg_internal {
 	struct rte_mp_msg msg;
 };
 
+struct message_queue_entry {
+	TAILQ_ENTRY(message_queue_entry) next;
+	struct mp_msg_internal msg;
+	struct sockaddr_un sa;
+};
+
+/** Double linked list of received messages. */
+TAILQ_HEAD(message_queue, message_queue_entry);
+
+static struct message_queue message_queue =
+	TAILQ_HEAD_INITIALIZER(message_queue);
+
 struct sync_request {
 	TAILQ_ENTRY(sync_request) next;
 	int reply_received;
@@ -276,12 +289,41 @@  process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
 static void *
 mp_handle(void *arg __rte_unused)
 {
-	struct mp_msg_internal msg;
-	struct sockaddr_un sa;
-
+	struct message_queue_entry *cur_msg, *next_msg, *new_msg = NULL;
 	while (1) {
-		if (read_msg(&msg, &sa) == 0)
-			process_msg(&msg, &sa);
+		/* we want to process all messages in order of their arrival,
+		 * but status of init_complete may change while we're iterating
+		 * the tailq. so, store it here and check once every iteration.
+		 */
+		int init_complete;
+
+		if (new_msg == NULL)
+			new_msg = malloc(sizeof(*new_msg));
+		if (read_msg(&new_msg->msg, &new_msg->sa) == 0) {
+			/* we successfully read the message, so enqueue it */
+			TAILQ_INSERT_TAIL(&message_queue, new_msg, next);
+			new_msg = NULL;
+		} /* reuse new_msg for next message if we couldn't read_msg */
+
+		init_complete = internal_config.init_complete;
+
+		/* tailq only accessed here, so no locking needed */
+		TAILQ_FOREACH_SAFE(cur_msg, &message_queue, next, next_msg) {
+			/* secondary process should not process any incoming
+			 * requests until its initialization is complete, but
+			 * it is allowed to process replies to its own queries.
+			 */
+			if (rte_eal_process_type() == RTE_PROC_SECONDARY &&
+					!init_complete &&
+					cur_msg->msg.type != MP_REP)
+				continue;
+
+			TAILQ_REMOVE(&message_queue, cur_msg, next);
+
+			process_msg(&cur_msg->msg, &cur_msg->sa);
+
+			free(cur_msg);
+		}
 	}
 
 	return NULL;