[dpdk-dev] [PATCH v1 3/4] ixgbe: Kill ixgbe_recv_scattered_pkts()

Ananyev, Konstantin konstantin.ananyev at intel.com
Wed Apr 29 11:28:50 CEST 2015



> -----Original Message-----
> From: Vlad Zolotarov [mailto:vladz at cloudius-systems.com]
> Sent: Wednesday, April 29, 2015 7:50 AM
> To: Ananyev, Konstantin; dev at dpdk.org
> Subject: Re: [dpdk-dev] [PATCH v1 3/4] ixgbe: Kill ixgbe_recv_scattered_pkts()
> 
> 
> 
> On 04/29/15 09:47, Vlad Zolotarov wrote:
> >
> >
> > On 04/28/15 20:42, Ananyev, Konstantin wrote:
> >> Hi Vlad,
> >>
> >>> -----Original Message-----
> >>> From: dev [mailto:dev-bounces at dpdk.org] On Behalf Of Vlad Zolotarov
> >>> Sent: Sunday, April 26, 2015 3:46 PM
> >>> To: dev at dpdk.org
> >>> Subject: [dpdk-dev] [PATCH v1 3/4] ixgbe: Kill
> >>> ixgbe_recv_scattered_pkts()
> >>>
> >>> Kill ixgbe_recv_scattered_pkts() - use
> >>> ixgbe_recv_pkts_lro_single_alloc()
> >>> instead.
> >>>
> >>> Work against HW queues in LRO and scattered Rx cases is exactly the
> >>> same.
> >>> Therefore we may drop the inferior callback.
> >>>
> >>> Signed-off-by: Vlad Zolotarov <vladz at cloudius-systems.com>
> >>> ---
> >>>   lib/librte_pmd_ixgbe/ixgbe_ethdev.c |   2 +-
> >>>   lib/librte_pmd_ixgbe/ixgbe_ethdev.h |   3 -
> >>>   lib/librte_pmd_ixgbe/ixgbe_rxtx.c   | 243
> >>> +-----------------------------------
> >>>   3 files changed, 7 insertions(+), 241 deletions(-)
> >>>
> >>> diff --git a/lib/librte_pmd_ixgbe/ixgbe_ethdev.c
> >>> b/lib/librte_pmd_ixgbe/ixgbe_ethdev.c
> >>> index aec1de9..5f9a1cf 100644
> >>> --- a/lib/librte_pmd_ixgbe/ixgbe_ethdev.c
> >>> +++ b/lib/librte_pmd_ixgbe/ixgbe_ethdev.c
> >>> @@ -986,7 +986,7 @@ eth_ixgbevf_dev_init(struct rte_eth_dev *eth_dev)
> >>>        * RX function */
> >>>       if (rte_eal_process_type() != RTE_PROC_PRIMARY){
> >>>           if (eth_dev->data->scattered_rx)
> >>> -            eth_dev->rx_pkt_burst = ixgbe_recv_scattered_pkts;
> >>> +            eth_dev->rx_pkt_burst = ixgbe_recv_pkts_lro_single_alloc;
> >>>           return 0;
> >>>       }
> >>>
> >>> diff --git a/lib/librte_pmd_ixgbe/ixgbe_ethdev.h
> >>> b/lib/librte_pmd_ixgbe/ixgbe_ethdev.h
> >>> index 5b90115..419ea5d 100644
> >>> --- a/lib/librte_pmd_ixgbe/ixgbe_ethdev.h
> >>> +++ b/lib/librte_pmd_ixgbe/ixgbe_ethdev.h
> >>> @@ -352,9 +352,6 @@ void ixgbevf_dev_rxtx_start(struct rte_eth_dev
> >>> *dev);
> >>>   uint16_t ixgbe_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
> >>>           uint16_t nb_pkts);
> >>>
> >>> -uint16_t ixgbe_recv_scattered_pkts(void *rx_queue,
> >>> -        struct rte_mbuf **rx_pkts, uint16_t nb_pkts);
> >>> -
> >>>   uint16_t ixgbe_recv_pkts_lro_single_alloc(void *rx_queue,
> >>>           struct rte_mbuf **rx_pkts, uint16_t nb_pkts);
> >>>   uint16_t ixgbe_recv_pkts_lro_bulk_alloc(void *rx_queue,
> >>> diff --git a/lib/librte_pmd_ixgbe/ixgbe_rxtx.c
> >>> b/lib/librte_pmd_ixgbe/ixgbe_rxtx.c
> >>> index a45f51e..c23e20f 100644
> >>> --- a/lib/librte_pmd_ixgbe/ixgbe_rxtx.c
> >>> +++ b/lib/librte_pmd_ixgbe/ixgbe_rxtx.c
> >>> @@ -1722,239 +1722,6 @@ ixgbe_recv_pkts_lro_bulk_alloc(void
> >>> *rx_queue, struct rte_mbuf **rx_pkts,
> >>>       return ixgbe_recv_pkts_lro(rx_queue, rx_pkts, nb_pkts, true);
> >>>   }
> >>>
> >>> -uint16_t
> >>> -ixgbe_recv_scattered_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
> >>> -              uint16_t nb_pkts)
> >>> -{
> >>> -    struct ixgbe_rx_queue *rxq;
> >>> -    volatile union ixgbe_adv_rx_desc *rx_ring;
> >>> -    volatile union ixgbe_adv_rx_desc *rxdp;
> >>> -    struct ixgbe_rx_entry *sw_ring;
> >>> -    struct ixgbe_rx_entry *rxe;
> >>> -    struct rte_mbuf *first_seg;
> >>> -    struct rte_mbuf *last_seg;
> >>> -    struct rte_mbuf *rxm;
> >>> -    struct rte_mbuf *nmb;
> >>> -    union ixgbe_adv_rx_desc rxd;
> >>> -    uint64_t dma; /* Physical address of mbuf data buffer */
> >>> -    uint32_t staterr;
> >>> -    uint16_t rx_id;
> >>> -    uint16_t nb_rx;
> >>> -    uint16_t nb_hold;
> >>> -    uint16_t data_len;
> >>> -
> >>> -    nb_rx = 0;
> >>> -    nb_hold = 0;
> >>> -    rxq = rx_queue;
> >>> -    rx_id = rxq->rx_tail;
> >>> -    rx_ring = rxq->rx_ring;
> >>> -    sw_ring = rxq->sw_ring;
> >>> -
> >>> -    /*
> >>> -     * Retrieve RX context of current packet, if any.
> >>> -     */
> >>> -    first_seg = rxq->pkt_first_seg;
> >>> -    last_seg = rxq->pkt_last_seg;
> >>> -
> >>> -    while (nb_rx < nb_pkts) {
> >>> -    next_desc:
> >>> -        /*
> >>> -         * The order of operations here is important as the DD status
> >>> -         * bit must not be read after any other descriptor fields.
> >>> -         * rx_ring and rxdp are pointing to volatile data so the order
> >>> -         * of accesses cannot be reordered by the compiler. If they
> >>> were
> >>> -         * not volatile, they could be reordered which could lead to
> >>> -         * using invalid descriptor fields when read from rxd.
> >>> -         */
> >>> -        rxdp = &rx_ring[rx_id];
> >>> -        staterr = rxdp->wb.upper.status_error;
> >>> -        if (! (staterr & rte_cpu_to_le_32(IXGBE_RXDADV_STAT_DD)))
> >>> -            break;
> >>> -        rxd = *rxdp;
> >>> -
> >>> -        /*
> >>> -         * Descriptor done.
> >>> -         *
> >>> -         * Allocate a new mbuf to replenish the RX ring descriptor.
> >>> -         * If the allocation fails:
> >>> -         *    - arrange for that RX descriptor to be the first one
> >>> -         *      being parsed the next time the receive function is
> >>> -         *      invoked [on the same queue].
> >>> -         *
> >>> -         *    - Stop parsing the RX ring and return immediately.
> >>> -         *
> >>> -         * This policy does not drop the packet received in the RX
> >>> -         * descriptor for which the allocation of a new mbuf failed.
> >>> -         * Thus, it allows that packet to be later retrieved if
> >>> -         * mbuf have been freed in the mean time.
> >>> -         * As a side effect, holding RX descriptors instead of
> >>> -         * systematically giving them back to the NIC may lead to
> >>> -         * RX ring exhaustion situations.
> >>> -         * However, the NIC can gracefully prevent such situations
> >>> -         * to happen by sending specific "back-pressure" flow control
> >>> -         * frames to its peer(s).
> >>> -         */
> >>> -        PMD_RX_LOG(DEBUG, "port_id=%u queue_id=%u rx_id=%u "
> >>> -               "staterr=0x%x data_len=%u",
> >>> -               (unsigned) rxq->port_id, (unsigned) rxq->queue_id,
> >>> -               (unsigned) rx_id, (unsigned) staterr,
> >>> -               (unsigned) rte_le_to_cpu_16(rxd.wb.upper.length));
> >>> -
> >>> -        nmb = rte_rxmbuf_alloc(rxq->mb_pool);
> >>> -        if (nmb == NULL) {
> >>> -            PMD_RX_LOG(DEBUG, "RX mbuf alloc failed port_id=%u "
> >>> -                   "queue_id=%u", (unsigned) rxq->port_id,
> >>> -                   (unsigned) rxq->queue_id);
> >>> - rte_eth_devices[rxq->port_id].data->rx_mbuf_alloc_failed++;
> >>> -            break;
> >>> -        }
> >>> -
> >>> -        nb_hold++;
> >>> -        rxe = &sw_ring[rx_id];
> >>> -        rx_id++;
> >>> -        if (rx_id == rxq->nb_rx_desc)
> >>> -            rx_id = 0;
> >>> -
> >>> -        /* Prefetch next mbuf while processing current one. */
> >>> -        rte_ixgbe_prefetch(sw_ring[rx_id].mbuf);
> >>> -
> >>> -        /*
> >>> -         * When next RX descriptor is on a cache-line boundary,
> >>> -         * prefetch the next 4 RX descriptors and the next 8 pointers
> >>> -         * to mbufs.
> >>> -         */
> >>> -        if ((rx_id & 0x3) == 0) {
> >>> -            rte_ixgbe_prefetch(&rx_ring[rx_id]);
> >>> -            rte_ixgbe_prefetch(&sw_ring[rx_id]);
> >>> -        }
> >>> -
> >>> -        /*
> >>> -         * Update RX descriptor with the physical address of the new
> >>> -         * data buffer of the new allocated mbuf.
> >>> -         */
> >>> -        rxm = rxe->mbuf;
> >>> -        rxe->mbuf = nmb;
> >>> -        dma = rte_cpu_to_le_64(RTE_MBUF_DATA_DMA_ADDR_DEFAULT(nmb));
> >>> -        rxdp->read.hdr_addr = dma;
> >>> -        rxdp->read.pkt_addr = dma;
> >>> -
> >>> -        /*
> >>> -         * Set data length & data buffer address of mbuf.
> >>> -         */
> >>> -        data_len = rte_le_to_cpu_16(rxd.wb.upper.length);
> >>> -        rxm->data_len = data_len;
> >>> -        rxm->data_off = RTE_PKTMBUF_HEADROOM;
> >>> -
> >>> -        /*
> >>> -         * If this is the first buffer of the received packet,
> >>> -         * set the pointer to the first mbuf of the packet and
> >>> -         * initialize its context.
> >>> -         * Otherwise, update the total length and the number of
> >>> segments
> >>> -         * of the current scattered packet, and update the pointer to
> >>> -         * the last mbuf of the current packet.
> >>> -         */
> >>> -        if (first_seg == NULL) {
> >>> -            first_seg = rxm;
> >>> -            first_seg->pkt_len = data_len;
> >>> -            first_seg->nb_segs = 1;
> >>> -        } else {
> >>> -            first_seg->pkt_len = (uint16_t)(first_seg->pkt_len
> >>> -                    + data_len);
> >>> -            first_seg->nb_segs++;
> >>> -            last_seg->next = rxm;
> >>> -        }
> >>> -
> >>> -        /*
> >>> -         * If this is not the last buffer of the received packet,
> >>> -         * update the pointer to the last mbuf of the current
> >>> scattered
> >>> -         * packet and continue to parse the RX ring.
> >>> -         */
> >>> -        if (! (staterr & IXGBE_RXDADV_STAT_EOP)) {
> >>> -            last_seg = rxm;
> >>> -            goto next_desc;
> >>> -        }
> >>> -
> >>> -        /*
> >>> -         * This is the last buffer of the received packet.
> >>> -         * If the CRC is not stripped by the hardware:
> >>> -         *   - Subtract the CRC    length from the total packet
> >>> length.
> >>> -         *   - If the last buffer only contains the whole CRC or a
> >>> part
> >>> -         *     of it, free the mbuf associated to the last buffer.
> >>> -         *     If part of the CRC is also contained in the previous
> >>> -         *     mbuf, subtract the length of that CRC part from the
> >>> -         *     data length of the previous mbuf.
> >>> -         */
> >>> -        rxm->next = NULL;
> >>> -        if (unlikely(rxq->crc_len > 0)) {
> >>> -            first_seg->pkt_len -= ETHER_CRC_LEN;
> >>> -            if (data_len <= ETHER_CRC_LEN) {
> >>> -                rte_pktmbuf_free_seg(rxm);
> >>> -                first_seg->nb_segs--;
> >>> -                last_seg->data_len = (uint16_t)
> >>> -                    (last_seg->data_len -
> >>> -                     (ETHER_CRC_LEN - data_len));
> >>> -                last_seg->next = NULL;
> >>> -            } else
> >>> -                rxm->data_len =
> >>> -                    (uint16_t) (data_len - ETHER_CRC_LEN);
> >>> -        }
> >>> -
> >>> -        /* Initialize the first mbuf of the returned packet */
> >>> -        ixgbe_fill_cluster_head_buf(first_seg, &rxd, rxq->port_id,
> >>> -                        staterr);
> >>> -
> >>> -        /* Prefetch data of first segment, if configured to do so. */
> >>> -        rte_packet_prefetch((char *)first_seg->buf_addr +
> >>> -            first_seg->data_off);
> >>> -
> >>> -        /*
> >>> -         * Store the mbuf address into the next entry of the array
> >>> -         * of returned packets.
> >>> -         */
> >>> -        rx_pkts[nb_rx++] = first_seg;
> >>> -
> >>> -        /*
> >>> -         * Setup receipt context for a new packet.
> >>> -         */
> >>> -        first_seg = NULL;
> >>> -    }
> >>> -
> >>> -    /*
> >>> -     * Record index of the next RX descriptor to probe.
> >>> -     */
> >>> -    rxq->rx_tail = rx_id;
> >>> -
> >>> -    /*
> >>> -     * Save receive context.
> >>> -     */
> >>> -    rxq->pkt_first_seg = first_seg;
> >>> -    rxq->pkt_last_seg = last_seg;
> >>> -
> >>> -    /*
> >>> -     * If the number of free RX descriptors is greater than the RX
> >>> free
> >>> -     * threshold of the queue, advance the Receive Descriptor Tail
> >>> (RDT)
> >>> -     * register.
> >>> -     * Update the RDT with the value of the last processed RX
> >>> descriptor
> >>> -     * minus 1, to guarantee that the RDT register is never equal
> >>> to the
> >>> -     * RDH register, which creates a "full" ring situtation from the
> >>> -     * hardware point of view...
> >>> -     */
> >>> -    nb_hold = (uint16_t) (nb_hold + rxq->nb_rx_hold);
> >>> -    if (nb_hold > rxq->rx_free_thresh) {
> >>> -        PMD_RX_LOG(DEBUG, "port_id=%u queue_id=%u rx_tail=%u "
> >>> -               "nb_hold=%u nb_rx=%u",
> >>> -               (unsigned) rxq->port_id, (unsigned) rxq->queue_id,
> >>> -               (unsigned) rx_id, (unsigned) nb_hold,
> >>> -               (unsigned) nb_rx);
> >>> -        rx_id = (uint16_t) ((rx_id == 0) ?
> >>> -                     (rxq->nb_rx_desc - 1) : (rx_id - 1));
> >>> -        IXGBE_PCI_REG_WRITE(rxq->rdt_reg_addr, rx_id);
> >>> -        nb_hold = 0;
> >>> -    }
> >>> -    rxq->nb_rx_hold = nb_hold;
> >>> -    return (nb_rx);
> >>> -}
> >>> -
> >>> /*********************************************************************
> >>>    *
> >>>    *  Queue management functions
> >>> @@ -2623,7 +2390,7 @@ ixgbe_dev_rx_queue_setup(struct rte_eth_dev *dev,
> >>>           return (-ENOMEM);
> >>>       }
> >>>
> >>> -    if (rsc_requested) {
> >>> +    if (rsc_requested || dev_rx_mode->enable_scatter) {
> >>>           rxq->sw_rsc_ring =
> >>>               rte_zmalloc_socket("rxq->sw_rsc_ring",
> >>>                          sizeof(struct ixgbe_rsc_entry) * len,
> >> I think here is a problem:
> >> We allocate sw_rsc_ring only if user explicitly requested LRO or
> >> scattered rx.
> >> Though later, ixgbe_dev_rx_init() might implicitly enable scattered
> >> rx, if the provided mbufs are too small:
> >>
> >> buf_size = (uint16_t) ((srrctl & IXGBE_SRRCTL_BSIZEPKT_MASK) <<
> >> IXGBE_SRRCTL_BSIZEPKT_SHIFT);
> >>
> >>   /* It adds dual VLAN length for supporting dual VLAN */
> >>   if (dev->data->dev_conf.rxmode.max_rx_pkt_len +
> >>                                             2 * IXGBE_VLAN_TAG_SIZE >
> >> buf_size)
> >> dev->data->scattered_rx = 1;
> >>
> >> So, ixgbe_recv_pkts_lro_*_alloc() will be selected, but
> >> ixgbe_recv_pkts_lro will be 0.
> >
> > U meant "sw_ring will be NULL" I guess... ;)
> 
> sw_rsc_ring ;)

Yes sw_rsc_ring of course, my typing is bad, as usual :)
Konstantin


> 
> > Yeah, u are right. Missed that.
> >
> >>
> >> Probably the easiest and safest fix, is to always allocate
> >> sw_rsc_ring for the queue.
> >> After all, it would consume at max a bit more than 32KB - doesn't
> >> seem that much to me.
> >
> > I agree. I should have dropped this conditioning...
> > Sending the v2... ;)
> >
> >> Konstantin
> >>
> >>> @@ -4017,12 +3784,13 @@ void ixgbe_set_rx_function(struct
> >>> rte_eth_dev *dev)
> >>>
> >>>               dev->rx_pkt_burst = ixgbe_recv_scattered_pkts_vec;
> >>>           } else {
> >>> -            PMD_INIT_LOG(DEBUG, "Using Regualr (non-vector) "
> >>> +            PMD_INIT_LOG(DEBUG, "Using Regualr (non-vector, "
> >>> +                        "single allocation) "
> >>>                           "Scattered Rx callback "
> >>>                           "(port=%d).",
> >>>                        dev->data->port_id);
> >>>
> >>> -            dev->rx_pkt_burst = ixgbe_recv_scattered_pkts;
> >>> +            dev->rx_pkt_burst = ixgbe_recv_pkts_lro_single_alloc;
> >>>           }
> >>>       /*
> >>>        * Below we set "simple" callbacks according to port/queues
> >>> parameters.
> >>> @@ -4855,7 +4623,8 @@ ixgbevf_dev_rx_init(struct rte_eth_dev *dev)
> >>>                       ixgbe_recv_scattered_pkts_vec;
> >>>               else
> >>>   #endif
> >>> -                dev->rx_pkt_burst = ixgbe_recv_scattered_pkts;
> >>> +                dev->rx_pkt_burst =
> >>> +                    ixgbe_recv_pkts_lro_single_alloc;
> >>>           }
> >>>       }
> >>>
> >>> --
> >>> 2.1.0
> >



More information about the dev mailing list