[v2,02/10] telemetry: add initial connection socket

Message ID 20181003173612.67101-3-kevin.laatz@intel.com (mailing list archive)
State Superseded, archived
Delegated to: Thomas Monjalon
Headers
Series introduce telemetry library |

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation success Compilation OK

Commit Message

Kevin Laatz Oct. 3, 2018, 5:36 p.m. UTC
  From: Ciara Power <ciara.power@intel.com>

This patch adds the telemetry UNIX socket. It is used to
allow connections from external clients.

On the initial connection from a client, ethdev stats are
registered in the metrics library, to allow for their retrieval
at a later stage.

Signed-off-by: Ciara Power <ciara.power@intel.com>
Signed-off-by: Brian Archbold <brian.archbold@intel.com>
Signed-off-by: Kevin Laatz <kevin.laatz@intel.com>
---
 lib/librte_telemetry/rte_telemetry.c          | 210 ++++++++++++++++++++++++++
 lib/librte_telemetry/rte_telemetry_internal.h |   4 +
 2 files changed, 214 insertions(+)
  

Comments

Mattias Rönnblom Oct. 3, 2018, 6:40 p.m. UTC | #1
On 2018-10-03 19:36, Kevin Laatz wrote:
> From: Ciara Power <ciara.power@intel.com>
> 
> This patch adds the telemetry UNIX socket. It is used to
> allow connections from external clients.
> 
> On the initial connection from a client, ethdev stats are
> registered in the metrics library, to allow for their retrieval
> at a later stage.
> 
> Signed-off-by: Ciara Power <ciara.power@intel.com>
> Signed-off-by: Brian Archbold <brian.archbold@intel.com>
> Signed-off-by: Kevin Laatz <kevin.laatz@intel.com>
> ---
>   lib/librte_telemetry/rte_telemetry.c          | 210 ++++++++++++++++++++++++++
>   lib/librte_telemetry/rte_telemetry_internal.h |   4 +
>   2 files changed, 214 insertions(+)
> 
> diff --git a/lib/librte_telemetry/rte_telemetry.c b/lib/librte_telemetry/rte_telemetry.c
> index d9ffec2..0c99d66 100644
> --- a/lib/librte_telemetry/rte_telemetry.c
> +++ b/lib/librte_telemetry/rte_telemetry.c
> @@ -3,22 +3,158 @@
>    */
>   
>   #include <unistd.h>
> +#include <fcntl.h>
>   #include <pthread.h>
> +#include <sys/socket.h>
> +#include <sys/un.h>
>   
>   #include <rte_eal.h>
>   #include <rte_ethdev.h>
>   #include <rte_metrics.h>
> +#include <rte_string_fns.h>
>   
>   #include "rte_telemetry.h"
>   #include "rte_telemetry_internal.h"
>   
>   #define SLEEP_TIME 10
>   
> +#define DEFAULT_DPDK_PATH "/var/run/.rte_telemetry"
> +

FHS 3.0 recommends "/run" over "/var/run". Maybe a more descriptive 
macro name would be in order, too.

> +const char *socket_path = DEFAULT_DPDK_PATH;

Is there really a need for this variable? Why not use the macro directly?

>   static telemetry_impl *static_telemetry;
>   
> +int32_t
> +rte_telemetry_is_port_active(int port_id)
> +{
> +	int ret;
> +
> +	ret = rte_eth_find_next(port_id);
> +	if (ret == port_id)
> +		return 1;
> +
> +	TELEMETRY_LOG_ERR("port_id: %d is invalid, not active",
> +		port_id);
> +	return 0;
> +}
> +
> +static int32_t
> +rte_telemetry_reg_ethdev_to_metrics(uint16_t port_id)
> +{
> +	int ret, num_xstats, ret_val, i;
> +	struct rte_eth_xstat *eth_xstats = NULL;
> +	struct rte_eth_xstat_name *eth_xstats_names = NULL;
> +
> +	if (!rte_eth_dev_is_valid_port(port_id)) {
> +		TELEMETRY_LOG_ERR("port_id: %d is invalid", port_id);
> +		return -EINVAL;
> +	}
> +
> +	num_xstats = rte_eth_xstats_get(port_id, NULL, 0);
> +	if (num_xstats < 0) {
> +		TELEMETRY_LOG_ERR("rte_eth_xstats_get(%u) failed: %d",
> +				port_id, num_xstats);
> +		return -EPERM;
> +	}
> +
> +	eth_xstats = malloc(sizeof(struct rte_eth_xstat) * num_xstats);
> +	if (!eth_xstats) {

eth_stats == NULL per 1.8.1 in the DPDK coding style guide.

> +		TELEMETRY_LOG_ERR("Failed to malloc memory for xstats");
> +		return -ENOMEM;
> +	}
> +
> +	ret = rte_eth_xstats_get(port_id, eth_xstats, num_xstats);
> +	const char *xstats_names[num_xstats];
> +	eth_xstats_names = malloc(sizeof(struct rte_eth_xstat_name) * num_xstats);
> +	if (ret < 0 || ret > num_xstats) {
> +		TELEMETRY_LOG_ERR("rte_eth_xstats_get(%u) len%i failed: %d",
> +				port_id, num_xstats, ret);
> +		ret_val = -EPERM;
> +		goto free_xstats;
> +	}
> +
> +	if (!eth_xstats_names) {
> +		TELEMETRY_LOG_ERR("Failed to malloc memory for xstats_names");
> +		ret_val = -ENOMEM;
> +		goto free_xstats;
> +	}
> +
> +	ret = rte_eth_xstats_get_names(port_id, eth_xstats_names, num_xstats);
> +	if (ret < 0 || ret > num_xstats) {
> +		TELEMETRY_LOG_ERR("rte_eth_xstats_get_names(%u) len%i failed: %d",
> +				port_id, num_xstats, ret);
> +		ret_val = -EPERM;
> +		goto free_xstats;
> +	}
> +
> +	for (i = 0; i < num_xstats; i++)
> +		xstats_names[i] = eth_xstats_names[eth_xstats[i].id].name;
> +
> +	ret_val = rte_metrics_reg_names(xstats_names, num_xstats);
> +	if (ret_val < 0) {
> +		TELEMETRY_LOG_ERR("rte_metrics_reg_names failed - metrics may already be registered");
> +		ret_val = -1;
> +		goto free_xstats;
> +	}
> +
> +	goto free_xstats;

This goto seems a little redundant to me.

> +
> +free_xstats:
> +	free(eth_xstats);
> +	free(eth_xstats_names);
> +	return ret_val;
> +}
> +
> +static int32_t
> +rte_telemetry_initial_accept(struct telemetry_impl *telemetry)
> +{
> +	int pid;

Ethernet port ids are uint16_t.

> +
> +	RTE_ETH_FOREACH_DEV(pid) {
> +		telemetry->reg_index =
> +			rte_telemetry_reg_ethdev_to_metrics(pid);
> +		break;
> +	}
> +
> +	if (telemetry->reg_index < 0) {
> +		TELEMETRY_LOG_ERR("Failed to register ethdev metrics");
> +		return -1;
> +	}
> +
> +	telemetry->metrics_register_done = 1;
> +
> +	return 0;
> +}
> +
> +static int32_t
> +rte_telemetry_accept_new_client(struct telemetry_impl *telemetry)
> +{
> +	int ret;
> +
> +	if (telemetry->accept_fd == 0 || telemetry->accept_fd == -1) {
> +		ret = listen(telemetry->server_fd, 1);
> +		if (ret < 0) {
> +			TELEMETRY_LOG_ERR("Listening error with server fd");
> +			return -1;
> +		}
> +		telemetry->accept_fd = accept(telemetry->server_fd, NULL, NULL);
> +
> +		if (telemetry->accept_fd > 0 &&

accept() returns -1 on error. 0 is a valid fd (although in this case it 
can't be returned, since at least one fd - the server socket - is open).

> +			telemetry->metrics_register_done == 0) {
> +			ret = rte_telemetry_initial_accept(telemetry);
> +			if (ret < 0) {
> +				TELEMETRY_LOG_ERR("Failed to run initial configurations/tests");
> +				return -1;
> +			}
> +		}
> +	}
> +
> +	return 0;
> +}
> +
>   static int32_t
>   rte_telemetry_run(void *userdata)
>   {
> +	int ret;
>   	struct telemetry_impl *telemetry = userdata;
>   
>   	if (!telemetry) {

1.8.1 again.

> @@ -26,6 +162,12 @@ rte_telemetry_run(void *userdata)
>   		return -1;
>   	}
>   
> +	ret = rte_telemetry_accept_new_client(telemetry);
> +	if (ret < 0) {
> +		TELEMETRY_LOG_ERR("Accept and read new client failed");
> +		return -1;
> +	}
> +
>   	return 0;
>   }
>   
> @@ -49,6 +191,56 @@ static void
>   	pthread_exit(0);
>   }
>   
> +static int32_t
> +rte_telemetry_set_socket_nonblock(int fd)
> +{
> +	int flags = fcntl(fd, F_GETFL, 0);
> +
> +	if (fd < 0) {
> +		TELEMETRY_LOG_ERR("Invalid fd provided");
> +		return -1;
> +	}

Shouldn't you do this check before you do the first fcntl()? If at all.

> +
> +	if (flags < 0)
> +		flags = 0;
> +
> +	return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
> +}
> +
> +static int32_t
> +rte_telemetry_create_socket(struct telemetry_impl *telemetry)
> +{
> +	int ret;
> +	struct sockaddr_un addr = {0};

Aren't you planning to set all the relevant fields? No need to zero.

> +
> +	if (!telemetry)

1.8.1

> +		return -1;
> +
> +	telemetry->server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
> +	if (telemetry->server_fd == -1) {
> +		TELEMETRY_LOG_ERR("Failed to open socket");
> +		return -1;
> +	}
> +
> +	ret  = rte_telemetry_set_socket_nonblock(telemetry->server_fd);
> +	if (ret < 0) {
> +		TELEMETRY_LOG_ERR("Could not set socket to NONBLOCK");

Close the socket?

> +		return -1;
> +	}
> +
> +	addr.sun_family = AF_UNIX;
> +	strlcpy(addr.sun_path, socket_path, sizeof(addr.sun_path));
> +	unlink(socket_path);
> +
> +	if (bind(telemetry->server_fd, (struct sockaddr *)&addr,
> +		sizeof(addr)) < 0) {
> +		TELEMETRY_LOG_ERR("Socket binding error");

Close it here as well.

> +		return -1;
> +	}
> +
> +	return 0;
> +}
> +
>   int32_t
>   rte_telemetry_init()
>   {
> @@ -69,6 +261,14 @@ rte_telemetry_init()
>   
>   	static_telemetry->socket_id = rte_socket_id();
>   	rte_metrics_init(static_telemetry->socket_id);
> +	ret = rte_telemetry_create_socket(static_telemetry);
> +	if (ret < 0) {
> +		ret = rte_telemetry_cleanup();
> +		if (ret < 0)
> +			TELEMETRY_LOG_ERR("TELEMETRY cleanup failed");
> +		return -EPERM;
> +	}
> +
>   	pthread_attr_init(&attr);

Not a comment on this patch, but... technically, this call may fail.

>   	ret = rte_ctrl_thread_create(&static_telemetry->thread_id,
>   		telemetry_ctrl_thread, &attr, rte_telemetry_run_thread_func,
> @@ -88,11 +288,21 @@ rte_telemetry_init()
>   int32_t
>   rte_telemetry_cleanup(void)
>   {
> +	int ret;
>   	struct telemetry_impl *telemetry = static_telemetry;
> +
> +	ret = close(telemetry->server_fd);
> +	if (ret < 0) {
> +		TELEMETRY_LOG_ERR("Close TELEMETRY socket failed");
> +		free(telemetry);
> +		return -EPERM;
> +	}
> +
>   	telemetry->thread_status = 0;
>   	pthread_join(telemetry->thread_id, NULL);
>   	free(telemetry);
>   	static_telemetry = NULL;
> +
>   	return 0;
>   }
>   
> diff --git a/lib/librte_telemetry/rte_telemetry_internal.h b/lib/librte_telemetry/rte_telemetry_internal.h
> index 4e810a8..569d56a 100644
> --- a/lib/librte_telemetry/rte_telemetry_internal.h
> +++ b/lib/librte_telemetry/rte_telemetry_internal.h
> @@ -24,9 +24,13 @@ extern int telemetry_log_level;
>   	TELEMETRY_LOG(INFO, fmt, ## args)
>   
>   typedef struct telemetry_impl {
> +	int accept_fd;
> +	int server_fd;
>   	pthread_t thread_id;
>   	int thread_status;
>   	uint32_t socket_id;
> +	int reg_index;
> +	int metrics_register_done;
>   } telemetry_impl;
>   
>   #endif
>
  
Thomas Monjalon Oct. 3, 2018, 7:36 p.m. UTC | #2
03/10/2018 20:40, Mattias Rönnblom:
> On 2018-10-03 19:36, Kevin Laatz wrote:
> > +#define DEFAULT_DPDK_PATH "/var/run/.rte_telemetry"
> > +
> 
> FHS 3.0 recommends "/run" over "/var/run". Maybe a more descriptive 
> macro name would be in order, too.

Kevin, you should use eal_get_runtime_dir().
You may need to export this function as a public API.

Mattias, thank you for the detailed review.
  
Mattias Rönnblom Oct. 3, 2018, 7:49 p.m. UTC | #3
On 2018-10-03 21:36, Thomas Monjalon wrote:
> 03/10/2018 20:40, Mattias Rönnblom:
>> On 2018-10-03 19:36, Kevin Laatz wrote:
>>> +#define DEFAULT_DPDK_PATH "/var/run/.rte_telemetry"
>>> +
>>
>> FHS 3.0 recommends "/run" over "/var/run". Maybe a more descriptive
>> macro name would be in order, too.
> 
> Kevin, you should use eal_get_runtime_dir().
> You may need to export this function as a public API.
> 

You could also use the AF_UNIX abstract namespace. Convenient, as in you 
don't have to bother with directories and dangling files from crashed 
processes, but on the other hand it's Linux-specific and a little obscure.
  

Patch

diff --git a/lib/librte_telemetry/rte_telemetry.c b/lib/librte_telemetry/rte_telemetry.c
index d9ffec2..0c99d66 100644
--- a/lib/librte_telemetry/rte_telemetry.c
+++ b/lib/librte_telemetry/rte_telemetry.c
@@ -3,22 +3,158 @@ 
  */
 
 #include <unistd.h>
+#include <fcntl.h>
 #include <pthread.h>
+#include <sys/socket.h>
+#include <sys/un.h>
 
 #include <rte_eal.h>
 #include <rte_ethdev.h>
 #include <rte_metrics.h>
+#include <rte_string_fns.h>
 
 #include "rte_telemetry.h"
 #include "rte_telemetry_internal.h"
 
 #define SLEEP_TIME 10
 
+#define DEFAULT_DPDK_PATH "/var/run/.rte_telemetry"
+
+const char *socket_path = DEFAULT_DPDK_PATH;
 static telemetry_impl *static_telemetry;
 
+int32_t
+rte_telemetry_is_port_active(int port_id)
+{
+	int ret;
+
+	ret = rte_eth_find_next(port_id);
+	if (ret == port_id)
+		return 1;
+
+	TELEMETRY_LOG_ERR("port_id: %d is invalid, not active",
+		port_id);
+	return 0;
+}
+
+static int32_t
+rte_telemetry_reg_ethdev_to_metrics(uint16_t port_id)
+{
+	int ret, num_xstats, ret_val, i;
+	struct rte_eth_xstat *eth_xstats = NULL;
+	struct rte_eth_xstat_name *eth_xstats_names = NULL;
+
+	if (!rte_eth_dev_is_valid_port(port_id)) {
+		TELEMETRY_LOG_ERR("port_id: %d is invalid", port_id);
+		return -EINVAL;
+	}
+
+	num_xstats = rte_eth_xstats_get(port_id, NULL, 0);
+	if (num_xstats < 0) {
+		TELEMETRY_LOG_ERR("rte_eth_xstats_get(%u) failed: %d",
+				port_id, num_xstats);
+		return -EPERM;
+	}
+
+	eth_xstats = malloc(sizeof(struct rte_eth_xstat) * num_xstats);
+	if (!eth_xstats) {
+		TELEMETRY_LOG_ERR("Failed to malloc memory for xstats");
+		return -ENOMEM;
+	}
+
+	ret = rte_eth_xstats_get(port_id, eth_xstats, num_xstats);
+	const char *xstats_names[num_xstats];
+	eth_xstats_names = malloc(sizeof(struct rte_eth_xstat_name) * num_xstats);
+	if (ret < 0 || ret > num_xstats) {
+		TELEMETRY_LOG_ERR("rte_eth_xstats_get(%u) len%i failed: %d",
+				port_id, num_xstats, ret);
+		ret_val = -EPERM;
+		goto free_xstats;
+	}
+
+	if (!eth_xstats_names) {
+		TELEMETRY_LOG_ERR("Failed to malloc memory for xstats_names");
+		ret_val = -ENOMEM;
+		goto free_xstats;
+	}
+
+	ret = rte_eth_xstats_get_names(port_id, eth_xstats_names, num_xstats);
+	if (ret < 0 || ret > num_xstats) {
+		TELEMETRY_LOG_ERR("rte_eth_xstats_get_names(%u) len%i failed: %d",
+				port_id, num_xstats, ret);
+		ret_val = -EPERM;
+		goto free_xstats;
+	}
+
+	for (i = 0; i < num_xstats; i++)
+		xstats_names[i] = eth_xstats_names[eth_xstats[i].id].name;
+
+	ret_val = rte_metrics_reg_names(xstats_names, num_xstats);
+	if (ret_val < 0) {
+		TELEMETRY_LOG_ERR("rte_metrics_reg_names failed - metrics may already be registered");
+		ret_val = -1;
+		goto free_xstats;
+	}
+
+	goto free_xstats;
+
+free_xstats:
+	free(eth_xstats);
+	free(eth_xstats_names);
+	return ret_val;
+}
+
+static int32_t
+rte_telemetry_initial_accept(struct telemetry_impl *telemetry)
+{
+	int pid;
+
+	RTE_ETH_FOREACH_DEV(pid) {
+		telemetry->reg_index =
+			rte_telemetry_reg_ethdev_to_metrics(pid);
+		break;
+	}
+
+	if (telemetry->reg_index < 0) {
+		TELEMETRY_LOG_ERR("Failed to register ethdev metrics");
+		return -1;
+	}
+
+	telemetry->metrics_register_done = 1;
+
+	return 0;
+}
+
+static int32_t
+rte_telemetry_accept_new_client(struct telemetry_impl *telemetry)
+{
+	int ret;
+
+	if (telemetry->accept_fd == 0 || telemetry->accept_fd == -1) {
+		ret = listen(telemetry->server_fd, 1);
+		if (ret < 0) {
+			TELEMETRY_LOG_ERR("Listening error with server fd");
+			return -1;
+		}
+		telemetry->accept_fd = accept(telemetry->server_fd, NULL, NULL);
+
+		if (telemetry->accept_fd > 0 &&
+			telemetry->metrics_register_done == 0) {
+			ret = rte_telemetry_initial_accept(telemetry);
+			if (ret < 0) {
+				TELEMETRY_LOG_ERR("Failed to run initial configurations/tests");
+				return -1;
+			}
+		}
+	}
+
+	return 0;
+}
+
 static int32_t
 rte_telemetry_run(void *userdata)
 {
+	int ret;
 	struct telemetry_impl *telemetry = userdata;
 
 	if (!telemetry) {
@@ -26,6 +162,12 @@  rte_telemetry_run(void *userdata)
 		return -1;
 	}
 
+	ret = rte_telemetry_accept_new_client(telemetry);
+	if (ret < 0) {
+		TELEMETRY_LOG_ERR("Accept and read new client failed");
+		return -1;
+	}
+
 	return 0;
 }
 
@@ -49,6 +191,56 @@  static void
 	pthread_exit(0);
 }
 
+static int32_t
+rte_telemetry_set_socket_nonblock(int fd)
+{
+	int flags = fcntl(fd, F_GETFL, 0);
+
+	if (fd < 0) {
+		TELEMETRY_LOG_ERR("Invalid fd provided");
+		return -1;
+	}
+
+	if (flags < 0)
+		flags = 0;
+
+	return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+}
+
+static int32_t
+rte_telemetry_create_socket(struct telemetry_impl *telemetry)
+{
+	int ret;
+	struct sockaddr_un addr = {0};
+
+	if (!telemetry)
+		return -1;
+
+	telemetry->server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
+	if (telemetry->server_fd == -1) {
+		TELEMETRY_LOG_ERR("Failed to open socket");
+		return -1;
+	}
+
+	ret  = rte_telemetry_set_socket_nonblock(telemetry->server_fd);
+	if (ret < 0) {
+		TELEMETRY_LOG_ERR("Could not set socket to NONBLOCK");
+		return -1;
+	}
+
+	addr.sun_family = AF_UNIX;
+	strlcpy(addr.sun_path, socket_path, sizeof(addr.sun_path));
+	unlink(socket_path);
+
+	if (bind(telemetry->server_fd, (struct sockaddr *)&addr,
+		sizeof(addr)) < 0) {
+		TELEMETRY_LOG_ERR("Socket binding error");
+		return -1;
+	}
+
+	return 0;
+}
+
 int32_t
 rte_telemetry_init()
 {
@@ -69,6 +261,14 @@  rte_telemetry_init()
 
 	static_telemetry->socket_id = rte_socket_id();
 	rte_metrics_init(static_telemetry->socket_id);
+	ret = rte_telemetry_create_socket(static_telemetry);
+	if (ret < 0) {
+		ret = rte_telemetry_cleanup();
+		if (ret < 0)
+			TELEMETRY_LOG_ERR("TELEMETRY cleanup failed");
+		return -EPERM;
+	}
+
 	pthread_attr_init(&attr);
 	ret = rte_ctrl_thread_create(&static_telemetry->thread_id,
 		telemetry_ctrl_thread, &attr, rte_telemetry_run_thread_func,
@@ -88,11 +288,21 @@  rte_telemetry_init()
 int32_t
 rte_telemetry_cleanup(void)
 {
+	int ret;
 	struct telemetry_impl *telemetry = static_telemetry;
+
+	ret = close(telemetry->server_fd);
+	if (ret < 0) {
+		TELEMETRY_LOG_ERR("Close TELEMETRY socket failed");
+		free(telemetry);
+		return -EPERM;
+	}
+
 	telemetry->thread_status = 0;
 	pthread_join(telemetry->thread_id, NULL);
 	free(telemetry);
 	static_telemetry = NULL;
+
 	return 0;
 }
 
diff --git a/lib/librte_telemetry/rte_telemetry_internal.h b/lib/librte_telemetry/rte_telemetry_internal.h
index 4e810a8..569d56a 100644
--- a/lib/librte_telemetry/rte_telemetry_internal.h
+++ b/lib/librte_telemetry/rte_telemetry_internal.h
@@ -24,9 +24,13 @@  extern int telemetry_log_level;
 	TELEMETRY_LOG(INFO, fmt, ## args)
 
 typedef struct telemetry_impl {
+	int accept_fd;
+	int server_fd;
 	pthread_t thread_id;
 	int thread_status;
 	uint32_t socket_id;
+	int reg_index;
+	int metrics_register_done;
 } telemetry_impl;
 
 #endif