Branch data Line data Source code
1 : : /* SPDX-License-Identifier: BSD-3-Clause
2 : : * Copyright(c) 2017 Intel Corporation
3 : : */
4 : :
5 : : #include <stdalign.h>
6 : : #include <stdio.h>
7 : : #include <stdlib.h>
8 : : #include <sys/queue.h>
9 : : #include <string.h>
10 : : #include <eal_export.h>
11 : : #include <rte_mbuf.h>
12 : : #include <rte_cycles.h>
13 : : #include <rte_memzone.h>
14 : : #include <rte_errno.h>
15 : : #include <rte_string_fns.h>
16 : : #include <rte_eal_memconfig.h>
17 : : #include <rte_pause.h>
18 : : #include <rte_tailq.h>
19 : :
20 : : #include "rte_distributor.h"
21 : : #include "rte_distributor_single.h"
22 : : #include "distributor_private.h"
23 : :
24 : : TAILQ_HEAD(rte_dist_burst_list, rte_distributor);
25 : :
26 : : static struct rte_tailq_elem rte_dist_burst_tailq = {
27 : : .name = "RTE_DIST_BURST",
28 : : };
29 [ - + ]: 252 : EAL_REGISTER_TAILQ(rte_dist_burst_tailq)
30 : :
31 : : /**** APIs called by workers ****/
32 : :
33 : : /**** Burst Packet APIs called by workers ****/
34 : :
35 : : RTE_EXPORT_SYMBOL(rte_distributor_request_pkt)
36 : : void
37 : 131283 : rte_distributor_request_pkt(struct rte_distributor *d,
38 : : unsigned int worker_id, struct rte_mbuf **oldpkt,
39 : : unsigned int count)
40 : : {
41 : : struct rte_distributor_buffer *buf = &(d->bufs[worker_id]);
42 : : unsigned int i;
43 : :
44 : : volatile RTE_ATOMIC(int64_t) *retptr64;
45 : :
46 [ - + ]: 131283 : if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
47 [ # # ]: 0 : rte_distributor_request_pkt_single(d->d_single,
48 : : worker_id, count ? oldpkt[0] : NULL);
49 : 0 : return;
50 : : }
51 : :
52 : 131283 : retptr64 = &(buf->retptr64[0]);
53 : : /* Spin while handshake bits are set (scheduler clears it).
54 : : * Sync with worker on GET_BUF flag.
55 : : */
56 [ + + ]: 4136988 : while (unlikely(rte_atomic_load_explicit(retptr64, rte_memory_order_acquire)
57 : : & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) {
58 : : rte_pause();
59 : 4005705 : uint64_t t = rte_rdtsc()+100;
60 : :
61 [ + + ]: 19211921 : while (rte_rdtsc() < t)
62 : : rte_pause();
63 : : }
64 : :
65 : : /*
66 : : * OK, if we've got here, then the scheduler has just cleared the
67 : : * handshake bits. Populate the retptrs with returning packets.
68 : : */
69 : :
70 [ + + ]: 1180459 : for (i = count; i < RTE_DIST_BURST_SIZE; i++)
71 : 1049176 : buf->retptr64[i] = 0;
72 : :
73 : : /* Set VALID_BUF bit for each packet returned */
74 [ + + ]: 132371 : for (i = count; i-- > 0; )
75 : 1088 : buf->retptr64[i] =
76 : 1088 : (((int64_t)(uintptr_t)(oldpkt[i])) <<
77 : 1088 : RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_VALID_BUF;
78 : :
79 : : /*
80 : : * Finally, set the GET_BUF to signal to distributor that cache
81 : : * line is ready for processing
82 : : * Sync with distributor to release retptrs
83 : : */
84 : 131283 : rte_atomic_store_explicit(retptr64, *retptr64 | RTE_DISTRIB_GET_BUF,
85 : : rte_memory_order_release);
86 : : }
87 : :
88 : : RTE_EXPORT_SYMBOL(rte_distributor_poll_pkt)
89 : : int
90 : 4584665 : rte_distributor_poll_pkt(struct rte_distributor *d,
91 : : unsigned int worker_id, struct rte_mbuf **pkts)
92 : : {
93 : : struct rte_distributor_buffer *buf = &d->bufs[worker_id];
94 : : uint64_t ret;
95 : : int count = 0;
96 : : unsigned int i;
97 : :
98 [ - + ]: 4584665 : if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
99 : 0 : pkts[0] = rte_distributor_poll_pkt_single(d->d_single,
100 : : worker_id);
101 : 0 : return (pkts[0]) ? 1 : 0;
102 : : }
103 : :
104 : : /* If any of below bits is set, return.
105 : : * GET_BUF is set when distributor hasn't sent any packets yet
106 : : * RETURN_BUF is set when distributor must retrieve in-flight packets
107 : : * Sync with distributor to acquire bufptrs
108 : : */
109 : 4584665 : if (rte_atomic_load_explicit(&(buf->bufptr64[0]), rte_memory_order_acquire)
110 [ + + ]: 4584665 : & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))
111 : : return -1;
112 : :
113 : : /* since bufptr64 is signed, this should be an arithmetic shift */
114 [ + + ]: 1181547 : for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
115 [ + + ]: 1050264 : if (likely(buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF)) {
116 : 1049666 : ret = buf->bufptr64[i] >> RTE_DISTRIB_FLAG_BITS;
117 : 1049666 : pkts[count++] = (struct rte_mbuf *)((uintptr_t)(ret));
118 : : }
119 : : }
120 : :
121 : : /*
122 : : * so now we've got the contents of the cacheline into an array of
123 : : * mbuf pointers, so toggle the bit so scheduler can start working
124 : : * on the next cacheline while we're working.
125 : : * Sync with distributor on GET_BUF flag. Release bufptrs.
126 : : */
127 : 131283 : rte_atomic_store_explicit(&(buf->bufptr64[0]),
128 : : buf->bufptr64[0] | RTE_DISTRIB_GET_BUF, rte_memory_order_release);
129 : :
130 : 131283 : return count;
131 : : }
132 : :
133 : : RTE_EXPORT_SYMBOL(rte_distributor_get_pkt)
134 : : int
135 : 1180949 : rte_distributor_get_pkt(struct rte_distributor *d,
136 : : unsigned int worker_id, struct rte_mbuf **pkts,
137 : : struct rte_mbuf **oldpkt, unsigned int return_count)
138 : : {
139 : : int count;
140 : :
141 [ + + ]: 1180949 : if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
142 [ + - ]: 1049666 : if (return_count <= 1) {
143 [ + + ]: 1049666 : pkts[0] = rte_distributor_get_pkt_single(d->d_single,
144 : : worker_id, return_count ? oldpkt[0] : NULL);
145 : 1049666 : return (pkts[0]) ? 1 : 0;
146 : : } else
147 : : return -EINVAL;
148 : : }
149 : :
150 : 131283 : rte_distributor_request_pkt(d, worker_id, oldpkt, return_count);
151 : :
152 : 131283 : count = rte_distributor_poll_pkt(d, worker_id, pkts);
153 [ + + ]: 4584665 : while (count == -1) {
154 : 4453382 : uint64_t t = rte_rdtsc() + 100;
155 : :
156 [ + + ]: 17891967 : while (rte_rdtsc() < t)
157 : : rte_pause();
158 : :
159 : 4453382 : count = rte_distributor_poll_pkt(d, worker_id, pkts);
160 : : }
161 : : return count;
162 : : }
163 : :
164 : : RTE_EXPORT_SYMBOL(rte_distributor_return_pkt)
165 : : int
166 : 4 : rte_distributor_return_pkt(struct rte_distributor *d,
167 : : unsigned int worker_id, struct rte_mbuf **oldpkt, int num)
168 : : {
169 : : struct rte_distributor_buffer *buf = &d->bufs[worker_id];
170 : : unsigned int i;
171 : :
172 [ + + ]: 4 : if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
173 [ + - ]: 2 : if (num == 1)
174 : 2 : return rte_distributor_return_pkt_single(d->d_single,
175 : : worker_id, oldpkt[0]);
176 [ # # ]: 0 : else if (num == 0)
177 : 0 : return rte_distributor_return_pkt_single(d->d_single,
178 : : worker_id, NULL);
179 : : else
180 : : return -EINVAL;
181 : : }
182 : :
183 : : /* Spin while handshake bits are set (scheduler clears it).
184 : : * Sync with worker on GET_BUF flag.
185 : : */
186 [ - + ]: 2 : while (unlikely(rte_atomic_load_explicit(&(buf->retptr64[0]), rte_memory_order_relaxed)
187 : : & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) {
188 : : rte_pause();
189 : 0 : uint64_t t = rte_rdtsc()+100;
190 : :
191 [ # # ]: 0 : while (rte_rdtsc() < t)
192 : : rte_pause();
193 : : }
194 : :
195 : : /* Sync with distributor to acquire retptrs */
196 : : rte_atomic_thread_fence(rte_memory_order_acquire);
197 [ + + ]: 18 : for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
198 : : /* Switch off the return bit first */
199 : 16 : buf->retptr64[i] = 0;
200 : :
201 [ + + ]: 4 : for (i = num; i-- > 0; )
202 : 2 : buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) <<
203 : 2 : RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_VALID_BUF;
204 : :
205 : : /* Use RETURN_BUF on bufptr64 to notify distributor that
206 : : * we won't read any mbufs from there even if GET_BUF is set.
207 : : * This allows distributor to retrieve in-flight already sent packets.
208 : : */
209 : 2 : rte_atomic_fetch_or_explicit(&(buf->bufptr64[0]), RTE_DISTRIB_RETURN_BUF,
210 : : rte_memory_order_acq_rel);
211 : :
212 : : /* set the RETURN_BUF on retptr64 even if we got no returns.
213 : : * Sync with distributor on RETURN_BUF flag. Release retptrs.
214 : : * Notify distributor that we don't request more packets any more.
215 : : */
216 : 2 : rte_atomic_store_explicit(&(buf->retptr64[0]),
217 : : buf->retptr64[0] | RTE_DISTRIB_RETURN_BUF, rte_memory_order_release);
218 : :
219 : 2 : return 0;
220 : : }
221 : :
222 : : /**** APIs called on distributor core ***/
223 : :
224 : : /* stores a packet returned from a worker inside the returns array */
225 : : static inline void
226 : : store_return(uintptr_t oldbuf, struct rte_distributor *d,
227 : : unsigned int *ret_start, unsigned int *ret_count)
228 : : {
229 : 1090 : if (!oldbuf)
230 : : return;
231 : : /* store returns in a circular buffer */
232 : 1090 : d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK]
233 : 1090 : = (void *)oldbuf;
234 : 1090 : *ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK);
235 : 1090 : *ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK);
236 : : }
237 : :
238 : : /*
239 : : * Match then flow_ids (tags) of the incoming packets to the flow_ids
240 : : * of the inflight packets (both inflight on the workers and in each worker
241 : : * backlog). This will then allow us to pin those packets to the relevant
242 : : * workers to give us our atomic flow pinning.
243 : : */
244 : : void
245 : 0 : find_match_scalar(struct rte_distributor *d,
246 : : uint16_t *data_ptr,
247 : : uint16_t *output_ptr)
248 : : {
249 : : struct rte_distributor_backlog *bl;
250 : : uint16_t i, j, w;
251 : :
252 : : /*
253 : : * Function overview:
254 : : * 1. Loop through all worker ID's
255 : : * 2. Compare the current inflights to the incoming tags
256 : : * 3. Compare the current backlog to the incoming tags
257 : : * 4. Add any matches to the output
258 : : */
259 : :
260 [ # # ]: 0 : for (j = 0 ; j < RTE_DIST_BURST_SIZE; j++)
261 : 0 : output_ptr[j] = 0;
262 : :
263 [ # # ]: 0 : for (i = 0; i < d->num_workers; i++) {
264 : 0 : bl = &d->backlog[i];
265 : :
266 [ # # ]: 0 : for (j = 0; j < RTE_DIST_BURST_SIZE ; j++)
267 [ # # ]: 0 : for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
268 [ # # ]: 0 : if (d->in_flight_tags[i][w] == data_ptr[j]) {
269 : 0 : output_ptr[j] = i+1;
270 : 0 : break;
271 : : }
272 [ # # ]: 0 : for (j = 0; j < RTE_DIST_BURST_SIZE; j++)
273 [ # # ]: 0 : for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
274 [ # # ]: 0 : if (bl->tags[w] == data_ptr[j]) {
275 : 0 : output_ptr[j] = i+1;
276 : 0 : break;
277 : : }
278 : : }
279 : :
280 : : /*
281 : : * At this stage, the output contains 8 16-bit values, with
282 : : * each non-zero value containing the worker ID on which the
283 : : * corresponding flow is pinned to.
284 : : */
285 : 0 : }
286 : :
287 : : /*
288 : : * When worker called rte_distributor_return_pkt()
289 : : * and passed RTE_DISTRIB_RETURN_BUF handshake through retptr64,
290 : : * distributor must retrieve both inflight and backlog packets assigned
291 : : * to the worker and reprocess them to another worker.
292 : : */
293 : : static void
294 : 2 : handle_worker_shutdown(struct rte_distributor *d, unsigned int wkr)
295 : : {
296 : : struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
297 : : /* double BURST size for storing both inflights and backlog */
298 : : struct rte_mbuf *pkts[RTE_DIST_BURST_SIZE * 2];
299 : : unsigned int pkts_count = 0;
300 : : unsigned int i;
301 : :
302 : : /* If GET_BUF is cleared there are in-flight packets sent
303 : : * to worker which does not require new packets.
304 : : * They must be retrieved and assigned to another worker.
305 : : */
306 : 2 : if (!(rte_atomic_load_explicit(&(buf->bufptr64[0]), rte_memory_order_acquire)
307 [ + - ]: 2 : & RTE_DISTRIB_GET_BUF))
308 [ + + ]: 18 : for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
309 [ - + ]: 16 : if (buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF)
310 : 0 : pkts[pkts_count++] = (void *)((uintptr_t)
311 : 0 : (buf->bufptr64[i]
312 : 0 : >> RTE_DISTRIB_FLAG_BITS));
313 : :
314 : : /* Make following operations on handshake flags on bufptr64:
315 : : * - set GET_BUF to indicate that distributor can overwrite buffer
316 : : * with new packets if worker will make a new request.
317 : : * - clear RETURN_BUF to unlock reads on worker side.
318 : : */
319 : 2 : rte_atomic_store_explicit(&(buf->bufptr64[0]), RTE_DISTRIB_GET_BUF,
320 : : rte_memory_order_release);
321 : :
322 : : /* Collect backlog packets from worker */
323 [ - + ]: 2 : for (i = 0; i < d->backlog[wkr].count; i++)
324 : 0 : pkts[pkts_count++] = (void *)((uintptr_t)
325 : 0 : (d->backlog[wkr].pkts[i] >> RTE_DISTRIB_FLAG_BITS));
326 : :
327 : 2 : d->backlog[wkr].count = 0;
328 : :
329 : : /* Clear both inflight and backlog tags */
330 [ + + ]: 18 : for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
331 : 16 : d->in_flight_tags[wkr][i] = 0;
332 : 16 : d->backlog[wkr].tags[i] = 0;
333 : : }
334 : :
335 : : /* Recursive call */
336 [ - + ]: 2 : if (pkts_count > 0)
337 : 0 : rte_distributor_process(d, pkts, pkts_count);
338 : 2 : }
339 : :
340 : :
341 : : /*
342 : : * When the handshake bits indicate that there are packets coming
343 : : * back from the worker, this function is called to copy and store
344 : : * the valid returned pointers (store_return).
345 : : */
346 : : static unsigned int
347 : 1125271 : handle_returns(struct rte_distributor *d, unsigned int wkr)
348 : : {
349 : : struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
350 : : uintptr_t oldbuf;
351 : 1125271 : unsigned int ret_start = d->returns.start,
352 : 1125271 : ret_count = d->returns.count;
353 : : unsigned int count = 0;
354 : : unsigned int i;
355 : :
356 : : /* Sync on GET_BUF flag. Acquire retptrs. */
357 : 1125271 : if (rte_atomic_load_explicit(&(buf->retptr64[0]), rte_memory_order_acquire)
358 [ + + ]: 1125271 : & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF)) {
359 [ + + ]: 1181565 : for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
360 [ + + ]: 1050280 : if (buf->retptr64[i] & RTE_DISTRIB_VALID_BUF) {
361 [ + - ]: 1090 : oldbuf = ((uintptr_t)(buf->retptr64[i] >>
362 : : RTE_DISTRIB_FLAG_BITS));
363 : : /* store returns in a circular buffer */
364 : : store_return(oldbuf, d, &ret_start, &ret_count);
365 : 1090 : count++;
366 : 1090 : buf->retptr64[i] &= ~RTE_DISTRIB_VALID_BUF;
367 : : }
368 : : }
369 : 131285 : d->returns.start = ret_start;
370 : 131285 : d->returns.count = ret_count;
371 : :
372 : : /* If worker requested packets with GET_BUF, set it to active
373 : : * otherwise (RETURN_BUF), set it to not active.
374 : : */
375 : 131285 : d->activesum -= d->active[wkr];
376 : 131285 : d->active[wkr] = !!(buf->retptr64[0] & RTE_DISTRIB_GET_BUF);
377 : 131285 : d->activesum += d->active[wkr];
378 : :
379 : : /* If worker returned packets without requesting new ones,
380 : : * handle all in-flights and backlog packets assigned to it.
381 : : */
382 [ + + ]: 131285 : if (unlikely(buf->retptr64[0] & RTE_DISTRIB_RETURN_BUF))
383 : 2 : handle_worker_shutdown(d, wkr);
384 : :
385 : : /* Clear for the worker to populate with more returns.
386 : : * Sync with distributor on GET_BUF flag. Release retptrs.
387 : : */
388 : 131285 : rte_atomic_store_explicit(&(buf->retptr64[0]), 0, rte_memory_order_release);
389 : : }
390 : 1125271 : return count;
391 : : }
392 : :
393 : : /*
394 : : * This function releases a burst (cache line) to a worker.
395 : : * It is called from the process function when a cacheline is
396 : : * full to make room for more packets for that worker, or when
397 : : * all packets have been assigned to bursts and need to be flushed
398 : : * to the workers.
399 : : * It also needs to wait for any outstanding packets from the worker
400 : : * before sending out new packets.
401 : : */
402 : : static unsigned int
403 : 131286 : release(struct rte_distributor *d, unsigned int wkr)
404 : : {
405 : : struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
406 : : unsigned int i;
407 : :
408 : 131286 : handle_returns(d, wkr);
409 [ + + ]: 131286 : if (unlikely(!d->active[wkr]))
410 : : return 0;
411 : :
412 : : /* Sync with worker on GET_BUF flag */
413 : 1092045 : while (!(rte_atomic_load_explicit(&(d->bufs[wkr].bufptr64[0]), rte_memory_order_acquire)
414 [ + + ]: 1092045 : & RTE_DISTRIB_GET_BUF)) {
415 : 960761 : handle_returns(d, wkr);
416 [ + - ]: 960761 : if (unlikely(!d->active[wkr]))
417 : : return 0;
418 : : rte_pause();
419 : : }
420 : :
421 : : buf->count = 0;
422 : :
423 [ + + ]: 1180950 : for (i = 0; i < d->backlog[wkr].count; i++) {
424 : 1049666 : d->bufs[wkr].bufptr64[i] = d->backlog[wkr].pkts[i] |
425 : 1049666 : RTE_DISTRIB_GET_BUF | RTE_DISTRIB_VALID_BUF;
426 : 1049666 : d->in_flight_tags[wkr][i] = d->backlog[wkr].tags[i];
427 : : }
428 : 131284 : buf->count = i;
429 [ + + ]: 131890 : for ( ; i < RTE_DIST_BURST_SIZE ; i++) {
430 : 606 : buf->bufptr64[i] = RTE_DISTRIB_GET_BUF;
431 : 606 : d->in_flight_tags[wkr][i] = 0;
432 : : }
433 : :
434 : 131284 : d->backlog[wkr].count = 0;
435 : :
436 : : /* Clear the GET bit.
437 : : * Sync with worker on GET_BUF flag. Release bufptrs.
438 : : */
439 : 131284 : rte_atomic_store_explicit(&(buf->bufptr64[0]),
440 : : buf->bufptr64[0] & ~RTE_DISTRIB_GET_BUF, rte_memory_order_release);
441 : 131284 : return buf->count;
442 : :
443 : : }
444 : :
445 : :
446 : : /* process a set of packets to distribute them to workers */
447 : : RTE_EXPORT_SYMBOL(rte_distributor_process)
448 : : int
449 : 65881 : rte_distributor_process(struct rte_distributor *d,
450 : : struct rte_mbuf **mbufs, unsigned int num_mbufs)
451 : : {
452 : : unsigned int next_idx = 0;
453 : : static unsigned int wkr;
454 : : struct rte_mbuf *next_mb = NULL;
455 : : int64_t next_value = 0;
456 : : uint16_t new_tag = 0;
457 : : alignas(RTE_CACHE_LINE_SIZE) uint16_t flows[RTE_DIST_BURST_SIZE];
458 : : unsigned int i, j, w, wid, matching_required;
459 : :
460 [ + + ]: 65881 : if (d->alg_type == RTE_DIST_ALG_SINGLE) {
461 : : /* Call the old API */
462 : 32806 : return rte_distributor_process_single(d->d_single,
463 : : mbufs, num_mbufs);
464 : : }
465 : :
466 [ + + ]: 66150 : for (wid = 0 ; wid < d->num_workers; wid++)
467 : 33075 : handle_returns(d, wid);
468 : :
469 [ + + ]: 33075 : if (unlikely(num_mbufs == 0)) {
470 : : /* Flush out all non-full cache-lines to workers. */
471 [ + + ]: 542 : for (wid = 0 ; wid < d->num_workers; wid++) {
472 : : /* Sync with worker on GET_BUF flag. */
473 : 271 : if (rte_atomic_load_explicit(&(d->bufs[wid].bufptr64[0]),
474 [ + + ]: 271 : rte_memory_order_acquire) & RTE_DISTRIB_GET_BUF) {
475 : 110 : d->bufs[wid].count = 0;
476 : 110 : release(d, wid);
477 : 110 : handle_returns(d, wid);
478 : : }
479 : : }
480 : : return 0;
481 : : }
482 : :
483 [ + - ]: 32804 : if (unlikely(!d->activesum))
484 : : return 0;
485 : :
486 [ + + ]: 164014 : while (next_idx < num_mbufs) {
487 : : alignas(128) uint16_t matches[RTE_DIST_BURST_SIZE];
488 : : unsigned int pkts;
489 : :
490 [ + + ]: 131210 : if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE)
491 : : pkts = num_mbufs - next_idx;
492 : : else
493 : : pkts = RTE_DIST_BURST_SIZE;
494 : :
495 [ + + ]: 1180876 : for (i = 0; i < pkts; i++) {
496 [ + - ]: 1049666 : if (mbufs[next_idx + i]) {
497 : : /* flows have to be non-zero */
498 : 1049666 : flows[i] = mbufs[next_idx + i]->hash.usr | 1;
499 : : } else
500 : 0 : flows[i] = 0;
501 : : }
502 [ + + ]: 131224 : for (; i < RTE_DIST_BURST_SIZE; i++)
503 : 14 : flows[i] = 0;
504 : :
505 : : matching_required = 1;
506 : :
507 [ + + ]: 1180876 : for (j = 0; j < pkts; j++) {
508 [ - + ]: 1049666 : if (unlikely(!d->activesum))
509 : 0 : return next_idx;
510 : :
511 [ + + ]: 1049666 : if (unlikely(matching_required)) {
512 [ + - ]: 131210 : switch (d->dist_match_fn) {
513 : 131210 : case RTE_DIST_MATCH_VECTOR:
514 : 131210 : find_match_vec(d, &flows[0],
515 : : &matches[0]);
516 : 131210 : break;
517 : 0 : default:
518 : 0 : find_match_scalar(d, &flows[0],
519 : : &matches[0]);
520 : : }
521 : : matching_required = 0;
522 : : }
523 : : /*
524 : : * Matches array now contain the intended worker ID (+1) of
525 : : * the incoming packets. Any zeroes need to be assigned
526 : : * workers.
527 : : */
528 : :
529 : 1049666 : next_mb = mbufs[next_idx++];
530 : 1049666 : next_value = (((int64_t)(uintptr_t)next_mb) <<
531 : : RTE_DISTRIB_FLAG_BITS);
532 : : /*
533 : : * User is advocated to set tag value for each
534 : : * mbuf before calling rte_distributor_process.
535 : : * User defined tags are used to identify flows,
536 : : * or sessions.
537 : : */
538 : : /* flows MUST be non-zero */
539 : 1049666 : new_tag = (uint16_t)(next_mb->hash.usr) | 1;
540 : :
541 : : /*
542 : : * Uncommenting the next line will cause the find_match
543 : : * function to be optimized out, making this function
544 : : * do parallel (non-atomic) distribution
545 : : */
546 : : /* matches[j] = 0; */
547 : :
548 [ + + - + ]: 1049666 : if (matches[j] && d->active[matches[j]-1]) {
549 : : struct rte_distributor_backlog *bl =
550 : : &d->backlog[matches[j]-1];
551 [ + + ]: 49 : if (unlikely(bl->count ==
552 : : RTE_DIST_BURST_SIZE)) {
553 : 6 : release(d, matches[j]-1);
554 [ - + ]: 6 : if (!d->active[matches[j]-1]) {
555 : 0 : j--;
556 : : next_idx--;
557 : : matching_required = 1;
558 : 0 : continue;
559 : : }
560 : : }
561 : :
562 : : /* Add to worker that already has flow */
563 : 49 : unsigned int idx = bl->count++;
564 : :
565 : 49 : bl->tags[idx] = new_tag;
566 : 49 : bl->pkts[idx] = next_value;
567 : :
568 : : } else {
569 : : struct rte_distributor_backlog *bl;
570 : :
571 [ - + ]: 1049617 : while (unlikely(!d->active[wkr]))
572 : 0 : wkr = (wkr + 1) % d->num_workers;
573 : : bl = &d->backlog[wkr];
574 : :
575 [ + + ]: 1049617 : if (unlikely(bl->count ==
576 : : RTE_DIST_BURST_SIZE)) {
577 : 131130 : release(d, wkr);
578 [ - + ]: 131130 : if (!d->active[wkr]) {
579 : 0 : j--;
580 : : next_idx--;
581 : : matching_required = 1;
582 : 0 : continue;
583 : : }
584 : : }
585 : :
586 : : /* Add to current worker */
587 : 1049617 : unsigned int idx = bl->count++;
588 : :
589 : 1049617 : bl->tags[idx] = new_tag;
590 : 1049617 : bl->pkts[idx] = next_value;
591 : : /*
592 : : * Now that we've just added an unpinned flow
593 : : * to a worker, we need to ensure that all
594 : : * other packets with that same flow will go
595 : : * to the same worker in this burst.
596 : : */
597 [ + + ]: 5772888 : for (w = j; w < pkts; w++)
598 [ + + ]: 4723271 : if (flows[w] == new_tag)
599 : 1049636 : matches[w] = wkr+1;
600 : : }
601 : : }
602 : 131210 : wkr = (wkr + 1) % d->num_workers;
603 : : }
604 : :
605 : : /* Flush out all non-full cache-lines to workers. */
606 [ + + ]: 65608 : for (wid = 0 ; wid < d->num_workers; wid++)
607 : : /* Sync with worker on GET_BUF flag. */
608 : 32804 : if ((rte_atomic_load_explicit(&(d->bufs[wid].bufptr64[0]),
609 [ + + ]: 32804 : rte_memory_order_acquire) & RTE_DISTRIB_GET_BUF)) {
610 : 40 : d->bufs[wid].count = 0;
611 : 40 : release(d, wid);
612 : : }
613 : :
614 : 32804 : return num_mbufs;
615 : : }
616 : :
617 : : /* return to the caller, packets returned from workers */
618 : : RTE_EXPORT_SYMBOL(rte_distributor_returned_pkts)
619 : : int
620 : 78 : rte_distributor_returned_pkts(struct rte_distributor *d,
621 : : struct rte_mbuf **mbufs, unsigned int max_mbufs)
622 : : {
623 : : struct rte_distributor_returned_pkts *returns = &d->returns;
624 : 78 : unsigned int retval = (max_mbufs < returns->count) ?
625 : : max_mbufs : returns->count;
626 : : unsigned int i;
627 : :
628 [ + + ]: 78 : if (d->alg_type == RTE_DIST_ALG_SINGLE) {
629 : : /* Call the old API */
630 : 39 : return rte_distributor_returned_pkts_single(d->d_single,
631 : : mbufs, max_mbufs);
632 : : }
633 : :
634 [ + + ]: 1129 : for (i = 0; i < retval; i++) {
635 : 1090 : unsigned int idx = (returns->start + i) &
636 : : RTE_DISTRIB_RETURNS_MASK;
637 : :
638 : 1090 : mbufs[i] = returns->mbufs[idx];
639 : : }
640 : 39 : returns->start += i;
641 : 39 : returns->count -= i;
642 : :
643 : 39 : return retval;
644 : : }
645 : :
646 : : /*
647 : : * Return the number of packets in-flight in a distributor, i.e. packets
648 : : * being worked on or queued up in a backlog.
649 : : */
650 : : static inline unsigned int
651 : : total_outstanding(const struct rte_distributor *d)
652 : : {
653 : : unsigned int wkr, total_outstanding = 0;
654 : :
655 [ + + + + ]: 616 : for (wkr = 0; wkr < d->num_workers; wkr++)
656 : 308 : total_outstanding += d->backlog[wkr].count + d->bufs[wkr].count;
657 : :
658 : : return total_outstanding;
659 : : }
660 : :
661 : : /*
662 : : * Flush the distributor, so that there are no outstanding packets in flight or
663 : : * queued up.
664 : : */
665 : : RTE_EXPORT_SYMBOL(rte_distributor_flush)
666 : : int
667 : 78 : rte_distributor_flush(struct rte_distributor *d)
668 : : {
669 : : unsigned int flushed;
670 : : unsigned int wkr;
671 : :
672 [ + + ]: 78 : if (d->alg_type == RTE_DIST_ALG_SINGLE) {
673 : : /* Call the old API */
674 : 39 : return rte_distributor_flush_single(d->d_single);
675 : : }
676 : :
677 : : flushed = total_outstanding(d);
678 : :
679 [ + + ]: 269 : while (total_outstanding(d) > 0)
680 : 230 : rte_distributor_process(d, NULL, 0);
681 : :
682 : : /* wait 10ms to allow all worker drain the pkts */
683 : 39 : rte_delay_us(10000);
684 : :
685 : : /*
686 : : * Send empty burst to all workers to allow them to exit
687 : : * gracefully, should they need to.
688 : : */
689 : 39 : rte_distributor_process(d, NULL, 0);
690 : :
691 [ + + ]: 78 : for (wkr = 0; wkr < d->num_workers; wkr++)
692 : 39 : handle_returns(d, wkr);
693 : :
694 : 39 : return flushed;
695 : : }
696 : :
697 : : /* clears the internal returns array in the distributor */
698 : : RTE_EXPORT_SYMBOL(rte_distributor_clear_returns)
699 : : void
700 : 6 : rte_distributor_clear_returns(struct rte_distributor *d)
701 : : {
702 : : unsigned int wkr;
703 : :
704 [ + + ]: 6 : if (d->alg_type == RTE_DIST_ALG_SINGLE) {
705 : : /* Call the old API */
706 : 3 : rte_distributor_clear_returns_single(d->d_single);
707 : 3 : return;
708 : : }
709 : :
710 : : /* throw away returns, so workers can exit */
711 [ + + ]: 6 : for (wkr = 0; wkr < d->num_workers; wkr++)
712 : : /* Sync with worker. Release retptrs. */
713 : 3 : rte_atomic_store_explicit(&(d->bufs[wkr].retptr64[0]), 0,
714 : : rte_memory_order_release);
715 : :
716 : 3 : d->returns.start = d->returns.count = 0;
717 : : }
718 : :
719 : : /* creates a distributor instance */
720 : : RTE_EXPORT_SYMBOL(rte_distributor_create)
721 : : struct rte_distributor *
722 : 6 : rte_distributor_create(const char *name,
723 : : unsigned int socket_id,
724 : : unsigned int num_workers,
725 : : unsigned int alg_type)
726 : : {
727 : : struct rte_distributor *d;
728 : : struct rte_dist_burst_list *dist_burst_list;
729 : : char mz_name[RTE_MEMZONE_NAMESIZE];
730 : : const struct rte_memzone *mz;
731 : : unsigned int i;
732 : :
733 : : /* TODO Reorganise function properly around RTE_DIST_ALG_SINGLE/BURST */
734 : :
735 : : /* compilation-time checks */
736 : : RTE_BUILD_BUG_ON((sizeof(*d) & RTE_CACHE_LINE_MASK) != 0);
737 : : RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
738 : :
739 [ + + + + ]: 6 : if (name == NULL || num_workers >=
740 : : (unsigned int)RTE_MIN(RTE_DISTRIB_MAX_WORKERS, RTE_MAX_LCORE)) {
741 : 4 : rte_errno = EINVAL;
742 : 4 : return NULL;
743 : : }
744 : :
745 [ + + ]: 2 : if (alg_type == RTE_DIST_ALG_SINGLE) {
746 : 1 : d = malloc(sizeof(struct rte_distributor));
747 [ - + ]: 1 : if (d == NULL) {
748 : 0 : rte_errno = ENOMEM;
749 : 0 : return NULL;
750 : : }
751 : 1 : d->d_single = rte_distributor_create_single(name,
752 : : socket_id, num_workers);
753 [ - + ]: 1 : if (d->d_single == NULL) {
754 : 0 : free(d);
755 : : /* rte_errno will have been set */
756 : 0 : return NULL;
757 : : }
758 : 1 : d->alg_type = alg_type;
759 : 1 : return d;
760 : : }
761 : :
762 : : snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
763 : 1 : mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
764 [ - + ]: 1 : if (mz == NULL) {
765 : 0 : rte_errno = ENOMEM;
766 : 0 : return NULL;
767 : : }
768 : :
769 : 1 : d = mz->addr;
770 : 1 : strlcpy(d->name, name, sizeof(d->name));
771 : 1 : d->num_workers = num_workers;
772 : 1 : d->alg_type = alg_type;
773 : :
774 : 1 : d->dist_match_fn = RTE_DIST_MATCH_SCALAR;
775 : : #if defined(RTE_ARCH_X86)
776 [ + - ]: 1 : if (rte_vect_get_max_simd_bitwidth() >= RTE_VECT_SIMD_128)
777 : 1 : d->dist_match_fn = RTE_DIST_MATCH_VECTOR;
778 : : #endif
779 : :
780 : : /*
781 : : * Set up the backlog tags so they're pointing at the second cache
782 : : * line for performance during flow matching
783 : : */
784 [ + + ]: 2 : for (i = 0 ; i < num_workers ; i++)
785 : 1 : d->backlog[i].tags = &d->in_flight_tags[i][RTE_DIST_BURST_SIZE];
786 : :
787 : 1 : memset(d->active, 0, sizeof(d->active));
788 : 1 : d->activesum = 0;
789 : :
790 : 1 : dist_burst_list = RTE_TAILQ_CAST(rte_dist_burst_tailq.head,
791 : : rte_dist_burst_list);
792 : :
793 : :
794 : 1 : rte_mcfg_tailq_write_lock();
795 : 1 : TAILQ_INSERT_TAIL(dist_burst_list, d, next);
796 : 1 : rte_mcfg_tailq_write_unlock();
797 : :
798 : 1 : return d;
799 : : }
|