Branch data Line data Source code
1 : : /* SPDX-License-Identifier: BSD-3-Clause
2 : : *
3 : : * Copyright (c) 2018-2020 Arm Limited
4 : : */
5 : :
6 : : #include <stdio.h>
7 : : #include <string.h>
8 : : #include <stdint.h>
9 : : #include <inttypes.h>
10 : : #include <errno.h>
11 : :
12 : : #include <eal_export.h>
13 : : #include <rte_common.h>
14 : : #include <rte_log.h>
15 : : #include <rte_memory.h>
16 : : #include <rte_malloc.h>
17 : : #include <rte_errno.h>
18 : : #include <rte_ring_elem.h>
19 : :
20 : : #include "rte_rcu_qsbr.h"
21 : : #include "rcu_qsbr_pvt.h"
22 : :
23 : : #define RCU_LOG(level, ...) \
24 : : RTE_LOG_LINE_PREFIX(level, RCU, "%s(): ", __func__, __VA_ARGS__)
25 : :
26 : : /* Get the memory size of QSBR variable */
27 : : RTE_EXPORT_SYMBOL(rte_rcu_qsbr_get_memsize)
28 : : size_t
29 : 7929 : rte_rcu_qsbr_get_memsize(uint32_t max_threads)
30 : : {
31 : : size_t sz;
32 : :
33 [ + + ]: 7929 : if (max_threads == 0) {
34 : 1 : RCU_LOG(ERR, "Invalid max_threads %u", max_threads);
35 : 1 : rte_errno = EINVAL;
36 : :
37 : 1 : return 1;
38 : : }
39 : :
40 : : sz = sizeof(struct rte_rcu_qsbr);
41 : :
42 : : /* Add the size of quiescent state counter array */
43 : 7928 : sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;
44 : :
45 : : /* Add the size of the registered thread ID bitmap array */
46 : 7928 : sz += __RTE_QSBR_THRID_ARRAY_SIZE(max_threads);
47 : :
48 : 7928 : return sz;
49 : : }
50 : :
51 : : /* Initialize a quiescent state variable */
52 : : RTE_EXPORT_SYMBOL(rte_rcu_qsbr_init)
53 : : int
54 : 7448 : rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
55 : : {
56 : : size_t sz;
57 : :
58 [ + + ]: 7448 : if (v == NULL) {
59 : 1 : RCU_LOG(ERR, "Invalid input parameter");
60 : 1 : rte_errno = EINVAL;
61 : :
62 : 1 : return 1;
63 : : }
64 : :
65 : 7447 : sz = rte_rcu_qsbr_get_memsize(max_threads);
66 [ + - ]: 7447 : if (sz == 1)
67 : : return 1;
68 : :
69 : : /* Set all the threads to offline */
70 : : memset(v, 0, sz);
71 : 7447 : v->max_threads = max_threads;
72 : 7447 : v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,
73 : : __RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
74 : : __RTE_QSBR_THRID_ARRAY_ELM_SIZE;
75 : 7447 : v->token = __RTE_QSBR_CNT_INIT;
76 : 7447 : v->acked_token = __RTE_QSBR_CNT_INIT - 1;
77 : :
78 : 7447 : return 0;
79 : : }
80 : :
81 : : /* Register a reader thread to report its quiescent state
82 : : * on a QS variable.
83 : : */
84 : : RTE_EXPORT_SYMBOL(rte_rcu_qsbr_thread_register)
85 : : int
86 : 543 : rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
87 : : {
88 : : unsigned int i, id;
89 : : uint64_t old_bmap;
90 : :
91 [ + + + + ]: 543 : if (v == NULL || thread_id >= v->max_threads) {
92 : 3 : RCU_LOG(ERR, "Invalid input parameter");
93 : 3 : rte_errno = EINVAL;
94 : :
95 : 3 : return 1;
96 : : }
97 : :
98 : : __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u",
99 : : v->qsbr_cnt[thread_id].lock_cnt);
100 : :
101 : 540 : id = thread_id & __RTE_QSBR_THRID_MASK;
102 : 540 : i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT;
103 : :
104 : : /* Add the thread to the bitmap of registered threads */
105 : 540 : old_bmap = rte_atomic_fetch_or_explicit(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
106 : : RTE_BIT64(id), rte_memory_order_release);
107 : :
108 : : /* Increment the number of threads registered only if the thread was not already
109 : : * registered
110 : : */
111 [ + + ]: 540 : if (!(old_bmap & RTE_BIT64(id)))
112 : 539 : rte_atomic_fetch_add_explicit(&v->num_threads, 1, rte_memory_order_relaxed);
113 : :
114 : : return 0;
115 : : }
116 : :
117 : : /* Remove a reader thread, from the list of threads reporting their
118 : : * quiescent state on a QS variable.
119 : : */
120 : : RTE_EXPORT_SYMBOL(rte_rcu_qsbr_thread_unregister)
121 : : int
122 : 276 : rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
123 : : {
124 : : unsigned int i, id;
125 : : uint64_t old_bmap;
126 : :
127 [ + + + + ]: 276 : if (v == NULL || thread_id >= v->max_threads) {
128 : 3 : RCU_LOG(ERR, "Invalid input parameter");
129 : 3 : rte_errno = EINVAL;
130 : :
131 : 3 : return 1;
132 : : }
133 : :
134 : : __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u",
135 : : v->qsbr_cnt[thread_id].lock_cnt);
136 : :
137 : 273 : id = thread_id & __RTE_QSBR_THRID_MASK;
138 : 273 : i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT;
139 : :
140 : : /* Make sure any loads of the shared data structure are
141 : : * completed before removal of the thread from the bitmap of
142 : : * reporting threads.
143 : : */
144 : 273 : old_bmap = rte_atomic_fetch_and_explicit(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
145 : : ~RTE_BIT64(id), rte_memory_order_release);
146 : :
147 : : /* Decrement the number of threads unregistered only if the thread was not already
148 : : * unregistered
149 : : */
150 [ + + ]: 273 : if (old_bmap & RTE_BIT64(id))
151 : 270 : rte_atomic_fetch_sub_explicit(&v->num_threads, 1, rte_memory_order_relaxed);
152 : :
153 : : return 0;
154 : : }
155 : :
156 : : /* Wait till the reader threads have entered quiescent state. */
157 : : RTE_EXPORT_SYMBOL(rte_rcu_qsbr_synchronize)
158 : : void
159 [ + + ]: 2058 : rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
160 : : {
161 : : uint64_t t;
162 : :
163 : : RTE_ASSERT(v != NULL);
164 : :
165 : : t = rte_rcu_qsbr_start(v);
166 : :
167 : : /* If the current thread has readside critical section,
168 : : * update its quiescent state status.
169 : : */
170 [ + + ]: 2058 : if (thread_id != RTE_QSBR_THRID_INVALID)
171 : : rte_rcu_qsbr_quiescent(v, thread_id);
172 : :
173 : : /* Wait for other readers to enter quiescent state */
174 : : rte_rcu_qsbr_check(v, t, true);
175 : 2058 : }
176 : :
177 : : /* Dump the details of a single quiescent state variable to a file. */
178 : : RTE_EXPORT_SYMBOL(rte_rcu_qsbr_dump)
179 : : int
180 : 6 : rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
181 : : {
182 : : uint64_t bmap;
183 : : uint32_t i, t, id;
184 : :
185 [ + + ]: 6 : if (v == NULL || f == NULL) {
186 : 3 : RCU_LOG(ERR, "Invalid input parameter");
187 : 3 : rte_errno = EINVAL;
188 : :
189 : 3 : return 1;
190 : : }
191 : :
192 : : fprintf(f, "\nQuiescent State Variable @%p\n", v);
193 : :
194 : 3 : fprintf(f, " QS variable memory size = %zu\n",
195 : : rte_rcu_qsbr_get_memsize(v->max_threads));
196 : 3 : fprintf(f, " Given # max threads = %u\n", v->max_threads);
197 : 3 : fprintf(f, " Current # threads = %u\n", v->num_threads);
198 : :
199 : : fprintf(f, " Registered thread IDs = ");
200 [ + + ]: 9 : for (i = 0; i < v->num_elems; i++) {
201 : 6 : bmap = rte_atomic_load_explicit(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
202 : : rte_memory_order_acquire);
203 : 6 : id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
204 [ + + ]: 7 : while (bmap) {
205 : : t = rte_ctz64(bmap);
206 : 1 : fprintf(f, "%u ", id + t);
207 : :
208 : 1 : bmap &= ~RTE_BIT64(t);
209 : : }
210 : : }
211 : :
212 : : fprintf(f, "\n");
213 : :
214 : 3 : fprintf(f, " Token = %" PRIu64 "\n",
215 : 3 : rte_atomic_load_explicit(&v->token, rte_memory_order_acquire));
216 : :
217 : 3 : fprintf(f, " Least Acknowledged Token = %" PRIu64 "\n",
218 : 3 : rte_atomic_load_explicit(&v->acked_token, rte_memory_order_acquire));
219 : :
220 : : fprintf(f, "Quiescent State Counts for readers:\n");
221 [ + + ]: 9 : for (i = 0; i < v->num_elems; i++) {
222 : 6 : bmap = rte_atomic_load_explicit(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
223 : : rte_memory_order_acquire);
224 : 6 : id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
225 [ + + ]: 7 : while (bmap) {
226 : : t = rte_ctz64(bmap);
227 : 1 : fprintf(f, "thread ID = %u, count = %" PRIu64 ", lock count = %u\n",
228 : : id + t,
229 : 1 : rte_atomic_load_explicit(
230 : : &v->qsbr_cnt[id + t].cnt,
231 : : rte_memory_order_relaxed),
232 : 1 : rte_atomic_load_explicit(
233 : : &v->qsbr_cnt[id + t].lock_cnt,
234 : : rte_memory_order_relaxed));
235 : 1 : bmap &= ~RTE_BIT64(t);
236 : : }
237 : : }
238 : :
239 : : return 0;
240 : : }
241 : :
242 : : /* Create a queue used to store the data structure elements that can
243 : : * be freed later. This queue is referred to as 'defer queue'.
244 : : */
245 : : RTE_EXPORT_SYMBOL(rte_rcu_qsbr_dq_create)
246 : : struct rte_rcu_qsbr_dq *
247 : 24 : rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
248 : : {
249 : : struct rte_rcu_qsbr_dq *dq;
250 : : uint32_t qs_fifo_size;
251 : : unsigned int flags;
252 : :
253 [ + + + + ]: 24 : if (params == NULL || params->free_fn == NULL ||
254 [ + + + - ]: 21 : params->v == NULL || params->name == NULL ||
255 [ + + + + ]: 20 : params->size == 0 || params->esize == 0 ||
256 [ + + ]: 18 : (params->esize % 4 != 0)) {
257 : 9 : RCU_LOG(ERR, "Invalid input parameter");
258 : 9 : rte_errno = EINVAL;
259 : :
260 : 9 : return NULL;
261 : : }
262 : : /* If auto reclamation is configured, reclaim limit
263 : : * should be a valid value.
264 : : */
265 [ + - ]: 15 : if ((params->trigger_reclaim_limit <= params->size) &&
266 [ - + ]: 15 : (params->max_reclaim_size == 0)) {
267 : 0 : RCU_LOG(ERR,
268 : : "Invalid input parameter, size = %u, trigger_reclaim_limit = %u, "
269 : : "max_reclaim_size = %u",
270 : : params->size, params->trigger_reclaim_limit,
271 : : params->max_reclaim_size);
272 : 0 : rte_errno = EINVAL;
273 : :
274 : 0 : return NULL;
275 : : }
276 : :
277 : 15 : dq = rte_zmalloc(NULL, sizeof(struct rte_rcu_qsbr_dq),
278 : : RTE_CACHE_LINE_SIZE);
279 [ - + ]: 15 : if (dq == NULL) {
280 : 0 : rte_errno = ENOMEM;
281 : :
282 : 0 : return NULL;
283 : : }
284 : :
285 : : /* Decide the flags for the ring.
286 : : * If MT safety is requested, use RTS for ring enqueue as most
287 : : * use cases involve dq-enqueue happening on the control plane.
288 : : * Ring dequeue is always HTS due to the possibility of revert.
289 : : */
290 : : flags = RING_F_MP_RTS_ENQ;
291 [ + + ]: 15 : if (params->flags & RTE_RCU_QSBR_DQ_MT_UNSAFE)
292 : : flags = RING_F_SP_ENQ;
293 : 15 : flags |= RING_F_MC_HTS_DEQ;
294 : : /* round up qs_fifo_size to next power of two that is not less than
295 : : * max_size.
296 : : */
297 : 15 : qs_fifo_size = rte_align32pow2(params->size + 1);
298 : : /* Add token size to ring element size */
299 : 30 : dq->r = rte_ring_create_elem(params->name,
300 : 15 : __RTE_QSBR_TOKEN_SIZE + params->esize,
301 : : qs_fifo_size, SOCKET_ID_ANY, flags);
302 [ - + ]: 15 : if (dq->r == NULL) {
303 : 0 : RCU_LOG(ERR, "defer queue create failed");
304 : 0 : rte_free(dq);
305 : 0 : return NULL;
306 : : }
307 : :
308 : 15 : dq->v = params->v;
309 : 15 : dq->size = params->size;
310 : 15 : dq->esize = __RTE_QSBR_TOKEN_SIZE + params->esize;
311 : 15 : dq->trigger_reclaim_limit = params->trigger_reclaim_limit;
312 : 15 : dq->max_reclaim_size = params->max_reclaim_size;
313 : 15 : dq->free_fn = params->free_fn;
314 : 15 : dq->p = params->p;
315 : :
316 : 15 : return dq;
317 : : }
318 : :
319 : : /* Enqueue one resource to the defer queue to free after the grace
320 : : * period is over.
321 : : */
322 : : RTE_EXPORT_SYMBOL(rte_rcu_qsbr_dq_enqueue)
323 : 1102 : int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
324 : 1102 : {
325 : : __rte_rcu_qsbr_dq_elem_t *dq_elem;
326 : : uint32_t cur_size;
327 : :
328 [ + + ]: 1102 : if (dq == NULL || e == NULL) {
329 : 3 : RCU_LOG(ERR, "Invalid input parameter");
330 : 3 : rte_errno = EINVAL;
331 : :
332 : 3 : return 1;
333 : : }
334 : :
335 : 1099 : char data[dq->esize];
336 : : dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data;
337 : : /* Start the grace period */
338 [ + + ]: 1099 : dq_elem->token = rte_rcu_qsbr_start(dq->v);
339 : :
340 : : /* Reclaim resources if the queue size has hit the reclaim
341 : : * limit. This helps the queue from growing too large and
342 : : * allows time for reader threads to report their quiescent state.
343 : : */
344 [ + + ]: 1099 : cur_size = rte_ring_count(dq->r);
345 [ + + ]: 1099 : if (cur_size > dq->trigger_reclaim_limit) {
346 : 1036 : RCU_LOG(INFO, "Triggering reclamation");
347 : 1036 : rte_rcu_qsbr_dq_reclaim(dq, dq->max_reclaim_size,
348 : : NULL, NULL, NULL);
349 : : }
350 : :
351 : : /* Enqueue the token and resource. Generating the token and
352 : : * enqueuing (token + resource) on the queue is not an
353 : : * atomic operation. When the defer queue is shared by multiple
354 : : * writers, this might result in tokens enqueued out of order
355 : : * on the queue. So, some tokens might wait longer than they
356 : : * are required to be reclaimed.
357 : : */
358 [ - + + - : 1099 : memcpy(dq_elem->elem, e, dq->esize - __RTE_QSBR_TOKEN_SIZE);
- ]
359 : : /* Check the status as enqueue might fail since the other threads
360 : : * might have used up the freed space.
361 : : * Enqueue uses the configured flags when the DQ was created.
362 : : */
363 [ - + + - : 1099 : if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) {
- ]
364 : 8 : RCU_LOG(ERR, "Enqueue failed");
365 : : /* Note that the token generated above is not used.
366 : : * Other than wasting tokens, it should not cause any
367 : : * other issues.
368 : : */
369 : 8 : RCU_LOG(INFO, "Skipped enqueuing token = %" PRIu64, dq_elem->token);
370 : :
371 : 8 : rte_errno = ENOSPC;
372 : 8 : return 1;
373 : : }
374 : :
375 : 1091 : RCU_LOG(INFO, "Enqueued token = %" PRIu64, dq_elem->token);
376 : :
377 : 1091 : return 0;
378 : : }
379 : :
380 : : /* Reclaim resources from the defer queue. */
381 : : RTE_EXPORT_SYMBOL(rte_rcu_qsbr_dq_reclaim)
382 : : int
383 : 1064 : rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
384 : : unsigned int *freed, unsigned int *pending,
385 : : unsigned int *available)
386 : 1064 : {
387 : : uint32_t cnt;
388 : : __rte_rcu_qsbr_dq_elem_t *dq_elem;
389 : :
390 [ + + ]: 1064 : if (dq == NULL || n == 0) {
391 : 2 : RCU_LOG(ERR, "Invalid input parameter");
392 : 2 : rte_errno = EINVAL;
393 : :
394 : 2 : return 1;
395 : : }
396 : :
397 : : cnt = 0;
398 : :
399 : 1062 : char data[dq->esize];
400 : : /* Check reader threads quiescent state and reclaim resources */
401 [ + + + + ]: 4236 : while (cnt < n &&
402 [ - + - ]: 2083 : rte_ring_dequeue_bulk_elem_start(dq->r, &data,
403 : : dq->esize, 1, available) != 0) {
404 : : dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data;
405 : :
406 : : /* Reclaim the resource */
407 [ + + ]: 2065 : if (rte_rcu_qsbr_check(dq->v, dq_elem->token, false) != 1) {
408 [ - + - ]: 974 : rte_ring_dequeue_elem_finish(dq->r, 0);
409 : : break;
410 : : }
411 [ - + - ]: 1091 : rte_ring_dequeue_elem_finish(dq->r, 1);
412 : :
413 : 1091 : RCU_LOG(INFO, "Reclaimed token = %" PRIu64, dq_elem->token);
414 : :
415 : 1091 : dq->free_fn(dq->p, dq_elem->elem, 1);
416 : :
417 : 1091 : cnt++;
418 : : }
419 : :
420 : 1062 : RCU_LOG(INFO, "Reclaimed %u resources", cnt);
421 : :
422 [ + + ]: 1062 : if (freed != NULL)
423 : 1 : *freed = cnt;
424 [ + + ]: 1062 : if (pending != NULL)
425 : 20 : *pending = rte_ring_count(dq->r);
426 : :
427 : : return 0;
428 : : }
429 : :
430 : : /* Delete a defer queue. */
431 : : RTE_EXPORT_SYMBOL(rte_rcu_qsbr_dq_delete)
432 : : int
433 : 25 : rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
434 : : {
435 : : unsigned int pending;
436 : :
437 [ + + ]: 25 : if (dq == NULL) {
438 : 6 : RCU_LOG(DEBUG, "Invalid input parameter");
439 : :
440 : 6 : return 0;
441 : : }
442 : :
443 : : /* Reclaim all the resources */
444 : 19 : rte_rcu_qsbr_dq_reclaim(dq, ~0, NULL, &pending, NULL);
445 [ + + ]: 19 : if (pending != 0) {
446 : 4 : rte_errno = EAGAIN;
447 : :
448 : 4 : return 1;
449 : : }
450 : :
451 : 15 : rte_ring_free(dq->r);
452 : 15 : rte_free(dq);
453 : :
454 : 15 : return 0;
455 : : }
456 : :
457 : : RTE_EXPORT_SYMBOL(rte_rcu_log_type)
458 [ - + ]: 252 : RTE_LOG_REGISTER_DEFAULT(rte_rcu_log_type, ERR);
|