1/* $NetBSD: pktqueue.c,v 1.8 2014/07/04 01:50:22 ozaki-r Exp $ */
2
3/*-
4 * Copyright (c) 2014 The NetBSD Foundation, Inc.
5 * All rights reserved.
6 *
7 * This code is derived from software contributed to The NetBSD Foundation
8 * by Mindaugas Rasiukevicius.
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 * 1. Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in the
17 * documentation and/or other materials provided with the distribution.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32/*
33 * The packet queue (pktqueue) interface is a lockless IP input queue
34 * which also abstracts and handles network ISR scheduling. It provides
35 * a mechanism to enable receiver-side packet steering (RPS).
36 */
37
38#include <sys/cdefs.h>
39__KERNEL_RCSID(0, "$NetBSD: pktqueue.c,v 1.8 2014/07/04 01:50:22 ozaki-r Exp $");
40
41#include <sys/param.h>
42#include <sys/types.h>
43
44#include <sys/atomic.h>
45#include <sys/cpu.h>
46#include <sys/pcq.h>
47#include <sys/intr.h>
48#include <sys/mbuf.h>
49#include <sys/proc.h>
50#include <sys/percpu.h>
51
52#include <net/pktqueue.h>
53
54/*
55 * WARNING: update this if struct pktqueue changes.
56 */
57#define PKTQ_CLPAD \
58 MAX(COHERENCY_UNIT, COHERENCY_UNIT - sizeof(kmutex_t) - sizeof(u_int))
59
60struct pktqueue {
61 /*
62 * The lock used for a barrier mechanism. The barrier counter,
63 * as well as the drop counter, are managed atomically though.
64 * Ensure this group is in a separate cache line.
65 */
66 kmutex_t pq_lock;
67 volatile u_int pq_barrier;
68 uint8_t _pad[PKTQ_CLPAD];
69
70 /* The size of the queue, counters and the interrupt handler. */
71 u_int pq_maxlen;
72 percpu_t * pq_counters;
73 void * pq_sih;
74
75 /* Finally, per-CPU queues. */
76 pcq_t * pq_queue[];
77};
78
79/* The counters of the packet queue. */
80#define PQCNT_ENQUEUE 0
81#define PQCNT_DEQUEUE 1
82#define PQCNT_DROP 2
83#define PQCNT_NCOUNTERS 3
84
85typedef struct {
86 uint64_t count[PQCNT_NCOUNTERS];
87} pktq_counters_t;
88
89/* Special marker value used by pktq_barrier() mechanism. */
90#define PKTQ_MARKER ((void *)(~0ULL))
91
92/*
93 * The total size of pktqueue_t which depends on the number of CPUs.
94 */
95#define PKTQUEUE_STRUCT_LEN(ncpu) \
96 roundup2(offsetof(pktqueue_t, pq_queue[ncpu]), coherency_unit)
97
98pktqueue_t *
99pktq_create(size_t maxlen, void (*intrh)(void *), void *sc)
100{
101 const u_int sflags = SOFTINT_NET | SOFTINT_MPSAFE | SOFTINT_RCPU;
102 const size_t len = PKTQUEUE_STRUCT_LEN(ncpu);
103 pktqueue_t *pq;
104 percpu_t *pc;
105 void *sih;
106
107 if ((pc = percpu_alloc(sizeof(pktq_counters_t))) == NULL) {
108 return NULL;
109 }
110 if ((sih = softint_establish(sflags, intrh, sc)) == NULL) {
111 percpu_free(pc, sizeof(pktq_counters_t));
112 return NULL;
113 }
114
115 pq = kmem_zalloc(len, KM_SLEEP);
116 for (u_int i = 0; i < ncpu; i++) {
117 pq->pq_queue[i] = pcq_create(maxlen, KM_SLEEP);
118 }
119 mutex_init(&pq->pq_lock, MUTEX_DEFAULT, IPL_NONE);
120 pq->pq_maxlen = maxlen;
121 pq->pq_counters = pc;
122 pq->pq_sih = sih;
123
124 return pq;
125}
126
127void
128pktq_destroy(pktqueue_t *pq)
129{
130 const size_t len = PKTQUEUE_STRUCT_LEN(ncpu);
131
132 for (u_int i = 0; i < ncpu; i++) {
133 pcq_t *q = pq->pq_queue[i];
134 KASSERT(pcq_peek(q) == NULL);
135 pcq_destroy(q);
136 }
137 percpu_free(pq->pq_counters, sizeof(pktq_counters_t));
138 softint_disestablish(pq->pq_sih);
139 mutex_destroy(&pq->pq_lock);
140 kmem_free(pq, len);
141}
142
143/*
144 * - pktq_inc_counter: increment the counter given an ID.
145 * - pktq_collect_counts: handler to sum up the counts from each CPU.
146 * - pktq_getcount: return the effective count given an ID.
147 */
148
149static inline void
150pktq_inc_count(pktqueue_t *pq, u_int i)
151{
152 percpu_t *pc = pq->pq_counters;
153 pktq_counters_t *c;
154
155 c = percpu_getref(pc);
156 c->count[i]++;
157 percpu_putref(pc);
158}
159
160static void
161pktq_collect_counts(void *mem, void *arg, struct cpu_info *ci)
162{
163 const pktq_counters_t *c = mem;
164 pktq_counters_t *sum = arg;
165
166 for (u_int i = 0; i < PQCNT_NCOUNTERS; i++) {
167 sum->count[i] += c->count[i];
168 }
169}
170
171uint64_t
172pktq_get_count(pktqueue_t *pq, pktq_count_t c)
173{
174 pktq_counters_t sum;
175
176 if (c != PKTQ_MAXLEN) {
177 memset(&sum, 0, sizeof(sum));
178 percpu_foreach(pq->pq_counters, pktq_collect_counts, &sum);
179 }
180 switch (c) {
181 case PKTQ_NITEMS:
182 return sum.count[PQCNT_ENQUEUE] - sum.count[PQCNT_DEQUEUE];
183 case PKTQ_DROPS:
184 return sum.count[PQCNT_DROP];
185 case PKTQ_MAXLEN:
186 return pq->pq_maxlen;
187 }
188 return 0;
189}
190
191uint32_t
192pktq_rps_hash(const struct mbuf *m __unused)
193{
194 /*
195 * XXX: No distribution yet; the softnet_lock contention
196 * XXX: must be eliminated first.
197 */
198 return 0;
199}
200
201/*
202 * pktq_enqueue: inject the packet into the end of the queue.
203 *
204 * => Must be called from the interrupt or with the preemption disabled.
205 * => Consumes the packet and returns true on success.
206 * => Returns false on failure; caller is responsible to free the packet.
207 */
208bool
209pktq_enqueue(pktqueue_t *pq, struct mbuf *m, const u_int hash __unused)
210{
211#if defined(_RUMPKERNEL) || defined(_RUMP_NATIVE_ABI)
212 const unsigned cpuid = curcpu()->ci_index;
213#else
214 const unsigned cpuid = hash % ncpu;
215#endif
216
217 KASSERT(kpreempt_disabled());
218
219 if (__predict_false(!pcq_put(pq->pq_queue[cpuid], m))) {
220 pktq_inc_count(pq, PQCNT_DROP);
221 return false;
222 }
223 softint_schedule_cpu(pq->pq_sih, cpu_lookup(cpuid));
224 pktq_inc_count(pq, PQCNT_ENQUEUE);
225 return true;
226}
227
228/*
229 * pktq_dequeue: take a packet from the queue.
230 *
231 * => Must be called with preemption disabled.
232 * => Must ensure there are not concurrent dequeue calls.
233 */
234struct mbuf *
235pktq_dequeue(pktqueue_t *pq)
236{
237 const struct cpu_info *ci = curcpu();
238 const unsigned cpuid = cpu_index(ci);
239 struct mbuf *m;
240
241 m = pcq_get(pq->pq_queue[cpuid]);
242 if (__predict_false(m == PKTQ_MARKER)) {
243 /* Note the marker entry. */
244 atomic_inc_uint(&pq->pq_barrier);
245 return NULL;
246 }
247 if (__predict_true(m != NULL)) {
248 pktq_inc_count(pq, PQCNT_DEQUEUE);
249 }
250 return m;
251}
252
253/*
254 * pktq_barrier: waits for a grace period when all packets enqueued at
255 * the moment of calling this routine will be processed. This is used
256 * to ensure that e.g. packets referencing some interface were drained.
257 */
258void
259pktq_barrier(pktqueue_t *pq)
260{
261 u_int pending = 0;
262
263 mutex_enter(&pq->pq_lock);
264 KASSERT(pq->pq_barrier == 0);
265
266 for (u_int i = 0; i < ncpu; i++) {
267 pcq_t *q = pq->pq_queue[i];
268
269 /* If the queue is empty - nothing to do. */
270 if (pcq_peek(q) == NULL) {
271 continue;
272 }
273 /* Otherwise, put the marker and entry. */
274 while (!pcq_put(q, PKTQ_MARKER)) {
275 kpause("pktqsync", false, 1, NULL);
276 }
277 kpreempt_disable();
278 softint_schedule_cpu(pq->pq_sih, cpu_lookup(i));
279 kpreempt_enable();
280 pending++;
281 }
282
283 /* Wait for each queue to process the markers. */
284 while (pq->pq_barrier != pending) {
285 kpause("pktqsync", false, 1, NULL);
286 }
287 pq->pq_barrier = 0;
288 mutex_exit(&pq->pq_lock);
289}
290
291/*
292 * pktq_flush: free mbufs in all queues.
293 *
294 * => The caller must ensure there are no concurrent writers or flush calls.
295 */
296void
297pktq_flush(pktqueue_t *pq)
298{
299 struct mbuf *m;
300
301 for (u_int i = 0; i < ncpu; i++) {
302 while ((m = pcq_get(pq->pq_queue[i])) != NULL) {
303 pktq_inc_count(pq, PQCNT_DEQUEUE);
304 m_freem(m);
305 }
306 }
307}
308
309/*
310 * pktq_set_maxlen: create per-CPU queues using a new size and replace
311 * the existing queues without losing any packets.
312 */
313int
314pktq_set_maxlen(pktqueue_t *pq, size_t maxlen)
315{
316 const u_int slotbytes = ncpu * sizeof(pcq_t *);
317 pcq_t **qs;
318
319 if (!maxlen || maxlen > PCQ_MAXLEN)
320 return EINVAL;
321 if (pq->pq_maxlen == maxlen)
322 return 0;
323
324 /* First, allocate the new queues and replace them. */
325 qs = kmem_zalloc(slotbytes, KM_SLEEP);
326 for (u_int i = 0; i < ncpu; i++) {
327 qs[i] = pcq_create(maxlen, KM_SLEEP);
328 }
329 mutex_enter(&pq->pq_lock);
330 for (u_int i = 0; i < ncpu; i++) {
331 /* Swap: store of a word is atomic. */
332 pcq_t *q = pq->pq_queue[i];
333 pq->pq_queue[i] = qs[i];
334 qs[i] = q;
335 }
336 pq->pq_maxlen = maxlen;
337 mutex_exit(&pq->pq_lock);
338
339 /*
340 * At this point, the new packets are flowing into the new
341 * queues. However, the old queues may have some packets
342 * present which are no longer being processed. We are going
343 * to re-enqueue them. This may change the order of packet
344 * arrival, but it is not considered an issue.
345 *
346 * There may be in-flight interrupts calling pktq_dequeue()
347 * which reference the old queues. Issue a barrier to ensure
348 * that we are going to be the only pcq_get() callers on the
349 * old queues.
350 */
351 pktq_barrier(pq);
352
353 for (u_int i = 0; i < ncpu; i++) {
354 struct mbuf *m;
355
356 while ((m = pcq_get(qs[i])) != NULL) {
357 while (!pcq_put(pq->pq_queue[i], m)) {
358 kpause("pktqrenq", false, 1, NULL);
359 }
360 }
361 pcq_destroy(qs[i]);
362 }
363
364 /* Well, that was fun. */
365 kmem_free(qs, slotbytes);
366 return 0;
367}
368
369int
370sysctl_pktq_maxlen(SYSCTLFN_ARGS, pktqueue_t *pq)
371{
372 u_int nmaxlen = pktq_get_count(pq, PKTQ_MAXLEN);
373 struct sysctlnode node = *rnode;
374 int error;
375
376 node.sysctl_data = &nmaxlen;
377 error = sysctl_lookup(SYSCTLFN_CALL(&node));
378 if (error || newp == NULL)
379 return error;
380 return pktq_set_maxlen(pq, nmaxlen);
381}
382
383int
384sysctl_pktq_count(SYSCTLFN_ARGS, pktqueue_t *pq, u_int count_id)
385{
386 int count = pktq_get_count(pq, count_id);
387 struct sysctlnode node = *rnode;
388 node.sysctl_data = &count;
389 return sysctl_lookup(SYSCTLFN_CALL(&node));
390}
391