Branch data Line data Source code
1 : : /* SPDX-License-Identifier: BSD-3-Clause
2 : : * Copyright(c) 2010-2014 Intel Corporation
3 : : */
4 : :
5 : : #include <stdio.h>
6 : : #include <sys/queue.h>
7 : : #include <rte_mbuf.h>
8 : : #include <rte_memzone.h>
9 : : #include <rte_errno.h>
10 : : #include <rte_string_fns.h>
11 : : #include <rte_eal_memconfig.h>
12 : : #include <rte_pause.h>
13 : : #include <rte_tailq.h>
14 : :
15 : : #include "rte_distributor_single.h"
16 : : #include "distributor_private.h"
17 : :
18 : : TAILQ_HEAD(rte_distributor_list, rte_distributor_single);
19 : :
20 : : static struct rte_tailq_elem rte_distributor_tailq = {
21 : : .name = "RTE_DISTRIBUTOR",
22 : : };
23 [ - + ]: 235 : EAL_REGISTER_TAILQ(rte_distributor_tailq)
24 : :
25 : : /**** APIs called by workers ****/
26 : :
27 : : void
28 : 1049666 : rte_distributor_request_pkt_single(struct rte_distributor_single *d,
29 : : unsigned worker_id, struct rte_mbuf *oldpkt)
30 : : {
31 : : union rte_distributor_buffer_single *buf = &d->bufs[worker_id];
32 : 1049666 : int64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
33 : : | RTE_DISTRIB_GET_BUF;
34 [ - + ]: 1049666 : RTE_WAIT_UNTIL_MASKED(&buf->bufptr64, RTE_DISTRIB_FLAGS_MASK,
35 : : ==, 0, rte_memory_order_relaxed);
36 : :
37 : : /* Sync with distributor on GET_BUF flag. */
38 : 1049666 : rte_atomic_store_explicit(&buf->bufptr64, req, rte_memory_order_release);
39 : 1049666 : }
40 : :
41 : : struct rte_mbuf *
42 : 17796111 : rte_distributor_poll_pkt_single(struct rte_distributor_single *d,
43 : : unsigned worker_id)
44 : : {
45 : : union rte_distributor_buffer_single *buf = &d->bufs[worker_id];
46 : : /* Sync with distributor. Acquire bufptr64. */
47 : 17796111 : if (rte_atomic_load_explicit(&buf->bufptr64, rte_memory_order_acquire)
48 [ + + ]: 17796111 : & RTE_DISTRIB_GET_BUF)
49 : : return NULL;
50 : :
51 : : /* since bufptr64 is signed, this should be an arithmetic shift */
52 : 1049666 : int64_t ret = buf->bufptr64 >> RTE_DISTRIB_FLAG_BITS;
53 : 1049666 : return (struct rte_mbuf *)((uintptr_t)ret);
54 : : }
55 : :
56 : : struct rte_mbuf *
57 : 1049666 : rte_distributor_get_pkt_single(struct rte_distributor_single *d,
58 : : unsigned worker_id, struct rte_mbuf *oldpkt)
59 : : {
60 : : struct rte_mbuf *ret;
61 : 1049666 : rte_distributor_request_pkt_single(d, worker_id, oldpkt);
62 [ + + ]: 17796111 : while ((ret = rte_distributor_poll_pkt_single(d, worker_id)) == NULL)
63 : : rte_pause();
64 : 1049666 : return ret;
65 : : }
66 : :
67 : : int
68 : 2 : rte_distributor_return_pkt_single(struct rte_distributor_single *d,
69 : : unsigned worker_id, struct rte_mbuf *oldpkt)
70 : : {
71 : : union rte_distributor_buffer_single *buf = &d->bufs[worker_id];
72 : 2 : uint64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
73 : 2 : | RTE_DISTRIB_RETURN_BUF;
74 [ - + ]: 2 : RTE_WAIT_UNTIL_MASKED(&buf->bufptr64, RTE_DISTRIB_FLAGS_MASK,
75 : : ==, 0, rte_memory_order_relaxed);
76 : :
77 : : /* Sync with distributor on RETURN_BUF flag. */
78 : 2 : rte_atomic_store_explicit(&buf->bufptr64, req, rte_memory_order_release);
79 : 2 : return 0;
80 : : }
81 : :
82 : : /**** APIs called on distributor core ***/
83 : :
84 : : /* as name suggests, adds a packet to the backlog for a particular worker */
85 : : static int
86 : : add_to_backlog(struct rte_distributor_backlog *bl, int64_t item)
87 : : {
88 [ + + ]: 430 : if (bl->count == RTE_DISTRIB_BACKLOG_SIZE)
89 : : return -1;
90 : :
91 : 31 : bl->pkts[(bl->start + bl->count++) & (RTE_DISTRIB_BACKLOG_MASK)]
92 : 31 : = item;
93 : : return 0;
94 : : }
95 : :
96 : : /* takes the next packet for a worker off the backlog */
97 : : static int64_t
98 : : backlog_pop(struct rte_distributor_backlog *bl)
99 : : {
100 : 31 : bl->count--;
101 : 31 : return bl->pkts[bl->start++ & RTE_DISTRIB_BACKLOG_MASK];
102 : : }
103 : :
104 : : /* stores a packet returned from a worker inside the returns array */
105 : : static inline void
106 : : store_return(uintptr_t oldbuf, struct rte_distributor_single *d,
107 : : unsigned *ret_start, unsigned *ret_count)
108 : : {
109 : : /* store returns in a circular buffer - code is branch-free */
110 : 52038825 : d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK]
111 : 52038825 : = (void *)oldbuf;
112 : 52038825 : *ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
113 : 52038825 : *ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
114 : : }
115 : :
116 : : static inline void
117 : 2 : handle_worker_shutdown(struct rte_distributor_single *d, unsigned int wkr)
118 : : {
119 : 2 : d->in_flight_tags[wkr] = 0;
120 : 2 : d->in_flight_bitmask &= ~(1UL << wkr);
121 : : /* Sync with worker. Release bufptr64. */
122 : 2 : rte_atomic_store_explicit(&d->bufs[wkr].bufptr64, 0, rte_memory_order_release);
123 [ - + ]: 2 : if (unlikely(d->backlog[wkr].count != 0)) {
124 : : /* On return of a packet, we need to move the
125 : : * queued packets for this core elsewhere.
126 : : * Easiest solution is to set things up for
127 : : * a recursive call. That will cause those
128 : : * packets to be queued up for the next free
129 : : * core, i.e. it will return as soon as a
130 : : * core becomes free to accept the first
131 : : * packet, as subsequent ones will be added to
132 : : * the backlog for that core.
133 : : */
134 : : struct rte_mbuf *pkts[RTE_DISTRIB_BACKLOG_SIZE];
135 : : unsigned i;
136 : : struct rte_distributor_backlog *bl = &d->backlog[wkr];
137 : :
138 [ # # ]: 0 : for (i = 0; i < bl->count; i++) {
139 : 0 : unsigned idx = (bl->start + i) &
140 : : RTE_DISTRIB_BACKLOG_MASK;
141 : 0 : pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
142 : : RTE_DISTRIB_FLAG_BITS));
143 : : }
144 : : /* recursive call.
145 : : * Note that the tags were set before first level call
146 : : * to rte_distributor_process.
147 : : */
148 : 0 : rte_distributor_process_single(d, pkts, i);
149 : 0 : bl->count = bl->start = 0;
150 : : }
151 : 2 : }
152 : :
153 : : /* this function is called when process() fn is called without any new
154 : : * packets. It goes through all the workers and clears any returned packets
155 : : * to do a partial flush.
156 : : */
157 : : static int
158 : 516 : process_returns(struct rte_distributor_single *d)
159 : : {
160 : : unsigned wkr;
161 : : unsigned flushed = 0;
162 : 516 : unsigned ret_start = d->returns.start,
163 : 516 : ret_count = d->returns.count;
164 : :
165 [ + + ]: 1032 : for (wkr = 0; wkr < d->num_workers; wkr++) {
166 : : uintptr_t oldbuf = 0;
167 : : /* Sync with worker. Acquire bufptr64. */
168 : 516 : const int64_t data = rte_atomic_load_explicit(&d->bufs[wkr].bufptr64,
169 : : rte_memory_order_acquire);
170 : :
171 [ + + ]: 516 : if (data & RTE_DISTRIB_GET_BUF) {
172 : 42 : flushed++;
173 [ + + ]: 42 : if (d->backlog[wkr].count)
174 : : /* Sync with worker. Release bufptr64. */
175 : 7 : rte_atomic_store_explicit(&d->bufs[wkr].bufptr64,
176 : : backlog_pop(&d->backlog[wkr]),
177 : : rte_memory_order_release);
178 : : else {
179 : : /* Sync with worker on GET_BUF flag. */
180 : 35 : rte_atomic_store_explicit(&d->bufs[wkr].bufptr64,
181 : : RTE_DISTRIB_GET_BUF,
182 : : rte_memory_order_release);
183 : 35 : d->in_flight_tags[wkr] = 0;
184 : 35 : d->in_flight_bitmask &= ~(1UL << wkr);
185 : : }
186 : 42 : oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
187 [ + + ]: 474 : } else if (data & RTE_DISTRIB_RETURN_BUF) {
188 : 2 : handle_worker_shutdown(d, wkr);
189 : 2 : oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
190 : : }
191 : :
192 : : store_return(oldbuf, d, &ret_start, &ret_count);
193 : : }
194 : :
195 : 516 : d->returns.start = ret_start;
196 : 516 : d->returns.count = ret_count;
197 : :
198 : 516 : return flushed;
199 : : }
200 : :
201 : : /* process a set of packets to distribute them to workers */
202 : : int
203 : 33320 : rte_distributor_process_single(struct rte_distributor_single *d,
204 : : struct rte_mbuf **mbufs, unsigned num_mbufs)
205 : : {
206 : : unsigned next_idx = 0;
207 : : unsigned wkr = 0;
208 : : struct rte_mbuf *next_mb = NULL;
209 : : int64_t next_value = 0;
210 : : uint32_t new_tag = 0;
211 : 33320 : unsigned ret_start = d->returns.start,
212 : 33320 : ret_count = d->returns.count;
213 : :
214 [ + + ]: 33320 : if (unlikely(num_mbufs == 0))
215 : 516 : return process_returns(d);
216 : :
217 [ + + ]: 52071112 : while (next_idx < num_mbufs || next_mb != NULL) {
218 : : uintptr_t oldbuf = 0;
219 : : /* Sync with worker. Acquire bufptr64. */
220 : 52038308 : int64_t data = rte_atomic_load_explicit(&(d->bufs[wkr].bufptr64),
221 : : rte_memory_order_acquire);
222 : :
223 [ + + ]: 52038308 : if (!next_mb) {
224 : 1050065 : next_mb = mbufs[next_idx++];
225 : 1050065 : next_value = (((int64_t)(uintptr_t)next_mb)
226 : : << RTE_DISTRIB_FLAG_BITS);
227 : : /*
228 : : * User is advocated to set tag value for each
229 : : * mbuf before calling rte_distributor_process.
230 : : * User defined tags are used to identify flows,
231 : : * or sessions.
232 : : */
233 : 1050065 : new_tag = next_mb->hash.usr;
234 : :
235 : : /*
236 : : * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64
237 : : * then the size of match has to be expanded.
238 : : */
239 : : uint64_t match = 0;
240 : : unsigned i;
241 : : /*
242 : : * to scan for a match use "xor" and "not" to get a 0/1
243 : : * value, then use shifting to merge to single "match"
244 : : * variable, where a one-bit indicates a match for the
245 : : * worker given by the bit-position
246 : : */
247 [ + + ]: 2100130 : for (i = 0; i < d->num_workers; i++)
248 : 1050065 : match |= ((uint64_t)!(d->in_flight_tags[i] ^ new_tag) << i);
249 : :
250 : : /* Only turned-on bits are considered as match */
251 : 1050065 : match &= d->in_flight_bitmask;
252 : :
253 [ + + ]: 1050065 : if (match) {
254 : : next_mb = NULL;
255 : : unsigned int worker = rte_ctz64(match);
256 : : if (add_to_backlog(&d->backlog[worker],
257 : : next_value) < 0)
258 : : next_idx--;
259 : : }
260 : : }
261 : :
262 [ + + ]: 52038308 : if ((data & RTE_DISTRIB_GET_BUF) &&
263 [ + + + - ]: 1049658 : (d->backlog[wkr].count || next_mb)) {
264 : :
265 [ + + ]: 1049658 : if (d->backlog[wkr].count)
266 : : /* Sync with worker. Release bufptr64. */
267 : 23 : rte_atomic_store_explicit(&d->bufs[wkr].bufptr64,
268 : : backlog_pop(&d->backlog[wkr]),
269 : : rte_memory_order_release);
270 : :
271 : : else {
272 : : /* Sync with worker. Release bufptr64. */
273 : 1049635 : rte_atomic_store_explicit(&d->bufs[wkr].bufptr64,
274 : : next_value,
275 : : rte_memory_order_release);
276 : 1049635 : d->in_flight_tags[wkr] = new_tag;
277 : 1049635 : d->in_flight_bitmask |= (1UL << wkr);
278 : : next_mb = NULL;
279 : : }
280 : 1049658 : oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
281 [ - + ]: 50988650 : } else if (data & RTE_DISTRIB_RETURN_BUF) {
282 : 0 : handle_worker_shutdown(d, wkr);
283 : 0 : oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
284 : : }
285 : :
286 : : /* store returns in a circular buffer */
287 : : store_return(oldbuf, d, &ret_start, &ret_count);
288 : :
289 [ + - ]: 52038308 : if (++wkr == d->num_workers)
290 : : wkr = 0;
291 : : }
292 : : /* to finish, check all workers for backlog and schedule work for them
293 : : * if they are ready */
294 [ + + ]: 65608 : for (wkr = 0; wkr < d->num_workers; wkr++)
295 [ + + ]: 32804 : if (d->backlog[wkr].count &&
296 : : /* Sync with worker. Acquire bufptr64. */
297 : 1 : (rte_atomic_load_explicit(&d->bufs[wkr].bufptr64,
298 [ + - ]: 1 : rte_memory_order_acquire) & RTE_DISTRIB_GET_BUF)) {
299 : :
300 : 1 : int64_t oldbuf = d->bufs[wkr].bufptr64 >>
301 : : RTE_DISTRIB_FLAG_BITS;
302 : :
303 : : store_return(oldbuf, d, &ret_start, &ret_count);
304 : :
305 : : /* Sync with worker. Release bufptr64. */
306 : 1 : rte_atomic_store_explicit(&d->bufs[wkr].bufptr64,
307 : : backlog_pop(&d->backlog[wkr]),
308 : : rte_memory_order_release);
309 : : }
310 : :
311 : 32804 : d->returns.start = ret_start;
312 : 32804 : d->returns.count = ret_count;
313 : 32804 : return num_mbufs;
314 : : }
315 : :
316 : : /* return to the caller, packets returned from workers */
317 : : int
318 : 39 : rte_distributor_returned_pkts_single(struct rte_distributor_single *d,
319 : : struct rte_mbuf **mbufs, unsigned max_mbufs)
320 : : {
321 : : struct rte_distributor_returned_pkts *returns = &d->returns;
322 : 39 : unsigned retval = (max_mbufs < returns->count) ?
323 : : max_mbufs : returns->count;
324 : : unsigned i;
325 : :
326 [ + + ]: 1129 : for (i = 0; i < retval; i++) {
327 : 1090 : unsigned idx = (returns->start + i) & RTE_DISTRIB_RETURNS_MASK;
328 : 1090 : mbufs[i] = returns->mbufs[idx];
329 : : }
330 : 39 : returns->start += i;
331 : 39 : returns->count -= i;
332 : :
333 : 39 : return retval;
334 : : }
335 : :
336 : : /* return the number of packets in-flight in a distributor, i.e. packets
337 : : * being worked on or queued up in a backlog.
338 : : */
339 : : static inline unsigned
340 : : total_outstanding(const struct rte_distributor_single *d)
341 : : {
342 : : unsigned wkr, total_outstanding;
343 : :
344 : 553 : total_outstanding = rte_popcount64(d->in_flight_bitmask);
345 : :
346 [ + + + + ]: 1184 : for (wkr = 0; wkr < d->num_workers; wkr++)
347 : 592 : total_outstanding += d->backlog[wkr].count;
348 : :
349 : : return total_outstanding;
350 : : }
351 : :
352 : : /* flush the distributor, so that there are no outstanding packets in flight or
353 : : * queued up. */
354 : : int
355 : 39 : rte_distributor_flush_single(struct rte_distributor_single *d)
356 : : {
357 : : const unsigned flushed = total_outstanding(d);
358 : :
359 [ + + ]: 553 : while (total_outstanding(d) > 0)
360 : 514 : rte_distributor_process_single(d, NULL, 0);
361 : :
362 : 39 : return flushed;
363 : : }
364 : :
365 : : /* clears the internal returns array in the distributor */
366 : : void
367 : 3 : rte_distributor_clear_returns_single(struct rte_distributor_single *d)
368 : : {
369 : 3 : d->returns.start = d->returns.count = 0;
370 : : #ifndef __OPTIMIZE__
371 : : memset(d->returns.mbufs, 0, sizeof(d->returns.mbufs));
372 : : #endif
373 : 3 : }
374 : :
375 : : /* creates a distributor instance */
376 : : struct rte_distributor_single *
377 : 1 : rte_distributor_create_single(const char *name,
378 : : unsigned socket_id,
379 : : unsigned num_workers)
380 : : {
381 : : struct rte_distributor_single *d;
382 : : struct rte_distributor_list *distributor_list;
383 : : char mz_name[RTE_MEMZONE_NAMESIZE];
384 : : const struct rte_memzone *mz;
385 : :
386 : : /* compilation-time checks */
387 : : RTE_BUILD_BUG_ON((sizeof(*d) & RTE_CACHE_LINE_MASK) != 0);
388 : : RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
389 : : RTE_BUILD_BUG_ON(RTE_DISTRIB_MAX_WORKERS >
390 : : sizeof(d->in_flight_bitmask) * CHAR_BIT);
391 : :
392 [ - + ]: 1 : if (name == NULL || num_workers >= RTE_DISTRIB_MAX_WORKERS) {
393 : 0 : rte_errno = EINVAL;
394 : 0 : return NULL;
395 : : }
396 : :
397 : : snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
398 : 1 : mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
399 [ - + ]: 1 : if (mz == NULL) {
400 : 0 : rte_errno = ENOMEM;
401 : 0 : return NULL;
402 : : }
403 : :
404 : 1 : d = mz->addr;
405 : 1 : strlcpy(d->name, name, sizeof(d->name));
406 : 1 : d->num_workers = num_workers;
407 : :
408 : 1 : distributor_list = RTE_TAILQ_CAST(rte_distributor_tailq.head,
409 : : rte_distributor_list);
410 : :
411 : 1 : rte_mcfg_tailq_write_lock();
412 : 1 : TAILQ_INSERT_TAIL(distributor_list, d, next);
413 : 1 : rte_mcfg_tailq_write_unlock();
414 : :
415 : 1 : return d;
416 : : }
|