[dpdk-dev] [PATCH v5 10/14] examples/quota_watermark: use ring space for watermarks

Bruce Richardson bruce.richardson at intel.com
Wed Mar 29 15:09:37 CEST 2017


Now that the enqueue function returns the amount of space in the ring,
we can use that to replace the old watermark functionality. Update the
example app to do so, and re-enable it in the examples Makefile.

Signed-off-by: Bruce Richardson <bruce.richardson at intel.com>
Reviewed-by: Yuanhan Liu <yuanhan.liu at linux.intel.com>
Acked-by: Olivier Matz <olivier.matz at 6wind.com>
---
v4: updated rst doc to match the code changes
---
 doc/guides/sample_app_ug/quota_watermark.rst | 148 ++++++++++++++++-----------
 examples/Makefile                            |   2 +-
 examples/quota_watermark/qw/init.c           |   5 +-
 examples/quota_watermark/qw/main.c           |  16 +--
 examples/quota_watermark/qw/main.h           |   1 +
 examples/quota_watermark/qwctl/commands.c    |   4 +-
 examples/quota_watermark/qwctl/qwctl.c       |   2 +
 examples/quota_watermark/qwctl/qwctl.h       |   1 +
 8 files changed, 106 insertions(+), 73 deletions(-)

diff --git a/doc/guides/sample_app_ug/quota_watermark.rst b/doc/guides/sample_app_ug/quota_watermark.rst
index 9f86e10..09530f2 100644
--- a/doc/guides/sample_app_ug/quota_watermark.rst
+++ b/doc/guides/sample_app_ug/quota_watermark.rst
@@ -1,5 +1,5 @@
 ..  BSD LICENSE
-    Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+    Copyright(c) 2010-2017 Intel Corporation. All rights reserved.
     All rights reserved.
 
     Redistribution and use in source and binary forms, with or without
@@ -31,11 +31,13 @@
 Quota and Watermark Sample Application
 ======================================
 
-The Quota and Watermark sample application is a simple example of packet processing using Data Plane Development Kit (DPDK) that
-showcases the use of a quota as the maximum number of packets enqueue/dequeue at a time and low and high watermarks
-to signal low and high ring usage respectively.
+The Quota and Watermark sample application is a simple example of packet
+processing using Data Plane Development Kit (DPDK) that showcases the use
+of a quota as the maximum number of packets enqueue/dequeue at a time and
+low and high thresholds, or watermarks, to signal low and high ring usage
+respectively.
 
-Additionally, it shows how ring watermarks can be used to feedback congestion notifications to data producers by
+Additionally, it shows how the thresholds can be used to feedback congestion notifications to data producers by
 temporarily stopping processing overloaded rings and sending Ethernet flow control frames.
 
 This sample application is split in two parts:
@@ -64,7 +66,7 @@ each stage of which being connected by rings, as shown in :numref:`figure_pipeli
 
 
 An adjustable quota value controls how many packets are being moved through the pipeline per enqueue and dequeue.
-Adjustable watermark values associated with the rings control a back-off mechanism that
+Adjustable threshold values associated with the rings control a back-off mechanism that
 tries to prevent the pipeline from being overloaded by:
 
 *   Stopping enqueuing on rings for which the usage has crossed the high watermark threshold
@@ -216,25 +218,26 @@ in the *DPDK Getting Started Guide* and the *DPDK API Reference*.
 Shared Variables Setup
 ^^^^^^^^^^^^^^^^^^^^^^
 
-The quota and low_watermark shared variables are put into an rte_memzone using a call to setup_shared_variables():
+The quota and high and low watermark shared variables are put into an rte_memzone using a call to setup_shared_variables():
 
 .. code-block:: c
 
     void
     setup_shared_variables(void)
     {
-        const struct rte_memzone *qw_memzone;
-
-        qw_memzone = rte_memzone_reserve(QUOTA_WATERMARK_MEMZONE_NAME, 2 * sizeof(int), rte_socket_id(), RTE_MEMZONE_2MB);
+           const struct rte_memzone *qw_memzone;
 
-        if (qw_memzone == NULL)
-            rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno));
+           qw_memzone = rte_memzone_reserve(QUOTA_WATERMARK_MEMZONE_NAME,
+                          3 * sizeof(int), rte_socket_id(), 0);
+           if (qw_memzone == NULL)
+                   rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno));
 
-        quota = qw_memzone->addr;
-        low_watermark = (unsigned int *) qw_memzone->addr + sizeof(int);
-   }
+           quota = qw_memzone->addr;
+           low_watermark = (unsigned int *) qw_memzone->addr + 1;
+           high_watermark = (unsigned int *) qw_memzone->addr + 2;
+    }
 
-These two variables are initialized to a default value in main() and
+These three variables are initialized to a default value in main() and
 can be changed while qw is running using the qwctl control program.
 
 Application Arguments
@@ -349,27 +352,37 @@ This is done using the following code:
     /* Process each port round robin style */
 
     for (port_id = 0; port_id < RTE_MAX_ETHPORTS; port_id++) {
-        if (!is_bit_set(port_id, portmask))
-            continue;
-
-        ring = rings[lcore_id][port_id];
-
-        if (ring_state[port_id] != RING_READY) {
-            if (rte_ring_count(ring) > *low_watermark)
-                continue;
-        else
-            ring_state[port_id] = RING_READY;
-        }
-
-        /* Enqueue received packets on the RX ring */
-
-        nb_rx_pkts = rte_eth_rx_burst(port_id, 0, pkts, *quota);
-
-        ret = rte_ring_enqueue_bulk(ring, (void *) pkts, nb_rx_pkts);
-        if (ret == -EDQUOT) {
-            ring_state[port_id] = RING_OVERLOADED;
-            send_pause_frame(port_id, 1337);
-        }
+            if (!is_bit_set(port_id, portmask))
+                    continue;
+
+            ring = rings[lcore_id][port_id];
+
+            if (ring_state[port_id] != RING_READY) {
+                    if (rte_ring_count(ring) > *low_watermark)
+                            continue;
+                    else
+                            ring_state[port_id] = RING_READY;
+            }
+
+            /* Enqueue received packets on the RX ring */
+            nb_rx_pkts = rte_eth_rx_burst(port_id, 0, pkts,
+                            (uint16_t) *quota);
+            ret = rte_ring_enqueue_bulk(ring, (void *) pkts,
+                            nb_rx_pkts, &free);
+            if (RING_SIZE - free > *high_watermark) {
+                    ring_state[port_id] = RING_OVERLOADED;
+                    send_pause_frame(port_id, 1337);
+            }
+
+            if (ret == 0) {
+
+                    /*
+                     * Return  mbufs to the pool,
+                     * effectively dropping packets
+                     */
+                    for (i = 0; i < nb_rx_pkts; i++)
+                            rte_pktmbuf_free(pkts[i]);
+            }
     }
 
 For each port in the port mask, the corresponding ring's pointer is fetched into ring and that ring's state is checked:
@@ -390,30 +403,40 @@ This thread is running on most of the logical cores to create and arbitrarily lo
     previous_lcore_id = get_previous_lcore_id(lcore_id);
 
     for (port_id = 0; port_id < RTE_MAX_ETHPORTS; port_id++) {
-        if (!is_bit_set(port_id, portmask))
-            continue;
-
-        tx = rings[lcore_id][port_id];
-        rx = rings[previous_lcore_id][port_id];
-        if (ring_state[port_id] != RING_READY) {
-            if (rte_ring_count(tx) > *low_watermark)
-                continue;
-        else
-            ring_state[port_id] = RING_READY;
-        }
-
-        /* Dequeue up to quota mbuf from rx */
-
-        nb_dq_pkts = rte_ring_dequeue_burst(rx, pkts, *quota);
-
-        if (unlikely(nb_dq_pkts < 0))
-            continue;
-
-        /* Enqueue them on tx */
-
-        ret = rte_ring_enqueue_bulk(tx, pkts, nb_dq_pkts);
-        if (ret == -EDQUOT)
-            ring_state[port_id] = RING_OVERLOADED;
+            if (!is_bit_set(port_id, portmask))
+                    continue;
+
+            tx = rings[lcore_id][port_id];
+            rx = rings[previous_lcore_id][port_id];
+
+            if (ring_state[port_id] != RING_READY) {
+                    if (rte_ring_count(tx) > *low_watermark)
+                            continue;
+                    else
+                            ring_state[port_id] = RING_READY;
+            }
+
+            /* Dequeue up to quota mbuf from rx */
+            nb_dq_pkts = rte_ring_dequeue_burst(rx, pkts,
+                            *quota, NULL);
+            if (unlikely(nb_dq_pkts < 0))
+                    continue;
+
+            /* Enqueue them on tx */
+            ret = rte_ring_enqueue_bulk(tx, pkts,
+                            nb_dq_pkts, &free);
+            if (RING_SIZE - free > *high_watermark)
+                    ring_state[port_id] = RING_OVERLOADED;
+
+            if (ret == 0) {
+
+                    /*
+                     * Return  mbufs to the pool,
+                     * effectively dropping packets
+                     */
+                    for (i = 0; i < nb_dq_pkts; i++)
+                            rte_pktmbuf_free(pkts[i]);
+            }
     }
 
 The thread's logic works mostly like receive_stage(),
@@ -482,5 +505,6 @@ low_watermark from the rte_memzone previously created by qw.
 
         quota = qw_memzone->addr;
 
-        low_watermark = (unsigned int *) qw_memzone->addr + sizeof(int);
+        low_watermark = (unsigned int *) qw_memzone->addr + 1;
+        high_watermark = (unsigned int *) qw_memzone->addr + 2;
     }
diff --git a/examples/Makefile b/examples/Makefile
index 19cd5ad..da2bfdd 100644
--- a/examples/Makefile
+++ b/examples/Makefile
@@ -81,7 +81,7 @@ DIRS-$(CONFIG_RTE_LIBRTE_REORDER) += packet_ordering
 DIRS-$(CONFIG_RTE_LIBRTE_IEEE1588) += ptpclient
 DIRS-$(CONFIG_RTE_LIBRTE_METER) += qos_meter
 DIRS-$(CONFIG_RTE_LIBRTE_SCHED) += qos_sched
-#DIRS-y += quota_watermark
+DIRS-y += quota_watermark
 DIRS-$(CONFIG_RTE_ETHDEV_RXTX_CALLBACKS) += rxtx_callbacks
 DIRS-y += skeleton
 ifeq ($(CONFIG_RTE_LIBRTE_HASH),y)
diff --git a/examples/quota_watermark/qw/init.c b/examples/quota_watermark/qw/init.c
index 95a9f94..6babfea 100644
--- a/examples/quota_watermark/qw/init.c
+++ b/examples/quota_watermark/qw/init.c
@@ -140,7 +140,7 @@ void init_ring(int lcore_id, uint8_t port_id)
 	if (ring == NULL)
 		rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno));
 
-	rte_ring_set_water_mark(ring, 80 * RING_SIZE / 100);
+	*high_watermark = 80 * RING_SIZE / 100;
 
 	rings[lcore_id][port_id] = ring;
 }
@@ -168,10 +168,11 @@ setup_shared_variables(void)
 	const struct rte_memzone *qw_memzone;
 
 	qw_memzone = rte_memzone_reserve(QUOTA_WATERMARK_MEMZONE_NAME,
-			2 * sizeof(int), rte_socket_id(), 0);
+			3 * sizeof(int), rte_socket_id(), 0);
 	if (qw_memzone == NULL)
 		rte_exit(EXIT_FAILURE, "%s\n", rte_strerror(rte_errno));
 
 	quota = qw_memzone->addr;
 	low_watermark = (unsigned int *) qw_memzone->addr + 1;
+	high_watermark = (unsigned int *) qw_memzone->addr + 2;
 }
diff --git a/examples/quota_watermark/qw/main.c b/examples/quota_watermark/qw/main.c
index 2dcddea..bdb8a43 100644
--- a/examples/quota_watermark/qw/main.c
+++ b/examples/quota_watermark/qw/main.c
@@ -67,6 +67,7 @@ struct ether_fc_frame {
 
 int *quota;
 unsigned int *low_watermark;
+unsigned int *high_watermark;
 
 uint8_t port_pairs[RTE_MAX_ETHPORTS];
 
@@ -158,6 +159,7 @@ receive_stage(__attribute__((unused)) void *args)
 	uint16_t nb_rx_pkts;
 
 	unsigned int lcore_id;
+	unsigned int free;
 
 	struct rte_mbuf *pkts[MAX_PKT_QUOTA];
 	struct rte_ring *ring;
@@ -189,13 +191,13 @@ receive_stage(__attribute__((unused)) void *args)
 			nb_rx_pkts = rte_eth_rx_burst(port_id, 0, pkts,
 					(uint16_t) *quota);
 			ret = rte_ring_enqueue_bulk(ring, (void *) pkts,
-					nb_rx_pkts);
-			if (ret == -EDQUOT) {
+					nb_rx_pkts, &free);
+			if (RING_SIZE - free > *high_watermark) {
 				ring_state[port_id] = RING_OVERLOADED;
 				send_pause_frame(port_id, 1337);
 			}
 
-			else if (ret == -ENOBUFS) {
+			if (ret == 0) {
 
 				/*
 				 * Return  mbufs to the pool,
@@ -217,6 +219,7 @@ pipeline_stage(__attribute__((unused)) void *args)
 	uint8_t port_id;
 
 	unsigned int lcore_id, previous_lcore_id;
+	unsigned int free;
 
 	void *pkts[MAX_PKT_QUOTA];
 	struct rte_ring *rx, *tx;
@@ -253,11 +256,12 @@ pipeline_stage(__attribute__((unused)) void *args)
 				continue;
 
 			/* Enqueue them on tx */
-			ret = rte_ring_enqueue_bulk(tx, pkts, nb_dq_pkts);
-			if (ret == -EDQUOT)
+			ret = rte_ring_enqueue_bulk(tx, pkts,
+					nb_dq_pkts, &free);
+			if (RING_SIZE - free > *high_watermark)
 				ring_state[port_id] = RING_OVERLOADED;
 
-			else if (ret == -ENOBUFS) {
+			if (ret == 0) {
 
 				/*
 				 * Return  mbufs to the pool,
diff --git a/examples/quota_watermark/qw/main.h b/examples/quota_watermark/qw/main.h
index 545ba42..8c8e311 100644
--- a/examples/quota_watermark/qw/main.h
+++ b/examples/quota_watermark/qw/main.h
@@ -43,6 +43,7 @@ enum ring_state {
 
 extern int *quota;
 extern unsigned int *low_watermark;
+extern unsigned int *high_watermark;
 
 extern uint8_t port_pairs[RTE_MAX_ETHPORTS];
 
diff --git a/examples/quota_watermark/qwctl/commands.c b/examples/quota_watermark/qwctl/commands.c
index 036bf80..5cac0e1 100644
--- a/examples/quota_watermark/qwctl/commands.c
+++ b/examples/quota_watermark/qwctl/commands.c
@@ -140,8 +140,8 @@ cmd_set_handler(__attribute__((unused)) void *parsed_result,
 		else
 			if (tokens->value >= *low_watermark * 100 / RING_SIZE
 					&& tokens->value <= 100)
-				rte_ring_set_water_mark(ring,
-					tokens->value * RING_SIZE / 100);
+				*high_watermark = tokens->value *
+						RING_SIZE / 100;
 			else
 				cmdline_printf(cl,
 					"ring high watermark must be between %u%% and 100%%\n",
diff --git a/examples/quota_watermark/qwctl/qwctl.c b/examples/quota_watermark/qwctl/qwctl.c
index 3a85cc3..7e7a396 100644
--- a/examples/quota_watermark/qwctl/qwctl.c
+++ b/examples/quota_watermark/qwctl/qwctl.c
@@ -55,6 +55,7 @@
 
 int *quota;
 unsigned int *low_watermark;
+unsigned int *high_watermark;
 
 
 static void
@@ -68,6 +69,7 @@ setup_shared_variables(void)
 
 	quota = qw_memzone->addr;
 	low_watermark = (unsigned int *) qw_memzone->addr + 1;
+	high_watermark = (unsigned int *) qw_memzone->addr + 2;
 }
 
 int main(int argc, char **argv)
diff --git a/examples/quota_watermark/qwctl/qwctl.h b/examples/quota_watermark/qwctl/qwctl.h
index 8d146e5..545914b 100644
--- a/examples/quota_watermark/qwctl/qwctl.h
+++ b/examples/quota_watermark/qwctl/qwctl.h
@@ -36,5 +36,6 @@
 
 extern int *quota;
 extern unsigned int *low_watermark;
+extern unsigned int *high_watermark;
 
 #endif /* _MAIN_H_ */
-- 
2.9.3



More information about the dev mailing list