LCOV - code coverage report
Current view: top level - drivers/event/dsw - dsw_event.c (source / functions) Hit Total Coverage
Test: Code coverage Lines: 0 480 0.0 %
Date: 2024-02-14 00:53:57 Functions: 0 37 0.0 %
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: 0 334 0.0 %

           Branch data     Line data    Source code
       1                 :            : /* SPDX-License-Identifier: BSD-3-Clause
       2                 :            :  * Copyright(c) 2018 Ericsson AB
       3                 :            :  */
       4                 :            : 
       5                 :            : #include "dsw_evdev.h"
       6                 :            : 
       7                 :            : #ifdef DSW_SORT_DEQUEUED
       8                 :            : #include "dsw_sort.h"
       9                 :            : #endif
      10                 :            : 
      11                 :            : #include <stdbool.h>
      12                 :            : #include <stdlib.h>
      13                 :            : #include <string.h>
      14                 :            : 
      15                 :            : #include <rte_cycles.h>
      16                 :            : #include <rte_memcpy.h>
      17                 :            : #include <rte_random.h>
      18                 :            : 
      19                 :            : static bool
      20                 :          0 : dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
      21                 :            :                          int32_t credits)
      22                 :            : {
      23                 :          0 :         int32_t inflight_credits = port->inflight_credits;
      24                 :          0 :         int32_t missing_credits = credits - inflight_credits;
      25                 :            :         int32_t total_on_loan;
      26                 :            :         int32_t available;
      27                 :            :         int32_t acquired_credits;
      28                 :            :         int32_t new_total_on_loan;
      29                 :            : 
      30         [ #  # ]:          0 :         if (likely(missing_credits <= 0)) {
      31                 :          0 :                 port->inflight_credits -= credits;
      32                 :          0 :                 return true;
      33                 :            :         }
      34                 :            : 
      35                 :          0 :         total_on_loan =
      36                 :          0 :                 __atomic_load_n(&dsw->credits_on_loan, __ATOMIC_RELAXED);
      37                 :          0 :         available = dsw->max_inflight - total_on_loan;
      38                 :          0 :         acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
      39                 :            : 
      40         [ #  # ]:          0 :         if (available < acquired_credits)
      41                 :            :                 return false;
      42                 :            : 
      43                 :            :         /* This is a race, no locks are involved, and thus some other
      44                 :            :          * thread can allocate tokens in between the check and the
      45                 :            :          * allocation.
      46                 :            :          */
      47                 :          0 :         new_total_on_loan =
      48                 :          0 :             __atomic_fetch_add(&dsw->credits_on_loan, acquired_credits,
      49                 :            :                                __ATOMIC_RELAXED) + acquired_credits;
      50                 :            : 
      51         [ #  # ]:          0 :         if (unlikely(new_total_on_loan > dsw->max_inflight)) {
      52                 :            :                 /* Some other port took the last credits */
      53                 :          0 :                 __atomic_fetch_sub(&dsw->credits_on_loan, acquired_credits,
      54                 :            :                                    __ATOMIC_RELAXED);
      55                 :          0 :                 return false;
      56                 :            :         }
      57                 :            : 
      58                 :            :         DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
      59                 :            :                         acquired_credits);
      60                 :            : 
      61                 :          0 :         port->inflight_credits += acquired_credits;
      62                 :          0 :         port->inflight_credits -= credits;
      63                 :            : 
      64                 :          0 :         return true;
      65                 :            : }
      66                 :            : 
      67                 :            : static void
      68                 :            : dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
      69                 :            :                         int32_t credits)
      70                 :            : {
      71                 :          0 :         port->inflight_credits += credits;
      72                 :            : 
      73         [ #  # ]:          0 :         if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
      74                 :            :                 int32_t leave_credits = DSW_PORT_MIN_CREDITS;
      75                 :          0 :                 int32_t return_credits =
      76                 :            :                         port->inflight_credits - leave_credits;
      77                 :            : 
      78                 :          0 :                 port->inflight_credits = leave_credits;
      79                 :            : 
      80                 :          0 :                 __atomic_fetch_sub(&dsw->credits_on_loan, return_credits,
      81                 :            :                                    __ATOMIC_RELAXED);
      82                 :            : 
      83                 :            :                 DSW_LOG_DP_PORT(DEBUG, port->id,
      84                 :            :                                 "Returned %d tokens to pool.\n",
      85                 :            :                                 return_credits);
      86                 :            :         }
      87                 :            : }
      88                 :            : 
      89                 :            : static void
      90                 :            : dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,
      91                 :            :                        uint16_t num_forward, uint16_t num_release)
      92                 :            : {
      93                 :          0 :         port->new_enqueued += num_new;
      94                 :          0 :         port->forward_enqueued += num_forward;
      95                 :          0 :         port->release_enqueued += num_release;
      96                 :          0 : }
      97                 :            : 
      98                 :            : static void
      99                 :            : dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)
     100                 :            : {
     101                 :          0 :         source_port->queue_enqueued[queue_id]++;
     102                 :            : }
     103                 :            : 
     104                 :            : static void
     105                 :            : dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)
     106                 :            : {
     107                 :          0 :         port->dequeued += num;
     108                 :            : }
     109                 :            : 
     110                 :            : static void
     111                 :            : dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)
     112                 :            : {
     113                 :          0 :         source_port->queue_dequeued[queue_id]++;
     114                 :            : }
     115                 :            : 
     116                 :            : static void
     117                 :            : dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
     118                 :            : {
     119         [ #  # ]:          0 :         if (dequeued > 0 && port->busy_start == 0)
     120                 :            :                 /* work period begins */
     121                 :          0 :                 port->busy_start = rte_get_timer_cycles();
     122   [ #  #  #  # ]:          0 :         else if (dequeued == 0 && port->busy_start > 0) {
     123                 :            :                 /* work period ends */
     124                 :          0 :                 uint64_t work_period =
     125                 :          0 :                         rte_get_timer_cycles() - port->busy_start;
     126                 :          0 :                 port->busy_cycles += work_period;
     127                 :          0 :                 port->busy_start = 0;
     128                 :            :         }
     129                 :            : }
     130                 :            : 
     131                 :            : static int16_t
     132                 :            : dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
     133                 :            : {
     134                 :          0 :         uint64_t passed = now - port->measurement_start;
     135                 :          0 :         uint64_t busy_cycles = port->busy_cycles;
     136                 :            : 
     137                 :          0 :         if (port->busy_start > 0) {
     138                 :          0 :                 busy_cycles += (now - port->busy_start);
     139                 :          0 :                 port->busy_start = now;
     140                 :            :         }
     141                 :            : 
     142                 :          0 :         int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
     143                 :            : 
     144                 :          0 :         port->measurement_start = now;
     145                 :          0 :         port->busy_cycles = 0;
     146                 :            : 
     147                 :          0 :         port->total_busy_cycles += busy_cycles;
     148                 :            : 
     149                 :            :         return load;
     150                 :            : }
     151                 :            : 
     152                 :            : static void
     153                 :          0 : dsw_port_load_update(struct dsw_port *port, uint64_t now)
     154                 :            : {
     155                 :            :         int16_t old_load;
     156                 :            :         int16_t period_load;
     157                 :            :         int16_t new_load;
     158                 :            : 
     159         [ #  # ]:          0 :         old_load = __atomic_load_n(&port->load, __ATOMIC_RELAXED);
     160                 :            : 
     161                 :            :         period_load = dsw_port_load_close_period(port, now);
     162                 :            : 
     163                 :          0 :         new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
     164                 :            :                 (DSW_OLD_LOAD_WEIGHT+1);
     165                 :            : 
     166                 :          0 :         __atomic_store_n(&port->load, new_load, __ATOMIC_RELAXED);
     167                 :            : 
     168                 :            :         /* The load of the recently immigrated flows should hopefully
     169                 :            :          * be reflected the load estimate by now.
     170                 :            :          */
     171                 :          0 :         __atomic_store_n(&port->immigration_load, 0, __ATOMIC_RELAXED);
     172                 :          0 : }
     173                 :            : 
     174                 :            : static void
     175                 :            : dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
     176                 :            : {
     177         [ #  # ]:          0 :         if (now < port->next_load_update)
     178                 :            :                 return;
     179                 :            : 
     180                 :          0 :         port->next_load_update = now + port->load_update_interval;
     181                 :            : 
     182                 :          0 :         dsw_port_load_update(port, now);
     183                 :            : }
     184                 :            : 
     185                 :            : static void
     186                 :          0 : dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
     187                 :            : {
     188                 :            :         /* there's always room on the ring */
     189   [ #  #  #  #  :          0 :         while (rte_ring_enqueue_elem(port->ctl_in_ring, msg, sizeof(*msg)) != 0)
                      # ]
     190                 :            :                 rte_pause();
     191                 :          0 : }
     192                 :            : 
     193                 :            : static int
     194                 :          0 : dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
     195                 :            : {
     196   [ #  #  #  #  :          0 :         return rte_ring_dequeue_elem(port->ctl_in_ring, msg, sizeof(*msg));
                      # ]
     197                 :            : }
     198                 :            : 
     199                 :            : static void
     200                 :          0 : dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
     201                 :            :                        uint8_t type, struct dsw_queue_flow *qfs,
     202                 :            :                        uint8_t qfs_len)
     203                 :            : {
     204                 :            :         uint16_t port_id;
     205                 :          0 :         struct dsw_ctl_msg msg = {
     206                 :            :                 .type = type,
     207                 :          0 :                 .originating_port_id = source_port->id,
     208                 :            :                 .qfs_len = qfs_len
     209                 :            :         };
     210                 :            : 
     211                 :          0 :         memcpy(msg.qfs, qfs, sizeof(struct dsw_queue_flow) * qfs_len);
     212                 :            : 
     213         [ #  # ]:          0 :         for (port_id = 0; port_id < dsw->num_ports; port_id++)
     214         [ #  # ]:          0 :                 if (port_id != source_port->id)
     215                 :          0 :                         dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
     216                 :          0 : }
     217                 :            : 
     218                 :            : static __rte_always_inline bool
     219                 :            : dsw_is_queue_flow_in_ary(const struct dsw_queue_flow *qfs, uint16_t qfs_len,
     220                 :            :                          uint8_t queue_id, uint16_t flow_hash)
     221                 :            : {
     222                 :            :         uint16_t i;
     223                 :            : 
     224   [ #  #  #  #  :          0 :         for (i = 0; i < qfs_len; i++)
          #  #  #  #  #  
                      # ]
     225   [ #  #  #  #  :          0 :                 if (qfs[i].queue_id == queue_id &&
          #  #  #  #  #  
                      # ]
     226   [ #  #  #  #  :          0 :                     qfs[i].flow_hash == flow_hash)
          #  #  #  #  #  
                      # ]
     227                 :            :                         return true;
     228                 :            : 
     229                 :            :         return false;
     230                 :            : }
     231                 :            : 
     232                 :            : static __rte_always_inline bool
     233                 :            : dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
     234                 :            :                         uint16_t flow_hash)
     235                 :            : {
     236                 :          0 :         return dsw_is_queue_flow_in_ary(port->paused_flows,
     237                 :          0 :                                         port->paused_flows_len,
     238                 :            :                                         queue_id, flow_hash);
     239                 :            : }
     240                 :            : 
     241                 :            : static __rte_always_inline bool
     242                 :            : dsw_port_is_flow_migrating(struct dsw_port *port, uint8_t queue_id,
     243                 :            :                            uint16_t flow_hash)
     244                 :            : {
     245                 :          0 :         return dsw_is_queue_flow_in_ary(port->emigration_target_qfs,
     246                 :          0 :                                         port->emigration_targets_len,
     247                 :            :                                         queue_id, flow_hash);
     248                 :            : }
     249                 :            : 
     250                 :            : static void
     251                 :            : dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs,
     252                 :            :                           uint8_t qfs_len)
     253                 :            : {
     254                 :            :         uint8_t i;
     255                 :            : 
     256   [ #  #  #  # ]:          0 :         for (i = 0; i < qfs_len; i++) {
     257                 :          0 :                 struct dsw_queue_flow *qf = &qfs[i];
     258                 :            : 
     259                 :            :                 DSW_LOG_DP_PORT(DEBUG, port->id,
     260                 :            :                                 "Pausing queue_id %d flow_hash %d.\n",
     261                 :            :                                 qf->queue_id, qf->flow_hash);
     262                 :            : 
     263                 :          0 :                 port->paused_flows[port->paused_flows_len] = *qf;
     264                 :          0 :                 port->paused_flows_len++;
     265                 :            :         };
     266                 :            : }
     267                 :            : 
     268                 :            : static void
     269                 :          0 : dsw_port_remove_paused_flow(struct dsw_port *port,
     270                 :            :                             struct dsw_queue_flow *target_qf)
     271                 :            : {
     272                 :            :         uint16_t i;
     273                 :            : 
     274         [ #  # ]:          0 :         for (i = 0; i < port->paused_flows_len; i++) {
     275                 :          0 :                 struct dsw_queue_flow *qf = &port->paused_flows[i];
     276                 :            : 
     277         [ #  # ]:          0 :                 if (qf->queue_id == target_qf->queue_id &&
     278         [ #  # ]:          0 :                     qf->flow_hash == target_qf->flow_hash) {
     279                 :          0 :                         uint16_t last_idx = port->paused_flows_len-1;
     280         [ #  # ]:          0 :                         if (i != last_idx)
     281                 :          0 :                                 port->paused_flows[i] =
     282                 :          0 :                                         port->paused_flows[last_idx];
     283                 :          0 :                         port->paused_flows_len--;
     284                 :            : 
     285                 :            :                         DSW_LOG_DP_PORT(DEBUG, port->id,
     286                 :            :                                         "Unpausing queue_id %d flow_hash %d.\n",
     287                 :            :                                         target_qf->queue_id,
     288                 :            :                                         target_qf->flow_hash);
     289                 :            : 
     290                 :          0 :                         return;
     291                 :            :                 }
     292                 :            :         }
     293                 :            : 
     294                 :          0 :         DSW_LOG_DP_PORT(ERR, port->id,
     295                 :            :                         "Failed to unpause queue_id %d flow_hash %d.\n",
     296                 :            :                         target_qf->queue_id, target_qf->flow_hash);
     297                 :            : }
     298                 :            : 
     299                 :            : static void
     300                 :            : dsw_port_remove_paused_flows(struct dsw_port *port,
     301                 :            :                              struct dsw_queue_flow *qfs, uint8_t qfs_len)
     302                 :            : {
     303                 :            :         uint8_t i;
     304                 :            : 
     305   [ #  #  #  # ]:          0 :         for (i = 0; i < qfs_len; i++)
     306                 :          0 :                 dsw_port_remove_paused_flow(port, &qfs[i]);
     307                 :            : }
     308                 :            : 
     309                 :            : static void
     310                 :            : dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
     311                 :            : 
     312                 :            : static void
     313                 :          0 : dsw_port_handle_pause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
     314                 :            :                             uint8_t originating_port_id,
     315                 :            :                             struct dsw_queue_flow *paused_qfs,
     316                 :            :                             uint8_t qfs_len)
     317                 :            : {
     318                 :          0 :         struct dsw_ctl_msg cfm = {
     319                 :            :                 .type = DSW_CTL_CFM,
     320                 :          0 :                 .originating_port_id = port->id
     321                 :            :         };
     322                 :            : 
     323                 :            :         /* There might be already-scheduled events belonging to the
     324                 :            :          * paused flow in the output buffers.
     325                 :            :          */
     326                 :            :         dsw_port_flush_out_buffers(dsw, port);
     327                 :            : 
     328                 :            :         dsw_port_add_paused_flows(port, paused_qfs, qfs_len);
     329                 :            : 
     330                 :            :         /* Make sure any stores to the original port's in_ring is seen
     331                 :            :          * before the ctl message.
     332                 :            :          */
     333                 :          0 :         rte_smp_wmb();
     334                 :            : 
     335                 :          0 :         dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
     336                 :          0 : }
     337                 :            : 
     338                 :            : struct dsw_queue_flow_burst {
     339                 :            :         struct dsw_queue_flow queue_flow;
     340                 :            :         uint16_t count;
     341                 :            : };
     342                 :            : 
     343                 :            : #define DSW_QF_TO_INT(_qf)                                      \
     344                 :            :         ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
     345                 :            : 
     346                 :            : static inline int
     347                 :          0 : dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
     348                 :            : {
     349                 :            :         const struct dsw_queue_flow *qf_a = v_qf_a;
     350                 :            :         const struct dsw_queue_flow *qf_b = v_qf_b;
     351                 :            : 
     352                 :          0 :         return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
     353                 :            : }
     354                 :            : 
     355                 :            : static uint16_t
     356                 :          0 : dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
     357                 :            :                        struct dsw_queue_flow_burst *bursts)
     358                 :            : {
     359                 :            :         uint16_t i;
     360                 :            :         struct dsw_queue_flow_burst *current_burst = NULL;
     361                 :            :         uint16_t num_bursts = 0;
     362                 :            : 
     363                 :            :         /* We don't need the stable property, and the list is likely
     364                 :            :          * large enough for qsort() to outperform dsw_stable_sort(),
     365                 :            :          * so we use qsort() here.
     366                 :            :          */
     367                 :          0 :         qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
     368                 :            : 
     369                 :            :         /* arrange the (now-consecutive) events into bursts */
     370         [ #  # ]:          0 :         for (i = 0; i < qfs_len; i++) {
     371         [ #  # ]:          0 :                 if (i == 0 ||
     372         [ #  # ]:          0 :                     dsw_cmp_qf(&qfs[i], &current_burst->queue_flow) != 0) {
     373                 :          0 :                         current_burst = &bursts[num_bursts];
     374                 :          0 :                         current_burst->queue_flow = qfs[i];
     375                 :          0 :                         current_burst->count = 0;
     376                 :          0 :                         num_bursts++;
     377                 :            :                 }
     378                 :          0 :                 current_burst->count++;
     379                 :            :         }
     380                 :            : 
     381                 :          0 :         return num_bursts;
     382                 :            : }
     383                 :            : 
     384                 :            : static bool
     385                 :          0 : dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
     386                 :            :                         int16_t load_limit)
     387                 :            : {
     388                 :            :         bool below_limit = false;
     389                 :            :         uint16_t i;
     390                 :            : 
     391         [ #  # ]:          0 :         for (i = 0; i < dsw->num_ports; i++) {
     392                 :          0 :                 int16_t measured_load =
     393                 :          0 :                         __atomic_load_n(&dsw->ports[i].load, __ATOMIC_RELAXED);
     394                 :          0 :                 int32_t immigration_load =
     395                 :          0 :                         __atomic_load_n(&dsw->ports[i].immigration_load,
     396                 :            :                                         __ATOMIC_RELAXED);
     397                 :          0 :                 int32_t load = measured_load + immigration_load;
     398                 :            : 
     399                 :          0 :                 load = RTE_MIN(load, DSW_MAX_LOAD);
     400                 :            : 
     401         [ #  # ]:          0 :                 if (load < load_limit)
     402                 :            :                         below_limit = true;
     403                 :          0 :                 port_loads[i] = load;
     404                 :            :         }
     405                 :          0 :         return below_limit;
     406                 :            : }
     407                 :            : 
     408                 :            : static int16_t
     409                 :            : dsw_flow_load(uint16_t num_events, int16_t port_load)
     410                 :            : {
     411                 :          0 :         return ((int32_t)port_load * (int32_t)num_events) /
     412                 :            :                 DSW_MAX_EVENTS_RECORDED;
     413                 :            : }
     414                 :            : 
     415                 :            : static int16_t
     416                 :            : dsw_evaluate_migration(int16_t source_load, int16_t target_load,
     417                 :            :                        int16_t flow_load)
     418                 :            : {
     419                 :            :         int32_t res_target_load;
     420                 :            :         int32_t imbalance;
     421                 :            : 
     422                 :          0 :         if (target_load > DSW_MAX_TARGET_LOAD_FOR_MIGRATION)
     423                 :            :                 return -1;
     424                 :            : 
     425                 :          0 :         imbalance = source_load - target_load;
     426                 :            : 
     427         [ #  # ]:          0 :         if (imbalance < DSW_REBALANCE_THRESHOLD)
     428                 :            :                 return -1;
     429                 :            : 
     430                 :          0 :         res_target_load = target_load + flow_load;
     431                 :            : 
     432                 :            :         /* If the estimated load of the target port will be higher
     433                 :            :          * than the source port's load, it doesn't make sense to move
     434                 :            :          * the flow.
     435                 :            :          */
     436         [ #  # ]:          0 :         if (res_target_load > source_load)
     437                 :            :                 return -1;
     438                 :            : 
     439                 :            :         /* The more idle the target will be, the better. This will
     440                 :            :          * make migration prefer moving smaller flows, and flows to
     441                 :            :          * lightly loaded ports.
     442                 :            :          */
     443                 :          0 :         return DSW_MAX_LOAD - res_target_load;
     444                 :            : }
     445                 :            : 
     446                 :            : static bool
     447                 :            : dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id)
     448                 :            : {
     449                 :            :         struct dsw_queue *queue = &dsw->queues[queue_id];
     450                 :            :         uint16_t i;
     451                 :            : 
     452         [ #  # ]:          0 :         for (i = 0; i < queue->num_serving_ports; i++)
     453         [ #  # ]:          0 :                 if (queue->serving_ports[i] == port_id)
     454                 :            :                         return true;
     455                 :            : 
     456                 :            :         return false;
     457                 :            : }
     458                 :            : 
     459                 :            : static bool
     460                 :          0 : dsw_select_emigration_target(struct dsw_evdev *dsw,
     461                 :            :                              struct dsw_port *source_port,
     462                 :            :                              struct dsw_queue_flow_burst *bursts,
     463                 :            :                              uint16_t num_bursts,
     464                 :            :                              int16_t *port_loads, uint16_t num_ports,
     465                 :            :                              uint8_t *target_port_ids,
     466                 :            :                              struct dsw_queue_flow *target_qfs,
     467                 :            :                              uint8_t *targets_len)
     468                 :            : {
     469                 :          0 :         int16_t source_port_load = port_loads[source_port->id];
     470                 :            :         struct dsw_queue_flow *candidate_qf = NULL;
     471                 :            :         uint8_t candidate_port_id = 0;
     472                 :            :         int16_t candidate_weight = -1;
     473                 :            :         int16_t candidate_flow_load = -1;
     474                 :            :         uint16_t i;
     475                 :            : 
     476         [ #  # ]:          0 :         if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)
     477                 :            :                 return false;
     478                 :            : 
     479         [ #  # ]:          0 :         for (i = 0; i < num_bursts; i++) {
     480                 :          0 :                 struct dsw_queue_flow_burst *burst = &bursts[i];
     481                 :          0 :                 struct dsw_queue_flow *qf = &burst->queue_flow;
     482                 :            :                 int16_t flow_load;
     483                 :            :                 uint16_t port_id;
     484                 :            : 
     485         [ #  # ]:          0 :                 if (dsw_is_queue_flow_in_ary(target_qfs, *targets_len,
     486                 :          0 :                                              qf->queue_id, qf->flow_hash))
     487                 :          0 :                         continue;
     488                 :            : 
     489                 :          0 :                 flow_load = dsw_flow_load(burst->count, source_port_load);
     490                 :            : 
     491         [ #  # ]:          0 :                 for (port_id = 0; port_id < num_ports; port_id++) {
     492                 :            :                         int16_t weight;
     493                 :            : 
     494         [ #  # ]:          0 :                         if (port_id == source_port->id)
     495                 :          0 :                                 continue;
     496                 :            : 
     497         [ #  # ]:          0 :                         if (!dsw_is_serving_port(dsw, port_id, qf->queue_id))
     498                 :          0 :                                 continue;
     499                 :            : 
     500                 :          0 :                         weight = dsw_evaluate_migration(source_port_load,
     501         [ #  # ]:          0 :                                                         port_loads[port_id],
     502                 :            :                                                         flow_load);
     503                 :            : 
     504         [ #  # ]:          0 :                         if (weight > candidate_weight) {
     505                 :            :                                 candidate_qf = qf;
     506                 :            :                                 candidate_port_id = port_id;
     507                 :            :                                 candidate_weight = weight;
     508                 :            :                                 candidate_flow_load = flow_load;
     509                 :            :                         }
     510                 :            :                 }
     511                 :            :         }
     512                 :            : 
     513         [ #  # ]:          0 :         if (candidate_weight < 0)
     514                 :            :                 return false;
     515                 :            : 
     516                 :            :         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Selected queue_id %d "
     517                 :            :                         "flow_hash %d (with flow load %d) for migration "
     518                 :            :                         "to port %d.\n", candidate_qf->queue_id,
     519                 :            :                         candidate_qf->flow_hash,
     520                 :            :                         DSW_LOAD_TO_PERCENT(candidate_flow_load),
     521                 :            :                         candidate_port_id);
     522                 :            : 
     523                 :          0 :         port_loads[candidate_port_id] += candidate_flow_load;
     524                 :          0 :         port_loads[source_port->id] -= candidate_flow_load;
     525                 :            : 
     526                 :          0 :         target_port_ids[*targets_len] = candidate_port_id;
     527                 :          0 :         target_qfs[*targets_len] = *candidate_qf;
     528                 :          0 :         (*targets_len)++;
     529                 :            : 
     530                 :          0 :         __atomic_fetch_add(&dsw->ports[candidate_port_id].immigration_load,
     531                 :            :                            candidate_flow_load, __ATOMIC_RELAXED);
     532                 :            : 
     533                 :          0 :         return true;
     534                 :            : }
     535                 :            : 
     536                 :            : static void
     537                 :          0 : dsw_select_emigration_targets(struct dsw_evdev *dsw,
     538                 :            :                               struct dsw_port *source_port,
     539                 :            :                               struct dsw_queue_flow_burst *bursts,
     540                 :            :                               uint16_t num_bursts, int16_t *port_loads)
     541                 :            : {
     542                 :          0 :         struct dsw_queue_flow *target_qfs = source_port->emigration_target_qfs;
     543                 :          0 :         uint8_t *target_port_ids = source_port->emigration_target_port_ids;
     544                 :          0 :         uint8_t *targets_len = &source_port->emigration_targets_len;
     545                 :            :         uint16_t i;
     546                 :            : 
     547         [ #  # ]:          0 :         for (i = 0; i < DSW_MAX_FLOWS_PER_MIGRATION; i++) {
     548                 :            :                 bool found;
     549                 :            : 
     550                 :          0 :                 found = dsw_select_emigration_target(dsw, source_port,
     551                 :            :                                                      bursts, num_bursts,
     552                 :          0 :                                                      port_loads, dsw->num_ports,
     553                 :            :                                                      target_port_ids,
     554                 :            :                                                      target_qfs,
     555                 :            :                                                      targets_len);
     556         [ #  # ]:          0 :                 if (!found)
     557                 :            :                         break;
     558                 :            :         }
     559                 :            : 
     560                 :            :         if (*targets_len == 0)
     561                 :            :                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
     562                 :            :                                 "For the %d flows considered, no target port "
     563                 :            :                                 "was found.\n", num_bursts);
     564                 :          0 : }
     565                 :            : 
     566                 :            : static uint8_t
     567                 :            : dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
     568                 :            : {
     569                 :            :         struct dsw_queue *queue = &dsw->queues[queue_id];
     570                 :            :         uint8_t port_id;
     571                 :            : 
     572   [ #  #  #  # ]:          0 :         if (queue->num_serving_ports > 1)
     573                 :          0 :                 port_id = queue->flow_to_port_map[flow_hash];
     574                 :            :         else
     575                 :            :                 /* A single-link queue, or atomic/ordered/parallel but
     576                 :            :                  * with just a single serving port.
     577                 :            :                  */
     578                 :          0 :                 port_id = queue->serving_ports[0];
     579                 :            : 
     580                 :            :         DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
     581                 :            :                    "to port %d.\n", queue_id, flow_hash, port_id);
     582                 :            : 
     583                 :            :         return port_id;
     584                 :            : }
     585                 :            : 
     586                 :            : static void
     587                 :          0 : dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
     588                 :            :                            uint8_t dest_port_id)
     589                 :            : {
     590                 :          0 :         struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
     591                 :            :         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
     592                 :          0 :         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
     593                 :            : 
     594         [ #  # ]:          0 :         if (*buffer_len == 0)
     595                 :            :                 return;
     596                 :            : 
     597                 :            :         /* The rings are dimensioned to fit all in-flight events (even
     598                 :            :          * on a single ring), so looping will work.
     599                 :            :          */
     600   [ #  #  #  #  :          0 :         rte_event_ring_enqueue_bulk(dest_port->in_ring, buffer, *buffer_len,
                      # ]
     601                 :            :                                     NULL);
     602                 :            : 
     603                 :          0 :         (*buffer_len) = 0;
     604                 :            : }
     605                 :            : 
     606                 :            : static uint16_t
     607                 :            : dsw_port_get_parallel_flow_id(struct dsw_port *port)
     608                 :            : {
     609                 :          0 :         uint16_t flow_id = port->next_parallel_flow_id;
     610                 :            : 
     611                 :          0 :         port->next_parallel_flow_id =
     612                 :          0 :                 (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
     613                 :            : 
     614                 :            :         return flow_id;
     615                 :            : }
     616                 :            : 
     617                 :            : static void
     618                 :            : dsw_port_buffer_paused(struct dsw_port *port,
     619                 :            :                        const struct rte_event *paused_event)
     620                 :            : {
     621                 :          0 :         port->paused_events[port->paused_events_len] = *paused_event;
     622                 :          0 :         port->paused_events_len++;
     623                 :          0 : }
     624                 :            : 
     625                 :            : 
     626                 :            : static void
     627                 :          0 : dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
     628                 :            :                            uint8_t dest_port_id, const struct rte_event *event)
     629                 :            : {
     630                 :          0 :         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
     631                 :            :         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
     632                 :            : 
     633         [ #  # ]:          0 :         if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
     634                 :          0 :                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
     635                 :            : 
     636                 :          0 :         buffer[*buffer_len] = *event;
     637                 :            : 
     638                 :          0 :         (*buffer_len)++;
     639                 :          0 : }
     640                 :            : 
     641                 :            : #define DSW_FLOW_ID_BITS (24)
     642                 :            : static uint16_t
     643                 :            : dsw_flow_id_hash(uint32_t flow_id)
     644                 :            : {
     645                 :            :         uint16_t hash = 0;
     646                 :            :         uint16_t offset = 0;
     647                 :            : 
     648                 :            :         do {
     649                 :          0 :                 hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
     650                 :          0 :                 offset += DSW_MAX_FLOWS_BITS;
     651   [ #  #  #  #  :          0 :         } while (offset < DSW_FLOW_ID_BITS);
          #  #  #  #  #  
             #  #  #  #  
                      # ]
     652                 :            : 
     653                 :            :         return hash;
     654                 :            : }
     655                 :            : 
     656                 :            : static void
     657                 :          0 : dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
     658                 :            :                          struct rte_event event)
     659                 :            : {
     660                 :            :         uint8_t dest_port_id;
     661                 :            : 
     662                 :          0 :         event.flow_id = dsw_port_get_parallel_flow_id(source_port);
     663                 :            : 
     664         [ #  # ]:          0 :         dest_port_id = dsw_schedule(dsw, event.queue_id,
     665                 :          0 :                                     dsw_flow_id_hash(event.flow_id));
     666                 :            : 
     667                 :          0 :         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
     668                 :          0 : }
     669                 :            : 
     670                 :            : static void
     671                 :          0 : dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
     672                 :            :                       const struct rte_event *event)
     673                 :            : {
     674                 :            :         uint16_t flow_hash;
     675                 :            :         uint8_t dest_port_id;
     676                 :            : 
     677         [ #  # ]:          0 :         if (unlikely(dsw->queues[event->queue_id].schedule_type ==
     678                 :            :                      RTE_SCHED_TYPE_PARALLEL)) {
     679                 :          0 :                 dsw_port_buffer_parallel(dsw, source_port, *event);
     680                 :          0 :                 return;
     681                 :            :         }
     682                 :            : 
     683                 :          0 :         flow_hash = dsw_flow_id_hash(event->flow_id);
     684                 :            : 
     685         [ #  # ]:          0 :         if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
     686                 :            :                                              flow_hash))) {
     687                 :            :                 dsw_port_buffer_paused(source_port, event);
     688                 :          0 :                 return;
     689                 :            :         }
     690                 :            : 
     691                 :            :         dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
     692                 :            : 
     693                 :          0 :         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
     694                 :            : }
     695                 :            : 
     696                 :            : static void
     697                 :          0 : dsw_port_flush_no_longer_paused_events(struct dsw_evdev *dsw,
     698                 :            :                                        struct dsw_port *source_port)
     699                 :          0 : {
     700                 :          0 :         uint16_t paused_events_len = source_port->paused_events_len;
     701                 :          0 :         struct rte_event paused_events[paused_events_len];
     702                 :            :         uint16_t i;
     703                 :            : 
     704         [ #  # ]:          0 :         if (paused_events_len == 0)
     705                 :          0 :                 return;
     706                 :            : 
     707         [ #  # ]:          0 :         rte_memcpy(paused_events, source_port->paused_events,
     708                 :            :                    paused_events_len * sizeof(struct rte_event));
     709                 :            : 
     710                 :          0 :         source_port->paused_events_len = 0;
     711                 :            : 
     712         [ #  # ]:          0 :         for (i = 0; i < paused_events_len; i++) {
     713                 :          0 :                 struct rte_event *event = &paused_events[i];
     714                 :            :                 uint16_t flow_hash;
     715                 :            : 
     716                 :          0 :                 flow_hash = dsw_flow_id_hash(event->flow_id);
     717                 :            : 
     718         [ #  # ]:          0 :                 if (dsw_port_is_flow_paused(source_port, event->queue_id,
     719                 :            :                                             flow_hash))
     720                 :            :                         dsw_port_buffer_paused(source_port, event);
     721                 :            :                 else {
     722                 :            :                         uint8_t dest_port_id;
     723                 :            : 
     724                 :            :                         dest_port_id = dsw_schedule(dsw, event->queue_id,
     725                 :            :                                                     flow_hash);
     726                 :            : 
     727                 :          0 :                         dsw_port_buffer_non_paused(dsw, source_port,
     728                 :            :                                                    dest_port_id, event);
     729                 :            :                 }
     730                 :            :         }
     731                 :            : }
     732                 :            : 
     733                 :            : static void
     734                 :            : dsw_port_emigration_stats(struct dsw_port *port, uint8_t finished)
     735                 :            : {
     736                 :            :         uint64_t flow_migration_latency;
     737                 :            : 
     738                 :          0 :         flow_migration_latency =
     739                 :          0 :                 (rte_get_timer_cycles() - port->emigration_start);
     740                 :          0 :         port->emigration_latency += (flow_migration_latency * finished);
     741                 :          0 :         port->emigrations += finished;
     742                 :          0 : }
     743                 :            : 
     744                 :            : static void
     745                 :          0 : dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port,
     746                 :            :                         uint8_t schedule_type)
     747                 :            : {
     748                 :            :         uint8_t i;
     749                 :            :         struct dsw_queue_flow left_qfs[DSW_MAX_FLOWS_PER_MIGRATION];
     750                 :            :         uint8_t left_port_ids[DSW_MAX_FLOWS_PER_MIGRATION];
     751                 :            :         uint8_t left_qfs_len = 0;
     752                 :            :         uint8_t finished;
     753                 :            : 
     754         [ #  # ]:          0 :         for (i = 0; i < port->emigration_targets_len; i++) {
     755                 :          0 :                 struct dsw_queue_flow *qf = &port->emigration_target_qfs[i];
     756                 :          0 :                 uint8_t queue_id = qf->queue_id;
     757                 :          0 :                 uint8_t queue_schedule_type =
     758                 :          0 :                         dsw->queues[queue_id].schedule_type;
     759                 :            :                 uint16_t flow_hash = qf->flow_hash;
     760                 :            : 
     761         [ #  # ]:          0 :                 if (queue_schedule_type != schedule_type) {
     762                 :          0 :                         left_port_ids[left_qfs_len] =
     763                 :          0 :                                 port->emigration_target_port_ids[i];
     764                 :          0 :                         left_qfs[left_qfs_len] = *qf;
     765                 :          0 :                         left_qfs_len++;
     766                 :          0 :                         continue;
     767                 :            :                 }
     768                 :            : 
     769                 :            :                 DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for "
     770                 :            :                                 "queue_id %d flow_hash %d.\n", queue_id,
     771                 :            :                                 flow_hash);
     772                 :            :         }
     773                 :            : 
     774                 :          0 :         finished = port->emigration_targets_len - left_qfs_len;
     775                 :            : 
     776         [ #  # ]:          0 :         if (finished > 0)
     777                 :            :                 dsw_port_emigration_stats(port, finished);
     778                 :            : 
     779         [ #  # ]:          0 :         for (i = 0; i < left_qfs_len; i++) {
     780                 :          0 :                 port->emigration_target_port_ids[i] = left_port_ids[i];
     781                 :          0 :                 port->emigration_target_qfs[i] = left_qfs[i];
     782                 :            :         }
     783                 :          0 :         port->emigration_targets_len = left_qfs_len;
     784                 :            : 
     785         [ #  # ]:          0 :         if (port->emigration_targets_len == 0) {
     786                 :          0 :                 port->migration_state = DSW_MIGRATION_STATE_IDLE;
     787                 :          0 :                 port->seen_events_len = 0;
     788                 :            :         }
     789                 :          0 : }
     790                 :            : 
     791                 :            : static void
     792                 :          0 : dsw_port_move_parallel_flows(struct dsw_evdev *dsw,
     793                 :            :                              struct dsw_port *source_port)
     794                 :            : {
     795                 :            :         uint8_t i;
     796                 :            : 
     797         [ #  # ]:          0 :         for (i = 0; i < source_port->emigration_targets_len; i++) {
     798                 :            :                 struct dsw_queue_flow *qf =
     799                 :          0 :                         &source_port->emigration_target_qfs[i];
     800                 :          0 :                 uint8_t queue_id = qf->queue_id;
     801                 :            : 
     802         [ #  # ]:          0 :                 if (dsw->queues[queue_id].schedule_type ==
     803                 :            :                     RTE_SCHED_TYPE_PARALLEL) {
     804                 :          0 :                         uint8_t dest_port_id =
     805                 :            :                                 source_port->emigration_target_port_ids[i];
     806                 :          0 :                         uint16_t flow_hash = qf->flow_hash;
     807                 :            : 
     808                 :            :                         /* Single byte-sized stores are always atomic. */
     809                 :          0 :                         dsw->queues[queue_id].flow_to_port_map[flow_hash] =
     810                 :            :                                 dest_port_id;
     811                 :            :                 }
     812                 :            :         }
     813                 :            : 
     814                 :          0 :         rte_smp_wmb();
     815                 :            : 
     816                 :          0 :         dsw_port_end_emigration(dsw, source_port, RTE_SCHED_TYPE_PARALLEL);
     817                 :          0 : }
     818                 :            : 
     819                 :            : static void
     820                 :          0 : dsw_port_consider_emigration(struct dsw_evdev *dsw,
     821                 :            :                              struct dsw_port *source_port,
     822                 :            :                              uint64_t now)
     823                 :          0 : {
     824                 :            :         bool any_port_below_limit;
     825                 :          0 :         struct dsw_queue_flow *seen_events = source_port->seen_events;
     826                 :          0 :         uint16_t seen_events_len = source_port->seen_events_len;
     827                 :            :         struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
     828                 :            :         uint16_t num_bursts;
     829                 :            :         int16_t source_port_load;
     830                 :          0 :         int16_t port_loads[dsw->num_ports];
     831                 :            : 
     832         [ #  # ]:          0 :         if (now < source_port->next_emigration)
     833                 :          0 :                 return;
     834                 :            : 
     835         [ #  # ]:          0 :         if (dsw->num_ports == 1)
     836                 :            :                 return;
     837                 :            : 
     838                 :            :         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering emigration.\n");
     839                 :            : 
     840         [ #  # ]:          0 :         if (seen_events_len < DSW_MAX_EVENTS_RECORDED) {
     841                 :            :                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Not enough events "
     842                 :            :                                 "are recorded to allow for a migration.\n");
     843                 :            :                 return;
     844                 :            :         }
     845                 :            : 
     846                 :            :         /* A flow migration cannot be initiated if there are paused
     847                 :            :          * events, since some/all of those events may be have been
     848                 :            :          * produced as a result of processing the flow(s) selected for
     849                 :            :          * migration. Moving such a flow would potentially introduced
     850                 :            :          * reordering, since processing the migrated flow on the
     851                 :            :          * receiving flow may commence before the to-be-enqueued-to
     852                 :            : 
     853                 :            :          * flows are unpaused, leading to paused events on the second
     854                 :            :          * port as well, destined for the same paused flow(s). When
     855                 :            :          * those flows are unpaused, the resulting events are
     856                 :            :          * delivered the owning port in an undefined order.
     857                 :            :          */
     858         [ #  # ]:          0 :         if (source_port->paused_events_len > 0) {
     859                 :            :                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are "
     860                 :            :                                 "events in the paus buffer.\n");
     861                 :            :                 return;
     862                 :            :         }
     863                 :            : 
     864                 :            :         /* Randomize interval to avoid having all threads considering
     865                 :            :          * emigration at the same in point in time, which might lead
     866                 :            :          * to all choosing the same target port.
     867                 :            :          */
     868                 :          0 :         source_port->next_emigration = now +
     869                 :          0 :                 source_port->migration_interval / 2 +
     870                 :          0 :                 rte_rand() % source_port->migration_interval;
     871                 :            : 
     872         [ #  # ]:          0 :         if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
     873                 :            :                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
     874                 :            :                                 "Emigration already in progress.\n");
     875                 :            :                 return;
     876                 :            :         }
     877                 :            : 
     878                 :            :         /* For simplicity, avoid migration in the unlikely case there
     879                 :            :          * is still events to consume in the in_buffer (from the last
     880                 :            :          * emigration).
     881                 :            :          */
     882         [ #  # ]:          0 :         if (source_port->in_buffer_len > 0) {
     883                 :            :                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
     884                 :            :                                 "events in the input buffer.\n");
     885                 :            :                 return;
     886                 :            :         }
     887                 :            : 
     888                 :          0 :         source_port_load =
     889                 :          0 :                 __atomic_load_n(&source_port->load, __ATOMIC_RELAXED);
     890         [ #  # ]:          0 :         if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
     891                 :            :                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
     892                 :            :                       "Load %d is below threshold level %d.\n",
     893                 :            :                       DSW_LOAD_TO_PERCENT(source_port_load),
     894                 :            :                       DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
     895                 :            :                 return;
     896                 :            :         }
     897                 :            : 
     898                 :            :         /* Avoid starting any expensive operations (sorting etc), in
     899                 :            :          * case of a scenario with all ports above the load limit.
     900                 :            :          */
     901                 :            :         any_port_below_limit =
     902                 :          0 :                 dsw_retrieve_port_loads(dsw, port_loads,
     903                 :            :                                         DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
     904         [ #  # ]:          0 :         if (!any_port_below_limit) {
     905                 :            :                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
     906                 :            :                                 "Candidate target ports are all too highly "
     907                 :            :                                 "loaded.\n");
     908                 :            :                 return;
     909                 :            :         }
     910                 :            : 
     911                 :          0 :         num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
     912                 :            :                                             bursts);
     913                 :            : 
     914                 :            :         /* For non-big-little systems, there's no point in moving the
     915                 :            :          * only (known) flow.
     916                 :            :          */
     917         [ #  # ]:          0 :         if (num_bursts < 2) {
     918                 :            :                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow "
     919                 :            :                                 "queue_id %d flow_hash %d has been seen.\n",
     920                 :            :                                 bursts[0].queue_flow.queue_id,
     921                 :            :                                 bursts[0].queue_flow.flow_hash);
     922                 :            :                 return;
     923                 :            :         }
     924                 :            : 
     925                 :          0 :         dsw_select_emigration_targets(dsw, source_port, bursts, num_bursts,
     926                 :            :                                       port_loads);
     927                 :            : 
     928         [ #  # ]:          0 :         if (source_port->emigration_targets_len == 0)
     929                 :            :                 return;
     930                 :            : 
     931                 :          0 :         source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
     932                 :          0 :         source_port->emigration_start = rte_get_timer_cycles();
     933                 :            : 
     934                 :            :         /* No need to go through the whole pause procedure for
     935                 :            :          * parallel queues, since atomic/ordered semantics need not to
     936                 :            :          * be maintained.
     937                 :            :          */
     938                 :          0 :         dsw_port_move_parallel_flows(dsw, source_port);
     939                 :            : 
     940                 :            :         /* All flows were on PARALLEL queues. */
     941         [ #  # ]:          0 :         if (source_port->migration_state == DSW_MIGRATION_STATE_IDLE)
     942                 :            :                 return;
     943                 :            : 
     944                 :            :         /* There might be 'loopback' events already scheduled in the
     945                 :            :          * output buffers.
     946                 :            :          */
     947                 :            :         dsw_port_flush_out_buffers(dsw, source_port);
     948                 :            : 
     949                 :          0 :         dsw_port_add_paused_flows(source_port,
     950                 :          0 :                                   source_port->emigration_target_qfs,
     951                 :          0 :                                   source_port->emigration_targets_len);
     952                 :            : 
     953                 :          0 :         dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
     954                 :            :                                source_port->emigration_target_qfs,
     955                 :            :                                source_port->emigration_targets_len);
     956                 :          0 :         source_port->cfm_cnt = 0;
     957                 :            : }
     958                 :            : 
     959                 :            : static void
     960                 :            : dsw_port_flush_no_longer_paused_events(struct dsw_evdev *dsw,
     961                 :            :                                        struct dsw_port *source_port);
     962                 :            : 
     963                 :            : static void
     964                 :          0 : dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
     965                 :            :                               uint8_t originating_port_id,
     966                 :            :                               struct dsw_queue_flow *paused_qfs,
     967                 :            :                               uint8_t qfs_len)
     968                 :            : {
     969                 :            :         uint16_t i;
     970                 :          0 :         struct dsw_ctl_msg cfm = {
     971                 :            :                 .type = DSW_CTL_CFM,
     972                 :          0 :                 .originating_port_id = port->id
     973                 :            :         };
     974                 :            : 
     975                 :            :         dsw_port_remove_paused_flows(port, paused_qfs, qfs_len);
     976                 :            : 
     977                 :          0 :         rte_smp_rmb();
     978                 :            : 
     979                 :          0 :         dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
     980                 :            : 
     981         [ #  # ]:          0 :         for (i = 0; i < qfs_len; i++) {
     982                 :          0 :                 struct dsw_queue_flow *qf = &paused_qfs[i];
     983                 :            : 
     984   [ #  #  #  # ]:          0 :                 if (dsw_schedule(dsw, qf->queue_id, qf->flow_hash) == port->id)
     985                 :          0 :                         port->immigrations++;
     986                 :            :         }
     987                 :            : 
     988                 :          0 :         dsw_port_flush_no_longer_paused_events(dsw, port);
     989                 :          0 : }
     990                 :            : 
     991                 :            : static void
     992                 :            : dsw_port_buffer_in_buffer(struct dsw_port *port,
     993                 :            :                           const struct rte_event *event)
     994                 :            : 
     995                 :            : {
     996                 :            :         RTE_ASSERT(port->in_buffer_start == 0);
     997                 :            : 
     998                 :          0 :         port->in_buffer[port->in_buffer_len] = *event;
     999                 :          0 :         port->in_buffer_len++;
    1000                 :          0 : }
    1001                 :            : 
    1002                 :            : static void
    1003                 :          0 : dsw_port_forward_emigrated_event(struct dsw_evdev *dsw,
    1004                 :            :                                  struct dsw_port *source_port,
    1005                 :            :                                  struct rte_event *event)
    1006                 :            : {
    1007                 :            :         uint16_t i;
    1008                 :            : 
    1009         [ #  # ]:          0 :         for (i = 0; i < source_port->emigration_targets_len; i++) {
    1010                 :            :                 struct dsw_queue_flow *qf =
    1011                 :          0 :                         &source_port->emigration_target_qfs[i];
    1012                 :          0 :                 uint8_t dest_port_id =
    1013                 :            :                         source_port->emigration_target_port_ids[i];
    1014                 :          0 :                 struct dsw_port *dest_port = &dsw->ports[dest_port_id];
    1015                 :            : 
    1016         [ #  # ]:          0 :                 if (event->queue_id == qf->queue_id &&
    1017         [ #  # ]:          0 :                     dsw_flow_id_hash(event->flow_id) == qf->flow_hash) {
    1018                 :            :                         /* No need to care about bursting forwarded
    1019                 :            :                          * events (to the destination port's in_ring),
    1020                 :            :                          * since migration doesn't happen very often,
    1021                 :            :                          * and also the majority of the dequeued
    1022                 :            :                          * events will likely *not* be forwarded.
    1023                 :            :                          */
    1024   [ #  #  #  #  :          0 :                         while (rte_event_ring_enqueue_burst(dest_port->in_ring,
                      # ]
    1025                 :            :                                                             event, 1,
    1026         [ #  # ]:          0 :                                                             NULL) != 1)
    1027                 :            :                                 rte_pause();
    1028                 :            :                         return;
    1029                 :            :                 }
    1030                 :            :         }
    1031                 :            : 
    1032                 :            :         /* Event did not belong to the emigrated flows */
    1033                 :            :         dsw_port_buffer_in_buffer(source_port, event);
    1034                 :            : }
    1035                 :            : 
    1036                 :            : static void
    1037                 :            : dsw_port_stash_migrating_event(struct dsw_port *port,
    1038                 :            :                                const struct rte_event *event)
    1039                 :            : {
    1040                 :          0 :         port->emigrating_events[port->emigrating_events_len] = *event;
    1041                 :          0 :         port->emigrating_events_len++;
    1042                 :          0 : }
    1043                 :            : 
    1044                 :            : #define DRAIN_DEQUEUE_BURST_SIZE (32)
    1045                 :            : 
    1046                 :            : static void
    1047                 :          0 : dsw_port_drain_in_ring(struct dsw_port *source_port)
    1048                 :            : {
    1049                 :            :         uint16_t num_events;
    1050                 :            :         uint16_t dequeued;
    1051                 :            : 
    1052                 :            :         /* Control ring message should been seen before the ring count
    1053                 :            :          * is read on the port's in_ring.
    1054                 :            :          */
    1055                 :          0 :         rte_smp_rmb();
    1056                 :            : 
    1057                 :          0 :         num_events = rte_event_ring_count(source_port->in_ring);
    1058                 :            : 
    1059         [ #  # ]:          0 :         for (dequeued = 0; dequeued < num_events; ) {
    1060                 :          0 :                 uint16_t burst_size = RTE_MIN(DRAIN_DEQUEUE_BURST_SIZE,
    1061                 :            :                                               num_events - dequeued);
    1062                 :          0 :                 struct rte_event events[burst_size];
    1063                 :            :                 uint16_t len;
    1064                 :            :                 uint16_t i;
    1065                 :            : 
    1066   [ #  #  #  #  :          0 :                 len = rte_event_ring_dequeue_burst(source_port->in_ring,
                      # ]
    1067                 :            :                                                    events, burst_size,
    1068                 :            :                                                    NULL);
    1069                 :            : 
    1070         [ #  # ]:          0 :                 for (i = 0; i < len; i++) {
    1071                 :          0 :                         struct rte_event *event = &events[i];
    1072                 :            :                         uint16_t flow_hash;
    1073                 :            : 
    1074                 :          0 :                         flow_hash = dsw_flow_id_hash(event->flow_id);
    1075                 :            : 
    1076         [ #  # ]:          0 :                         if (unlikely(dsw_port_is_flow_migrating(source_port,
    1077                 :            :                                                                 event->queue_id,
    1078                 :            :                                                                 flow_hash)))
    1079                 :            :                                 dsw_port_stash_migrating_event(source_port,
    1080                 :            :                                                                event);
    1081                 :            :                         else
    1082                 :            :                                 dsw_port_buffer_in_buffer(source_port, event);
    1083                 :            :                 }
    1084                 :            : 
    1085                 :          0 :                 dequeued += len;
    1086                 :            :         }
    1087                 :          0 : }
    1088                 :            : 
    1089                 :            : static void
    1090                 :            : dsw_port_forward_emigrated_flows(struct dsw_evdev *dsw,
    1091                 :            :                                  struct dsw_port *source_port)
    1092                 :            : {
    1093                 :            :         uint16_t i;
    1094                 :            : 
    1095         [ #  # ]:          0 :         for (i = 0; i < source_port->emigrating_events_len; i++) {
    1096                 :          0 :                 struct rte_event *event = &source_port->emigrating_events[i];
    1097                 :            : 
    1098                 :          0 :                 dsw_port_forward_emigrated_event(dsw, source_port, event);
    1099                 :            :         }
    1100                 :          0 :         source_port->emigrating_events_len = 0;
    1101                 :            : }
    1102                 :            : 
    1103                 :            : static void
    1104                 :          0 : dsw_port_move_emigrating_flows(struct dsw_evdev *dsw,
    1105                 :            :                                struct dsw_port *source_port)
    1106                 :            : {
    1107                 :            :         uint8_t i;
    1108                 :            : 
    1109                 :            :         dsw_port_flush_out_buffers(dsw, source_port);
    1110                 :            : 
    1111         [ #  # ]:          0 :         for (i = 0; i < source_port->emigration_targets_len; i++) {
    1112                 :            :                 struct dsw_queue_flow *qf =
    1113                 :          0 :                         &source_port->emigration_target_qfs[i];
    1114                 :          0 :                 uint8_t dest_port_id =
    1115                 :            :                         source_port->emigration_target_port_ids[i];
    1116                 :            : 
    1117                 :          0 :                 dsw->queues[qf->queue_id].flow_to_port_map[qf->flow_hash] =
    1118                 :            :                     dest_port_id;
    1119                 :            :         }
    1120                 :            : 
    1121                 :          0 :         rte_smp_wmb();
    1122                 :            : 
    1123                 :          0 :         dsw_port_drain_in_ring(source_port);
    1124                 :            :         dsw_port_forward_emigrated_flows(dsw, source_port);
    1125                 :            : 
    1126                 :            :         dsw_port_remove_paused_flows(source_port,
    1127                 :          0 :                                      source_port->emigration_target_qfs,
    1128                 :          0 :                                      source_port->emigration_targets_len);
    1129                 :            : 
    1130                 :          0 :         dsw_port_flush_no_longer_paused_events(dsw, source_port);
    1131                 :            : 
    1132                 :            :         /* Flow table update and migration destination port's enqueues
    1133                 :            :          * must be seen before the control message.
    1134                 :            :          */
    1135                 :          0 :         rte_smp_wmb();
    1136                 :            : 
    1137                 :          0 :         dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ,
    1138                 :            :                                source_port->emigration_target_qfs,
    1139                 :          0 :                                source_port->emigration_targets_len);
    1140                 :          0 :         source_port->cfm_cnt = 0;
    1141                 :          0 :         source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
    1142                 :          0 : }
    1143                 :            : 
    1144                 :            : static void
    1145                 :          0 : dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
    1146                 :            : {
    1147                 :          0 :         port->cfm_cnt++;
    1148                 :            : 
    1149         [ #  # ]:          0 :         if (port->cfm_cnt == (dsw->num_ports-1)) {
    1150      [ #  #  # ]:          0 :                 switch (port->migration_state) {
    1151                 :          0 :                 case DSW_MIGRATION_STATE_PAUSING:
    1152                 :          0 :                         dsw_port_move_emigrating_flows(dsw, port);
    1153                 :          0 :                         break;
    1154                 :          0 :                 case DSW_MIGRATION_STATE_UNPAUSING:
    1155                 :          0 :                         dsw_port_end_emigration(dsw, port,
    1156                 :            :                                                 RTE_SCHED_TYPE_ATOMIC);
    1157                 :          0 :                         break;
    1158                 :            :                 default:
    1159                 :            :                         RTE_ASSERT(0);
    1160                 :            :                         break;
    1161                 :            :                 }
    1162                 :            :         }
    1163                 :          0 : }
    1164                 :            : 
    1165                 :            : static void
    1166                 :          0 : dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
    1167                 :            : {
    1168                 :            :         struct dsw_ctl_msg msg;
    1169                 :            : 
    1170         [ #  # ]:          0 :         if (dsw_port_ctl_dequeue(port, &msg) == 0) {
    1171   [ #  #  #  # ]:          0 :                 switch (msg.type) {
    1172                 :          0 :                 case DSW_CTL_PAUS_REQ:
    1173                 :          0 :                         dsw_port_handle_pause_flows(dsw, port,
    1174                 :          0 :                                                     msg.originating_port_id,
    1175                 :          0 :                                                     msg.qfs, msg.qfs_len);
    1176                 :          0 :                         break;
    1177                 :          0 :                 case DSW_CTL_UNPAUS_REQ:
    1178                 :          0 :                         dsw_port_handle_unpause_flows(dsw, port,
    1179                 :          0 :                                                       msg.originating_port_id,
    1180                 :          0 :                                                       msg.qfs, msg.qfs_len);
    1181                 :          0 :                         break;
    1182                 :          0 :                 case DSW_CTL_CFM:
    1183                 :          0 :                         dsw_port_handle_confirm(dsw, port);
    1184                 :          0 :                         break;
    1185                 :            :                 }
    1186                 :            :         }
    1187                 :          0 : }
    1188                 :            : 
    1189                 :            : static void
    1190                 :            : dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
    1191                 :            : {
    1192                 :          0 :         port->ops_since_bg_task += (num_events+1);
    1193                 :          0 : }
    1194                 :            : 
    1195                 :            : static void
    1196                 :          0 : dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
    1197                 :            : {
    1198                 :            :         /* For simplicity (in the migration logic), avoid all
    1199                 :            :          * background processing in case event processing is in
    1200                 :            :          * progress.
    1201                 :            :          */
    1202         [ #  # ]:          0 :         if (port->pending_releases > 0)
    1203                 :            :                 return;
    1204                 :            : 
    1205                 :            :         /* Polling the control ring is relatively inexpensive, and
    1206                 :            :          * polling it often helps bringing down migration latency, so
    1207                 :            :          * do this for every iteration.
    1208                 :            :          */
    1209                 :          0 :         dsw_port_ctl_process(dsw, port);
    1210                 :            : 
    1211                 :            :         /* To avoid considering migration and flushing output buffers
    1212                 :            :          * on every dequeue/enqueue call, the scheduler only performs
    1213                 :            :          * such 'background' tasks every nth
    1214                 :            :          * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
    1215                 :            :          */
    1216         [ #  # ]:          0 :         if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
    1217                 :            :                 uint64_t now;
    1218                 :            : 
    1219                 :            :                 now = rte_get_timer_cycles();
    1220                 :            : 
    1221                 :          0 :                 port->last_bg = now;
    1222                 :            : 
    1223                 :            :                 /* Logic to avoid having events linger in the output
    1224                 :            :                  * buffer too long.
    1225                 :            :                  */
    1226                 :            :                 dsw_port_flush_out_buffers(dsw, port);
    1227                 :            : 
    1228                 :            :                 dsw_port_consider_load_update(port, now);
    1229                 :            : 
    1230                 :          0 :                 dsw_port_consider_emigration(dsw, port, now);
    1231                 :            : 
    1232                 :          0 :                 port->ops_since_bg_task = 0;
    1233                 :            :         }
    1234                 :            : }
    1235                 :            : 
    1236                 :            : static void
    1237                 :            : dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
    1238                 :            : {
    1239                 :            :         uint16_t dest_port_id;
    1240                 :            : 
    1241   [ #  #  #  #  :          0 :         for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
          #  #  #  #  #  
          #  #  #  #  #  
             #  #  #  # ]
    1242                 :          0 :                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
    1243                 :            : }
    1244                 :            : 
    1245                 :            : uint16_t
    1246                 :          0 : dsw_event_enqueue(void *port, const struct rte_event *ev)
    1247                 :            : {
    1248                 :          0 :         return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
    1249                 :            : }
    1250                 :            : 
    1251                 :            : static __rte_always_inline uint16_t
    1252                 :            : dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
    1253                 :            :                                 const struct rte_event events[],
    1254                 :            :                                 uint16_t events_len, bool op_types_known,
    1255                 :            :                                 uint16_t num_new, uint16_t num_release,
    1256                 :            :                                 uint16_t num_non_release)
    1257                 :            : {
    1258                 :          0 :         struct dsw_evdev *dsw = source_port->dsw;
    1259                 :            :         bool enough_credits;
    1260                 :            :         uint16_t i;
    1261                 :            : 
    1262                 :            :         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
    1263                 :            :                         "events.\n", events_len);
    1264                 :            : 
    1265                 :          0 :         dsw_port_bg_process(dsw, source_port);
    1266                 :            : 
    1267                 :            :         /* XXX: For performance (=ring efficiency) reasons, the
    1268                 :            :          * scheduler relies on internal non-ring buffers instead of
    1269                 :            :          * immediately sending the event to the destination ring. For
    1270                 :            :          * a producer that doesn't intend to produce or consume any
    1271                 :            :          * more events, the scheduler provides a way to flush the
    1272                 :            :          * buffer, by means of doing an enqueue of zero events. In
    1273                 :            :          * addition, a port cannot be left "unattended" (e.g. unused)
    1274                 :            :          * for long periods of time, since that would stall
    1275                 :            :          * migration. Eventdev API extensions to provide a cleaner way
    1276                 :            :          * to archive both of these functions should be
    1277                 :            :          * considered.
    1278                 :            :          */
    1279   [ #  #  #  #  :          0 :         if (unlikely(events_len == 0)) {
                   #  # ]
    1280                 :            :                 dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
    1281                 :            :                 dsw_port_flush_out_buffers(dsw, source_port);
    1282                 :            :                 return 0;
    1283                 :            :         }
    1284                 :            : 
    1285                 :            :         dsw_port_note_op(source_port, events_len);
    1286                 :            : 
    1287                 :            :         if (!op_types_known)
    1288         [ #  # ]:          0 :                 for (i = 0; i < events_len; i++) {
    1289      [ #  #  # ]:          0 :                         switch (events[i].op) {
    1290                 :          0 :                         case RTE_EVENT_OP_RELEASE:
    1291                 :          0 :                                 num_release++;
    1292                 :          0 :                                 break;
    1293                 :          0 :                         case RTE_EVENT_OP_NEW:
    1294                 :          0 :                                 num_new++;
    1295                 :            :                                 /* Falls through. */
    1296                 :          0 :                         default:
    1297                 :          0 :                                 num_non_release++;
    1298                 :          0 :                                 break;
    1299                 :            :                         }
    1300                 :            :                 }
    1301                 :            : 
    1302                 :            :         /* Technically, we could allow the non-new events up to the
    1303                 :            :          * first new event in the array into the system, but for
    1304                 :            :          * simplicity reasons, we deny the whole burst if the port is
    1305                 :            :          * above the water mark.
    1306                 :            :          */
    1307   [ #  #  #  #  :          0 :         if (unlikely(num_new > 0 &&
             #  #  #  # ]
    1308                 :            :                      __atomic_load_n(&dsw->credits_on_loan, __ATOMIC_RELAXED) >
    1309                 :            :                      source_port->new_event_threshold))
    1310                 :            :                 return 0;
    1311                 :            : 
    1312                 :          0 :         enough_credits = dsw_port_acquire_credits(dsw, source_port,
    1313                 :            :                                                   num_non_release);
    1314   [ #  #  #  #  :          0 :         if (unlikely(!enough_credits))
                   #  # ]
    1315                 :            :                 return 0;
    1316                 :            : 
    1317                 :          0 :         source_port->pending_releases -= num_release;
    1318                 :            : 
    1319                 :            :         dsw_port_enqueue_stats(source_port, num_new,
    1320                 :          0 :                                num_non_release-num_new, num_release);
    1321                 :            : 
    1322   [ #  #  #  #  :          0 :         for (i = 0; i < events_len; i++) {
                   #  # ]
    1323                 :          0 :                 const struct rte_event *event = &events[i];
    1324                 :            : 
    1325   [ #  #  #  # ]:          0 :                 if (likely(num_release == 0 ||
    1326                 :            :                            event->op != RTE_EVENT_OP_RELEASE))
    1327                 :          0 :                         dsw_port_buffer_event(dsw, source_port, event);
    1328                 :          0 :                 dsw_port_queue_enqueue_stats(source_port, event->queue_id);
    1329                 :            :         }
    1330                 :            : 
    1331                 :            :         DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
    1332                 :            :                         "accepted.\n", num_non_release);
    1333                 :            : 
    1334                 :          0 :         return (num_non_release + num_release);
    1335                 :            : }
    1336                 :            : 
    1337                 :            : uint16_t
    1338                 :          0 : dsw_event_enqueue_burst(void *port, const struct rte_event events[],
    1339                 :            :                         uint16_t events_len)
    1340                 :            : {
    1341                 :            :         struct dsw_port *source_port = port;
    1342                 :            : 
    1343         [ #  # ]:          0 :         if (unlikely(events_len > source_port->enqueue_depth))
    1344                 :            :                 events_len = source_port->enqueue_depth;
    1345                 :            : 
    1346                 :          0 :         return dsw_event_enqueue_burst_generic(source_port, events,
    1347                 :            :                                                events_len, false, 0, 0, 0);
    1348                 :            : }
    1349                 :            : 
    1350                 :            : uint16_t
    1351                 :          0 : dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
    1352                 :            :                             uint16_t events_len)
    1353                 :            : {
    1354                 :            :         struct dsw_port *source_port = port;
    1355                 :            : 
    1356         [ #  # ]:          0 :         if (unlikely(events_len > source_port->enqueue_depth))
    1357                 :            :                 events_len = source_port->enqueue_depth;
    1358                 :            : 
    1359                 :          0 :         return dsw_event_enqueue_burst_generic(source_port, events,
    1360                 :            :                                                events_len, true, events_len,
    1361                 :            :                                                0, events_len);
    1362                 :            : }
    1363                 :            : 
    1364                 :            : uint16_t
    1365                 :          0 : dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
    1366                 :            :                                 uint16_t events_len)
    1367                 :            : {
    1368                 :            :         struct dsw_port *source_port = port;
    1369                 :            : 
    1370         [ #  # ]:          0 :         if (unlikely(events_len > source_port->enqueue_depth))
    1371                 :            :                 events_len = source_port->enqueue_depth;
    1372                 :            : 
    1373                 :          0 :         return dsw_event_enqueue_burst_generic(source_port, events,
    1374                 :            :                                                events_len, true, 0, 0,
    1375                 :            :                                                events_len);
    1376                 :            : }
    1377                 :            : 
    1378                 :            : uint16_t
    1379                 :          0 : dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
    1380                 :            : {
    1381                 :          0 :         return dsw_event_dequeue_burst(port, events, 1, wait);
    1382                 :            : }
    1383                 :            : 
    1384                 :            : static void
    1385                 :          0 : dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
    1386                 :            :                             uint16_t num)
    1387                 :            : {
    1388                 :            :         uint16_t i;
    1389                 :            : 
    1390                 :          0 :         dsw_port_dequeue_stats(port, num);
    1391                 :            : 
    1392         [ #  # ]:          0 :         for (i = 0; i < num; i++) {
    1393                 :          0 :                 uint16_t l_idx = port->seen_events_idx;
    1394                 :          0 :                 struct dsw_queue_flow *qf = &port->seen_events[l_idx];
    1395                 :          0 :                 struct rte_event *event = &events[i];
    1396                 :          0 :                 qf->queue_id = event->queue_id;
    1397                 :          0 :                 qf->flow_hash = dsw_flow_id_hash(event->flow_id);
    1398                 :            : 
    1399                 :          0 :                 port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
    1400                 :            : 
    1401                 :          0 :                 dsw_port_queue_dequeued_stats(port, event->queue_id);
    1402                 :            :         }
    1403                 :            : 
    1404         [ #  # ]:          0 :         if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
    1405                 :          0 :                 port->seen_events_len =
    1406                 :          0 :                         RTE_MIN(port->seen_events_len + num,
    1407                 :            :                                 DSW_MAX_EVENTS_RECORDED);
    1408                 :          0 : }
    1409                 :            : 
    1410                 :            : #ifdef DSW_SORT_DEQUEUED
    1411                 :            : 
    1412                 :            : #define DSW_EVENT_TO_INT(_event)                                \
    1413                 :            :         ((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
    1414                 :            : 
    1415                 :            : static inline int
    1416                 :            : dsw_cmp_event(const void *v_event_a, const void *v_event_b)
    1417                 :            : {
    1418                 :            :         const struct rte_event *event_a = v_event_a;
    1419                 :            :         const struct rte_event *event_b = v_event_b;
    1420                 :            : 
    1421                 :            :         return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
    1422                 :            : }
    1423                 :            : #endif
    1424                 :            : 
    1425                 :            : static uint16_t
    1426                 :          0 : dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
    1427                 :            :                        uint16_t num)
    1428                 :            : {
    1429         [ #  # ]:          0 :         if (unlikely(port->in_buffer_len > 0)) {
    1430                 :          0 :                 uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
    1431                 :            : 
    1432         [ #  # ]:          0 :                 rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
    1433                 :            :                            dequeued * sizeof(struct rte_event));
    1434                 :            : 
    1435                 :          0 :                 port->in_buffer_start += dequeued;
    1436                 :          0 :                 port->in_buffer_len -= dequeued;
    1437                 :            : 
    1438         [ #  # ]:          0 :                 if (port->in_buffer_len == 0)
    1439                 :          0 :                         port->in_buffer_start = 0;
    1440                 :            : 
    1441                 :          0 :                 return dequeued;
    1442                 :            :         }
    1443                 :            : 
    1444   [ #  #  #  #  :          0 :         return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
                      # ]
    1445                 :            : }
    1446                 :            : 
    1447                 :            : static void
    1448                 :          0 : dsw_port_stash_migrating_events(struct dsw_port *port,
    1449                 :            :                                 struct rte_event *events, uint16_t *num)
    1450                 :            : {
    1451                 :            :         uint16_t i;
    1452                 :            : 
    1453                 :            :         /* The assumption here - performance-wise - is that events
    1454                 :            :          * belonging to migrating flows are relatively rare.
    1455                 :            :          */
    1456         [ #  # ]:          0 :         for (i = 0; i < (*num); ) {
    1457                 :          0 :                 struct rte_event *event = &events[i];
    1458                 :            :                 uint16_t flow_hash;
    1459                 :            : 
    1460                 :          0 :                 flow_hash = dsw_flow_id_hash(event->flow_id);
    1461                 :            : 
    1462         [ #  # ]:          0 :                 if (unlikely(dsw_port_is_flow_migrating(port, event->queue_id,
    1463                 :            :                                                         flow_hash))) {
    1464                 :            :                         uint16_t left;
    1465                 :            : 
    1466                 :            :                         dsw_port_stash_migrating_event(port, event);
    1467                 :            : 
    1468                 :          0 :                         (*num)--;
    1469                 :          0 :                         left = *num - i;
    1470                 :            : 
    1471         [ #  # ]:          0 :                         if (left > 0)
    1472                 :          0 :                                 memmove(event, event + 1,
    1473                 :            :                                         left * sizeof(struct rte_event));
    1474                 :            :                 } else
    1475                 :          0 :                         i++;
    1476                 :            :         }
    1477                 :          0 : }
    1478                 :            : 
    1479                 :            : uint16_t
    1480                 :          0 : dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
    1481                 :            :                         uint64_t wait __rte_unused)
    1482                 :            : {
    1483                 :            :         struct dsw_port *source_port = port;
    1484                 :          0 :         struct dsw_evdev *dsw = source_port->dsw;
    1485                 :            :         uint16_t dequeued;
    1486                 :            : 
    1487                 :          0 :         source_port->pending_releases = 0;
    1488                 :            : 
    1489                 :          0 :         dsw_port_bg_process(dsw, source_port);
    1490                 :            : 
    1491         [ #  # ]:          0 :         if (unlikely(num > source_port->dequeue_depth))
    1492                 :            :                 num = source_port->dequeue_depth;
    1493                 :            : 
    1494                 :          0 :         dequeued = dsw_port_dequeue_burst(source_port, events, num);
    1495                 :            : 
    1496         [ #  # ]:          0 :         if (unlikely(source_port->migration_state ==
    1497                 :            :                      DSW_MIGRATION_STATE_PAUSING))
    1498                 :          0 :                 dsw_port_stash_migrating_events(source_port, events,
    1499                 :            :                                                 &dequeued);
    1500                 :            : 
    1501                 :          0 :         source_port->pending_releases = dequeued;
    1502                 :            : 
    1503         [ #  # ]:          0 :         dsw_port_load_record(source_port, dequeued);
    1504                 :            : 
    1505                 :          0 :         dsw_port_note_op(source_port, dequeued);
    1506                 :            : 
    1507         [ #  # ]:          0 :         if (dequeued > 0) {
    1508                 :            :                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
    1509                 :            :                                 dequeued);
    1510                 :            : 
    1511                 :            :                 dsw_port_return_credits(dsw, source_port, dequeued);
    1512                 :            : 
    1513                 :            :                 /* One potential optimization one might think of is to
    1514                 :            :                  * add a migration state (prior to 'pausing'), and
    1515                 :            :                  * only record seen events when the port is in this
    1516                 :            :                  * state (and transit to 'pausing' when enough events
    1517                 :            :                  * have been gathered). However, that schema doesn't
    1518                 :            :                  * seem to improve performance.
    1519                 :            :                  */
    1520                 :          0 :                 dsw_port_record_seen_events(port, events, dequeued);
    1521                 :            :         } else /* Zero-size dequeue means a likely idle port, and thus
    1522                 :            :                 * we can afford trading some efficiency for a slightly
    1523                 :            :                 * reduced event wall-time latency.
    1524                 :            :                 */
    1525                 :            :                 dsw_port_flush_out_buffers(dsw, port);
    1526                 :            : 
    1527                 :            : #ifdef DSW_SORT_DEQUEUED
    1528                 :            :         dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);
    1529                 :            : #endif
    1530                 :            : 
    1531                 :          0 :         return dequeued;
    1532                 :            : }
    1533                 :            : 
    1534                 :          0 : void dsw_event_maintain(void *port, int op)
    1535                 :            : {
    1536                 :            :         struct dsw_port *source_port = port;
    1537                 :          0 :         struct dsw_evdev *dsw = source_port->dsw;
    1538                 :            : 
    1539                 :            :         dsw_port_note_op(source_port, 0);
    1540                 :          0 :         dsw_port_bg_process(dsw, source_port);
    1541                 :            : 
    1542         [ #  # ]:          0 :         if (op & RTE_EVENT_DEV_MAINT_OP_FLUSH)
    1543                 :            :                 dsw_port_flush_out_buffers(dsw, source_port);
    1544                 :          0 : }

Generated by: LCOV version 1.14