Branch data Line data Source code
1 : : /* SPDX-License-Identifier: BSD-3-Clause
2 : : * Copyright(c) 2018 Ericsson AB
3 : : */
4 : :
5 : : #include "dsw_evdev.h"
6 : :
7 : : #ifdef DSW_SORT_DEQUEUED
8 : : #include "dsw_sort.h"
9 : : #endif
10 : :
11 : : #include <stdbool.h>
12 : : #include <stdlib.h>
13 : : #include <string.h>
14 : :
15 : : #include <rte_cycles.h>
16 : : #include <rte_memcpy.h>
17 : : #include <rte_random.h>
18 : :
19 : : static bool
20 : 0 : dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
21 : : int32_t credits)
22 : : {
23 : 0 : int32_t inflight_credits = port->inflight_credits;
24 : 0 : int32_t missing_credits = credits - inflight_credits;
25 : : int32_t total_on_loan;
26 : : int32_t available;
27 : : int32_t acquired_credits;
28 : : int32_t new_total_on_loan;
29 : :
30 [ # # ]: 0 : if (likely(missing_credits <= 0)) {
31 : 0 : port->inflight_credits -= credits;
32 : 0 : return true;
33 : : }
34 : :
35 : 0 : total_on_loan =
36 : 0 : __atomic_load_n(&dsw->credits_on_loan, __ATOMIC_RELAXED);
37 : 0 : available = dsw->max_inflight - total_on_loan;
38 : 0 : acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
39 : :
40 [ # # ]: 0 : if (available < acquired_credits)
41 : : return false;
42 : :
43 : : /* This is a race, no locks are involved, and thus some other
44 : : * thread can allocate tokens in between the check and the
45 : : * allocation.
46 : : */
47 : 0 : new_total_on_loan =
48 : 0 : __atomic_fetch_add(&dsw->credits_on_loan, acquired_credits,
49 : : __ATOMIC_RELAXED) + acquired_credits;
50 : :
51 [ # # ]: 0 : if (unlikely(new_total_on_loan > dsw->max_inflight)) {
52 : : /* Some other port took the last credits */
53 : 0 : __atomic_fetch_sub(&dsw->credits_on_loan, acquired_credits,
54 : : __ATOMIC_RELAXED);
55 : 0 : return false;
56 : : }
57 : :
58 : : DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
59 : : acquired_credits);
60 : :
61 : 0 : port->inflight_credits += acquired_credits;
62 : 0 : port->inflight_credits -= credits;
63 : :
64 : 0 : return true;
65 : : }
66 : :
67 : : static void
68 : : dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
69 : : int32_t credits)
70 : : {
71 : 0 : port->inflight_credits += credits;
72 : :
73 [ # # ]: 0 : if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
74 : : int32_t leave_credits = DSW_PORT_MIN_CREDITS;
75 : 0 : int32_t return_credits =
76 : : port->inflight_credits - leave_credits;
77 : :
78 : 0 : port->inflight_credits = leave_credits;
79 : :
80 : 0 : __atomic_fetch_sub(&dsw->credits_on_loan, return_credits,
81 : : __ATOMIC_RELAXED);
82 : :
83 : : DSW_LOG_DP_PORT(DEBUG, port->id,
84 : : "Returned %d tokens to pool.\n",
85 : : return_credits);
86 : : }
87 : : }
88 : :
89 : : static void
90 : : dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,
91 : : uint16_t num_forward, uint16_t num_release)
92 : : {
93 : 0 : port->new_enqueued += num_new;
94 : 0 : port->forward_enqueued += num_forward;
95 : 0 : port->release_enqueued += num_release;
96 : 0 : }
97 : :
98 : : static void
99 : : dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)
100 : : {
101 : 0 : source_port->queue_enqueued[queue_id]++;
102 : : }
103 : :
104 : : static void
105 : : dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)
106 : : {
107 : 0 : port->dequeued += num;
108 : : }
109 : :
110 : : static void
111 : : dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)
112 : : {
113 : 0 : source_port->queue_dequeued[queue_id]++;
114 : : }
115 : :
116 : : static void
117 : : dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
118 : : {
119 [ # # ]: 0 : if (dequeued > 0 && port->busy_start == 0)
120 : : /* work period begins */
121 : 0 : port->busy_start = rte_get_timer_cycles();
122 [ # # # # ]: 0 : else if (dequeued == 0 && port->busy_start > 0) {
123 : : /* work period ends */
124 : 0 : uint64_t work_period =
125 : 0 : rte_get_timer_cycles() - port->busy_start;
126 : 0 : port->busy_cycles += work_period;
127 : 0 : port->busy_start = 0;
128 : : }
129 : : }
130 : :
131 : : static int16_t
132 : : dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
133 : : {
134 : 0 : uint64_t passed = now - port->measurement_start;
135 : 0 : uint64_t busy_cycles = port->busy_cycles;
136 : :
137 : 0 : if (port->busy_start > 0) {
138 : 0 : busy_cycles += (now - port->busy_start);
139 : 0 : port->busy_start = now;
140 : : }
141 : :
142 : 0 : int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
143 : :
144 : 0 : port->measurement_start = now;
145 : 0 : port->busy_cycles = 0;
146 : :
147 : 0 : port->total_busy_cycles += busy_cycles;
148 : :
149 : : return load;
150 : : }
151 : :
152 : : static void
153 : 0 : dsw_port_load_update(struct dsw_port *port, uint64_t now)
154 : : {
155 : : int16_t old_load;
156 : : int16_t period_load;
157 : : int16_t new_load;
158 : :
159 [ # # ]: 0 : old_load = __atomic_load_n(&port->load, __ATOMIC_RELAXED);
160 : :
161 : : period_load = dsw_port_load_close_period(port, now);
162 : :
163 : 0 : new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
164 : : (DSW_OLD_LOAD_WEIGHT+1);
165 : :
166 : 0 : __atomic_store_n(&port->load, new_load, __ATOMIC_RELAXED);
167 : :
168 : : /* The load of the recently immigrated flows should hopefully
169 : : * be reflected the load estimate by now.
170 : : */
171 : 0 : __atomic_store_n(&port->immigration_load, 0, __ATOMIC_RELAXED);
172 : 0 : }
173 : :
174 : : static void
175 : : dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
176 : : {
177 [ # # ]: 0 : if (now < port->next_load_update)
178 : : return;
179 : :
180 : 0 : port->next_load_update = now + port->load_update_interval;
181 : :
182 : 0 : dsw_port_load_update(port, now);
183 : : }
184 : :
185 : : static void
186 : 0 : dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
187 : : {
188 : : /* there's always room on the ring */
189 [ # # # # : 0 : while (rte_ring_enqueue_elem(port->ctl_in_ring, msg, sizeof(*msg)) != 0)
# ]
190 : : rte_pause();
191 : 0 : }
192 : :
193 : : static int
194 : 0 : dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
195 : : {
196 [ # # # # : 0 : return rte_ring_dequeue_elem(port->ctl_in_ring, msg, sizeof(*msg));
# ]
197 : : }
198 : :
199 : : static void
200 : 0 : dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
201 : : uint8_t type, struct dsw_queue_flow *qfs,
202 : : uint8_t qfs_len)
203 : : {
204 : : uint16_t port_id;
205 : 0 : struct dsw_ctl_msg msg = {
206 : : .type = type,
207 : 0 : .originating_port_id = source_port->id,
208 : : .qfs_len = qfs_len
209 : : };
210 : :
211 : 0 : memcpy(msg.qfs, qfs, sizeof(struct dsw_queue_flow) * qfs_len);
212 : :
213 [ # # ]: 0 : for (port_id = 0; port_id < dsw->num_ports; port_id++)
214 [ # # ]: 0 : if (port_id != source_port->id)
215 : 0 : dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
216 : 0 : }
217 : :
218 : : static __rte_always_inline bool
219 : : dsw_is_queue_flow_in_ary(const struct dsw_queue_flow *qfs, uint16_t qfs_len,
220 : : uint8_t queue_id, uint16_t flow_hash)
221 : : {
222 : : uint16_t i;
223 : :
224 [ # # # # : 0 : for (i = 0; i < qfs_len; i++)
# # # # #
# ]
225 [ # # # # : 0 : if (qfs[i].queue_id == queue_id &&
# # # # #
# ]
226 [ # # # # : 0 : qfs[i].flow_hash == flow_hash)
# # # # #
# ]
227 : : return true;
228 : :
229 : : return false;
230 : : }
231 : :
232 : : static __rte_always_inline bool
233 : : dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
234 : : uint16_t flow_hash)
235 : : {
236 : 0 : return dsw_is_queue_flow_in_ary(port->paused_flows,
237 : 0 : port->paused_flows_len,
238 : : queue_id, flow_hash);
239 : : }
240 : :
241 : : static __rte_always_inline bool
242 : : dsw_port_is_flow_migrating(struct dsw_port *port, uint8_t queue_id,
243 : : uint16_t flow_hash)
244 : : {
245 : 0 : return dsw_is_queue_flow_in_ary(port->emigration_target_qfs,
246 : 0 : port->emigration_targets_len,
247 : : queue_id, flow_hash);
248 : : }
249 : :
250 : : static void
251 : : dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs,
252 : : uint8_t qfs_len)
253 : : {
254 : : uint8_t i;
255 : :
256 [ # # # # ]: 0 : for (i = 0; i < qfs_len; i++) {
257 : 0 : struct dsw_queue_flow *qf = &qfs[i];
258 : :
259 : : DSW_LOG_DP_PORT(DEBUG, port->id,
260 : : "Pausing queue_id %d flow_hash %d.\n",
261 : : qf->queue_id, qf->flow_hash);
262 : :
263 : 0 : port->paused_flows[port->paused_flows_len] = *qf;
264 : 0 : port->paused_flows_len++;
265 : : };
266 : : }
267 : :
268 : : static void
269 : 0 : dsw_port_remove_paused_flow(struct dsw_port *port,
270 : : struct dsw_queue_flow *target_qf)
271 : : {
272 : : uint16_t i;
273 : :
274 [ # # ]: 0 : for (i = 0; i < port->paused_flows_len; i++) {
275 : 0 : struct dsw_queue_flow *qf = &port->paused_flows[i];
276 : :
277 [ # # ]: 0 : if (qf->queue_id == target_qf->queue_id &&
278 [ # # ]: 0 : qf->flow_hash == target_qf->flow_hash) {
279 : 0 : uint16_t last_idx = port->paused_flows_len-1;
280 [ # # ]: 0 : if (i != last_idx)
281 : 0 : port->paused_flows[i] =
282 : 0 : port->paused_flows[last_idx];
283 : 0 : port->paused_flows_len--;
284 : :
285 : : DSW_LOG_DP_PORT(DEBUG, port->id,
286 : : "Unpausing queue_id %d flow_hash %d.\n",
287 : : target_qf->queue_id,
288 : : target_qf->flow_hash);
289 : :
290 : 0 : return;
291 : : }
292 : : }
293 : :
294 : 0 : DSW_LOG_DP_PORT(ERR, port->id,
295 : : "Failed to unpause queue_id %d flow_hash %d.\n",
296 : : target_qf->queue_id, target_qf->flow_hash);
297 : : }
298 : :
299 : : static void
300 : : dsw_port_remove_paused_flows(struct dsw_port *port,
301 : : struct dsw_queue_flow *qfs, uint8_t qfs_len)
302 : : {
303 : : uint8_t i;
304 : :
305 [ # # # # ]: 0 : for (i = 0; i < qfs_len; i++)
306 : 0 : dsw_port_remove_paused_flow(port, &qfs[i]);
307 : : }
308 : :
309 : : static void
310 : : dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
311 : :
312 : : static void
313 : 0 : dsw_port_handle_pause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
314 : : uint8_t originating_port_id,
315 : : struct dsw_queue_flow *paused_qfs,
316 : : uint8_t qfs_len)
317 : : {
318 : 0 : struct dsw_ctl_msg cfm = {
319 : : .type = DSW_CTL_CFM,
320 : 0 : .originating_port_id = port->id
321 : : };
322 : :
323 : : /* There might be already-scheduled events belonging to the
324 : : * paused flow in the output buffers.
325 : : */
326 : : dsw_port_flush_out_buffers(dsw, port);
327 : :
328 : : dsw_port_add_paused_flows(port, paused_qfs, qfs_len);
329 : :
330 : : /* Make sure any stores to the original port's in_ring is seen
331 : : * before the ctl message.
332 : : */
333 : 0 : rte_smp_wmb();
334 : :
335 : 0 : dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
336 : 0 : }
337 : :
338 : : struct dsw_queue_flow_burst {
339 : : struct dsw_queue_flow queue_flow;
340 : : uint16_t count;
341 : : };
342 : :
343 : : #define DSW_QF_TO_INT(_qf) \
344 : : ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
345 : :
346 : : static inline int
347 : 0 : dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
348 : : {
349 : : const struct dsw_queue_flow *qf_a = v_qf_a;
350 : : const struct dsw_queue_flow *qf_b = v_qf_b;
351 : :
352 : 0 : return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
353 : : }
354 : :
355 : : static uint16_t
356 : 0 : dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
357 : : struct dsw_queue_flow_burst *bursts)
358 : : {
359 : : uint16_t i;
360 : : struct dsw_queue_flow_burst *current_burst = NULL;
361 : : uint16_t num_bursts = 0;
362 : :
363 : : /* We don't need the stable property, and the list is likely
364 : : * large enough for qsort() to outperform dsw_stable_sort(),
365 : : * so we use qsort() here.
366 : : */
367 : 0 : qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
368 : :
369 : : /* arrange the (now-consecutive) events into bursts */
370 [ # # ]: 0 : for (i = 0; i < qfs_len; i++) {
371 [ # # ]: 0 : if (i == 0 ||
372 [ # # ]: 0 : dsw_cmp_qf(&qfs[i], ¤t_burst->queue_flow) != 0) {
373 : 0 : current_burst = &bursts[num_bursts];
374 : 0 : current_burst->queue_flow = qfs[i];
375 : 0 : current_burst->count = 0;
376 : 0 : num_bursts++;
377 : : }
378 : 0 : current_burst->count++;
379 : : }
380 : :
381 : 0 : return num_bursts;
382 : : }
383 : :
384 : : static bool
385 : 0 : dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
386 : : int16_t load_limit)
387 : : {
388 : : bool below_limit = false;
389 : : uint16_t i;
390 : :
391 [ # # ]: 0 : for (i = 0; i < dsw->num_ports; i++) {
392 : 0 : int16_t measured_load =
393 : 0 : __atomic_load_n(&dsw->ports[i].load, __ATOMIC_RELAXED);
394 : 0 : int32_t immigration_load =
395 : 0 : __atomic_load_n(&dsw->ports[i].immigration_load,
396 : : __ATOMIC_RELAXED);
397 : 0 : int32_t load = measured_load + immigration_load;
398 : :
399 : 0 : load = RTE_MIN(load, DSW_MAX_LOAD);
400 : :
401 [ # # ]: 0 : if (load < load_limit)
402 : : below_limit = true;
403 : 0 : port_loads[i] = load;
404 : : }
405 : 0 : return below_limit;
406 : : }
407 : :
408 : : static int16_t
409 : : dsw_flow_load(uint16_t num_events, int16_t port_load)
410 : : {
411 : 0 : return ((int32_t)port_load * (int32_t)num_events) /
412 : : DSW_MAX_EVENTS_RECORDED;
413 : : }
414 : :
415 : : static int16_t
416 : : dsw_evaluate_migration(int16_t source_load, int16_t target_load,
417 : : int16_t flow_load)
418 : : {
419 : : int32_t res_target_load;
420 : : int32_t imbalance;
421 : :
422 : 0 : if (target_load > DSW_MAX_TARGET_LOAD_FOR_MIGRATION)
423 : : return -1;
424 : :
425 : 0 : imbalance = source_load - target_load;
426 : :
427 [ # # ]: 0 : if (imbalance < DSW_REBALANCE_THRESHOLD)
428 : : return -1;
429 : :
430 : 0 : res_target_load = target_load + flow_load;
431 : :
432 : : /* If the estimated load of the target port will be higher
433 : : * than the source port's load, it doesn't make sense to move
434 : : * the flow.
435 : : */
436 [ # # ]: 0 : if (res_target_load > source_load)
437 : : return -1;
438 : :
439 : : /* The more idle the target will be, the better. This will
440 : : * make migration prefer moving smaller flows, and flows to
441 : : * lightly loaded ports.
442 : : */
443 : 0 : return DSW_MAX_LOAD - res_target_load;
444 : : }
445 : :
446 : : static bool
447 : : dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id)
448 : : {
449 : : struct dsw_queue *queue = &dsw->queues[queue_id];
450 : 0 : uint64_t port_mask = UINT64_C(1) << port_id;
451 : :
452 : 0 : return queue->serving_ports & port_mask;
453 : : }
454 : :
455 : : static bool
456 : 0 : dsw_select_emigration_target(struct dsw_evdev *dsw,
457 : : struct dsw_port *source_port,
458 : : struct dsw_queue_flow_burst *bursts,
459 : : uint16_t num_bursts,
460 : : int16_t *port_loads, uint16_t num_ports,
461 : : uint8_t *target_port_ids,
462 : : struct dsw_queue_flow *target_qfs,
463 : : uint8_t *targets_len)
464 : : {
465 : 0 : int16_t source_port_load = port_loads[source_port->id];
466 : : struct dsw_queue_flow *candidate_qf = NULL;
467 : : uint8_t candidate_port_id = 0;
468 : : int16_t candidate_weight = -1;
469 : : int16_t candidate_flow_load = -1;
470 : : uint16_t i;
471 : :
472 [ # # ]: 0 : if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)
473 : : return false;
474 : :
475 [ # # ]: 0 : for (i = 0; i < num_bursts; i++) {
476 : 0 : struct dsw_queue_flow_burst *burst = &bursts[i];
477 : 0 : struct dsw_queue_flow *qf = &burst->queue_flow;
478 : : int16_t flow_load;
479 : : uint16_t port_id;
480 : :
481 [ # # ]: 0 : if (dsw_is_queue_flow_in_ary(target_qfs, *targets_len,
482 : 0 : qf->queue_id, qf->flow_hash))
483 : 0 : continue;
484 : :
485 : 0 : flow_load = dsw_flow_load(burst->count, source_port_load);
486 : :
487 [ # # ]: 0 : for (port_id = 0; port_id < num_ports; port_id++) {
488 : : int16_t weight;
489 : :
490 [ # # ]: 0 : if (port_id == source_port->id)
491 : 0 : continue;
492 : :
493 [ # # ]: 0 : if (!dsw_is_serving_port(dsw, port_id, qf->queue_id))
494 : 0 : continue;
495 : :
496 : 0 : weight = dsw_evaluate_migration(source_port_load,
497 [ # # ]: 0 : port_loads[port_id],
498 : : flow_load);
499 : :
500 [ # # ]: 0 : if (weight > candidate_weight) {
501 : : candidate_qf = qf;
502 : : candidate_port_id = port_id;
503 : : candidate_weight = weight;
504 : : candidate_flow_load = flow_load;
505 : : }
506 : : }
507 : : }
508 : :
509 [ # # ]: 0 : if (candidate_weight < 0)
510 : : return false;
511 : :
512 : : DSW_LOG_DP_PORT(DEBUG, source_port->id, "Selected queue_id %d "
513 : : "flow_hash %d (with flow load %d) for migration "
514 : : "to port %d.\n", candidate_qf->queue_id,
515 : : candidate_qf->flow_hash,
516 : : DSW_LOAD_TO_PERCENT(candidate_flow_load),
517 : : candidate_port_id);
518 : :
519 : 0 : port_loads[candidate_port_id] += candidate_flow_load;
520 : 0 : port_loads[source_port->id] -= candidate_flow_load;
521 : :
522 : 0 : target_port_ids[*targets_len] = candidate_port_id;
523 : 0 : target_qfs[*targets_len] = *candidate_qf;
524 : 0 : (*targets_len)++;
525 : :
526 : 0 : __atomic_fetch_add(&dsw->ports[candidate_port_id].immigration_load,
527 : : candidate_flow_load, __ATOMIC_RELAXED);
528 : :
529 : 0 : return true;
530 : : }
531 : :
532 : : static void
533 : 0 : dsw_select_emigration_targets(struct dsw_evdev *dsw,
534 : : struct dsw_port *source_port,
535 : : struct dsw_queue_flow_burst *bursts,
536 : : uint16_t num_bursts, int16_t *port_loads)
537 : : {
538 : 0 : struct dsw_queue_flow *target_qfs = source_port->emigration_target_qfs;
539 : 0 : uint8_t *target_port_ids = source_port->emigration_target_port_ids;
540 : 0 : uint8_t *targets_len = &source_port->emigration_targets_len;
541 : : uint16_t i;
542 : :
543 [ # # ]: 0 : for (i = 0; i < DSW_MAX_FLOWS_PER_MIGRATION; i++) {
544 : : bool found;
545 : :
546 : 0 : found = dsw_select_emigration_target(dsw, source_port,
547 : : bursts, num_bursts,
548 : 0 : port_loads, dsw->num_ports,
549 : : target_port_ids,
550 : : target_qfs,
551 : : targets_len);
552 [ # # ]: 0 : if (!found)
553 : : break;
554 : : }
555 : :
556 : : if (*targets_len == 0)
557 : : DSW_LOG_DP_PORT(DEBUG, source_port->id,
558 : : "For the %d flows considered, no target port "
559 : : "was found.\n", num_bursts);
560 : 0 : }
561 : :
562 : : static uint8_t
563 : : dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
564 : : {
565 : : struct dsw_queue *queue = &dsw->queues[queue_id];
566 : : uint8_t port_id;
567 : :
568 [ # # # # ]: 0 : if (queue->num_serving_ports > 1)
569 : 0 : port_id = queue->flow_to_port_map[flow_hash];
570 : : else
571 : : /* A single-link queue, or atomic/ordered/parallel but
572 : : * with just a single serving port.
573 : : */
574 : 0 : port_id = rte_bsf64(queue->serving_ports);
575 : :
576 : : DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
577 : : "to port %d.\n", queue_id, flow_hash, port_id);
578 : :
579 : : return port_id;
580 : : }
581 : :
582 : : static void
583 : 0 : dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
584 : : uint8_t dest_port_id)
585 : : {
586 : 0 : struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
587 : : uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
588 : 0 : struct rte_event *buffer = source_port->out_buffer[dest_port_id];
589 : :
590 [ # # ]: 0 : if (*buffer_len == 0)
591 : : return;
592 : :
593 : : /* The rings are dimensioned to fit all in-flight events (even
594 : : * on a single ring), so looping will work.
595 : : */
596 [ # # # # : 0 : rte_event_ring_enqueue_bulk(dest_port->in_ring, buffer, *buffer_len,
# ]
597 : : NULL);
598 : :
599 : 0 : (*buffer_len) = 0;
600 : : }
601 : :
602 : : static uint16_t
603 : : dsw_port_get_parallel_flow_id(struct dsw_port *port)
604 : : {
605 : 0 : uint16_t flow_id = port->next_parallel_flow_id;
606 : :
607 : 0 : port->next_parallel_flow_id =
608 : 0 : (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
609 : :
610 : : return flow_id;
611 : : }
612 : :
613 : : static void
614 : : dsw_port_buffer_paused(struct dsw_port *port,
615 : : const struct rte_event *paused_event)
616 : : {
617 : 0 : port->paused_events[port->paused_events_len] = *paused_event;
618 : 0 : port->paused_events_len++;
619 : 0 : }
620 : :
621 : :
622 : : static void
623 : 0 : dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
624 : : uint8_t dest_port_id, const struct rte_event *event)
625 : : {
626 : 0 : struct rte_event *buffer = source_port->out_buffer[dest_port_id];
627 : : uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
628 : :
629 [ # # ]: 0 : if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
630 : 0 : dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
631 : :
632 : 0 : buffer[*buffer_len] = *event;
633 : :
634 : 0 : (*buffer_len)++;
635 : 0 : }
636 : :
637 : : #define DSW_FLOW_ID_BITS (24)
638 : : static uint16_t
639 : : dsw_flow_id_hash(uint32_t flow_id)
640 : : {
641 : : uint16_t hash = 0;
642 : : uint16_t offset = 0;
643 : :
644 : : do {
645 : 0 : hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
646 : 0 : offset += DSW_MAX_FLOWS_BITS;
647 [ # # # # : 0 : } while (offset < DSW_FLOW_ID_BITS);
# # # # #
# # # #
# ]
648 : :
649 : : return hash;
650 : : }
651 : :
652 : : static void
653 : 0 : dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
654 : : struct rte_event event)
655 : : {
656 : : uint8_t dest_port_id;
657 : :
658 : 0 : event.flow_id = dsw_port_get_parallel_flow_id(source_port);
659 : :
660 [ # # ]: 0 : dest_port_id = dsw_schedule(dsw, event.queue_id,
661 : 0 : dsw_flow_id_hash(event.flow_id));
662 : :
663 : 0 : dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
664 : 0 : }
665 : :
666 : : static void
667 : 0 : dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
668 : : const struct rte_event *event)
669 : : {
670 : : uint16_t flow_hash;
671 : : uint8_t dest_port_id;
672 : :
673 [ # # ]: 0 : if (unlikely(dsw->queues[event->queue_id].schedule_type ==
674 : : RTE_SCHED_TYPE_PARALLEL)) {
675 : 0 : dsw_port_buffer_parallel(dsw, source_port, *event);
676 : 0 : return;
677 : : }
678 : :
679 : 0 : flow_hash = dsw_flow_id_hash(event->flow_id);
680 : :
681 [ # # ]: 0 : if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
682 : : flow_hash))) {
683 : : dsw_port_buffer_paused(source_port, event);
684 : 0 : return;
685 : : }
686 : :
687 : : dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
688 : :
689 : 0 : dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
690 : : }
691 : :
692 : : static void
693 : 0 : dsw_port_flush_no_longer_paused_events(struct dsw_evdev *dsw,
694 : : struct dsw_port *source_port)
695 : 0 : {
696 : 0 : uint16_t paused_events_len = source_port->paused_events_len;
697 : 0 : struct rte_event paused_events[paused_events_len];
698 : : uint16_t i;
699 : :
700 [ # # ]: 0 : if (paused_events_len == 0)
701 : 0 : return;
702 : :
703 [ # # ]: 0 : rte_memcpy(paused_events, source_port->paused_events,
704 : : paused_events_len * sizeof(struct rte_event));
705 : :
706 : 0 : source_port->paused_events_len = 0;
707 : :
708 [ # # ]: 0 : for (i = 0; i < paused_events_len; i++) {
709 : 0 : struct rte_event *event = &paused_events[i];
710 : : uint16_t flow_hash;
711 : :
712 : 0 : flow_hash = dsw_flow_id_hash(event->flow_id);
713 : :
714 [ # # ]: 0 : if (dsw_port_is_flow_paused(source_port, event->queue_id,
715 : : flow_hash))
716 : : dsw_port_buffer_paused(source_port, event);
717 : : else {
718 : : uint8_t dest_port_id;
719 : :
720 : : dest_port_id = dsw_schedule(dsw, event->queue_id,
721 : : flow_hash);
722 : :
723 : 0 : dsw_port_buffer_non_paused(dsw, source_port,
724 : : dest_port_id, event);
725 : : }
726 : : }
727 : : }
728 : :
729 : : static void
730 : : dsw_port_emigration_stats(struct dsw_port *port, uint8_t finished)
731 : : {
732 : : uint64_t flow_migration_latency;
733 : :
734 : 0 : flow_migration_latency =
735 : 0 : (rte_get_timer_cycles() - port->emigration_start);
736 : 0 : port->emigration_latency += (flow_migration_latency * finished);
737 : 0 : port->emigrations += finished;
738 : 0 : }
739 : :
740 : : static void
741 : 0 : dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port,
742 : : uint8_t schedule_type)
743 : : {
744 : : uint8_t i;
745 : : struct dsw_queue_flow left_qfs[DSW_MAX_FLOWS_PER_MIGRATION];
746 : : uint8_t left_port_ids[DSW_MAX_FLOWS_PER_MIGRATION];
747 : : uint8_t left_qfs_len = 0;
748 : : uint8_t finished;
749 : :
750 [ # # ]: 0 : for (i = 0; i < port->emigration_targets_len; i++) {
751 : 0 : struct dsw_queue_flow *qf = &port->emigration_target_qfs[i];
752 : 0 : uint8_t queue_id = qf->queue_id;
753 : 0 : uint8_t queue_schedule_type =
754 : 0 : dsw->queues[queue_id].schedule_type;
755 : : uint16_t flow_hash = qf->flow_hash;
756 : :
757 [ # # ]: 0 : if (queue_schedule_type != schedule_type) {
758 : 0 : left_port_ids[left_qfs_len] =
759 : 0 : port->emigration_target_port_ids[i];
760 : 0 : left_qfs[left_qfs_len] = *qf;
761 : 0 : left_qfs_len++;
762 : 0 : continue;
763 : : }
764 : :
765 : : DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for "
766 : : "queue_id %d flow_hash %d.\n", queue_id,
767 : : flow_hash);
768 : : }
769 : :
770 : 0 : finished = port->emigration_targets_len - left_qfs_len;
771 : :
772 [ # # ]: 0 : if (finished > 0)
773 : : dsw_port_emigration_stats(port, finished);
774 : :
775 [ # # ]: 0 : for (i = 0; i < left_qfs_len; i++) {
776 : 0 : port->emigration_target_port_ids[i] = left_port_ids[i];
777 : 0 : port->emigration_target_qfs[i] = left_qfs[i];
778 : : }
779 : 0 : port->emigration_targets_len = left_qfs_len;
780 : :
781 [ # # ]: 0 : if (port->emigration_targets_len == 0) {
782 : 0 : port->migration_state = DSW_MIGRATION_STATE_IDLE;
783 : 0 : port->seen_events_len = 0;
784 : : }
785 : 0 : }
786 : :
787 : : static void
788 : 0 : dsw_port_move_parallel_flows(struct dsw_evdev *dsw,
789 : : struct dsw_port *source_port)
790 : : {
791 : : uint8_t i;
792 : :
793 [ # # ]: 0 : for (i = 0; i < source_port->emigration_targets_len; i++) {
794 : : struct dsw_queue_flow *qf =
795 : 0 : &source_port->emigration_target_qfs[i];
796 : 0 : uint8_t queue_id = qf->queue_id;
797 : :
798 [ # # ]: 0 : if (dsw->queues[queue_id].schedule_type ==
799 : : RTE_SCHED_TYPE_PARALLEL) {
800 : 0 : uint8_t dest_port_id =
801 : : source_port->emigration_target_port_ids[i];
802 : 0 : uint16_t flow_hash = qf->flow_hash;
803 : :
804 : : /* Single byte-sized stores are always atomic. */
805 : 0 : dsw->queues[queue_id].flow_to_port_map[flow_hash] =
806 : : dest_port_id;
807 : : }
808 : : }
809 : :
810 : 0 : rte_smp_wmb();
811 : :
812 : 0 : dsw_port_end_emigration(dsw, source_port, RTE_SCHED_TYPE_PARALLEL);
813 : 0 : }
814 : :
815 : : static void
816 : 0 : dsw_port_consider_emigration(struct dsw_evdev *dsw,
817 : : struct dsw_port *source_port,
818 : : uint64_t now)
819 : 0 : {
820 : : bool any_port_below_limit;
821 : 0 : struct dsw_queue_flow *seen_events = source_port->seen_events;
822 : 0 : uint16_t seen_events_len = source_port->seen_events_len;
823 : : struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
824 : : uint16_t num_bursts;
825 : : int16_t source_port_load;
826 : 0 : int16_t port_loads[dsw->num_ports];
827 : :
828 [ # # ]: 0 : if (now < source_port->next_emigration)
829 : 0 : return;
830 : :
831 [ # # ]: 0 : if (dsw->num_ports == 1)
832 : : return;
833 : :
834 : : DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering emigration.\n");
835 : :
836 [ # # ]: 0 : if (seen_events_len < DSW_MAX_EVENTS_RECORDED) {
837 : : DSW_LOG_DP_PORT(DEBUG, source_port->id, "Not enough events "
838 : : "are recorded to allow for a migration.\n");
839 : : return;
840 : : }
841 : :
842 : : /* A flow migration cannot be initiated if there are paused
843 : : * events, since some/all of those events may be have been
844 : : * produced as a result of processing the flow(s) selected for
845 : : * migration. Moving such a flow would potentially introduced
846 : : * reordering, since processing the migrated flow on the
847 : : * receiving flow may commence before the to-be-enqueued-to
848 : :
849 : : * flows are unpaused, leading to paused events on the second
850 : : * port as well, destined for the same paused flow(s). When
851 : : * those flows are unpaused, the resulting events are
852 : : * delivered the owning port in an undefined order.
853 : : */
854 [ # # ]: 0 : if (source_port->paused_events_len > 0) {
855 : : DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are "
856 : : "events in the paus buffer.\n");
857 : : return;
858 : : }
859 : :
860 : : /* Randomize interval to avoid having all threads considering
861 : : * emigration at the same in point in time, which might lead
862 : : * to all choosing the same target port.
863 : : */
864 : 0 : source_port->next_emigration = now +
865 : 0 : source_port->migration_interval / 2 +
866 : 0 : rte_rand() % source_port->migration_interval;
867 : :
868 [ # # ]: 0 : if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
869 : : DSW_LOG_DP_PORT(DEBUG, source_port->id,
870 : : "Emigration already in progress.\n");
871 : : return;
872 : : }
873 : :
874 : : /* For simplicity, avoid migration in the unlikely case there
875 : : * is still events to consume in the in_buffer (from the last
876 : : * emigration).
877 : : */
878 [ # # ]: 0 : if (source_port->in_buffer_len > 0) {
879 : : DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
880 : : "events in the input buffer.\n");
881 : : return;
882 : : }
883 : :
884 : 0 : source_port_load =
885 : 0 : __atomic_load_n(&source_port->load, __ATOMIC_RELAXED);
886 [ # # ]: 0 : if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
887 : : DSW_LOG_DP_PORT(DEBUG, source_port->id,
888 : : "Load %d is below threshold level %d.\n",
889 : : DSW_LOAD_TO_PERCENT(source_port_load),
890 : : DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
891 : : return;
892 : : }
893 : :
894 : : /* Avoid starting any expensive operations (sorting etc), in
895 : : * case of a scenario with all ports above the load limit.
896 : : */
897 : : any_port_below_limit =
898 : 0 : dsw_retrieve_port_loads(dsw, port_loads,
899 : : DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
900 [ # # ]: 0 : if (!any_port_below_limit) {
901 : : DSW_LOG_DP_PORT(DEBUG, source_port->id,
902 : : "Candidate target ports are all too highly "
903 : : "loaded.\n");
904 : : return;
905 : : }
906 : :
907 : 0 : num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
908 : : bursts);
909 : :
910 : : /* For non-big-little systems, there's no point in moving the
911 : : * only (known) flow.
912 : : */
913 [ # # ]: 0 : if (num_bursts < 2) {
914 : : DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow "
915 : : "queue_id %d flow_hash %d has been seen.\n",
916 : : bursts[0].queue_flow.queue_id,
917 : : bursts[0].queue_flow.flow_hash);
918 : : return;
919 : : }
920 : :
921 : 0 : dsw_select_emigration_targets(dsw, source_port, bursts, num_bursts,
922 : : port_loads);
923 : :
924 [ # # ]: 0 : if (source_port->emigration_targets_len == 0)
925 : : return;
926 : :
927 : 0 : source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
928 : 0 : source_port->emigration_start = rte_get_timer_cycles();
929 : :
930 : : /* No need to go through the whole pause procedure for
931 : : * parallel queues, since atomic/ordered semantics need not to
932 : : * be maintained.
933 : : */
934 : 0 : dsw_port_move_parallel_flows(dsw, source_port);
935 : :
936 : : /* All flows were on PARALLEL queues. */
937 [ # # ]: 0 : if (source_port->migration_state == DSW_MIGRATION_STATE_IDLE)
938 : : return;
939 : :
940 : : /* There might be 'loopback' events already scheduled in the
941 : : * output buffers.
942 : : */
943 : : dsw_port_flush_out_buffers(dsw, source_port);
944 : :
945 : 0 : dsw_port_add_paused_flows(source_port,
946 : 0 : source_port->emigration_target_qfs,
947 : 0 : source_port->emigration_targets_len);
948 : :
949 : 0 : dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
950 : : source_port->emigration_target_qfs,
951 : : source_port->emigration_targets_len);
952 : 0 : source_port->cfm_cnt = 0;
953 : : }
954 : :
955 : : static void
956 : : dsw_port_flush_no_longer_paused_events(struct dsw_evdev *dsw,
957 : : struct dsw_port *source_port);
958 : :
959 : : static void
960 : 0 : dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
961 : : uint8_t originating_port_id,
962 : : struct dsw_queue_flow *paused_qfs,
963 : : uint8_t qfs_len)
964 : : {
965 : : uint16_t i;
966 : 0 : struct dsw_ctl_msg cfm = {
967 : : .type = DSW_CTL_CFM,
968 : 0 : .originating_port_id = port->id
969 : : };
970 : :
971 : : dsw_port_remove_paused_flows(port, paused_qfs, qfs_len);
972 : :
973 : 0 : rte_smp_rmb();
974 : :
975 : 0 : dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
976 : :
977 [ # # ]: 0 : for (i = 0; i < qfs_len; i++) {
978 : 0 : struct dsw_queue_flow *qf = &paused_qfs[i];
979 : :
980 [ # # # # ]: 0 : if (dsw_schedule(dsw, qf->queue_id, qf->flow_hash) == port->id)
981 : 0 : port->immigrations++;
982 : : }
983 : :
984 : 0 : dsw_port_flush_no_longer_paused_events(dsw, port);
985 : 0 : }
986 : :
987 : : static void
988 : : dsw_port_buffer_in_buffer(struct dsw_port *port,
989 : : const struct rte_event *event)
990 : :
991 : : {
992 : : RTE_ASSERT(port->in_buffer_start == 0);
993 : :
994 : 0 : port->in_buffer[port->in_buffer_len] = *event;
995 : 0 : port->in_buffer_len++;
996 : 0 : }
997 : :
998 : : static void
999 : 0 : dsw_port_forward_emigrated_event(struct dsw_evdev *dsw,
1000 : : struct dsw_port *source_port,
1001 : : struct rte_event *event)
1002 : : {
1003 : : uint16_t i;
1004 : :
1005 [ # # ]: 0 : for (i = 0; i < source_port->emigration_targets_len; i++) {
1006 : : struct dsw_queue_flow *qf =
1007 : 0 : &source_port->emigration_target_qfs[i];
1008 : 0 : uint8_t dest_port_id =
1009 : : source_port->emigration_target_port_ids[i];
1010 : 0 : struct dsw_port *dest_port = &dsw->ports[dest_port_id];
1011 : :
1012 [ # # ]: 0 : if (event->queue_id == qf->queue_id &&
1013 [ # # ]: 0 : dsw_flow_id_hash(event->flow_id) == qf->flow_hash) {
1014 : : /* No need to care about bursting forwarded
1015 : : * events (to the destination port's in_ring),
1016 : : * since migration doesn't happen very often,
1017 : : * and also the majority of the dequeued
1018 : : * events will likely *not* be forwarded.
1019 : : */
1020 [ # # # # : 0 : while (rte_event_ring_enqueue_burst(dest_port->in_ring,
# ]
1021 : : event, 1,
1022 [ # # ]: 0 : NULL) != 1)
1023 : : rte_pause();
1024 : : return;
1025 : : }
1026 : : }
1027 : :
1028 : : /* Event did not belong to the emigrated flows */
1029 : : dsw_port_buffer_in_buffer(source_port, event);
1030 : : }
1031 : :
1032 : : static void
1033 : : dsw_port_stash_migrating_event(struct dsw_port *port,
1034 : : const struct rte_event *event)
1035 : : {
1036 : 0 : port->emigrating_events[port->emigrating_events_len] = *event;
1037 : 0 : port->emigrating_events_len++;
1038 : 0 : }
1039 : :
1040 : : #define DRAIN_DEQUEUE_BURST_SIZE (32)
1041 : :
1042 : : static void
1043 : 0 : dsw_port_drain_in_ring(struct dsw_port *source_port)
1044 : : {
1045 : : uint16_t num_events;
1046 : : uint16_t dequeued;
1047 : :
1048 : : /* Control ring message should been seen before the ring count
1049 : : * is read on the port's in_ring.
1050 : : */
1051 : 0 : rte_smp_rmb();
1052 : :
1053 : 0 : num_events = rte_event_ring_count(source_port->in_ring);
1054 : :
1055 [ # # ]: 0 : for (dequeued = 0; dequeued < num_events; ) {
1056 : 0 : uint16_t burst_size = RTE_MIN(DRAIN_DEQUEUE_BURST_SIZE,
1057 : : num_events - dequeued);
1058 : 0 : struct rte_event events[burst_size];
1059 : : uint16_t len;
1060 : : uint16_t i;
1061 : :
1062 [ # # # # : 0 : len = rte_event_ring_dequeue_burst(source_port->in_ring,
# ]
1063 : : events, burst_size,
1064 : : NULL);
1065 : :
1066 [ # # ]: 0 : for (i = 0; i < len; i++) {
1067 : 0 : struct rte_event *event = &events[i];
1068 : : uint16_t flow_hash;
1069 : :
1070 : 0 : flow_hash = dsw_flow_id_hash(event->flow_id);
1071 : :
1072 [ # # ]: 0 : if (unlikely(dsw_port_is_flow_migrating(source_port,
1073 : : event->queue_id,
1074 : : flow_hash)))
1075 : : dsw_port_stash_migrating_event(source_port,
1076 : : event);
1077 : : else
1078 : : dsw_port_buffer_in_buffer(source_port, event);
1079 : : }
1080 : :
1081 : 0 : dequeued += len;
1082 : : }
1083 : 0 : }
1084 : :
1085 : : static void
1086 : : dsw_port_forward_emigrated_flows(struct dsw_evdev *dsw,
1087 : : struct dsw_port *source_port)
1088 : : {
1089 : : uint16_t i;
1090 : :
1091 [ # # ]: 0 : for (i = 0; i < source_port->emigrating_events_len; i++) {
1092 : 0 : struct rte_event *event = &source_port->emigrating_events[i];
1093 : :
1094 : 0 : dsw_port_forward_emigrated_event(dsw, source_port, event);
1095 : : }
1096 : 0 : source_port->emigrating_events_len = 0;
1097 : : }
1098 : :
1099 : : static void
1100 : 0 : dsw_port_move_emigrating_flows(struct dsw_evdev *dsw,
1101 : : struct dsw_port *source_port)
1102 : : {
1103 : : uint8_t i;
1104 : :
1105 : : dsw_port_flush_out_buffers(dsw, source_port);
1106 : :
1107 [ # # ]: 0 : for (i = 0; i < source_port->emigration_targets_len; i++) {
1108 : : struct dsw_queue_flow *qf =
1109 : 0 : &source_port->emigration_target_qfs[i];
1110 : 0 : uint8_t dest_port_id =
1111 : : source_port->emigration_target_port_ids[i];
1112 : :
1113 : 0 : dsw->queues[qf->queue_id].flow_to_port_map[qf->flow_hash] =
1114 : : dest_port_id;
1115 : : }
1116 : :
1117 : 0 : rte_smp_wmb();
1118 : :
1119 : 0 : dsw_port_drain_in_ring(source_port);
1120 : : dsw_port_forward_emigrated_flows(dsw, source_port);
1121 : :
1122 : : dsw_port_remove_paused_flows(source_port,
1123 : 0 : source_port->emigration_target_qfs,
1124 : 0 : source_port->emigration_targets_len);
1125 : :
1126 : 0 : dsw_port_flush_no_longer_paused_events(dsw, source_port);
1127 : :
1128 : : /* Flow table update and migration destination port's enqueues
1129 : : * must be seen before the control message.
1130 : : */
1131 : 0 : rte_smp_wmb();
1132 : :
1133 : 0 : dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ,
1134 : : source_port->emigration_target_qfs,
1135 : 0 : source_port->emigration_targets_len);
1136 : 0 : source_port->cfm_cnt = 0;
1137 : 0 : source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
1138 : 0 : }
1139 : :
1140 : : static void
1141 : 0 : dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
1142 : : {
1143 : 0 : port->cfm_cnt++;
1144 : :
1145 [ # # ]: 0 : if (port->cfm_cnt == (dsw->num_ports-1)) {
1146 [ # # # ]: 0 : switch (port->migration_state) {
1147 : 0 : case DSW_MIGRATION_STATE_PAUSING:
1148 : 0 : dsw_port_move_emigrating_flows(dsw, port);
1149 : 0 : break;
1150 : 0 : case DSW_MIGRATION_STATE_UNPAUSING:
1151 : 0 : dsw_port_end_emigration(dsw, port,
1152 : : RTE_SCHED_TYPE_ATOMIC);
1153 : 0 : break;
1154 : : default:
1155 : : RTE_ASSERT(0);
1156 : : break;
1157 : : }
1158 : : }
1159 : 0 : }
1160 : :
1161 : : static void
1162 : 0 : dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
1163 : : {
1164 : : struct dsw_ctl_msg msg;
1165 : :
1166 [ # # ]: 0 : if (dsw_port_ctl_dequeue(port, &msg) == 0) {
1167 [ # # # # ]: 0 : switch (msg.type) {
1168 : 0 : case DSW_CTL_PAUS_REQ:
1169 : 0 : dsw_port_handle_pause_flows(dsw, port,
1170 : 0 : msg.originating_port_id,
1171 : 0 : msg.qfs, msg.qfs_len);
1172 : 0 : break;
1173 : 0 : case DSW_CTL_UNPAUS_REQ:
1174 : 0 : dsw_port_handle_unpause_flows(dsw, port,
1175 : 0 : msg.originating_port_id,
1176 : 0 : msg.qfs, msg.qfs_len);
1177 : 0 : break;
1178 : 0 : case DSW_CTL_CFM:
1179 : 0 : dsw_port_handle_confirm(dsw, port);
1180 : 0 : break;
1181 : : }
1182 : : }
1183 : 0 : }
1184 : :
1185 : : static void
1186 : : dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
1187 : : {
1188 : 0 : port->ops_since_bg_task += (num_events+1);
1189 : 0 : }
1190 : :
1191 : : static void
1192 : 0 : dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
1193 : : {
1194 : : /* For simplicity (in the migration logic), avoid all
1195 : : * background processing in case event processing is in
1196 : : * progress.
1197 : : */
1198 [ # # ]: 0 : if (port->pending_releases > 0)
1199 : : return;
1200 : :
1201 : : /* Polling the control ring is relatively inexpensive, and
1202 : : * polling it often helps bringing down migration latency, so
1203 : : * do this for every iteration.
1204 : : */
1205 : 0 : dsw_port_ctl_process(dsw, port);
1206 : :
1207 : : /* To avoid considering migration and flushing output buffers
1208 : : * on every dequeue/enqueue call, the scheduler only performs
1209 : : * such 'background' tasks every nth
1210 : : * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
1211 : : */
1212 [ # # ]: 0 : if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
1213 : : uint64_t now;
1214 : :
1215 : : now = rte_get_timer_cycles();
1216 : :
1217 : 0 : port->last_bg = now;
1218 : :
1219 : : /* Logic to avoid having events linger in the output
1220 : : * buffer too long.
1221 : : */
1222 : : dsw_port_flush_out_buffers(dsw, port);
1223 : :
1224 : : dsw_port_consider_load_update(port, now);
1225 : :
1226 : 0 : dsw_port_consider_emigration(dsw, port, now);
1227 : :
1228 : 0 : port->ops_since_bg_task = 0;
1229 : : }
1230 : : }
1231 : :
1232 : : static void
1233 : : dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
1234 : : {
1235 : : uint16_t dest_port_id;
1236 : :
1237 [ # # # # : 0 : for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
# # # # #
# # # # #
# # # # ]
1238 : 0 : dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
1239 : : }
1240 : :
1241 : : uint16_t
1242 : 0 : dsw_event_enqueue(void *port, const struct rte_event *ev)
1243 : : {
1244 : 0 : return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
1245 : : }
1246 : :
1247 : : static __rte_always_inline uint16_t
1248 : : dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
1249 : : const struct rte_event events[],
1250 : : uint16_t events_len, bool op_types_known,
1251 : : uint16_t num_new, uint16_t num_release,
1252 : : uint16_t num_non_release)
1253 : : {
1254 : 0 : struct dsw_evdev *dsw = source_port->dsw;
1255 : : bool enough_credits;
1256 : : uint16_t i;
1257 : :
1258 : : DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
1259 : : "events.\n", events_len);
1260 : :
1261 : 0 : dsw_port_bg_process(dsw, source_port);
1262 : :
1263 : : /* XXX: For performance (=ring efficiency) reasons, the
1264 : : * scheduler relies on internal non-ring buffers instead of
1265 : : * immediately sending the event to the destination ring. For
1266 : : * a producer that doesn't intend to produce or consume any
1267 : : * more events, the scheduler provides a way to flush the
1268 : : * buffer, by means of doing an enqueue of zero events. In
1269 : : * addition, a port cannot be left "unattended" (e.g. unused)
1270 : : * for long periods of time, since that would stall
1271 : : * migration. Eventdev API extensions to provide a cleaner way
1272 : : * to archive both of these functions should be
1273 : : * considered.
1274 : : */
1275 [ # # # # : 0 : if (unlikely(events_len == 0)) {
# # ]
1276 : : dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
1277 : : dsw_port_flush_out_buffers(dsw, source_port);
1278 : : return 0;
1279 : : }
1280 : :
1281 : : dsw_port_note_op(source_port, events_len);
1282 : :
1283 : : if (!op_types_known)
1284 [ # # ]: 0 : for (i = 0; i < events_len; i++) {
1285 [ # # # ]: 0 : switch (events[i].op) {
1286 : 0 : case RTE_EVENT_OP_RELEASE:
1287 : 0 : num_release++;
1288 : 0 : break;
1289 : 0 : case RTE_EVENT_OP_NEW:
1290 : 0 : num_new++;
1291 : : /* Falls through. */
1292 : 0 : default:
1293 : 0 : num_non_release++;
1294 : 0 : break;
1295 : : }
1296 : : }
1297 : :
1298 : : /* Technically, we could allow the non-new events up to the
1299 : : * first new event in the array into the system, but for
1300 : : * simplicity reasons, we deny the whole burst if the port is
1301 : : * above the water mark.
1302 : : */
1303 [ # # # # : 0 : if (unlikely(num_new > 0 &&
# # # # ]
1304 : : __atomic_load_n(&dsw->credits_on_loan, __ATOMIC_RELAXED) >
1305 : : source_port->new_event_threshold))
1306 : : return 0;
1307 : :
1308 : 0 : enough_credits = dsw_port_acquire_credits(dsw, source_port,
1309 : : num_non_release);
1310 [ # # # # : 0 : if (unlikely(!enough_credits))
# # ]
1311 : : return 0;
1312 : :
1313 : 0 : source_port->pending_releases -= num_release;
1314 : :
1315 : : dsw_port_enqueue_stats(source_port, num_new,
1316 : 0 : num_non_release-num_new, num_release);
1317 : :
1318 [ # # # # : 0 : for (i = 0; i < events_len; i++) {
# # ]
1319 : 0 : const struct rte_event *event = &events[i];
1320 : :
1321 [ # # # # ]: 0 : if (likely(num_release == 0 ||
1322 : : event->op != RTE_EVENT_OP_RELEASE))
1323 : 0 : dsw_port_buffer_event(dsw, source_port, event);
1324 : 0 : dsw_port_queue_enqueue_stats(source_port, event->queue_id);
1325 : : }
1326 : :
1327 : : DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
1328 : : "accepted.\n", num_non_release);
1329 : :
1330 : 0 : return (num_non_release + num_release);
1331 : : }
1332 : :
1333 : : uint16_t
1334 : 0 : dsw_event_enqueue_burst(void *port, const struct rte_event events[],
1335 : : uint16_t events_len)
1336 : : {
1337 : : struct dsw_port *source_port = port;
1338 : :
1339 [ # # ]: 0 : if (unlikely(events_len > source_port->enqueue_depth))
1340 : : events_len = source_port->enqueue_depth;
1341 : :
1342 : 0 : return dsw_event_enqueue_burst_generic(source_port, events,
1343 : : events_len, false, 0, 0, 0);
1344 : : }
1345 : :
1346 : : uint16_t
1347 : 0 : dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
1348 : : uint16_t events_len)
1349 : : {
1350 : : struct dsw_port *source_port = port;
1351 : :
1352 [ # # ]: 0 : if (unlikely(events_len > source_port->enqueue_depth))
1353 : : events_len = source_port->enqueue_depth;
1354 : :
1355 : 0 : return dsw_event_enqueue_burst_generic(source_port, events,
1356 : : events_len, true, events_len,
1357 : : 0, events_len);
1358 : : }
1359 : :
1360 : : uint16_t
1361 : 0 : dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
1362 : : uint16_t events_len)
1363 : : {
1364 : : struct dsw_port *source_port = port;
1365 : :
1366 [ # # ]: 0 : if (unlikely(events_len > source_port->enqueue_depth))
1367 : : events_len = source_port->enqueue_depth;
1368 : :
1369 : 0 : return dsw_event_enqueue_burst_generic(source_port, events,
1370 : : events_len, true, 0, 0,
1371 : : events_len);
1372 : : }
1373 : :
1374 : : uint16_t
1375 : 0 : dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
1376 : : {
1377 : 0 : return dsw_event_dequeue_burst(port, events, 1, wait);
1378 : : }
1379 : :
1380 : : static void
1381 : 0 : dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
1382 : : uint16_t num)
1383 : : {
1384 : : uint16_t i;
1385 : :
1386 : 0 : dsw_port_dequeue_stats(port, num);
1387 : :
1388 [ # # ]: 0 : for (i = 0; i < num; i++) {
1389 : 0 : uint16_t l_idx = port->seen_events_idx;
1390 : 0 : struct dsw_queue_flow *qf = &port->seen_events[l_idx];
1391 : 0 : struct rte_event *event = &events[i];
1392 : 0 : qf->queue_id = event->queue_id;
1393 : 0 : qf->flow_hash = dsw_flow_id_hash(event->flow_id);
1394 : :
1395 : 0 : port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
1396 : :
1397 : 0 : dsw_port_queue_dequeued_stats(port, event->queue_id);
1398 : : }
1399 : :
1400 [ # # ]: 0 : if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
1401 : 0 : port->seen_events_len =
1402 : 0 : RTE_MIN(port->seen_events_len + num,
1403 : : DSW_MAX_EVENTS_RECORDED);
1404 : 0 : }
1405 : :
1406 : : #ifdef DSW_SORT_DEQUEUED
1407 : :
1408 : : #define DSW_EVENT_TO_INT(_event) \
1409 : : ((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
1410 : :
1411 : : static inline int
1412 : : dsw_cmp_event(const void *v_event_a, const void *v_event_b)
1413 : : {
1414 : : const struct rte_event *event_a = v_event_a;
1415 : : const struct rte_event *event_b = v_event_b;
1416 : :
1417 : : return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
1418 : : }
1419 : : #endif
1420 : :
1421 : : static uint16_t
1422 : 0 : dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
1423 : : uint16_t num)
1424 : : {
1425 [ # # ]: 0 : if (unlikely(port->in_buffer_len > 0)) {
1426 : 0 : uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
1427 : :
1428 [ # # ]: 0 : rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
1429 : : dequeued * sizeof(struct rte_event));
1430 : :
1431 : 0 : port->in_buffer_start += dequeued;
1432 : 0 : port->in_buffer_len -= dequeued;
1433 : :
1434 [ # # ]: 0 : if (port->in_buffer_len == 0)
1435 : 0 : port->in_buffer_start = 0;
1436 : :
1437 : 0 : return dequeued;
1438 : : }
1439 : :
1440 [ # # # # : 0 : return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
# ]
1441 : : }
1442 : :
1443 : : static void
1444 : 0 : dsw_port_stash_migrating_events(struct dsw_port *port,
1445 : : struct rte_event *events, uint16_t *num)
1446 : : {
1447 : : uint16_t i;
1448 : :
1449 : : /* The assumption here - performance-wise - is that events
1450 : : * belonging to migrating flows are relatively rare.
1451 : : */
1452 [ # # ]: 0 : for (i = 0; i < (*num); ) {
1453 : 0 : struct rte_event *event = &events[i];
1454 : : uint16_t flow_hash;
1455 : :
1456 : 0 : flow_hash = dsw_flow_id_hash(event->flow_id);
1457 : :
1458 [ # # ]: 0 : if (unlikely(dsw_port_is_flow_migrating(port, event->queue_id,
1459 : : flow_hash))) {
1460 : : uint16_t left;
1461 : :
1462 : : dsw_port_stash_migrating_event(port, event);
1463 : :
1464 : 0 : (*num)--;
1465 : 0 : left = *num - i;
1466 : :
1467 [ # # ]: 0 : if (left > 0)
1468 : 0 : memmove(event, event + 1,
1469 : : left * sizeof(struct rte_event));
1470 : : } else
1471 : 0 : i++;
1472 : : }
1473 : 0 : }
1474 : :
1475 : : uint16_t
1476 : 0 : dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
1477 : : uint64_t wait __rte_unused)
1478 : : {
1479 : : struct dsw_port *source_port = port;
1480 : 0 : struct dsw_evdev *dsw = source_port->dsw;
1481 : : uint16_t dequeued;
1482 : :
1483 : 0 : source_port->pending_releases = 0;
1484 : :
1485 : 0 : dsw_port_bg_process(dsw, source_port);
1486 : :
1487 [ # # ]: 0 : if (unlikely(num > source_port->dequeue_depth))
1488 : : num = source_port->dequeue_depth;
1489 : :
1490 : 0 : dequeued = dsw_port_dequeue_burst(source_port, events, num);
1491 : :
1492 [ # # ]: 0 : if (unlikely(source_port->migration_state ==
1493 : : DSW_MIGRATION_STATE_PAUSING))
1494 : 0 : dsw_port_stash_migrating_events(source_port, events,
1495 : : &dequeued);
1496 : :
1497 : 0 : source_port->pending_releases = dequeued;
1498 : :
1499 [ # # ]: 0 : dsw_port_load_record(source_port, dequeued);
1500 : :
1501 : 0 : dsw_port_note_op(source_port, dequeued);
1502 : :
1503 [ # # ]: 0 : if (dequeued > 0) {
1504 : : DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
1505 : : dequeued);
1506 : :
1507 : : dsw_port_return_credits(dsw, source_port, dequeued);
1508 : :
1509 : : /* One potential optimization one might think of is to
1510 : : * add a migration state (prior to 'pausing'), and
1511 : : * only record seen events when the port is in this
1512 : : * state (and transit to 'pausing' when enough events
1513 : : * have been gathered). However, that schema doesn't
1514 : : * seem to improve performance.
1515 : : */
1516 : 0 : dsw_port_record_seen_events(port, events, dequeued);
1517 : : } else /* Zero-size dequeue means a likely idle port, and thus
1518 : : * we can afford trading some efficiency for a slightly
1519 : : * reduced event wall-time latency.
1520 : : */
1521 : : dsw_port_flush_out_buffers(dsw, port);
1522 : :
1523 : : #ifdef DSW_SORT_DEQUEUED
1524 : : dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);
1525 : : #endif
1526 : :
1527 : 0 : return dequeued;
1528 : : }
1529 : :
1530 : 0 : void dsw_event_maintain(void *port, int op)
1531 : : {
1532 : : struct dsw_port *source_port = port;
1533 : 0 : struct dsw_evdev *dsw = source_port->dsw;
1534 : :
1535 : : dsw_port_note_op(source_port, 0);
1536 : 0 : dsw_port_bg_process(dsw, source_port);
1537 : :
1538 [ # # ]: 0 : if (op & RTE_EVENT_DEV_MAINT_OP_FLUSH)
1539 : : dsw_port_flush_out_buffers(dsw, source_port);
1540 : 0 : }
|