Merge tag 'cris-for-4.9' of git://git.kernel.org/pub/scm/linux/kernel/git/jesper...
[cascardo/linux.git] / include / linux / ptr_ring.h
1 /*
2  *      Definitions for the 'struct ptr_ring' datastructure.
3  *
4  *      Author:
5  *              Michael S. Tsirkin <mst@redhat.com>
6  *
7  *      Copyright (C) 2016 Red Hat, Inc.
8  *
9  *      This program is free software; you can redistribute it and/or modify it
10  *      under the terms of the GNU General Public License as published by the
11  *      Free Software Foundation; either version 2 of the License, or (at your
12  *      option) any later version.
13  *
14  *      This is a limited-size FIFO maintaining pointers in FIFO order, with
15  *      one CPU producing entries and another consuming entries from a FIFO.
16  *
17  *      This implementation tries to minimize cache-contention when there is a
18  *      single producer and a single consumer CPU.
19  */
20
21 #ifndef _LINUX_PTR_RING_H
22 #define _LINUX_PTR_RING_H 1
23
24 #ifdef __KERNEL__
25 #include <linux/spinlock.h>
26 #include <linux/cache.h>
27 #include <linux/types.h>
28 #include <linux/compiler.h>
29 #include <linux/cache.h>
30 #include <linux/slab.h>
31 #include <asm/errno.h>
32 #endif
33
34 struct ptr_ring {
35         int producer ____cacheline_aligned_in_smp;
36         spinlock_t producer_lock;
37         int consumer ____cacheline_aligned_in_smp;
38         spinlock_t consumer_lock;
39         /* Shared consumer/producer data */
40         /* Read-only by both the producer and the consumer */
41         int size ____cacheline_aligned_in_smp; /* max entries in queue */
42         void **queue;
43 };
44
45 /* Note: callers invoking this in a loop must use a compiler barrier,
46  * for example cpu_relax().  If ring is ever resized, callers must hold
47  * producer_lock - see e.g. ptr_ring_full.  Otherwise, if callers don't hold
48  * producer_lock, the next call to __ptr_ring_produce may fail.
49  */
50 static inline bool __ptr_ring_full(struct ptr_ring *r)
51 {
52         return r->queue[r->producer];
53 }
54
55 static inline bool ptr_ring_full(struct ptr_ring *r)
56 {
57         bool ret;
58
59         spin_lock(&r->producer_lock);
60         ret = __ptr_ring_full(r);
61         spin_unlock(&r->producer_lock);
62
63         return ret;
64 }
65
66 static inline bool ptr_ring_full_irq(struct ptr_ring *r)
67 {
68         bool ret;
69
70         spin_lock_irq(&r->producer_lock);
71         ret = __ptr_ring_full(r);
72         spin_unlock_irq(&r->producer_lock);
73
74         return ret;
75 }
76
77 static inline bool ptr_ring_full_any(struct ptr_ring *r)
78 {
79         unsigned long flags;
80         bool ret;
81
82         spin_lock_irqsave(&r->producer_lock, flags);
83         ret = __ptr_ring_full(r);
84         spin_unlock_irqrestore(&r->producer_lock, flags);
85
86         return ret;
87 }
88
89 static inline bool ptr_ring_full_bh(struct ptr_ring *r)
90 {
91         bool ret;
92
93         spin_lock_bh(&r->producer_lock);
94         ret = __ptr_ring_full(r);
95         spin_unlock_bh(&r->producer_lock);
96
97         return ret;
98 }
99
100 /* Note: callers invoking this in a loop must use a compiler barrier,
101  * for example cpu_relax(). Callers must hold producer_lock.
102  */
103 static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
104 {
105         if (unlikely(!r->size) || r->queue[r->producer])
106                 return -ENOSPC;
107
108         r->queue[r->producer++] = ptr;
109         if (unlikely(r->producer >= r->size))
110                 r->producer = 0;
111         return 0;
112 }
113
114 static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
115 {
116         int ret;
117
118         spin_lock(&r->producer_lock);
119         ret = __ptr_ring_produce(r, ptr);
120         spin_unlock(&r->producer_lock);
121
122         return ret;
123 }
124
125 static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
126 {
127         int ret;
128
129         spin_lock_irq(&r->producer_lock);
130         ret = __ptr_ring_produce(r, ptr);
131         spin_unlock_irq(&r->producer_lock);
132
133         return ret;
134 }
135
136 static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
137 {
138         unsigned long flags;
139         int ret;
140
141         spin_lock_irqsave(&r->producer_lock, flags);
142         ret = __ptr_ring_produce(r, ptr);
143         spin_unlock_irqrestore(&r->producer_lock, flags);
144
145         return ret;
146 }
147
148 static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
149 {
150         int ret;
151
152         spin_lock_bh(&r->producer_lock);
153         ret = __ptr_ring_produce(r, ptr);
154         spin_unlock_bh(&r->producer_lock);
155
156         return ret;
157 }
158
159 /* Note: callers invoking this in a loop must use a compiler barrier,
160  * for example cpu_relax(). Callers must take consumer_lock
161  * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL.
162  * If ring is never resized, and if the pointer is merely
163  * tested, there's no need to take the lock - see e.g.  __ptr_ring_empty.
164  */
165 static inline void *__ptr_ring_peek(struct ptr_ring *r)
166 {
167         if (likely(r->size))
168                 return r->queue[r->consumer];
169         return NULL;
170 }
171
172 /* Note: callers invoking this in a loop must use a compiler barrier,
173  * for example cpu_relax(). Callers must take consumer_lock
174  * if the ring is ever resized - see e.g. ptr_ring_empty.
175  */
176 static inline bool __ptr_ring_empty(struct ptr_ring *r)
177 {
178         return !__ptr_ring_peek(r);
179 }
180
181 static inline bool ptr_ring_empty(struct ptr_ring *r)
182 {
183         bool ret;
184
185         spin_lock(&r->consumer_lock);
186         ret = __ptr_ring_empty(r);
187         spin_unlock(&r->consumer_lock);
188
189         return ret;
190 }
191
192 static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
193 {
194         bool ret;
195
196         spin_lock_irq(&r->consumer_lock);
197         ret = __ptr_ring_empty(r);
198         spin_unlock_irq(&r->consumer_lock);
199
200         return ret;
201 }
202
203 static inline bool ptr_ring_empty_any(struct ptr_ring *r)
204 {
205         unsigned long flags;
206         bool ret;
207
208         spin_lock_irqsave(&r->consumer_lock, flags);
209         ret = __ptr_ring_empty(r);
210         spin_unlock_irqrestore(&r->consumer_lock, flags);
211
212         return ret;
213 }
214
215 static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
216 {
217         bool ret;
218
219         spin_lock_bh(&r->consumer_lock);
220         ret = __ptr_ring_empty(r);
221         spin_unlock_bh(&r->consumer_lock);
222
223         return ret;
224 }
225
226 /* Must only be called after __ptr_ring_peek returned !NULL */
227 static inline void __ptr_ring_discard_one(struct ptr_ring *r)
228 {
229         r->queue[r->consumer++] = NULL;
230         if (unlikely(r->consumer >= r->size))
231                 r->consumer = 0;
232 }
233
234 static inline void *__ptr_ring_consume(struct ptr_ring *r)
235 {
236         void *ptr;
237
238         ptr = __ptr_ring_peek(r);
239         if (ptr)
240                 __ptr_ring_discard_one(r);
241
242         return ptr;
243 }
244
245 static inline void *ptr_ring_consume(struct ptr_ring *r)
246 {
247         void *ptr;
248
249         spin_lock(&r->consumer_lock);
250         ptr = __ptr_ring_consume(r);
251         spin_unlock(&r->consumer_lock);
252
253         return ptr;
254 }
255
256 static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
257 {
258         void *ptr;
259
260         spin_lock_irq(&r->consumer_lock);
261         ptr = __ptr_ring_consume(r);
262         spin_unlock_irq(&r->consumer_lock);
263
264         return ptr;
265 }
266
267 static inline void *ptr_ring_consume_any(struct ptr_ring *r)
268 {
269         unsigned long flags;
270         void *ptr;
271
272         spin_lock_irqsave(&r->consumer_lock, flags);
273         ptr = __ptr_ring_consume(r);
274         spin_unlock_irqrestore(&r->consumer_lock, flags);
275
276         return ptr;
277 }
278
279 static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
280 {
281         void *ptr;
282
283         spin_lock_bh(&r->consumer_lock);
284         ptr = __ptr_ring_consume(r);
285         spin_unlock_bh(&r->consumer_lock);
286
287         return ptr;
288 }
289
290 /* Cast to structure type and call a function without discarding from FIFO.
291  * Function must return a value.
292  * Callers must take consumer_lock.
293  */
294 #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
295
296 #define PTR_RING_PEEK_CALL(r, f) ({ \
297         typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
298         \
299         spin_lock(&(r)->consumer_lock); \
300         __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
301         spin_unlock(&(r)->consumer_lock); \
302         __PTR_RING_PEEK_CALL_v; \
303 })
304
305 #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
306         typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
307         \
308         spin_lock_irq(&(r)->consumer_lock); \
309         __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
310         spin_unlock_irq(&(r)->consumer_lock); \
311         __PTR_RING_PEEK_CALL_v; \
312 })
313
314 #define PTR_RING_PEEK_CALL_BH(r, f) ({ \
315         typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
316         \
317         spin_lock_bh(&(r)->consumer_lock); \
318         __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
319         spin_unlock_bh(&(r)->consumer_lock); \
320         __PTR_RING_PEEK_CALL_v; \
321 })
322
323 #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
324         typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
325         unsigned long __PTR_RING_PEEK_CALL_f;\
326         \
327         spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
328         __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
329         spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
330         __PTR_RING_PEEK_CALL_v; \
331 })
332
333 static inline void **__ptr_ring_init_queue_alloc(int size, gfp_t gfp)
334 {
335         return kzalloc(ALIGN(size * sizeof(void *), SMP_CACHE_BYTES), gfp);
336 }
337
338 static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
339 {
340         r->queue = __ptr_ring_init_queue_alloc(size, gfp);
341         if (!r->queue)
342                 return -ENOMEM;
343
344         r->size = size;
345         r->producer = r->consumer = 0;
346         spin_lock_init(&r->producer_lock);
347         spin_lock_init(&r->consumer_lock);
348
349         return 0;
350 }
351
352 static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
353                                            int size, gfp_t gfp,
354                                            void (*destroy)(void *))
355 {
356         int producer = 0;
357         void **old;
358         void *ptr;
359
360         while ((ptr = ptr_ring_consume(r)))
361                 if (producer < size)
362                         queue[producer++] = ptr;
363                 else if (destroy)
364                         destroy(ptr);
365
366         r->size = size;
367         r->producer = producer;
368         r->consumer = 0;
369         old = r->queue;
370         r->queue = queue;
371
372         return old;
373 }
374
375 static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
376                                   void (*destroy)(void *))
377 {
378         unsigned long flags;
379         void **queue = __ptr_ring_init_queue_alloc(size, gfp);
380         void **old;
381
382         if (!queue)
383                 return -ENOMEM;
384
385         spin_lock_irqsave(&(r)->producer_lock, flags);
386
387         old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
388
389         spin_unlock_irqrestore(&(r)->producer_lock, flags);
390
391         kfree(old);
392
393         return 0;
394 }
395
396 static inline int ptr_ring_resize_multiple(struct ptr_ring **rings, int nrings,
397                                            int size,
398                                            gfp_t gfp, void (*destroy)(void *))
399 {
400         unsigned long flags;
401         void ***queues;
402         int i;
403
404         queues = kmalloc(nrings * sizeof *queues, gfp);
405         if (!queues)
406                 goto noqueues;
407
408         for (i = 0; i < nrings; ++i) {
409                 queues[i] = __ptr_ring_init_queue_alloc(size, gfp);
410                 if (!queues[i])
411                         goto nomem;
412         }
413
414         for (i = 0; i < nrings; ++i) {
415                 spin_lock_irqsave(&(rings[i])->producer_lock, flags);
416                 queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
417                                                   size, gfp, destroy);
418                 spin_unlock_irqrestore(&(rings[i])->producer_lock, flags);
419         }
420
421         for (i = 0; i < nrings; ++i)
422                 kfree(queues[i]);
423
424         kfree(queues);
425
426         return 0;
427
428 nomem:
429         while (--i >= 0)
430                 kfree(queues[i]);
431
432         kfree(queues);
433
434 noqueues:
435         return -ENOMEM;
436 }
437
438 static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
439 {
440         void *ptr;
441
442         if (destroy)
443                 while ((ptr = ptr_ring_consume(r)))
444                         destroy(ptr);
445         kfree(r->queue);
446 }
447
448 #endif /* _LINUX_PTR_RING_H  */