[dpdk-dev] [PATCH v2 2/2] Add in_flight_bitmask so as to use full 32 bits of tag.
Qinglai Xiao
jigsaw at gmail.com
Mon Nov 10 13:52:47 CET 2014
With introduction of in_flight_bitmask, the whole 32 bits of tag can be
used. Further more, this patch fixed the integer overflow when finding
the matched tags.
Note that currently librte_distributor supports up to 64 worker threads.
If more workers are needed, the size of in_flight_bitmask and the
algorithm of finding matched tag must be revised.
Signed-off-by: Qinglai Xiao <jigsaw at gmail.com>
---
lib/librte_distributor/rte_distributor.c | 45 ++++++++++++++++++++++--------
lib/librte_distributor/rte_distributor.h | 4 ++
2 files changed, 37 insertions(+), 12 deletions(-)
diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c
index 3dfec4a..3dfccae 100644
--- a/lib/librte_distributor/rte_distributor.c
+++ b/lib/librte_distributor/rte_distributor.c
@@ -92,7 +92,13 @@ struct rte_distributor {
unsigned num_workers; /**< Number of workers polling */
uint32_t in_flight_tags[RTE_MAX_LCORE];
- /**< Tracks the tag being processed per core, 0 == no pkt */
+ /**< Tracks the tag being processed per core */
+ uint64_t in_flight_bitmask;
+ /**< on/off bits for in-flight tags.
+ * Note that if RTE_MAX_LCORE is larger than 64 then
+ * the bitmask has to expand.
+ */
+
struct rte_distributor_backlog backlog[RTE_MAX_LCORE];
union rte_distributor_buffer bufs[RTE_MAX_LCORE];
@@ -189,6 +195,7 @@ static inline void
handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
{
d->in_flight_tags[wkr] = 0;
+ d->in_flight_bitmask &= ~(1UL << wkr);
d->bufs[wkr].bufptr64 = 0;
if (unlikely(d->backlog[wkr].count != 0)) {
/* On return of a packet, we need to move the
@@ -211,7 +218,10 @@ handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
RTE_DISTRIB_FLAG_BITS));
}
- /* recursive call */
+ /* recursive call.
+ * Note that the tags were set before first level call
+ * to rte_distributor_process.
+ */
rte_distributor_process(d, pkts, i);
bl->count = bl->start = 0;
}
@@ -242,6 +252,7 @@ process_returns(struct rte_distributor *d)
else {
d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF;
d->in_flight_tags[wkr] = 0;
+ d->in_flight_bitmask &= ~(1UL << wkr);
}
oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
} else if (data & RTE_DISTRIB_RETURN_BUF) {
@@ -284,14 +295,18 @@ rte_distributor_process(struct rte_distributor *d,
next_value = (((int64_t)(uintptr_t)next_mb)
<< RTE_DISTRIB_FLAG_BITS);
/*
- * Set the low bit on the tag, so we can guarantee that
- * we never store a tag value of zero. That means we can
- * use the zero-value to indicate that no packet is
- * being processed by a worker.
+ * User is advocated to set tag vaue for each
+ * mbuf before calling rte_distributor_process.
+ * User defined tags are used to identify flows,
+ * or sessions.
*/
- new_tag = (next_mb->hash.usr | 1);
+ new_tag = next_mb->hash.usr;
- uint32_t match = 0;
+ /*
+ * Note that if RTE_MAX_LCORE is larger than 64 then
+ * the size of match has to be expanded.
+ */
+ uint64_t match = 0;
unsigned i;
/*
* to scan for a match use "xor" and "not" to get a 0/1
@@ -303,9 +318,12 @@ rte_distributor_process(struct rte_distributor *d,
match |= (!(d->in_flight_tags[i] ^ new_tag)
<< i);
+ /* Only turned-on bits are considered as match */
+ match &= d->in_flight_bitmask;
+
if (match) {
next_mb = NULL;
- unsigned worker = __builtin_ctz(match);
+ unsigned worker = __builtin_ctzl(match);
if (add_to_backlog(&d->backlog[worker],
next_value) < 0)
next_idx--;
@@ -322,6 +340,7 @@ rte_distributor_process(struct rte_distributor *d,
else {
d->bufs[wkr].bufptr64 = next_value;
d->in_flight_tags[wkr] = new_tag;
+ d->in_flight_bitmask |= (1UL << wkr);
next_mb = NULL;
}
oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
@@ -379,11 +398,13 @@ rte_distributor_returned_pkts(struct rte_distributor *d,
static inline unsigned
total_outstanding(const struct rte_distributor *d)
{
- unsigned wkr, total_outstanding = 0;
+ unsigned wkr, total_outstanding;
+
+ total_outstanding = __builtin_popcountl(d->in_flight_bitmask);
for (wkr = 0; wkr < d->num_workers; wkr++)
- total_outstanding += d->backlog[wkr].count +
- !!(d->in_flight_tags[wkr]);
+ total_outstanding += d->backlog[wkr].count;
+
return total_outstanding;
}
diff --git a/lib/librte_distributor/rte_distributor.h b/lib/librte_distributor/rte_distributor.h
index ec0d74a..cc1d559 100644
--- a/lib/librte_distributor/rte_distributor.h
+++ b/lib/librte_distributor/rte_distributor.h
@@ -88,6 +88,10 @@ rte_distributor_create(const char *name, unsigned socket_id,
* packets. The distributor will ensure that no two packets that have the
* same flow id, or tag, in the mbuf will be procesed at the same time.
*
+ * The user is advocated to set tag for each mbuf before calling this function.
+ * If user doesn't set the tag, the tag value can be various values depending on
+ * driver implementation and configuration.
+ *
* This is not multi-thread safe and should only be called on a single lcore.
*
* @param d
--
1.7.1
More information about the dev
mailing list