Merge branch 'x86/tsc' into tracing/core
[cascardo/linux.git] / kernel / trace / ring_buffer.c
1 /*
2  * Generic ring buffer
3  *
4  * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
5  */
6 #include <linux/ring_buffer.h>
7 #include <linux/spinlock.h>
8 #include <linux/debugfs.h>
9 #include <linux/uaccess.h>
10 #include <linux/module.h>
11 #include <linux/percpu.h>
12 #include <linux/mutex.h>
13 #include <linux/sched.h>        /* used for sched_clock() (for now) */
14 #include <linux/init.h>
15 #include <linux/hash.h>
16 #include <linux/list.h>
17 #include <linux/fs.h>
18
19 #include "trace.h"
20
21 /*
22  * A fast way to enable or disable all ring buffers is to
23  * call tracing_on or tracing_off. Turning off the ring buffers
24  * prevents all ring buffers from being recorded to.
25  * Turning this switch on, makes it OK to write to the
26  * ring buffer, if the ring buffer is enabled itself.
27  *
28  * There's three layers that must be on in order to write
29  * to the ring buffer.
30  *
31  * 1) This global flag must be set.
32  * 2) The ring buffer must be enabled for recording.
33  * 3) The per cpu buffer must be enabled for recording.
34  *
35  * In case of an anomaly, this global flag has a bit set that
36  * will permantly disable all ring buffers.
37  */
38
39 /*
40  * Global flag to disable all recording to ring buffers
41  *  This has two bits: ON, DISABLED
42  *
43  *  ON   DISABLED
44  * ---- ----------
45  *   0      0        : ring buffers are off
46  *   1      0        : ring buffers are on
47  *   X      1        : ring buffers are permanently disabled
48  */
49
50 enum {
51         RB_BUFFERS_ON_BIT       = 0,
52         RB_BUFFERS_DISABLED_BIT = 1,
53 };
54
55 enum {
56         RB_BUFFERS_ON           = 1 << RB_BUFFERS_ON_BIT,
57         RB_BUFFERS_DISABLED     = 1 << RB_BUFFERS_DISABLED_BIT,
58 };
59
60 static long ring_buffer_flags __read_mostly = RB_BUFFERS_ON;
61
62 /**
63  * tracing_on - enable all tracing buffers
64  *
65  * This function enables all tracing buffers that may have been
66  * disabled with tracing_off.
67  */
68 void tracing_on(void)
69 {
70         set_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags);
71 }
72
73 /**
74  * tracing_off - turn off all tracing buffers
75  *
76  * This function stops all tracing buffers from recording data.
77  * It does not disable any overhead the tracers themselves may
78  * be causing. This function simply causes all recording to
79  * the ring buffers to fail.
80  */
81 void tracing_off(void)
82 {
83         clear_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags);
84 }
85
86 /**
87  * tracing_off_permanent - permanently disable ring buffers
88  *
89  * This function, once called, will disable all ring buffers
90  * permanenty.
91  */
92 void tracing_off_permanent(void)
93 {
94         set_bit(RB_BUFFERS_DISABLED_BIT, &ring_buffer_flags);
95 }
96
97 #include "trace.h"
98
99 /* Up this if you want to test the TIME_EXTENTS and normalization */
100 #define DEBUG_SHIFT 0
101
102 /* FIXME!!! */
103 u64 ring_buffer_time_stamp(int cpu)
104 {
105         u64 time;
106
107         preempt_disable_notrace();
108         /* shift to debug/test normalization and TIME_EXTENTS */
109         time = sched_clock() << DEBUG_SHIFT;
110         preempt_enable_no_resched_notrace();
111
112         return time;
113 }
114
115 void ring_buffer_normalize_time_stamp(int cpu, u64 *ts)
116 {
117         /* Just stupid testing the normalize function and deltas */
118         *ts >>= DEBUG_SHIFT;
119 }
120
121 #define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event))
122 #define RB_ALIGNMENT_SHIFT      2
123 #define RB_ALIGNMENT            (1 << RB_ALIGNMENT_SHIFT)
124 #define RB_MAX_SMALL_DATA       28
125
126 enum {
127         RB_LEN_TIME_EXTEND = 8,
128         RB_LEN_TIME_STAMP = 16,
129 };
130
131 /* inline for ring buffer fast paths */
132 static inline unsigned
133 rb_event_length(struct ring_buffer_event *event)
134 {
135         unsigned length;
136
137         switch (event->type) {
138         case RINGBUF_TYPE_PADDING:
139                 /* undefined */
140                 return -1;
141
142         case RINGBUF_TYPE_TIME_EXTEND:
143                 return RB_LEN_TIME_EXTEND;
144
145         case RINGBUF_TYPE_TIME_STAMP:
146                 return RB_LEN_TIME_STAMP;
147
148         case RINGBUF_TYPE_DATA:
149                 if (event->len)
150                         length = event->len << RB_ALIGNMENT_SHIFT;
151                 else
152                         length = event->array[0];
153                 return length + RB_EVNT_HDR_SIZE;
154         default:
155                 BUG();
156         }
157         /* not hit */
158         return 0;
159 }
160
161 /**
162  * ring_buffer_event_length - return the length of the event
163  * @event: the event to get the length of
164  */
165 unsigned ring_buffer_event_length(struct ring_buffer_event *event)
166 {
167         return rb_event_length(event);
168 }
169
170 /* inline for ring buffer fast paths */
171 static inline void *
172 rb_event_data(struct ring_buffer_event *event)
173 {
174         BUG_ON(event->type != RINGBUF_TYPE_DATA);
175         /* If length is in len field, then array[0] has the data */
176         if (event->len)
177                 return (void *)&event->array[0];
178         /* Otherwise length is in array[0] and array[1] has the data */
179         return (void *)&event->array[1];
180 }
181
182 /**
183  * ring_buffer_event_data - return the data of the event
184  * @event: the event to get the data from
185  */
186 void *ring_buffer_event_data(struct ring_buffer_event *event)
187 {
188         return rb_event_data(event);
189 }
190
191 #define for_each_buffer_cpu(buffer, cpu)                \
192         for_each_cpu_mask(cpu, buffer->cpumask)
193
194 #define TS_SHIFT        27
195 #define TS_MASK         ((1ULL << TS_SHIFT) - 1)
196 #define TS_DELTA_TEST   (~TS_MASK)
197
198 struct buffer_data_page {
199         u64              time_stamp;    /* page time stamp */
200         local_t          commit;        /* write commited index */
201         unsigned char    data[];        /* data of buffer page */
202 };
203
204 struct buffer_page {
205         local_t          write;         /* index for next write */
206         unsigned         read;          /* index for next read */
207         struct list_head list;          /* list of free pages */
208         struct buffer_data_page *page;  /* Actual data page */
209 };
210
211 static void rb_init_page(struct buffer_data_page *bpage)
212 {
213         local_set(&bpage->commit, 0);
214 }
215
216 /*
217  * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
218  * this issue out.
219  */
220 static inline void free_buffer_page(struct buffer_page *bpage)
221 {
222         if (bpage->page)
223                 free_page((unsigned long)bpage->page);
224         kfree(bpage);
225 }
226
227 /*
228  * We need to fit the time_stamp delta into 27 bits.
229  */
230 static inline int test_time_stamp(u64 delta)
231 {
232         if (delta & TS_DELTA_TEST)
233                 return 1;
234         return 0;
235 }
236
237 #define BUF_PAGE_SIZE (PAGE_SIZE - sizeof(struct buffer_data_page))
238
239 /*
240  * head_page == tail_page && head == tail then buffer is empty.
241  */
242 struct ring_buffer_per_cpu {
243         int                             cpu;
244         struct ring_buffer              *buffer;
245         spinlock_t                      reader_lock; /* serialize readers */
246         raw_spinlock_t                  lock;
247         struct lock_class_key           lock_key;
248         struct list_head                pages;
249         struct buffer_page              *head_page;     /* read from head */
250         struct buffer_page              *tail_page;     /* write to tail */
251         struct buffer_page              *commit_page;   /* commited pages */
252         struct buffer_page              *reader_page;
253         unsigned long                   overrun;
254         unsigned long                   entries;
255         u64                             write_stamp;
256         u64                             read_stamp;
257         atomic_t                        record_disabled;
258 };
259
260 struct ring_buffer {
261         unsigned                        pages;
262         unsigned                        flags;
263         int                             cpus;
264         cpumask_t                       cpumask;
265         atomic_t                        record_disabled;
266
267         struct mutex                    mutex;
268
269         struct ring_buffer_per_cpu      **buffers;
270 };
271
272 struct ring_buffer_iter {
273         struct ring_buffer_per_cpu      *cpu_buffer;
274         unsigned long                   head;
275         struct buffer_page              *head_page;
276         u64                             read_stamp;
277 };
278
279 /* buffer may be either ring_buffer or ring_buffer_per_cpu */
280 #define RB_WARN_ON(buffer, cond)                                \
281         ({                                                      \
282                 int _____ret = unlikely(cond);                  \
283                 if (_____ret) {                                 \
284                         atomic_inc(&buffer->record_disabled);   \
285                         WARN_ON(1);                             \
286                 }                                               \
287                 _____ret;                                       \
288         })
289
290 /**
291  * check_pages - integrity check of buffer pages
292  * @cpu_buffer: CPU buffer with pages to test
293  *
294  * As a safty measure we check to make sure the data pages have not
295  * been corrupted.
296  */
297 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
298 {
299         struct list_head *head = &cpu_buffer->pages;
300         struct buffer_page *bpage, *tmp;
301
302         if (RB_WARN_ON(cpu_buffer, head->next->prev != head))
303                 return -1;
304         if (RB_WARN_ON(cpu_buffer, head->prev->next != head))
305                 return -1;
306
307         list_for_each_entry_safe(bpage, tmp, head, list) {
308                 if (RB_WARN_ON(cpu_buffer,
309                                bpage->list.next->prev != &bpage->list))
310                         return -1;
311                 if (RB_WARN_ON(cpu_buffer,
312                                bpage->list.prev->next != &bpage->list))
313                         return -1;
314         }
315
316         return 0;
317 }
318
319 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
320                              unsigned nr_pages)
321 {
322         struct list_head *head = &cpu_buffer->pages;
323         struct buffer_page *bpage, *tmp;
324         unsigned long addr;
325         LIST_HEAD(pages);
326         unsigned i;
327
328         for (i = 0; i < nr_pages; i++) {
329                 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
330                                     GFP_KERNEL, cpu_to_node(cpu_buffer->cpu));
331                 if (!bpage)
332                         goto free_pages;
333                 list_add(&bpage->list, &pages);
334
335                 addr = __get_free_page(GFP_KERNEL);
336                 if (!addr)
337                         goto free_pages;
338                 bpage->page = (void *)addr;
339                 rb_init_page(bpage->page);
340         }
341
342         list_splice(&pages, head);
343
344         rb_check_pages(cpu_buffer);
345
346         return 0;
347
348  free_pages:
349         list_for_each_entry_safe(bpage, tmp, &pages, list) {
350                 list_del_init(&bpage->list);
351                 free_buffer_page(bpage);
352         }
353         return -ENOMEM;
354 }
355
356 static struct ring_buffer_per_cpu *
357 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
358 {
359         struct ring_buffer_per_cpu *cpu_buffer;
360         struct buffer_page *bpage;
361         unsigned long addr;
362         int ret;
363
364         cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
365                                   GFP_KERNEL, cpu_to_node(cpu));
366         if (!cpu_buffer)
367                 return NULL;
368
369         cpu_buffer->cpu = cpu;
370         cpu_buffer->buffer = buffer;
371         spin_lock_init(&cpu_buffer->reader_lock);
372         cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED;
373         INIT_LIST_HEAD(&cpu_buffer->pages);
374
375         bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
376                             GFP_KERNEL, cpu_to_node(cpu));
377         if (!bpage)
378                 goto fail_free_buffer;
379
380         cpu_buffer->reader_page = bpage;
381         addr = __get_free_page(GFP_KERNEL);
382         if (!addr)
383                 goto fail_free_reader;
384         bpage->page = (void *)addr;
385         rb_init_page(bpage->page);
386
387         INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
388
389         ret = rb_allocate_pages(cpu_buffer, buffer->pages);
390         if (ret < 0)
391                 goto fail_free_reader;
392
393         cpu_buffer->head_page
394                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
395         cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
396
397         return cpu_buffer;
398
399  fail_free_reader:
400         free_buffer_page(cpu_buffer->reader_page);
401
402  fail_free_buffer:
403         kfree(cpu_buffer);
404         return NULL;
405 }
406
407 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
408 {
409         struct list_head *head = &cpu_buffer->pages;
410         struct buffer_page *bpage, *tmp;
411
412         list_del_init(&cpu_buffer->reader_page->list);
413         free_buffer_page(cpu_buffer->reader_page);
414
415         list_for_each_entry_safe(bpage, tmp, head, list) {
416                 list_del_init(&bpage->list);
417                 free_buffer_page(bpage);
418         }
419         kfree(cpu_buffer);
420 }
421
422 /*
423  * Causes compile errors if the struct buffer_page gets bigger
424  * than the struct page.
425  */
426 extern int ring_buffer_page_too_big(void);
427
428 /**
429  * ring_buffer_alloc - allocate a new ring_buffer
430  * @size: the size in bytes that is needed.
431  * @flags: attributes to set for the ring buffer.
432  *
433  * Currently the only flag that is available is the RB_FL_OVERWRITE
434  * flag. This flag means that the buffer will overwrite old data
435  * when the buffer wraps. If this flag is not set, the buffer will
436  * drop data when the tail hits the head.
437  */
438 struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)
439 {
440         struct ring_buffer *buffer;
441         int bsize;
442         int cpu;
443
444         /* Paranoid! Optimizes out when all is well */
445         if (sizeof(struct buffer_page) > sizeof(struct page))
446                 ring_buffer_page_too_big();
447
448
449         /* keep it in its own cache line */
450         buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
451                          GFP_KERNEL);
452         if (!buffer)
453                 return NULL;
454
455         buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
456         buffer->flags = flags;
457
458         /* need at least two pages */
459         if (buffer->pages == 1)
460                 buffer->pages++;
461
462         buffer->cpumask = cpu_possible_map;
463         buffer->cpus = nr_cpu_ids;
464
465         bsize = sizeof(void *) * nr_cpu_ids;
466         buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
467                                   GFP_KERNEL);
468         if (!buffer->buffers)
469                 goto fail_free_buffer;
470
471         for_each_buffer_cpu(buffer, cpu) {
472                 buffer->buffers[cpu] =
473                         rb_allocate_cpu_buffer(buffer, cpu);
474                 if (!buffer->buffers[cpu])
475                         goto fail_free_buffers;
476         }
477
478         mutex_init(&buffer->mutex);
479
480         return buffer;
481
482  fail_free_buffers:
483         for_each_buffer_cpu(buffer, cpu) {
484                 if (buffer->buffers[cpu])
485                         rb_free_cpu_buffer(buffer->buffers[cpu]);
486         }
487         kfree(buffer->buffers);
488
489  fail_free_buffer:
490         kfree(buffer);
491         return NULL;
492 }
493
494 /**
495  * ring_buffer_free - free a ring buffer.
496  * @buffer: the buffer to free.
497  */
498 void
499 ring_buffer_free(struct ring_buffer *buffer)
500 {
501         int cpu;
502
503         for_each_buffer_cpu(buffer, cpu)
504                 rb_free_cpu_buffer(buffer->buffers[cpu]);
505
506         kfree(buffer);
507 }
508
509 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
510
511 static void
512 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
513 {
514         struct buffer_page *bpage;
515         struct list_head *p;
516         unsigned i;
517
518         atomic_inc(&cpu_buffer->record_disabled);
519         synchronize_sched();
520
521         for (i = 0; i < nr_pages; i++) {
522                 if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages)))
523                         return;
524                 p = cpu_buffer->pages.next;
525                 bpage = list_entry(p, struct buffer_page, list);
526                 list_del_init(&bpage->list);
527                 free_buffer_page(bpage);
528         }
529         if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages)))
530                 return;
531
532         rb_reset_cpu(cpu_buffer);
533
534         rb_check_pages(cpu_buffer);
535
536         atomic_dec(&cpu_buffer->record_disabled);
537
538 }
539
540 static void
541 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
542                 struct list_head *pages, unsigned nr_pages)
543 {
544         struct buffer_page *bpage;
545         struct list_head *p;
546         unsigned i;
547
548         atomic_inc(&cpu_buffer->record_disabled);
549         synchronize_sched();
550
551         for (i = 0; i < nr_pages; i++) {
552                 if (RB_WARN_ON(cpu_buffer, list_empty(pages)))
553                         return;
554                 p = pages->next;
555                 bpage = list_entry(p, struct buffer_page, list);
556                 list_del_init(&bpage->list);
557                 list_add_tail(&bpage->list, &cpu_buffer->pages);
558         }
559         rb_reset_cpu(cpu_buffer);
560
561         rb_check_pages(cpu_buffer);
562
563         atomic_dec(&cpu_buffer->record_disabled);
564 }
565
566 /**
567  * ring_buffer_resize - resize the ring buffer
568  * @buffer: the buffer to resize.
569  * @size: the new size.
570  *
571  * The tracer is responsible for making sure that the buffer is
572  * not being used while changing the size.
573  * Note: We may be able to change the above requirement by using
574  *  RCU synchronizations.
575  *
576  * Minimum size is 2 * BUF_PAGE_SIZE.
577  *
578  * Returns -1 on failure.
579  */
580 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
581 {
582         struct ring_buffer_per_cpu *cpu_buffer;
583         unsigned nr_pages, rm_pages, new_pages;
584         struct buffer_page *bpage, *tmp;
585         unsigned long buffer_size;
586         unsigned long addr;
587         LIST_HEAD(pages);
588         int i, cpu;
589
590         /*
591          * Always succeed at resizing a non-existent buffer:
592          */
593         if (!buffer)
594                 return size;
595
596         size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
597         size *= BUF_PAGE_SIZE;
598         buffer_size = buffer->pages * BUF_PAGE_SIZE;
599
600         /* we need a minimum of two pages */
601         if (size < BUF_PAGE_SIZE * 2)
602                 size = BUF_PAGE_SIZE * 2;
603
604         if (size == buffer_size)
605                 return size;
606
607         mutex_lock(&buffer->mutex);
608
609         nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
610
611         if (size < buffer_size) {
612
613                 /* easy case, just free pages */
614                 if (RB_WARN_ON(buffer, nr_pages >= buffer->pages)) {
615                         mutex_unlock(&buffer->mutex);
616                         return -1;
617                 }
618
619                 rm_pages = buffer->pages - nr_pages;
620
621                 for_each_buffer_cpu(buffer, cpu) {
622                         cpu_buffer = buffer->buffers[cpu];
623                         rb_remove_pages(cpu_buffer, rm_pages);
624                 }
625                 goto out;
626         }
627
628         /*
629          * This is a bit more difficult. We only want to add pages
630          * when we can allocate enough for all CPUs. We do this
631          * by allocating all the pages and storing them on a local
632          * link list. If we succeed in our allocation, then we
633          * add these pages to the cpu_buffers. Otherwise we just free
634          * them all and return -ENOMEM;
635          */
636         if (RB_WARN_ON(buffer, nr_pages <= buffer->pages)) {
637                 mutex_unlock(&buffer->mutex);
638                 return -1;
639         }
640
641         new_pages = nr_pages - buffer->pages;
642
643         for_each_buffer_cpu(buffer, cpu) {
644                 for (i = 0; i < new_pages; i++) {
645                         bpage = kzalloc_node(ALIGN(sizeof(*bpage),
646                                                   cache_line_size()),
647                                             GFP_KERNEL, cpu_to_node(cpu));
648                         if (!bpage)
649                                 goto free_pages;
650                         list_add(&bpage->list, &pages);
651                         addr = __get_free_page(GFP_KERNEL);
652                         if (!addr)
653                                 goto free_pages;
654                         bpage->page = (void *)addr;
655                         rb_init_page(bpage->page);
656                 }
657         }
658
659         for_each_buffer_cpu(buffer, cpu) {
660                 cpu_buffer = buffer->buffers[cpu];
661                 rb_insert_pages(cpu_buffer, &pages, new_pages);
662         }
663
664         if (RB_WARN_ON(buffer, !list_empty(&pages))) {
665                 mutex_unlock(&buffer->mutex);
666                 return -1;
667         }
668
669  out:
670         buffer->pages = nr_pages;
671         mutex_unlock(&buffer->mutex);
672
673         return size;
674
675  free_pages:
676         list_for_each_entry_safe(bpage, tmp, &pages, list) {
677                 list_del_init(&bpage->list);
678                 free_buffer_page(bpage);
679         }
680         mutex_unlock(&buffer->mutex);
681         return -ENOMEM;
682 }
683
684 static inline int rb_null_event(struct ring_buffer_event *event)
685 {
686         return event->type == RINGBUF_TYPE_PADDING;
687 }
688
689 static inline void *
690 __rb_data_page_index(struct buffer_data_page *bpage, unsigned index)
691 {
692         return bpage->data + index;
693 }
694
695 static inline void *__rb_page_index(struct buffer_page *bpage, unsigned index)
696 {
697         return bpage->page->data + index;
698 }
699
700 static inline struct ring_buffer_event *
701 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
702 {
703         return __rb_page_index(cpu_buffer->reader_page,
704                                cpu_buffer->reader_page->read);
705 }
706
707 static inline struct ring_buffer_event *
708 rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
709 {
710         return __rb_page_index(cpu_buffer->head_page,
711                                cpu_buffer->head_page->read);
712 }
713
714 static inline struct ring_buffer_event *
715 rb_iter_head_event(struct ring_buffer_iter *iter)
716 {
717         return __rb_page_index(iter->head_page, iter->head);
718 }
719
720 static inline unsigned rb_page_write(struct buffer_page *bpage)
721 {
722         return local_read(&bpage->write);
723 }
724
725 static inline unsigned rb_page_commit(struct buffer_page *bpage)
726 {
727         return local_read(&bpage->page->commit);
728 }
729
730 /* Size is determined by what has been commited */
731 static inline unsigned rb_page_size(struct buffer_page *bpage)
732 {
733         return rb_page_commit(bpage);
734 }
735
736 static inline unsigned
737 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
738 {
739         return rb_page_commit(cpu_buffer->commit_page);
740 }
741
742 static inline unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
743 {
744         return rb_page_commit(cpu_buffer->head_page);
745 }
746
747 /*
748  * When the tail hits the head and the buffer is in overwrite mode,
749  * the head jumps to the next page and all content on the previous
750  * page is discarded. But before doing so, we update the overrun
751  * variable of the buffer.
752  */
753 static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer)
754 {
755         struct ring_buffer_event *event;
756         unsigned long head;
757
758         for (head = 0; head < rb_head_size(cpu_buffer);
759              head += rb_event_length(event)) {
760
761                 event = __rb_page_index(cpu_buffer->head_page, head);
762                 if (RB_WARN_ON(cpu_buffer, rb_null_event(event)))
763                         return;
764                 /* Only count data entries */
765                 if (event->type != RINGBUF_TYPE_DATA)
766                         continue;
767                 cpu_buffer->overrun++;
768                 cpu_buffer->entries--;
769         }
770 }
771
772 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
773                                struct buffer_page **bpage)
774 {
775         struct list_head *p = (*bpage)->list.next;
776
777         if (p == &cpu_buffer->pages)
778                 p = p->next;
779
780         *bpage = list_entry(p, struct buffer_page, list);
781 }
782
783 static inline unsigned
784 rb_event_index(struct ring_buffer_event *event)
785 {
786         unsigned long addr = (unsigned long)event;
787
788         return (addr & ~PAGE_MASK) - (PAGE_SIZE - BUF_PAGE_SIZE);
789 }
790
791 static inline int
792 rb_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
793              struct ring_buffer_event *event)
794 {
795         unsigned long addr = (unsigned long)event;
796         unsigned long index;
797
798         index = rb_event_index(event);
799         addr &= PAGE_MASK;
800
801         return cpu_buffer->commit_page->page == (void *)addr &&
802                 rb_commit_index(cpu_buffer) == index;
803 }
804
805 static inline void
806 rb_set_commit_event(struct ring_buffer_per_cpu *cpu_buffer,
807                     struct ring_buffer_event *event)
808 {
809         unsigned long addr = (unsigned long)event;
810         unsigned long index;
811
812         index = rb_event_index(event);
813         addr &= PAGE_MASK;
814
815         while (cpu_buffer->commit_page->page != (void *)addr) {
816                 if (RB_WARN_ON(cpu_buffer,
817                           cpu_buffer->commit_page == cpu_buffer->tail_page))
818                         return;
819                 cpu_buffer->commit_page->page->commit =
820                         cpu_buffer->commit_page->write;
821                 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
822                 cpu_buffer->write_stamp =
823                         cpu_buffer->commit_page->page->time_stamp;
824         }
825
826         /* Now set the commit to the event's index */
827         local_set(&cpu_buffer->commit_page->page->commit, index);
828 }
829
830 static inline void
831 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
832 {
833         /*
834          * We only race with interrupts and NMIs on this CPU.
835          * If we own the commit event, then we can commit
836          * all others that interrupted us, since the interruptions
837          * are in stack format (they finish before they come
838          * back to us). This allows us to do a simple loop to
839          * assign the commit to the tail.
840          */
841         while (cpu_buffer->commit_page != cpu_buffer->tail_page) {
842                 cpu_buffer->commit_page->page->commit =
843                         cpu_buffer->commit_page->write;
844                 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
845                 cpu_buffer->write_stamp =
846                         cpu_buffer->commit_page->page->time_stamp;
847                 /* add barrier to keep gcc from optimizing too much */
848                 barrier();
849         }
850         while (rb_commit_index(cpu_buffer) !=
851                rb_page_write(cpu_buffer->commit_page)) {
852                 cpu_buffer->commit_page->page->commit =
853                         cpu_buffer->commit_page->write;
854                 barrier();
855         }
856 }
857
858 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
859 {
860         cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp;
861         cpu_buffer->reader_page->read = 0;
862 }
863
864 static inline void rb_inc_iter(struct ring_buffer_iter *iter)
865 {
866         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
867
868         /*
869          * The iterator could be on the reader page (it starts there).
870          * But the head could have moved, since the reader was
871          * found. Check for this case and assign the iterator
872          * to the head page instead of next.
873          */
874         if (iter->head_page == cpu_buffer->reader_page)
875                 iter->head_page = cpu_buffer->head_page;
876         else
877                 rb_inc_page(cpu_buffer, &iter->head_page);
878
879         iter->read_stamp = iter->head_page->page->time_stamp;
880         iter->head = 0;
881 }
882
883 /**
884  * ring_buffer_update_event - update event type and data
885  * @event: the even to update
886  * @type: the type of event
887  * @length: the size of the event field in the ring buffer
888  *
889  * Update the type and data fields of the event. The length
890  * is the actual size that is written to the ring buffer,
891  * and with this, we can determine what to place into the
892  * data field.
893  */
894 static inline void
895 rb_update_event(struct ring_buffer_event *event,
896                          unsigned type, unsigned length)
897 {
898         event->type = type;
899
900         switch (type) {
901
902         case RINGBUF_TYPE_PADDING:
903                 break;
904
905         case RINGBUF_TYPE_TIME_EXTEND:
906                 event->len =
907                         (RB_LEN_TIME_EXTEND + (RB_ALIGNMENT-1))
908                         >> RB_ALIGNMENT_SHIFT;
909                 break;
910
911         case RINGBUF_TYPE_TIME_STAMP:
912                 event->len =
913                         (RB_LEN_TIME_STAMP + (RB_ALIGNMENT-1))
914                         >> RB_ALIGNMENT_SHIFT;
915                 break;
916
917         case RINGBUF_TYPE_DATA:
918                 length -= RB_EVNT_HDR_SIZE;
919                 if (length > RB_MAX_SMALL_DATA) {
920                         event->len = 0;
921                         event->array[0] = length;
922                 } else
923                         event->len =
924                                 (length + (RB_ALIGNMENT-1))
925                                 >> RB_ALIGNMENT_SHIFT;
926                 break;
927         default:
928                 BUG();
929         }
930 }
931
932 static inline unsigned rb_calculate_event_length(unsigned length)
933 {
934         struct ring_buffer_event event; /* Used only for sizeof array */
935
936         /* zero length can cause confusions */
937         if (!length)
938                 length = 1;
939
940         if (length > RB_MAX_SMALL_DATA)
941                 length += sizeof(event.array[0]);
942
943         length += RB_EVNT_HDR_SIZE;
944         length = ALIGN(length, RB_ALIGNMENT);
945
946         return length;
947 }
948
949 static struct ring_buffer_event *
950 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
951                   unsigned type, unsigned long length, u64 *ts)
952 {
953         struct buffer_page *tail_page, *head_page, *reader_page;
954         unsigned long tail, write;
955         struct ring_buffer *buffer = cpu_buffer->buffer;
956         struct ring_buffer_event *event;
957         unsigned long flags;
958
959         tail_page = cpu_buffer->tail_page;
960         write = local_add_return(length, &tail_page->write);
961         tail = write - length;
962
963         /* See if we shot pass the end of this buffer page */
964         if (write > BUF_PAGE_SIZE) {
965                 struct buffer_page *next_page = tail_page;
966
967                 local_irq_save(flags);
968                 __raw_spin_lock(&cpu_buffer->lock);
969
970                 rb_inc_page(cpu_buffer, &next_page);
971
972                 head_page = cpu_buffer->head_page;
973                 reader_page = cpu_buffer->reader_page;
974
975                 /* we grabbed the lock before incrementing */
976                 if (RB_WARN_ON(cpu_buffer, next_page == reader_page))
977                         goto out_unlock;
978
979                 /*
980                  * If for some reason, we had an interrupt storm that made
981                  * it all the way around the buffer, bail, and warn
982                  * about it.
983                  */
984                 if (unlikely(next_page == cpu_buffer->commit_page)) {
985                         WARN_ON_ONCE(1);
986                         goto out_unlock;
987                 }
988
989                 if (next_page == head_page) {
990                         if (!(buffer->flags & RB_FL_OVERWRITE)) {
991                                 /* reset write */
992                                 if (tail <= BUF_PAGE_SIZE)
993                                         local_set(&tail_page->write, tail);
994                                 goto out_unlock;
995                         }
996
997                         /* tail_page has not moved yet? */
998                         if (tail_page == cpu_buffer->tail_page) {
999                                 /* count overflows */
1000                                 rb_update_overflow(cpu_buffer);
1001
1002                                 rb_inc_page(cpu_buffer, &head_page);
1003                                 cpu_buffer->head_page = head_page;
1004                                 cpu_buffer->head_page->read = 0;
1005                         }
1006                 }
1007
1008                 /*
1009                  * If the tail page is still the same as what we think
1010                  * it is, then it is up to us to update the tail
1011                  * pointer.
1012                  */
1013                 if (tail_page == cpu_buffer->tail_page) {
1014                         local_set(&next_page->write, 0);
1015                         local_set(&next_page->page->commit, 0);
1016                         cpu_buffer->tail_page = next_page;
1017
1018                         /* reread the time stamp */
1019                         *ts = ring_buffer_time_stamp(cpu_buffer->cpu);
1020                         cpu_buffer->tail_page->page->time_stamp = *ts;
1021                 }
1022
1023                 /*
1024                  * The actual tail page has moved forward.
1025                  */
1026                 if (tail < BUF_PAGE_SIZE) {
1027                         /* Mark the rest of the page with padding */
1028                         event = __rb_page_index(tail_page, tail);
1029                         event->type = RINGBUF_TYPE_PADDING;
1030                 }
1031
1032                 if (tail <= BUF_PAGE_SIZE)
1033                         /* Set the write back to the previous setting */
1034                         local_set(&tail_page->write, tail);
1035
1036                 /*
1037                  * If this was a commit entry that failed,
1038                  * increment that too
1039                  */
1040                 if (tail_page == cpu_buffer->commit_page &&
1041                     tail == rb_commit_index(cpu_buffer)) {
1042                         rb_set_commit_to_write(cpu_buffer);
1043                 }
1044
1045                 __raw_spin_unlock(&cpu_buffer->lock);
1046                 local_irq_restore(flags);
1047
1048                 /* fail and let the caller try again */
1049                 return ERR_PTR(-EAGAIN);
1050         }
1051
1052         /* We reserved something on the buffer */
1053
1054         if (RB_WARN_ON(cpu_buffer, write > BUF_PAGE_SIZE))
1055                 return NULL;
1056
1057         event = __rb_page_index(tail_page, tail);
1058         rb_update_event(event, type, length);
1059
1060         /*
1061          * If this is a commit and the tail is zero, then update
1062          * this page's time stamp.
1063          */
1064         if (!tail && rb_is_commit(cpu_buffer, event))
1065                 cpu_buffer->commit_page->page->time_stamp = *ts;
1066
1067         return event;
1068
1069  out_unlock:
1070         __raw_spin_unlock(&cpu_buffer->lock);
1071         local_irq_restore(flags);
1072         return NULL;
1073 }
1074
1075 static int
1076 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1077                   u64 *ts, u64 *delta)
1078 {
1079         struct ring_buffer_event *event;
1080         static int once;
1081         int ret;
1082
1083         if (unlikely(*delta > (1ULL << 59) && !once++)) {
1084                 printk(KERN_WARNING "Delta way too big! %llu"
1085                        " ts=%llu write stamp = %llu\n",
1086                        (unsigned long long)*delta,
1087                        (unsigned long long)*ts,
1088                        (unsigned long long)cpu_buffer->write_stamp);
1089                 WARN_ON(1);
1090         }
1091
1092         /*
1093          * The delta is too big, we to add a
1094          * new timestamp.
1095          */
1096         event = __rb_reserve_next(cpu_buffer,
1097                                   RINGBUF_TYPE_TIME_EXTEND,
1098                                   RB_LEN_TIME_EXTEND,
1099                                   ts);
1100         if (!event)
1101                 return -EBUSY;
1102
1103         if (PTR_ERR(event) == -EAGAIN)
1104                 return -EAGAIN;
1105
1106         /* Only a commited time event can update the write stamp */
1107         if (rb_is_commit(cpu_buffer, event)) {
1108                 /*
1109                  * If this is the first on the page, then we need to
1110                  * update the page itself, and just put in a zero.
1111                  */
1112                 if (rb_event_index(event)) {
1113                         event->time_delta = *delta & TS_MASK;
1114                         event->array[0] = *delta >> TS_SHIFT;
1115                 } else {
1116                         cpu_buffer->commit_page->page->time_stamp = *ts;
1117                         event->time_delta = 0;
1118                         event->array[0] = 0;
1119                 }
1120                 cpu_buffer->write_stamp = *ts;
1121                 /* let the caller know this was the commit */
1122                 ret = 1;
1123         } else {
1124                 /* Darn, this is just wasted space */
1125                 event->time_delta = 0;
1126                 event->array[0] = 0;
1127                 ret = 0;
1128         }
1129
1130         *delta = 0;
1131
1132         return ret;
1133 }
1134
1135 static struct ring_buffer_event *
1136 rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
1137                       unsigned type, unsigned long length)
1138 {
1139         struct ring_buffer_event *event;
1140         u64 ts, delta;
1141         int commit = 0;
1142         int nr_loops = 0;
1143
1144  again:
1145         /*
1146          * We allow for interrupts to reenter here and do a trace.
1147          * If one does, it will cause this original code to loop
1148          * back here. Even with heavy interrupts happening, this
1149          * should only happen a few times in a row. If this happens
1150          * 1000 times in a row, there must be either an interrupt
1151          * storm or we have something buggy.
1152          * Bail!
1153          */
1154         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
1155                 return NULL;
1156
1157         ts = ring_buffer_time_stamp(cpu_buffer->cpu);
1158
1159         /*
1160          * Only the first commit can update the timestamp.
1161          * Yes there is a race here. If an interrupt comes in
1162          * just after the conditional and it traces too, then it
1163          * will also check the deltas. More than one timestamp may
1164          * also be made. But only the entry that did the actual
1165          * commit will be something other than zero.
1166          */
1167         if (cpu_buffer->tail_page == cpu_buffer->commit_page &&
1168             rb_page_write(cpu_buffer->tail_page) ==
1169             rb_commit_index(cpu_buffer)) {
1170
1171                 delta = ts - cpu_buffer->write_stamp;
1172
1173                 /* make sure this delta is calculated here */
1174                 barrier();
1175
1176                 /* Did the write stamp get updated already? */
1177                 if (unlikely(ts < cpu_buffer->write_stamp))
1178                         delta = 0;
1179
1180                 if (test_time_stamp(delta)) {
1181
1182                         commit = rb_add_time_stamp(cpu_buffer, &ts, &delta);
1183
1184                         if (commit == -EBUSY)
1185                                 return NULL;
1186
1187                         if (commit == -EAGAIN)
1188                                 goto again;
1189
1190                         RB_WARN_ON(cpu_buffer, commit < 0);
1191                 }
1192         } else
1193                 /* Non commits have zero deltas */
1194                 delta = 0;
1195
1196         event = __rb_reserve_next(cpu_buffer, type, length, &ts);
1197         if (PTR_ERR(event) == -EAGAIN)
1198                 goto again;
1199
1200         if (!event) {
1201                 if (unlikely(commit))
1202                         /*
1203                          * Ouch! We needed a timestamp and it was commited. But
1204                          * we didn't get our event reserved.
1205                          */
1206                         rb_set_commit_to_write(cpu_buffer);
1207                 return NULL;
1208         }
1209
1210         /*
1211          * If the timestamp was commited, make the commit our entry
1212          * now so that we will update it when needed.
1213          */
1214         if (commit)
1215                 rb_set_commit_event(cpu_buffer, event);
1216         else if (!rb_is_commit(cpu_buffer, event))
1217                 delta = 0;
1218
1219         event->time_delta = delta;
1220
1221         return event;
1222 }
1223
1224 static DEFINE_PER_CPU(int, rb_need_resched);
1225
1226 /**
1227  * ring_buffer_lock_reserve - reserve a part of the buffer
1228  * @buffer: the ring buffer to reserve from
1229  * @length: the length of the data to reserve (excluding event header)
1230  * @flags: a pointer to save the interrupt flags
1231  *
1232  * Returns a reseverd event on the ring buffer to copy directly to.
1233  * The user of this interface will need to get the body to write into
1234  * and can use the ring_buffer_event_data() interface.
1235  *
1236  * The length is the length of the data needed, not the event length
1237  * which also includes the event header.
1238  *
1239  * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
1240  * If NULL is returned, then nothing has been allocated or locked.
1241  */
1242 struct ring_buffer_event *
1243 ring_buffer_lock_reserve(struct ring_buffer *buffer,
1244                          unsigned long length,
1245                          unsigned long *flags)
1246 {
1247         struct ring_buffer_per_cpu *cpu_buffer;
1248         struct ring_buffer_event *event;
1249         int cpu, resched;
1250
1251         if (ring_buffer_flags != RB_BUFFERS_ON)
1252                 return NULL;
1253
1254         if (atomic_read(&buffer->record_disabled))
1255                 return NULL;
1256
1257         /* If we are tracing schedule, we don't want to recurse */
1258         resched = ftrace_preempt_disable();
1259
1260         cpu = raw_smp_processor_id();
1261
1262         if (!cpu_isset(cpu, buffer->cpumask))
1263                 goto out;
1264
1265         cpu_buffer = buffer->buffers[cpu];
1266
1267         if (atomic_read(&cpu_buffer->record_disabled))
1268                 goto out;
1269
1270         length = rb_calculate_event_length(length);
1271         if (length > BUF_PAGE_SIZE)
1272                 goto out;
1273
1274         event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length);
1275         if (!event)
1276                 goto out;
1277
1278         /*
1279          * Need to store resched state on this cpu.
1280          * Only the first needs to.
1281          */
1282
1283         if (preempt_count() == 1)
1284                 per_cpu(rb_need_resched, cpu) = resched;
1285
1286         return event;
1287
1288  out:
1289         ftrace_preempt_enable(resched);
1290         return NULL;
1291 }
1292
1293 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
1294                       struct ring_buffer_event *event)
1295 {
1296         cpu_buffer->entries++;
1297
1298         /* Only process further if we own the commit */
1299         if (!rb_is_commit(cpu_buffer, event))
1300                 return;
1301
1302         cpu_buffer->write_stamp += event->time_delta;
1303
1304         rb_set_commit_to_write(cpu_buffer);
1305 }
1306
1307 /**
1308  * ring_buffer_unlock_commit - commit a reserved
1309  * @buffer: The buffer to commit to
1310  * @event: The event pointer to commit.
1311  * @flags: the interrupt flags received from ring_buffer_lock_reserve.
1312  *
1313  * This commits the data to the ring buffer, and releases any locks held.
1314  *
1315  * Must be paired with ring_buffer_lock_reserve.
1316  */
1317 int ring_buffer_unlock_commit(struct ring_buffer *buffer,
1318                               struct ring_buffer_event *event,
1319                               unsigned long flags)
1320 {
1321         struct ring_buffer_per_cpu *cpu_buffer;
1322         int cpu = raw_smp_processor_id();
1323
1324         cpu_buffer = buffer->buffers[cpu];
1325
1326         rb_commit(cpu_buffer, event);
1327
1328         /*
1329          * Only the last preempt count needs to restore preemption.
1330          */
1331         if (preempt_count() == 1)
1332                 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu));
1333         else
1334                 preempt_enable_no_resched_notrace();
1335
1336         return 0;
1337 }
1338
1339 /**
1340  * ring_buffer_write - write data to the buffer without reserving
1341  * @buffer: The ring buffer to write to.
1342  * @length: The length of the data being written (excluding the event header)
1343  * @data: The data to write to the buffer.
1344  *
1345  * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
1346  * one function. If you already have the data to write to the buffer, it
1347  * may be easier to simply call this function.
1348  *
1349  * Note, like ring_buffer_lock_reserve, the length is the length of the data
1350  * and not the length of the event which would hold the header.
1351  */
1352 int ring_buffer_write(struct ring_buffer *buffer,
1353                         unsigned long length,
1354                         void *data)
1355 {
1356         struct ring_buffer_per_cpu *cpu_buffer;
1357         struct ring_buffer_event *event;
1358         unsigned long event_length;
1359         void *body;
1360         int ret = -EBUSY;
1361         int cpu, resched;
1362
1363         if (ring_buffer_flags != RB_BUFFERS_ON)
1364                 return -EBUSY;
1365
1366         if (atomic_read(&buffer->record_disabled))
1367                 return -EBUSY;
1368
1369         resched = ftrace_preempt_disable();
1370
1371         cpu = raw_smp_processor_id();
1372
1373         if (!cpu_isset(cpu, buffer->cpumask))
1374                 goto out;
1375
1376         cpu_buffer = buffer->buffers[cpu];
1377
1378         if (atomic_read(&cpu_buffer->record_disabled))
1379                 goto out;
1380
1381         event_length = rb_calculate_event_length(length);
1382         event = rb_reserve_next_event(cpu_buffer,
1383                                       RINGBUF_TYPE_DATA, event_length);
1384         if (!event)
1385                 goto out;
1386
1387         body = rb_event_data(event);
1388
1389         memcpy(body, data, length);
1390
1391         rb_commit(cpu_buffer, event);
1392
1393         ret = 0;
1394  out:
1395         ftrace_preempt_enable(resched);
1396
1397         return ret;
1398 }
1399
1400 static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
1401 {
1402         struct buffer_page *reader = cpu_buffer->reader_page;
1403         struct buffer_page *head = cpu_buffer->head_page;
1404         struct buffer_page *commit = cpu_buffer->commit_page;
1405
1406         return reader->read == rb_page_commit(reader) &&
1407                 (commit == reader ||
1408                  (commit == head &&
1409                   head->read == rb_page_commit(commit)));
1410 }
1411
1412 /**
1413  * ring_buffer_record_disable - stop all writes into the buffer
1414  * @buffer: The ring buffer to stop writes to.
1415  *
1416  * This prevents all writes to the buffer. Any attempt to write
1417  * to the buffer after this will fail and return NULL.
1418  *
1419  * The caller should call synchronize_sched() after this.
1420  */
1421 void ring_buffer_record_disable(struct ring_buffer *buffer)
1422 {
1423         atomic_inc(&buffer->record_disabled);
1424 }
1425
1426 /**
1427  * ring_buffer_record_enable - enable writes to the buffer
1428  * @buffer: The ring buffer to enable writes
1429  *
1430  * Note, multiple disables will need the same number of enables
1431  * to truely enable the writing (much like preempt_disable).
1432  */
1433 void ring_buffer_record_enable(struct ring_buffer *buffer)
1434 {
1435         atomic_dec(&buffer->record_disabled);
1436 }
1437
1438 /**
1439  * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
1440  * @buffer: The ring buffer to stop writes to.
1441  * @cpu: The CPU buffer to stop
1442  *
1443  * This prevents all writes to the buffer. Any attempt to write
1444  * to the buffer after this will fail and return NULL.
1445  *
1446  * The caller should call synchronize_sched() after this.
1447  */
1448 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
1449 {
1450         struct ring_buffer_per_cpu *cpu_buffer;
1451
1452         if (!cpu_isset(cpu, buffer->cpumask))
1453                 return;
1454
1455         cpu_buffer = buffer->buffers[cpu];
1456         atomic_inc(&cpu_buffer->record_disabled);
1457 }
1458
1459 /**
1460  * ring_buffer_record_enable_cpu - enable writes to the buffer
1461  * @buffer: The ring buffer to enable writes
1462  * @cpu: The CPU to enable.
1463  *
1464  * Note, multiple disables will need the same number of enables
1465  * to truely enable the writing (much like preempt_disable).
1466  */
1467 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
1468 {
1469         struct ring_buffer_per_cpu *cpu_buffer;
1470
1471         if (!cpu_isset(cpu, buffer->cpumask))
1472                 return;
1473
1474         cpu_buffer = buffer->buffers[cpu];
1475         atomic_dec(&cpu_buffer->record_disabled);
1476 }
1477
1478 /**
1479  * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
1480  * @buffer: The ring buffer
1481  * @cpu: The per CPU buffer to get the entries from.
1482  */
1483 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
1484 {
1485         struct ring_buffer_per_cpu *cpu_buffer;
1486
1487         if (!cpu_isset(cpu, buffer->cpumask))
1488                 return 0;
1489
1490         cpu_buffer = buffer->buffers[cpu];
1491         return cpu_buffer->entries;
1492 }
1493
1494 /**
1495  * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
1496  * @buffer: The ring buffer
1497  * @cpu: The per CPU buffer to get the number of overruns from
1498  */
1499 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
1500 {
1501         struct ring_buffer_per_cpu *cpu_buffer;
1502
1503         if (!cpu_isset(cpu, buffer->cpumask))
1504                 return 0;
1505
1506         cpu_buffer = buffer->buffers[cpu];
1507         return cpu_buffer->overrun;
1508 }
1509
1510 /**
1511  * ring_buffer_entries - get the number of entries in a buffer
1512  * @buffer: The ring buffer
1513  *
1514  * Returns the total number of entries in the ring buffer
1515  * (all CPU entries)
1516  */
1517 unsigned long ring_buffer_entries(struct ring_buffer *buffer)
1518 {
1519         struct ring_buffer_per_cpu *cpu_buffer;
1520         unsigned long entries = 0;
1521         int cpu;
1522
1523         /* if you care about this being correct, lock the buffer */
1524         for_each_buffer_cpu(buffer, cpu) {
1525                 cpu_buffer = buffer->buffers[cpu];
1526                 entries += cpu_buffer->entries;
1527         }
1528
1529         return entries;
1530 }
1531
1532 /**
1533  * ring_buffer_overrun_cpu - get the number of overruns in buffer
1534  * @buffer: The ring buffer
1535  *
1536  * Returns the total number of overruns in the ring buffer
1537  * (all CPU entries)
1538  */
1539 unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
1540 {
1541         struct ring_buffer_per_cpu *cpu_buffer;
1542         unsigned long overruns = 0;
1543         int cpu;
1544
1545         /* if you care about this being correct, lock the buffer */
1546         for_each_buffer_cpu(buffer, cpu) {
1547                 cpu_buffer = buffer->buffers[cpu];
1548                 overruns += cpu_buffer->overrun;
1549         }
1550
1551         return overruns;
1552 }
1553
1554 static void rb_iter_reset(struct ring_buffer_iter *iter)
1555 {
1556         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1557
1558         /* Iterator usage is expected to have record disabled */
1559         if (list_empty(&cpu_buffer->reader_page->list)) {
1560                 iter->head_page = cpu_buffer->head_page;
1561                 iter->head = cpu_buffer->head_page->read;
1562         } else {
1563                 iter->head_page = cpu_buffer->reader_page;
1564                 iter->head = cpu_buffer->reader_page->read;
1565         }
1566         if (iter->head)
1567                 iter->read_stamp = cpu_buffer->read_stamp;
1568         else
1569                 iter->read_stamp = iter->head_page->page->time_stamp;
1570 }
1571
1572 /**
1573  * ring_buffer_iter_reset - reset an iterator
1574  * @iter: The iterator to reset
1575  *
1576  * Resets the iterator, so that it will start from the beginning
1577  * again.
1578  */
1579 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
1580 {
1581         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1582         unsigned long flags;
1583
1584         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
1585         rb_iter_reset(iter);
1586         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
1587 }
1588
1589 /**
1590  * ring_buffer_iter_empty - check if an iterator has no more to read
1591  * @iter: The iterator to check
1592  */
1593 int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
1594 {
1595         struct ring_buffer_per_cpu *cpu_buffer;
1596
1597         cpu_buffer = iter->cpu_buffer;
1598
1599         return iter->head_page == cpu_buffer->commit_page &&
1600                 iter->head == rb_commit_index(cpu_buffer);
1601 }
1602
1603 static void
1604 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1605                      struct ring_buffer_event *event)
1606 {
1607         u64 delta;
1608
1609         switch (event->type) {
1610         case RINGBUF_TYPE_PADDING:
1611                 return;
1612
1613         case RINGBUF_TYPE_TIME_EXTEND:
1614                 delta = event->array[0];
1615                 delta <<= TS_SHIFT;
1616                 delta += event->time_delta;
1617                 cpu_buffer->read_stamp += delta;
1618                 return;
1619
1620         case RINGBUF_TYPE_TIME_STAMP:
1621                 /* FIXME: not implemented */
1622                 return;
1623
1624         case RINGBUF_TYPE_DATA:
1625                 cpu_buffer->read_stamp += event->time_delta;
1626                 return;
1627
1628         default:
1629                 BUG();
1630         }
1631         return;
1632 }
1633
1634 static void
1635 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
1636                           struct ring_buffer_event *event)
1637 {
1638         u64 delta;
1639
1640         switch (event->type) {
1641         case RINGBUF_TYPE_PADDING:
1642                 return;
1643
1644         case RINGBUF_TYPE_TIME_EXTEND:
1645                 delta = event->array[0];
1646                 delta <<= TS_SHIFT;
1647                 delta += event->time_delta;
1648                 iter->read_stamp += delta;
1649                 return;
1650
1651         case RINGBUF_TYPE_TIME_STAMP:
1652                 /* FIXME: not implemented */
1653                 return;
1654
1655         case RINGBUF_TYPE_DATA:
1656                 iter->read_stamp += event->time_delta;
1657                 return;
1658
1659         default:
1660                 BUG();
1661         }
1662         return;
1663 }
1664
1665 static struct buffer_page *
1666 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
1667 {
1668         struct buffer_page *reader = NULL;
1669         unsigned long flags;
1670         int nr_loops = 0;
1671
1672         local_irq_save(flags);
1673         __raw_spin_lock(&cpu_buffer->lock);
1674
1675  again:
1676         /*
1677          * This should normally only loop twice. But because the
1678          * start of the reader inserts an empty page, it causes
1679          * a case where we will loop three times. There should be no
1680          * reason to loop four times (that I know of).
1681          */
1682         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
1683                 reader = NULL;
1684                 goto out;
1685         }
1686
1687         reader = cpu_buffer->reader_page;
1688
1689         /* If there's more to read, return this page */
1690         if (cpu_buffer->reader_page->read < rb_page_size(reader))
1691                 goto out;
1692
1693         /* Never should we have an index greater than the size */
1694         if (RB_WARN_ON(cpu_buffer,
1695                        cpu_buffer->reader_page->read > rb_page_size(reader)))
1696                 goto out;
1697
1698         /* check if we caught up to the tail */
1699         reader = NULL;
1700         if (cpu_buffer->commit_page == cpu_buffer->reader_page)
1701                 goto out;
1702
1703         /*
1704          * Splice the empty reader page into the list around the head.
1705          * Reset the reader page to size zero.
1706          */
1707
1708         reader = cpu_buffer->head_page;
1709         cpu_buffer->reader_page->list.next = reader->list.next;
1710         cpu_buffer->reader_page->list.prev = reader->list.prev;
1711
1712         local_set(&cpu_buffer->reader_page->write, 0);
1713         local_set(&cpu_buffer->reader_page->page->commit, 0);
1714
1715         /* Make the reader page now replace the head */
1716         reader->list.prev->next = &cpu_buffer->reader_page->list;
1717         reader->list.next->prev = &cpu_buffer->reader_page->list;
1718
1719         /*
1720          * If the tail is on the reader, then we must set the head
1721          * to the inserted page, otherwise we set it one before.
1722          */
1723         cpu_buffer->head_page = cpu_buffer->reader_page;
1724
1725         if (cpu_buffer->commit_page != reader)
1726                 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1727
1728         /* Finally update the reader page to the new head */
1729         cpu_buffer->reader_page = reader;
1730         rb_reset_reader_page(cpu_buffer);
1731
1732         goto again;
1733
1734  out:
1735         __raw_spin_unlock(&cpu_buffer->lock);
1736         local_irq_restore(flags);
1737
1738         return reader;
1739 }
1740
1741 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
1742 {
1743         struct ring_buffer_event *event;
1744         struct buffer_page *reader;
1745         unsigned length;
1746
1747         reader = rb_get_reader_page(cpu_buffer);
1748
1749         /* This function should not be called when buffer is empty */
1750         if (RB_WARN_ON(cpu_buffer, !reader))
1751                 return;
1752
1753         event = rb_reader_event(cpu_buffer);
1754
1755         if (event->type == RINGBUF_TYPE_DATA)
1756                 cpu_buffer->entries--;
1757
1758         rb_update_read_stamp(cpu_buffer, event);
1759
1760         length = rb_event_length(event);
1761         cpu_buffer->reader_page->read += length;
1762 }
1763
1764 static void rb_advance_iter(struct ring_buffer_iter *iter)
1765 {
1766         struct ring_buffer *buffer;
1767         struct ring_buffer_per_cpu *cpu_buffer;
1768         struct ring_buffer_event *event;
1769         unsigned length;
1770
1771         cpu_buffer = iter->cpu_buffer;
1772         buffer = cpu_buffer->buffer;
1773
1774         /*
1775          * Check if we are at the end of the buffer.
1776          */
1777         if (iter->head >= rb_page_size(iter->head_page)) {
1778                 if (RB_WARN_ON(buffer,
1779                                iter->head_page == cpu_buffer->commit_page))
1780                         return;
1781                 rb_inc_iter(iter);
1782                 return;
1783         }
1784
1785         event = rb_iter_head_event(iter);
1786
1787         length = rb_event_length(event);
1788
1789         /*
1790          * This should not be called to advance the header if we are
1791          * at the tail of the buffer.
1792          */
1793         if (RB_WARN_ON(cpu_buffer,
1794                        (iter->head_page == cpu_buffer->commit_page) &&
1795                        (iter->head + length > rb_commit_index(cpu_buffer))))
1796                 return;
1797
1798         rb_update_iter_read_stamp(iter, event);
1799
1800         iter->head += length;
1801
1802         /* check for end of page padding */
1803         if ((iter->head >= rb_page_size(iter->head_page)) &&
1804             (iter->head_page != cpu_buffer->commit_page))
1805                 rb_advance_iter(iter);
1806 }
1807
1808 static struct ring_buffer_event *
1809 rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
1810 {
1811         struct ring_buffer_per_cpu *cpu_buffer;
1812         struct ring_buffer_event *event;
1813         struct buffer_page *reader;
1814         int nr_loops = 0;
1815
1816         if (!cpu_isset(cpu, buffer->cpumask))
1817                 return NULL;
1818
1819         cpu_buffer = buffer->buffers[cpu];
1820
1821  again:
1822         /*
1823          * We repeat when a timestamp is encountered. It is possible
1824          * to get multiple timestamps from an interrupt entering just
1825          * as one timestamp is about to be written. The max times
1826          * that this can happen is the number of nested interrupts we
1827          * can have.  Nesting 10 deep of interrupts is clearly
1828          * an anomaly.
1829          */
1830         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
1831                 return NULL;
1832
1833         reader = rb_get_reader_page(cpu_buffer);
1834         if (!reader)
1835                 return NULL;
1836
1837         event = rb_reader_event(cpu_buffer);
1838
1839         switch (event->type) {
1840         case RINGBUF_TYPE_PADDING:
1841                 RB_WARN_ON(cpu_buffer, 1);
1842                 rb_advance_reader(cpu_buffer);
1843                 return NULL;
1844
1845         case RINGBUF_TYPE_TIME_EXTEND:
1846                 /* Internal data, OK to advance */
1847                 rb_advance_reader(cpu_buffer);
1848                 goto again;
1849
1850         case RINGBUF_TYPE_TIME_STAMP:
1851                 /* FIXME: not implemented */
1852                 rb_advance_reader(cpu_buffer);
1853                 goto again;
1854
1855         case RINGBUF_TYPE_DATA:
1856                 if (ts) {
1857                         *ts = cpu_buffer->read_stamp + event->time_delta;
1858                         ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1859                 }
1860                 return event;
1861
1862         default:
1863                 BUG();
1864         }
1865
1866         return NULL;
1867 }
1868
1869 static struct ring_buffer_event *
1870 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
1871 {
1872         struct ring_buffer *buffer;
1873         struct ring_buffer_per_cpu *cpu_buffer;
1874         struct ring_buffer_event *event;
1875         int nr_loops = 0;
1876
1877         if (ring_buffer_iter_empty(iter))
1878                 return NULL;
1879
1880         cpu_buffer = iter->cpu_buffer;
1881         buffer = cpu_buffer->buffer;
1882
1883  again:
1884         /*
1885          * We repeat when a timestamp is encountered. It is possible
1886          * to get multiple timestamps from an interrupt entering just
1887          * as one timestamp is about to be written. The max times
1888          * that this can happen is the number of nested interrupts we
1889          * can have. Nesting 10 deep of interrupts is clearly
1890          * an anomaly.
1891          */
1892         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
1893                 return NULL;
1894
1895         if (rb_per_cpu_empty(cpu_buffer))
1896                 return NULL;
1897
1898         event = rb_iter_head_event(iter);
1899
1900         switch (event->type) {
1901         case RINGBUF_TYPE_PADDING:
1902                 rb_inc_iter(iter);
1903                 goto again;
1904
1905         case RINGBUF_TYPE_TIME_EXTEND:
1906                 /* Internal data, OK to advance */
1907                 rb_advance_iter(iter);
1908                 goto again;
1909
1910         case RINGBUF_TYPE_TIME_STAMP:
1911                 /* FIXME: not implemented */
1912                 rb_advance_iter(iter);
1913                 goto again;
1914
1915         case RINGBUF_TYPE_DATA:
1916                 if (ts) {
1917                         *ts = iter->read_stamp + event->time_delta;
1918                         ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1919                 }
1920                 return event;
1921
1922         default:
1923                 BUG();
1924         }
1925
1926         return NULL;
1927 }
1928
1929 /**
1930  * ring_buffer_peek - peek at the next event to be read
1931  * @buffer: The ring buffer to read
1932  * @cpu: The cpu to peak at
1933  * @ts: The timestamp counter of this event.
1934  *
1935  * This will return the event that will be read next, but does
1936  * not consume the data.
1937  */
1938 struct ring_buffer_event *
1939 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
1940 {
1941         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
1942         struct ring_buffer_event *event;
1943         unsigned long flags;
1944
1945         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
1946         event = rb_buffer_peek(buffer, cpu, ts);
1947         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
1948
1949         return event;
1950 }
1951
1952 /**
1953  * ring_buffer_iter_peek - peek at the next event to be read
1954  * @iter: The ring buffer iterator
1955  * @ts: The timestamp counter of this event.
1956  *
1957  * This will return the event that will be read next, but does
1958  * not increment the iterator.
1959  */
1960 struct ring_buffer_event *
1961 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
1962 {
1963         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1964         struct ring_buffer_event *event;
1965         unsigned long flags;
1966
1967         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
1968         event = rb_iter_peek(iter, ts);
1969         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
1970
1971         return event;
1972 }
1973
1974 /**
1975  * ring_buffer_consume - return an event and consume it
1976  * @buffer: The ring buffer to get the next event from
1977  *
1978  * Returns the next event in the ring buffer, and that event is consumed.
1979  * Meaning, that sequential reads will keep returning a different event,
1980  * and eventually empty the ring buffer if the producer is slower.
1981  */
1982 struct ring_buffer_event *
1983 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
1984 {
1985         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
1986         struct ring_buffer_event *event;
1987         unsigned long flags;
1988
1989         if (!cpu_isset(cpu, buffer->cpumask))
1990                 return NULL;
1991
1992         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
1993
1994         event = rb_buffer_peek(buffer, cpu, ts);
1995         if (!event)
1996                 goto out;
1997
1998         rb_advance_reader(cpu_buffer);
1999
2000  out:
2001         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2002
2003         return event;
2004 }
2005
2006 /**
2007  * ring_buffer_read_start - start a non consuming read of the buffer
2008  * @buffer: The ring buffer to read from
2009  * @cpu: The cpu buffer to iterate over
2010  *
2011  * This starts up an iteration through the buffer. It also disables
2012  * the recording to the buffer until the reading is finished.
2013  * This prevents the reading from being corrupted. This is not
2014  * a consuming read, so a producer is not expected.
2015  *
2016  * Must be paired with ring_buffer_finish.
2017  */
2018 struct ring_buffer_iter *
2019 ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
2020 {
2021         struct ring_buffer_per_cpu *cpu_buffer;
2022         struct ring_buffer_iter *iter;
2023         unsigned long flags;
2024
2025         if (!cpu_isset(cpu, buffer->cpumask))
2026                 return NULL;
2027
2028         iter = kmalloc(sizeof(*iter), GFP_KERNEL);
2029         if (!iter)
2030                 return NULL;
2031
2032         cpu_buffer = buffer->buffers[cpu];
2033
2034         iter->cpu_buffer = cpu_buffer;
2035
2036         atomic_inc(&cpu_buffer->record_disabled);
2037         synchronize_sched();
2038
2039         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2040         __raw_spin_lock(&cpu_buffer->lock);
2041         rb_iter_reset(iter);
2042         __raw_spin_unlock(&cpu_buffer->lock);
2043         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2044
2045         return iter;
2046 }
2047
2048 /**
2049  * ring_buffer_finish - finish reading the iterator of the buffer
2050  * @iter: The iterator retrieved by ring_buffer_start
2051  *
2052  * This re-enables the recording to the buffer, and frees the
2053  * iterator.
2054  */
2055 void
2056 ring_buffer_read_finish(struct ring_buffer_iter *iter)
2057 {
2058         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2059
2060         atomic_dec(&cpu_buffer->record_disabled);
2061         kfree(iter);
2062 }
2063
2064 /**
2065  * ring_buffer_read - read the next item in the ring buffer by the iterator
2066  * @iter: The ring buffer iterator
2067  * @ts: The time stamp of the event read.
2068  *
2069  * This reads the next event in the ring buffer and increments the iterator.
2070  */
2071 struct ring_buffer_event *
2072 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
2073 {
2074         struct ring_buffer_event *event;
2075         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2076         unsigned long flags;
2077
2078         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2079         event = rb_iter_peek(iter, ts);
2080         if (!event)
2081                 goto out;
2082
2083         rb_advance_iter(iter);
2084  out:
2085         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2086
2087         return event;
2088 }
2089
2090 /**
2091  * ring_buffer_size - return the size of the ring buffer (in bytes)
2092  * @buffer: The ring buffer.
2093  */
2094 unsigned long ring_buffer_size(struct ring_buffer *buffer)
2095 {
2096         return BUF_PAGE_SIZE * buffer->pages;
2097 }
2098
2099 static void
2100 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
2101 {
2102         cpu_buffer->head_page
2103                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
2104         local_set(&cpu_buffer->head_page->write, 0);
2105         local_set(&cpu_buffer->head_page->page->commit, 0);
2106
2107         cpu_buffer->head_page->read = 0;
2108
2109         cpu_buffer->tail_page = cpu_buffer->head_page;
2110         cpu_buffer->commit_page = cpu_buffer->head_page;
2111
2112         INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
2113         local_set(&cpu_buffer->reader_page->write, 0);
2114         local_set(&cpu_buffer->reader_page->page->commit, 0);
2115         cpu_buffer->reader_page->read = 0;
2116
2117         cpu_buffer->overrun = 0;
2118         cpu_buffer->entries = 0;
2119 }
2120
2121 /**
2122  * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
2123  * @buffer: The ring buffer to reset a per cpu buffer of
2124  * @cpu: The CPU buffer to be reset
2125  */
2126 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
2127 {
2128         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
2129         unsigned long flags;
2130
2131         if (!cpu_isset(cpu, buffer->cpumask))
2132                 return;
2133
2134         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2135
2136         __raw_spin_lock(&cpu_buffer->lock);
2137
2138         rb_reset_cpu(cpu_buffer);
2139
2140         __raw_spin_unlock(&cpu_buffer->lock);
2141
2142         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2143 }
2144
2145 /**
2146  * ring_buffer_reset - reset a ring buffer
2147  * @buffer: The ring buffer to reset all cpu buffers
2148  */
2149 void ring_buffer_reset(struct ring_buffer *buffer)
2150 {
2151         int cpu;
2152
2153         for_each_buffer_cpu(buffer, cpu)
2154                 ring_buffer_reset_cpu(buffer, cpu);
2155 }
2156
2157 /**
2158  * rind_buffer_empty - is the ring buffer empty?
2159  * @buffer: The ring buffer to test
2160  */
2161 int ring_buffer_empty(struct ring_buffer *buffer)
2162 {
2163         struct ring_buffer_per_cpu *cpu_buffer;
2164         int cpu;
2165
2166         /* yes this is racy, but if you don't like the race, lock the buffer */
2167         for_each_buffer_cpu(buffer, cpu) {
2168                 cpu_buffer = buffer->buffers[cpu];
2169                 if (!rb_per_cpu_empty(cpu_buffer))
2170                         return 0;
2171         }
2172         return 1;
2173 }
2174
2175 /**
2176  * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
2177  * @buffer: The ring buffer
2178  * @cpu: The CPU buffer to test
2179  */
2180 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
2181 {
2182         struct ring_buffer_per_cpu *cpu_buffer;
2183
2184         if (!cpu_isset(cpu, buffer->cpumask))
2185                 return 1;
2186
2187         cpu_buffer = buffer->buffers[cpu];
2188         return rb_per_cpu_empty(cpu_buffer);
2189 }
2190
2191 /**
2192  * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
2193  * @buffer_a: One buffer to swap with
2194  * @buffer_b: The other buffer to swap with
2195  *
2196  * This function is useful for tracers that want to take a "snapshot"
2197  * of a CPU buffer and has another back up buffer lying around.
2198  * it is expected that the tracer handles the cpu buffer not being
2199  * used at the moment.
2200  */
2201 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
2202                          struct ring_buffer *buffer_b, int cpu)
2203 {
2204         struct ring_buffer_per_cpu *cpu_buffer_a;
2205         struct ring_buffer_per_cpu *cpu_buffer_b;
2206
2207         if (!cpu_isset(cpu, buffer_a->cpumask) ||
2208             !cpu_isset(cpu, buffer_b->cpumask))
2209                 return -EINVAL;
2210
2211         /* At least make sure the two buffers are somewhat the same */
2212         if (buffer_a->pages != buffer_b->pages)
2213                 return -EINVAL;
2214
2215         cpu_buffer_a = buffer_a->buffers[cpu];
2216         cpu_buffer_b = buffer_b->buffers[cpu];
2217
2218         /*
2219          * We can't do a synchronize_sched here because this
2220          * function can be called in atomic context.
2221          * Normally this will be called from the same CPU as cpu.
2222          * If not it's up to the caller to protect this.
2223          */
2224         atomic_inc(&cpu_buffer_a->record_disabled);
2225         atomic_inc(&cpu_buffer_b->record_disabled);
2226
2227         buffer_a->buffers[cpu] = cpu_buffer_b;
2228         buffer_b->buffers[cpu] = cpu_buffer_a;
2229
2230         cpu_buffer_b->buffer = buffer_a;
2231         cpu_buffer_a->buffer = buffer_b;
2232
2233         atomic_dec(&cpu_buffer_a->record_disabled);
2234         atomic_dec(&cpu_buffer_b->record_disabled);
2235
2236         return 0;
2237 }
2238
2239 static void rb_remove_entries(struct ring_buffer_per_cpu *cpu_buffer,
2240                               struct buffer_data_page *bpage)
2241 {
2242         struct ring_buffer_event *event;
2243         unsigned long head;
2244
2245         __raw_spin_lock(&cpu_buffer->lock);
2246         for (head = 0; head < local_read(&bpage->commit);
2247              head += rb_event_length(event)) {
2248
2249                 event = __rb_data_page_index(bpage, head);
2250                 if (RB_WARN_ON(cpu_buffer, rb_null_event(event)))
2251                         return;
2252                 /* Only count data entries */
2253                 if (event->type != RINGBUF_TYPE_DATA)
2254                         continue;
2255                 cpu_buffer->entries--;
2256         }
2257         __raw_spin_unlock(&cpu_buffer->lock);
2258 }
2259
2260 /**
2261  * ring_buffer_alloc_read_page - allocate a page to read from buffer
2262  * @buffer: the buffer to allocate for.
2263  *
2264  * This function is used in conjunction with ring_buffer_read_page.
2265  * When reading a full page from the ring buffer, these functions
2266  * can be used to speed up the process. The calling function should
2267  * allocate a few pages first with this function. Then when it
2268  * needs to get pages from the ring buffer, it passes the result
2269  * of this function into ring_buffer_read_page, which will swap
2270  * the page that was allocated, with the read page of the buffer.
2271  *
2272  * Returns:
2273  *  The page allocated, or NULL on error.
2274  */
2275 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer)
2276 {
2277         unsigned long addr;
2278         struct buffer_data_page *bpage;
2279
2280         addr = __get_free_page(GFP_KERNEL);
2281         if (!addr)
2282                 return NULL;
2283
2284         bpage = (void *)addr;
2285
2286         return bpage;
2287 }
2288
2289 /**
2290  * ring_buffer_free_read_page - free an allocated read page
2291  * @buffer: the buffer the page was allocate for
2292  * @data: the page to free
2293  *
2294  * Free a page allocated from ring_buffer_alloc_read_page.
2295  */
2296 void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data)
2297 {
2298         free_page((unsigned long)data);
2299 }
2300
2301 /**
2302  * ring_buffer_read_page - extract a page from the ring buffer
2303  * @buffer: buffer to extract from
2304  * @data_page: the page to use allocated from ring_buffer_alloc_read_page
2305  * @cpu: the cpu of the buffer to extract
2306  * @full: should the extraction only happen when the page is full.
2307  *
2308  * This function will pull out a page from the ring buffer and consume it.
2309  * @data_page must be the address of the variable that was returned
2310  * from ring_buffer_alloc_read_page. This is because the page might be used
2311  * to swap with a page in the ring buffer.
2312  *
2313  * for example:
2314  *      rpage = ring_buffer_alloc_page(buffer);
2315  *      if (!rpage)
2316  *              return error;
2317  *      ret = ring_buffer_read_page(buffer, &rpage, cpu, 0);
2318  *      if (ret)
2319  *              process_page(rpage);
2320  *
2321  * When @full is set, the function will not return true unless
2322  * the writer is off the reader page.
2323  *
2324  * Note: it is up to the calling functions to handle sleeps and wakeups.
2325  *  The ring buffer can be used anywhere in the kernel and can not
2326  *  blindly call wake_up. The layer that uses the ring buffer must be
2327  *  responsible for that.
2328  *
2329  * Returns:
2330  *  1 if data has been transferred
2331  *  0 if no data has been transferred.
2332  */
2333 int ring_buffer_read_page(struct ring_buffer *buffer,
2334                             void **data_page, int cpu, int full)
2335 {
2336         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
2337         struct ring_buffer_event *event;
2338         struct buffer_data_page *bpage;
2339         unsigned long flags;
2340         int ret = 0;
2341
2342         if (!data_page)
2343                 return 0;
2344
2345         bpage = *data_page;
2346         if (!bpage)
2347                 return 0;
2348
2349         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2350
2351         /*
2352          * rb_buffer_peek will get the next ring buffer if
2353          * the current reader page is empty.
2354          */
2355         event = rb_buffer_peek(buffer, cpu, NULL);
2356         if (!event)
2357                 goto out;
2358
2359         /* check for data */
2360         if (!local_read(&cpu_buffer->reader_page->page->commit))
2361                 goto out;
2362         /*
2363          * If the writer is already off of the read page, then simply
2364          * switch the read page with the given page. Otherwise
2365          * we need to copy the data from the reader to the writer.
2366          */
2367         if (cpu_buffer->reader_page == cpu_buffer->commit_page) {
2368                 unsigned int read = cpu_buffer->reader_page->read;
2369
2370                 if (full)
2371                         goto out;
2372                 /* The writer is still on the reader page, we must copy */
2373                 bpage = cpu_buffer->reader_page->page;
2374                 memcpy(bpage->data,
2375                        cpu_buffer->reader_page->page->data + read,
2376                        local_read(&bpage->commit) - read);
2377
2378                 /* consume what was read */
2379                 cpu_buffer->reader_page += read;
2380
2381         } else {
2382                 /* swap the pages */
2383                 rb_init_page(bpage);
2384                 bpage = cpu_buffer->reader_page->page;
2385                 cpu_buffer->reader_page->page = *data_page;
2386                 cpu_buffer->reader_page->read = 0;
2387                 *data_page = bpage;
2388         }
2389         ret = 1;
2390
2391         /* update the entry counter */
2392         rb_remove_entries(cpu_buffer, bpage);
2393  out:
2394         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2395
2396         return ret;
2397 }
2398
2399 static ssize_t
2400 rb_simple_read(struct file *filp, char __user *ubuf,
2401                size_t cnt, loff_t *ppos)
2402 {
2403         long *p = filp->private_data;
2404         char buf[64];
2405         int r;
2406
2407         if (test_bit(RB_BUFFERS_DISABLED_BIT, p))
2408                 r = sprintf(buf, "permanently disabled\n");
2409         else
2410                 r = sprintf(buf, "%d\n", test_bit(RB_BUFFERS_ON_BIT, p));
2411
2412         return simple_read_from_buffer(ubuf, cnt, ppos, buf, r);
2413 }
2414
2415 static ssize_t
2416 rb_simple_write(struct file *filp, const char __user *ubuf,
2417                 size_t cnt, loff_t *ppos)
2418 {
2419         long *p = filp->private_data;
2420         char buf[64];
2421         long val;
2422         int ret;
2423
2424         if (cnt >= sizeof(buf))
2425                 return -EINVAL;
2426
2427         if (copy_from_user(&buf, ubuf, cnt))
2428                 return -EFAULT;
2429
2430         buf[cnt] = 0;
2431
2432         ret = strict_strtoul(buf, 10, &val);
2433         if (ret < 0)
2434                 return ret;
2435
2436         if (val)
2437                 set_bit(RB_BUFFERS_ON_BIT, p);
2438         else
2439                 clear_bit(RB_BUFFERS_ON_BIT, p);
2440
2441         (*ppos)++;
2442
2443         return cnt;
2444 }
2445
2446 static struct file_operations rb_simple_fops = {
2447         .open           = tracing_open_generic,
2448         .read           = rb_simple_read,
2449         .write          = rb_simple_write,
2450 };
2451
2452
2453 static __init int rb_init_debugfs(void)
2454 {
2455         struct dentry *d_tracer;
2456         struct dentry *entry;
2457
2458         d_tracer = tracing_init_dentry();
2459
2460         entry = debugfs_create_file("tracing_on", 0644, d_tracer,
2461                                     &ring_buffer_flags, &rb_simple_fops);
2462         if (!entry)
2463                 pr_warning("Could not create debugfs 'tracing_on' entry\n");
2464
2465         return 0;
2466 }
2467
2468 fs_initcall(rb_init_debugfs);