drbd: fix potential data corruption and protocol error
[cascardo/linux.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 static DEFINE_MUTEX(drbd_main_mutex);
60 int drbdd_init(struct drbd_thread *);
61 int drbd_worker(struct drbd_thread *);
62 int drbd_asender(struct drbd_thread *);
63
64 int drbd_init(void);
65 static int drbd_open(struct block_device *bdev, fmode_t mode);
66 static int drbd_release(struct gendisk *gd, fmode_t mode);
67 static int w_md_sync(struct drbd_work *w, int unused);
68 static void md_sync_timer_fn(unsigned long data);
69 static int w_bitmap_io(struct drbd_work *w, int unused);
70 static int w_go_diskless(struct drbd_work *w, int unused);
71
72 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
73               "Lars Ellenberg <lars@linbit.com>");
74 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
75 MODULE_VERSION(REL_VERSION);
76 MODULE_LICENSE("GPL");
77 MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
78                  __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
79 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
80
81 #include <linux/moduleparam.h>
82 /* allow_open_on_secondary */
83 MODULE_PARM_DESC(allow_oos, "DONT USE!");
84 /* thanks to these macros, if compiled into the kernel (not-module),
85  * this becomes the boot parameter drbd.minor_count */
86 module_param(minor_count, uint, 0444);
87 module_param(disable_sendpage, bool, 0644);
88 module_param(allow_oos, bool, 0);
89 module_param(proc_details, int, 0644);
90
91 #ifdef CONFIG_DRBD_FAULT_INJECTION
92 int enable_faults;
93 int fault_rate;
94 static int fault_count;
95 int fault_devs;
96 /* bitmap of enabled faults */
97 module_param(enable_faults, int, 0664);
98 /* fault rate % value - applies to all enabled faults */
99 module_param(fault_rate, int, 0664);
100 /* count of faults inserted */
101 module_param(fault_count, int, 0664);
102 /* bitmap of devices to insert faults on */
103 module_param(fault_devs, int, 0644);
104 #endif
105
106 /* module parameter, defined */
107 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
108 int disable_sendpage;
109 int allow_oos;
110 int proc_details;       /* Detail level in proc drbd*/
111
112 /* Module parameter for setting the user mode helper program
113  * to run. Default is /sbin/drbdadm */
114 char usermode_helper[80] = "/sbin/drbdadm";
115
116 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
117
118 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
119  * as member "struct gendisk *vdisk;"
120  */
121 struct idr minors;
122 struct list_head drbd_tconns;  /* list of struct drbd_tconn */
123
124 struct kmem_cache *drbd_request_cache;
125 struct kmem_cache *drbd_ee_cache;       /* peer requests */
126 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
127 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
128 mempool_t *drbd_request_mempool;
129 mempool_t *drbd_ee_mempool;
130 mempool_t *drbd_md_io_page_pool;
131 struct bio_set *drbd_md_io_bio_set;
132
133 /* I do not use a standard mempool, because:
134    1) I want to hand out the pre-allocated objects first.
135    2) I want to be able to interrupt sleeping allocation with a signal.
136    Note: This is a single linked list, the next pointer is the private
137          member of struct page.
138  */
139 struct page *drbd_pp_pool;
140 spinlock_t   drbd_pp_lock;
141 int          drbd_pp_vacant;
142 wait_queue_head_t drbd_pp_wait;
143
144 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
145
146 static const struct block_device_operations drbd_ops = {
147         .owner =   THIS_MODULE,
148         .open =    drbd_open,
149         .release = drbd_release,
150 };
151
152 static void bio_destructor_drbd(struct bio *bio)
153 {
154         bio_free(bio, drbd_md_io_bio_set);
155 }
156
157 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
158 {
159         struct bio *bio;
160
161         if (!drbd_md_io_bio_set)
162                 return bio_alloc(gfp_mask, 1);
163
164         bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
165         if (!bio)
166                 return NULL;
167         bio->bi_destructor = bio_destructor_drbd;
168         return bio;
169 }
170
171 #ifdef __CHECKER__
172 /* When checking with sparse, and this is an inline function, sparse will
173    give tons of false positives. When this is a real functions sparse works.
174  */
175 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
176 {
177         int io_allowed;
178
179         atomic_inc(&mdev->local_cnt);
180         io_allowed = (mdev->state.disk >= mins);
181         if (!io_allowed) {
182                 if (atomic_dec_and_test(&mdev->local_cnt))
183                         wake_up(&mdev->misc_wait);
184         }
185         return io_allowed;
186 }
187
188 #endif
189
190 /**
191  * DOC: The transfer log
192  *
193  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
194  * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
195  * of the list. There is always at least one &struct drbd_tl_epoch object.
196  *
197  * Each &struct drbd_tl_epoch has a circular double linked list of requests
198  * attached.
199  */
200 static int tl_init(struct drbd_tconn *tconn)
201 {
202         struct drbd_tl_epoch *b;
203
204         /* during device minor initialization, we may well use GFP_KERNEL */
205         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
206         if (!b)
207                 return 0;
208         INIT_LIST_HEAD(&b->requests);
209         INIT_LIST_HEAD(&b->w.list);
210         b->next = NULL;
211         b->br_number = 4711;
212         b->n_writes = 0;
213         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
214
215         tconn->oldest_tle = b;
216         tconn->newest_tle = b;
217         INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
218         INIT_LIST_HEAD(&tconn->barrier_acked_requests);
219
220         return 1;
221 }
222
223 static void tl_cleanup(struct drbd_tconn *tconn)
224 {
225         if (tconn->oldest_tle != tconn->newest_tle)
226                 conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
227         if (!list_empty(&tconn->out_of_sequence_requests))
228                 conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
229         kfree(tconn->oldest_tle);
230         tconn->oldest_tle = NULL;
231         kfree(tconn->unused_spare_tle);
232         tconn->unused_spare_tle = NULL;
233 }
234
235 /**
236  * _tl_add_barrier() - Adds a barrier to the transfer log
237  * @mdev:       DRBD device.
238  * @new:        Barrier to be added before the current head of the TL.
239  *
240  * The caller must hold the req_lock.
241  */
242 void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
243 {
244         struct drbd_tl_epoch *newest_before;
245
246         INIT_LIST_HEAD(&new->requests);
247         INIT_LIST_HEAD(&new->w.list);
248         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
249         new->next = NULL;
250         new->n_writes = 0;
251
252         newest_before = tconn->newest_tle;
253         /* never send a barrier number == 0, because that is special-cased
254          * when using TCQ for our write ordering code */
255         new->br_number = (newest_before->br_number+1) ?: 1;
256         if (tconn->newest_tle != new) {
257                 tconn->newest_tle->next = new;
258                 tconn->newest_tle = new;
259         }
260 }
261
262 /**
263  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
264  * @mdev:       DRBD device.
265  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
266  * @set_size:   Expected number of requests before that barrier.
267  *
268  * In case the passed barrier_nr or set_size does not match the oldest
269  * &struct drbd_tl_epoch objects this function will cause a termination
270  * of the connection.
271  */
272 void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
273                 unsigned int set_size)
274 {
275         struct drbd_conf *mdev;
276         struct drbd_tl_epoch *b, *nob; /* next old barrier */
277         struct list_head *le, *tle;
278         struct drbd_request *r;
279
280         spin_lock_irq(&tconn->req_lock);
281
282         b = tconn->oldest_tle;
283
284         /* first some paranoia code */
285         if (b == NULL) {
286                 conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
287                          barrier_nr);
288                 goto bail;
289         }
290         if (b->br_number != barrier_nr) {
291                 conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
292                          barrier_nr, b->br_number);
293                 goto bail;
294         }
295         if (b->n_writes != set_size) {
296                 conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
297                          barrier_nr, set_size, b->n_writes);
298                 goto bail;
299         }
300
301         /* Clean up list of requests processed during current epoch */
302         list_for_each_safe(le, tle, &b->requests) {
303                 r = list_entry(le, struct drbd_request, tl_requests);
304                 _req_mod(r, BARRIER_ACKED);
305         }
306         /* There could be requests on the list waiting for completion
307            of the write to the local disk. To avoid corruptions of
308            slab's data structures we have to remove the lists head.
309
310            Also there could have been a barrier ack out of sequence, overtaking
311            the write acks - which would be a bug and violating write ordering.
312            To not deadlock in case we lose connection while such requests are
313            still pending, we need some way to find them for the
314            _req_mode(CONNECTION_LOST_WHILE_PENDING).
315
316            These have been list_move'd to the out_of_sequence_requests list in
317            _req_mod(, BARRIER_ACKED) above.
318            */
319         list_splice_init(&b->requests, &tconn->barrier_acked_requests);
320         mdev = b->w.mdev;
321
322         nob = b->next;
323         if (test_and_clear_bit(CREATE_BARRIER, &tconn->flags)) {
324                 _tl_add_barrier(tconn, b);
325                 if (nob)
326                         tconn->oldest_tle = nob;
327                 /* if nob == NULL b was the only barrier, and becomes the new
328                    barrier. Therefore tconn->oldest_tle points already to b */
329         } else {
330                 D_ASSERT(nob != NULL);
331                 tconn->oldest_tle = nob;
332                 kfree(b);
333         }
334
335         spin_unlock_irq(&tconn->req_lock);
336         dec_ap_pending(mdev);
337
338         return;
339
340 bail:
341         spin_unlock_irq(&tconn->req_lock);
342         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
343 }
344
345
346 /**
347  * _tl_restart() - Walks the transfer log, and applies an action to all requests
348  * @mdev:       DRBD device.
349  * @what:       The action/event to perform with all request objects
350  *
351  * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
352  * RESTART_FROZEN_DISK_IO.
353  */
354 void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
355 {
356         struct drbd_tl_epoch *b, *tmp, **pn;
357         struct list_head *le, *tle, carry_reads;
358         struct drbd_request *req;
359         int rv, n_writes, n_reads;
360
361         b = tconn->oldest_tle;
362         pn = &tconn->oldest_tle;
363         while (b) {
364                 n_writes = 0;
365                 n_reads = 0;
366                 INIT_LIST_HEAD(&carry_reads);
367                 list_for_each_safe(le, tle, &b->requests) {
368                         req = list_entry(le, struct drbd_request, tl_requests);
369                         rv = _req_mod(req, what);
370
371                         if (rv & MR_WRITE)
372                                 n_writes++;
373                         if (rv & MR_READ)
374                                 n_reads++;
375                 }
376                 tmp = b->next;
377
378                 if (n_writes) {
379                         if (what == RESEND) {
380                                 b->n_writes = n_writes;
381                                 if (b->w.cb == NULL) {
382                                         b->w.cb = w_send_barrier;
383                                         inc_ap_pending(b->w.mdev);
384                                         set_bit(CREATE_BARRIER, &tconn->flags);
385                                 }
386
387                                 drbd_queue_work(&tconn->data.work, &b->w);
388                         }
389                         pn = &b->next;
390                 } else {
391                         if (n_reads)
392                                 list_add(&carry_reads, &b->requests);
393                         /* there could still be requests on that ring list,
394                          * in case local io is still pending */
395                         list_del(&b->requests);
396
397                         /* dec_ap_pending corresponding to queue_barrier.
398                          * the newest barrier may not have been queued yet,
399                          * in which case w.cb is still NULL. */
400                         if (b->w.cb != NULL)
401                                 dec_ap_pending(b->w.mdev);
402
403                         if (b == tconn->newest_tle) {
404                                 /* recycle, but reinit! */
405                                 if (tmp != NULL)
406                                         conn_err(tconn, "ASSERT FAILED tmp == NULL");
407                                 INIT_LIST_HEAD(&b->requests);
408                                 list_splice(&carry_reads, &b->requests);
409                                 INIT_LIST_HEAD(&b->w.list);
410                                 b->w.cb = NULL;
411                                 b->br_number = net_random();
412                                 b->n_writes = 0;
413
414                                 *pn = b;
415                                 break;
416                         }
417                         *pn = tmp;
418                         kfree(b);
419                 }
420                 b = tmp;
421                 list_splice(&carry_reads, &b->requests);
422         }
423
424         /* Actions operating on the disk state, also want to work on
425            requests that got barrier acked. */
426         switch (what) {
427         case FAIL_FROZEN_DISK_IO:
428         case RESTART_FROZEN_DISK_IO:
429                 list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
430                         req = list_entry(le, struct drbd_request, tl_requests);
431                         _req_mod(req, what);
432                 }
433         case CONNECTION_LOST_WHILE_PENDING:
434         case RESEND:
435                 break;
436         default:
437                 conn_err(tconn, "what = %d in _tl_restart()\n", what);
438         }
439 }
440
441 /**
442  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
443  * @mdev:       DRBD device.
444  *
445  * This is called after the connection to the peer was lost. The storage covered
446  * by the requests on the transfer gets marked as our of sync. Called from the
447  * receiver thread and the worker thread.
448  */
449 void tl_clear(struct drbd_tconn *tconn)
450 {
451         struct list_head *le, *tle;
452         struct drbd_request *r;
453
454         spin_lock_irq(&tconn->req_lock);
455
456         _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
457
458         /* we expect this list to be empty. */
459         if (!list_empty(&tconn->out_of_sequence_requests))
460                 conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
461
462         /* but just in case, clean it up anyways! */
463         list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
464                 r = list_entry(le, struct drbd_request, tl_requests);
465                 /* It would be nice to complete outside of spinlock.
466                  * But this is easier for now. */
467                 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
468         }
469
470         /* ensure bit indicating barrier is required is clear */
471         clear_bit(CREATE_BARRIER, &tconn->flags);
472
473         spin_unlock_irq(&tconn->req_lock);
474 }
475
476 void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
477 {
478         spin_lock_irq(&tconn->req_lock);
479         _tl_restart(tconn, what);
480         spin_unlock_irq(&tconn->req_lock);
481 }
482
483 /**
484  * tl_abort_disk_io() - Abort disk I/O for all requests for a certain mdev in the TL
485  * @mdev:       DRBD device.
486  */
487 void tl_abort_disk_io(struct drbd_conf *mdev)
488 {
489         struct drbd_tconn *tconn = mdev->tconn;
490         struct drbd_tl_epoch *b;
491         struct list_head *le, *tle;
492         struct drbd_request *req;
493
494         spin_lock_irq(&tconn->req_lock);
495         b = tconn->oldest_tle;
496         while (b) {
497                 list_for_each_safe(le, tle, &b->requests) {
498                         req = list_entry(le, struct drbd_request, tl_requests);
499                         if (!(req->rq_state & RQ_LOCAL_PENDING))
500                                 continue;
501                         if (req->w.mdev == mdev)
502                                 _req_mod(req, ABORT_DISK_IO);
503                 }
504                 b = b->next;
505         }
506
507         list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
508                 req = list_entry(le, struct drbd_request, tl_requests);
509                 if (!(req->rq_state & RQ_LOCAL_PENDING))
510                         continue;
511                 if (req->w.mdev == mdev)
512                         _req_mod(req, ABORT_DISK_IO);
513         }
514
515         spin_unlock_irq(&tconn->req_lock);
516 }
517
518 static int drbd_thread_setup(void *arg)
519 {
520         struct drbd_thread *thi = (struct drbd_thread *) arg;
521         struct drbd_tconn *tconn = thi->tconn;
522         unsigned long flags;
523         int retval;
524
525         snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
526                  thi->name[0], thi->tconn->name);
527
528 restart:
529         retval = thi->function(thi);
530
531         spin_lock_irqsave(&thi->t_lock, flags);
532
533         /* if the receiver has been "EXITING", the last thing it did
534          * was set the conn state to "StandAlone",
535          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
536          * and receiver thread will be "started".
537          * drbd_thread_start needs to set "RESTARTING" in that case.
538          * t_state check and assignment needs to be within the same spinlock,
539          * so either thread_start sees EXITING, and can remap to RESTARTING,
540          * or thread_start see NONE, and can proceed as normal.
541          */
542
543         if (thi->t_state == RESTARTING) {
544                 conn_info(tconn, "Restarting %s thread\n", thi->name);
545                 thi->t_state = RUNNING;
546                 spin_unlock_irqrestore(&thi->t_lock, flags);
547                 goto restart;
548         }
549
550         thi->task = NULL;
551         thi->t_state = NONE;
552         smp_mb();
553         complete_all(&thi->stop);
554         spin_unlock_irqrestore(&thi->t_lock, flags);
555
556         conn_info(tconn, "Terminating %s\n", current->comm);
557
558         /* Release mod reference taken when thread was started */
559
560         kref_put(&tconn->kref, &conn_destroy);
561         module_put(THIS_MODULE);
562         return retval;
563 }
564
565 static void drbd_thread_init(struct drbd_tconn *tconn, struct drbd_thread *thi,
566                              int (*func) (struct drbd_thread *), char *name)
567 {
568         spin_lock_init(&thi->t_lock);
569         thi->task    = NULL;
570         thi->t_state = NONE;
571         thi->function = func;
572         thi->tconn = tconn;
573         strncpy(thi->name, name, ARRAY_SIZE(thi->name));
574 }
575
576 int drbd_thread_start(struct drbd_thread *thi)
577 {
578         struct drbd_tconn *tconn = thi->tconn;
579         struct task_struct *nt;
580         unsigned long flags;
581
582         /* is used from state engine doing drbd_thread_stop_nowait,
583          * while holding the req lock irqsave */
584         spin_lock_irqsave(&thi->t_lock, flags);
585
586         switch (thi->t_state) {
587         case NONE:
588                 conn_info(tconn, "Starting %s thread (from %s [%d])\n",
589                          thi->name, current->comm, current->pid);
590
591                 /* Get ref on module for thread - this is released when thread exits */
592                 if (!try_module_get(THIS_MODULE)) {
593                         conn_err(tconn, "Failed to get module reference in drbd_thread_start\n");
594                         spin_unlock_irqrestore(&thi->t_lock, flags);
595                         return false;
596                 }
597
598                 kref_get(&thi->tconn->kref);
599
600                 init_completion(&thi->stop);
601                 thi->reset_cpu_mask = 1;
602                 thi->t_state = RUNNING;
603                 spin_unlock_irqrestore(&thi->t_lock, flags);
604                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
605
606                 nt = kthread_create(drbd_thread_setup, (void *) thi,
607                                     "drbd_%c_%s", thi->name[0], thi->tconn->name);
608
609                 if (IS_ERR(nt)) {
610                         conn_err(tconn, "Couldn't start thread\n");
611
612                         kref_put(&tconn->kref, &conn_destroy);
613                         module_put(THIS_MODULE);
614                         return false;
615                 }
616                 spin_lock_irqsave(&thi->t_lock, flags);
617                 thi->task = nt;
618                 thi->t_state = RUNNING;
619                 spin_unlock_irqrestore(&thi->t_lock, flags);
620                 wake_up_process(nt);
621                 break;
622         case EXITING:
623                 thi->t_state = RESTARTING;
624                 conn_info(tconn, "Restarting %s thread (from %s [%d])\n",
625                                 thi->name, current->comm, current->pid);
626                 /* fall through */
627         case RUNNING:
628         case RESTARTING:
629         default:
630                 spin_unlock_irqrestore(&thi->t_lock, flags);
631                 break;
632         }
633
634         return true;
635 }
636
637
638 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
639 {
640         unsigned long flags;
641
642         enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
643
644         /* may be called from state engine, holding the req lock irqsave */
645         spin_lock_irqsave(&thi->t_lock, flags);
646
647         if (thi->t_state == NONE) {
648                 spin_unlock_irqrestore(&thi->t_lock, flags);
649                 if (restart)
650                         drbd_thread_start(thi);
651                 return;
652         }
653
654         if (thi->t_state != ns) {
655                 if (thi->task == NULL) {
656                         spin_unlock_irqrestore(&thi->t_lock, flags);
657                         return;
658                 }
659
660                 thi->t_state = ns;
661                 smp_mb();
662                 init_completion(&thi->stop);
663                 if (thi->task != current)
664                         force_sig(DRBD_SIGKILL, thi->task);
665         }
666
667         spin_unlock_irqrestore(&thi->t_lock, flags);
668
669         if (wait)
670                 wait_for_completion(&thi->stop);
671 }
672
673 static struct drbd_thread *drbd_task_to_thread(struct drbd_tconn *tconn, struct task_struct *task)
674 {
675         struct drbd_thread *thi =
676                 task == tconn->receiver.task ? &tconn->receiver :
677                 task == tconn->asender.task  ? &tconn->asender :
678                 task == tconn->worker.task   ? &tconn->worker : NULL;
679
680         return thi;
681 }
682
683 char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *task)
684 {
685         struct drbd_thread *thi = drbd_task_to_thread(tconn, task);
686         return thi ? thi->name : task->comm;
687 }
688
689 int conn_lowest_minor(struct drbd_tconn *tconn)
690 {
691         struct drbd_conf *mdev;
692         int vnr = 0, m;
693
694         rcu_read_lock();
695         mdev = idr_get_next(&tconn->volumes, &vnr);
696         m = mdev ? mdev_to_minor(mdev) : -1;
697         rcu_read_unlock();
698
699         return m;
700 }
701
702 #ifdef CONFIG_SMP
703 /**
704  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
705  * @mdev:       DRBD device.
706  *
707  * Forces all threads of a device onto the same CPU. This is beneficial for
708  * DRBD's performance. May be overwritten by user's configuration.
709  */
710 void drbd_calc_cpu_mask(struct drbd_tconn *tconn)
711 {
712         int ord, cpu;
713
714         /* user override. */
715         if (cpumask_weight(tconn->cpu_mask))
716                 return;
717
718         ord = conn_lowest_minor(tconn) % cpumask_weight(cpu_online_mask);
719         for_each_online_cpu(cpu) {
720                 if (ord-- == 0) {
721                         cpumask_set_cpu(cpu, tconn->cpu_mask);
722                         return;
723                 }
724         }
725         /* should not be reached */
726         cpumask_setall(tconn->cpu_mask);
727 }
728
729 /**
730  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
731  * @mdev:       DRBD device.
732  * @thi:        drbd_thread object
733  *
734  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
735  * prematurely.
736  */
737 void drbd_thread_current_set_cpu(struct drbd_thread *thi)
738 {
739         struct task_struct *p = current;
740
741         if (!thi->reset_cpu_mask)
742                 return;
743         thi->reset_cpu_mask = 0;
744         set_cpus_allowed_ptr(p, thi->tconn->cpu_mask);
745 }
746 #endif
747
748 /**
749  * drbd_header_size  -  size of a packet header
750  *
751  * The header size is a multiple of 8, so any payload following the header is
752  * word aligned on 64-bit architectures.  (The bitmap send and receive code
753  * relies on this.)
754  */
755 unsigned int drbd_header_size(struct drbd_tconn *tconn)
756 {
757         if (tconn->agreed_pro_version >= 100) {
758                 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8));
759                 return sizeof(struct p_header100);
760         } else {
761                 BUILD_BUG_ON(sizeof(struct p_header80) !=
762                              sizeof(struct p_header95));
763                 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8));
764                 return sizeof(struct p_header80);
765         }
766 }
767
768 static unsigned int prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
769 {
770         h->magic   = cpu_to_be32(DRBD_MAGIC);
771         h->command = cpu_to_be16(cmd);
772         h->length  = cpu_to_be16(size);
773         return sizeof(struct p_header80);
774 }
775
776 static unsigned int prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
777 {
778         h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
779         h->command = cpu_to_be16(cmd);
780         h->length = cpu_to_be32(size);
781         return sizeof(struct p_header95);
782 }
783
784 static unsigned int prepare_header100(struct p_header100 *h, enum drbd_packet cmd,
785                                       int size, int vnr)
786 {
787         h->magic = cpu_to_be32(DRBD_MAGIC_100);
788         h->volume = cpu_to_be16(vnr);
789         h->command = cpu_to_be16(cmd);
790         h->length = cpu_to_be32(size);
791         h->pad = 0;
792         return sizeof(struct p_header100);
793 }
794
795 static unsigned int prepare_header(struct drbd_tconn *tconn, int vnr,
796                                    void *buffer, enum drbd_packet cmd, int size)
797 {
798         if (tconn->agreed_pro_version >= 100)
799                 return prepare_header100(buffer, cmd, size, vnr);
800         else if (tconn->agreed_pro_version >= 95 &&
801                  size > DRBD_MAX_SIZE_H80_PACKET)
802                 return prepare_header95(buffer, cmd, size);
803         else
804                 return prepare_header80(buffer, cmd, size);
805 }
806
807 static void *__conn_prepare_command(struct drbd_tconn *tconn,
808                                     struct drbd_socket *sock)
809 {
810         if (!sock->socket)
811                 return NULL;
812         return sock->sbuf + drbd_header_size(tconn);
813 }
814
815 void *conn_prepare_command(struct drbd_tconn *tconn, struct drbd_socket *sock)
816 {
817         void *p;
818
819         mutex_lock(&sock->mutex);
820         p = __conn_prepare_command(tconn, sock);
821         if (!p)
822                 mutex_unlock(&sock->mutex);
823
824         return p;
825 }
826
827 void *drbd_prepare_command(struct drbd_conf *mdev, struct drbd_socket *sock)
828 {
829         return conn_prepare_command(mdev->tconn, sock);
830 }
831
832 static int __send_command(struct drbd_tconn *tconn, int vnr,
833                           struct drbd_socket *sock, enum drbd_packet cmd,
834                           unsigned int header_size, void *data,
835                           unsigned int size)
836 {
837         int msg_flags;
838         int err;
839
840         /*
841          * Called with @data == NULL and the size of the data blocks in @size
842          * for commands that send data blocks.  For those commands, omit the
843          * MSG_MORE flag: this will increase the likelihood that data blocks
844          * which are page aligned on the sender will end up page aligned on the
845          * receiver.
846          */
847         msg_flags = data ? MSG_MORE : 0;
848
849         header_size += prepare_header(tconn, vnr, sock->sbuf, cmd,
850                                       header_size + size);
851         err = drbd_send_all(tconn, sock->socket, sock->sbuf, header_size,
852                             msg_flags);
853         if (data && !err)
854                 err = drbd_send_all(tconn, sock->socket, data, size, 0);
855         return err;
856 }
857
858 static int __conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
859                                enum drbd_packet cmd, unsigned int header_size,
860                                void *data, unsigned int size)
861 {
862         return __send_command(tconn, 0, sock, cmd, header_size, data, size);
863 }
864
865 int conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
866                       enum drbd_packet cmd, unsigned int header_size,
867                       void *data, unsigned int size)
868 {
869         int err;
870
871         err = __conn_send_command(tconn, sock, cmd, header_size, data, size);
872         mutex_unlock(&sock->mutex);
873         return err;
874 }
875
876 int drbd_send_command(struct drbd_conf *mdev, struct drbd_socket *sock,
877                       enum drbd_packet cmd, unsigned int header_size,
878                       void *data, unsigned int size)
879 {
880         int err;
881
882         err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, header_size,
883                              data, size);
884         mutex_unlock(&sock->mutex);
885         return err;
886 }
887
888 int drbd_send_ping(struct drbd_tconn *tconn)
889 {
890         struct drbd_socket *sock;
891
892         sock = &tconn->meta;
893         if (!conn_prepare_command(tconn, sock))
894                 return -EIO;
895         return conn_send_command(tconn, sock, P_PING, 0, NULL, 0);
896 }
897
898 int drbd_send_ping_ack(struct drbd_tconn *tconn)
899 {
900         struct drbd_socket *sock;
901
902         sock = &tconn->meta;
903         if (!conn_prepare_command(tconn, sock))
904                 return -EIO;
905         return conn_send_command(tconn, sock, P_PING_ACK, 0, NULL, 0);
906 }
907
908 int drbd_send_sync_param(struct drbd_conf *mdev)
909 {
910         struct drbd_socket *sock;
911         struct p_rs_param_95 *p;
912         int size;
913         const int apv = mdev->tconn->agreed_pro_version;
914         enum drbd_packet cmd;
915         struct net_conf *nc;
916         struct disk_conf *dc;
917
918         sock = &mdev->tconn->data;
919         p = drbd_prepare_command(mdev, sock);
920         if (!p)
921                 return -EIO;
922
923         rcu_read_lock();
924         nc = rcu_dereference(mdev->tconn->net_conf);
925
926         size = apv <= 87 ? sizeof(struct p_rs_param)
927                 : apv == 88 ? sizeof(struct p_rs_param)
928                         + strlen(nc->verify_alg) + 1
929                 : apv <= 94 ? sizeof(struct p_rs_param_89)
930                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
931
932         cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
933
934         /* initialize verify_alg and csums_alg */
935         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
936
937         if (get_ldev(mdev)) {
938                 dc = rcu_dereference(mdev->ldev->disk_conf);
939                 p->resync_rate = cpu_to_be32(dc->resync_rate);
940                 p->c_plan_ahead = cpu_to_be32(dc->c_plan_ahead);
941                 p->c_delay_target = cpu_to_be32(dc->c_delay_target);
942                 p->c_fill_target = cpu_to_be32(dc->c_fill_target);
943                 p->c_max_rate = cpu_to_be32(dc->c_max_rate);
944                 put_ldev(mdev);
945         } else {
946                 p->resync_rate = cpu_to_be32(DRBD_RESYNC_RATE_DEF);
947                 p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
948                 p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
949                 p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
950                 p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
951         }
952
953         if (apv >= 88)
954                 strcpy(p->verify_alg, nc->verify_alg);
955         if (apv >= 89)
956                 strcpy(p->csums_alg, nc->csums_alg);
957         rcu_read_unlock();
958
959         return drbd_send_command(mdev, sock, cmd, size, NULL, 0);
960 }
961
962 int __drbd_send_protocol(struct drbd_tconn *tconn, enum drbd_packet cmd)
963 {
964         struct drbd_socket *sock;
965         struct p_protocol *p;
966         struct net_conf *nc;
967         int size, cf;
968
969         sock = &tconn->data;
970         p = __conn_prepare_command(tconn, sock);
971         if (!p)
972                 return -EIO;
973
974         rcu_read_lock();
975         nc = rcu_dereference(tconn->net_conf);
976
977         if (nc->tentative && tconn->agreed_pro_version < 92) {
978                 rcu_read_unlock();
979                 mutex_unlock(&sock->mutex);
980                 conn_err(tconn, "--dry-run is not supported by peer");
981                 return -EOPNOTSUPP;
982         }
983
984         size = sizeof(*p);
985         if (tconn->agreed_pro_version >= 87)
986                 size += strlen(nc->integrity_alg) + 1;
987
988         p->protocol      = cpu_to_be32(nc->wire_protocol);
989         p->after_sb_0p   = cpu_to_be32(nc->after_sb_0p);
990         p->after_sb_1p   = cpu_to_be32(nc->after_sb_1p);
991         p->after_sb_2p   = cpu_to_be32(nc->after_sb_2p);
992         p->two_primaries = cpu_to_be32(nc->two_primaries);
993         cf = 0;
994         if (nc->discard_my_data)
995                 cf |= CF_DISCARD_MY_DATA;
996         if (nc->tentative)
997                 cf |= CF_DRY_RUN;
998         p->conn_flags    = cpu_to_be32(cf);
999
1000         if (tconn->agreed_pro_version >= 87)
1001                 strcpy(p->integrity_alg, nc->integrity_alg);
1002         rcu_read_unlock();
1003
1004         return __conn_send_command(tconn, sock, cmd, size, NULL, 0);
1005 }
1006
1007 int drbd_send_protocol(struct drbd_tconn *tconn)
1008 {
1009         int err;
1010
1011         mutex_lock(&tconn->data.mutex);
1012         err = __drbd_send_protocol(tconn, P_PROTOCOL);
1013         mutex_unlock(&tconn->data.mutex);
1014
1015         return err;
1016 }
1017
1018 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1019 {
1020         struct drbd_socket *sock;
1021         struct p_uuids *p;
1022         int i;
1023
1024         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1025                 return 0;
1026
1027         sock = &mdev->tconn->data;
1028         p = drbd_prepare_command(mdev, sock);
1029         if (!p) {
1030                 put_ldev(mdev);
1031                 return -EIO;
1032         }
1033         for (i = UI_CURRENT; i < UI_SIZE; i++)
1034                 p->uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1035
1036         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1037         p->uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1038         rcu_read_lock();
1039         uuid_flags |= rcu_dereference(mdev->tconn->net_conf)->discard_my_data ? 1 : 0;
1040         rcu_read_unlock();
1041         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1042         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1043         p->uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1044
1045         put_ldev(mdev);
1046         return drbd_send_command(mdev, sock, P_UUIDS, sizeof(*p), NULL, 0);
1047 }
1048
1049 int drbd_send_uuids(struct drbd_conf *mdev)
1050 {
1051         return _drbd_send_uuids(mdev, 0);
1052 }
1053
1054 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1055 {
1056         return _drbd_send_uuids(mdev, 8);
1057 }
1058
1059 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
1060 {
1061         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1062                 u64 *uuid = mdev->ldev->md.uuid;
1063                 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
1064                      text,
1065                      (unsigned long long)uuid[UI_CURRENT],
1066                      (unsigned long long)uuid[UI_BITMAP],
1067                      (unsigned long long)uuid[UI_HISTORY_START],
1068                      (unsigned long long)uuid[UI_HISTORY_END]);
1069                 put_ldev(mdev);
1070         } else {
1071                 dev_info(DEV, "%s effective data uuid: %016llX\n",
1072                                 text,
1073                                 (unsigned long long)mdev->ed_uuid);
1074         }
1075 }
1076
1077 void drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
1078 {
1079         struct drbd_socket *sock;
1080         struct p_rs_uuid *p;
1081         u64 uuid;
1082
1083         D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1084
1085         uuid = mdev->ldev->md.uuid[UI_BITMAP];
1086         if (uuid && uuid != UUID_JUST_CREATED)
1087                 uuid = uuid + UUID_NEW_BM_OFFSET;
1088         else
1089                 get_random_bytes(&uuid, sizeof(u64));
1090         drbd_uuid_set(mdev, UI_BITMAP, uuid);
1091         drbd_print_uuids(mdev, "updated sync UUID");
1092         drbd_md_sync(mdev);
1093
1094         sock = &mdev->tconn->data;
1095         p = drbd_prepare_command(mdev, sock);
1096         if (p) {
1097                 p->uuid = cpu_to_be64(uuid);
1098                 drbd_send_command(mdev, sock, P_SYNC_UUID, sizeof(*p), NULL, 0);
1099         }
1100 }
1101
1102 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1103 {
1104         struct drbd_socket *sock;
1105         struct p_sizes *p;
1106         sector_t d_size, u_size;
1107         int q_order_type, max_bio_size;
1108
1109         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1110                 D_ASSERT(mdev->ldev->backing_bdev);
1111                 d_size = drbd_get_max_capacity(mdev->ldev);
1112                 rcu_read_lock();
1113                 u_size = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
1114                 rcu_read_unlock();
1115                 q_order_type = drbd_queue_order_type(mdev);
1116                 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
1117                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
1118                 put_ldev(mdev);
1119         } else {
1120                 d_size = 0;
1121                 u_size = 0;
1122                 q_order_type = QUEUE_ORDERED_NONE;
1123                 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
1124         }
1125
1126         sock = &mdev->tconn->data;
1127         p = drbd_prepare_command(mdev, sock);
1128         if (!p)
1129                 return -EIO;
1130
1131         if (mdev->tconn->agreed_pro_version <= 94)
1132                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
1133         else if (mdev->tconn->agreed_pro_version < 100)
1134                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE_P95);
1135
1136         p->d_size = cpu_to_be64(d_size);
1137         p->u_size = cpu_to_be64(u_size);
1138         p->c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1139         p->max_bio_size = cpu_to_be32(max_bio_size);
1140         p->queue_order_type = cpu_to_be16(q_order_type);
1141         p->dds_flags = cpu_to_be16(flags);
1142         return drbd_send_command(mdev, sock, P_SIZES, sizeof(*p), NULL, 0);
1143 }
1144
1145 /**
1146  * drbd_send_current_state() - Sends the drbd state to the peer
1147  * @mdev:       DRBD device.
1148  */
1149 int drbd_send_current_state(struct drbd_conf *mdev)
1150 {
1151         struct drbd_socket *sock;
1152         struct p_state *p;
1153
1154         sock = &mdev->tconn->data;
1155         p = drbd_prepare_command(mdev, sock);
1156         if (!p)
1157                 return -EIO;
1158         p->state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1159         return drbd_send_command(mdev, sock, P_STATE, sizeof(*p), NULL, 0);
1160 }
1161
1162 /**
1163  * drbd_send_state() - After a state change, sends the new state to the peer
1164  * @mdev:      DRBD device.
1165  * @state:     the state to send, not necessarily the current state.
1166  *
1167  * Each state change queues an "after_state_ch" work, which will eventually
1168  * send the resulting new state to the peer. If more state changes happen
1169  * between queuing and processing of the after_state_ch work, we still
1170  * want to send each intermediary state in the order it occurred.
1171  */
1172 int drbd_send_state(struct drbd_conf *mdev, union drbd_state state)
1173 {
1174         struct drbd_socket *sock;
1175         struct p_state *p;
1176
1177         sock = &mdev->tconn->data;
1178         p = drbd_prepare_command(mdev, sock);
1179         if (!p)
1180                 return -EIO;
1181         p->state = cpu_to_be32(state.i); /* Within the send mutex */
1182         return drbd_send_command(mdev, sock, P_STATE, sizeof(*p), NULL, 0);
1183 }
1184
1185 int drbd_send_state_req(struct drbd_conf *mdev, union drbd_state mask, union drbd_state val)
1186 {
1187         struct drbd_socket *sock;
1188         struct p_req_state *p;
1189
1190         sock = &mdev->tconn->data;
1191         p = drbd_prepare_command(mdev, sock);
1192         if (!p)
1193                 return -EIO;
1194         p->mask = cpu_to_be32(mask.i);
1195         p->val = cpu_to_be32(val.i);
1196         return drbd_send_command(mdev, sock, P_STATE_CHG_REQ, sizeof(*p), NULL, 0);
1197 }
1198
1199 int conn_send_state_req(struct drbd_tconn *tconn, union drbd_state mask, union drbd_state val)
1200 {
1201         enum drbd_packet cmd;
1202         struct drbd_socket *sock;
1203         struct p_req_state *p;
1204
1205         cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REQ : P_CONN_ST_CHG_REQ;
1206         sock = &tconn->data;
1207         p = conn_prepare_command(tconn, sock);
1208         if (!p)
1209                 return -EIO;
1210         p->mask = cpu_to_be32(mask.i);
1211         p->val = cpu_to_be32(val.i);
1212         return conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1213 }
1214
1215 void drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
1216 {
1217         struct drbd_socket *sock;
1218         struct p_req_state_reply *p;
1219
1220         sock = &mdev->tconn->meta;
1221         p = drbd_prepare_command(mdev, sock);
1222         if (p) {
1223                 p->retcode = cpu_to_be32(retcode);
1224                 drbd_send_command(mdev, sock, P_STATE_CHG_REPLY, sizeof(*p), NULL, 0);
1225         }
1226 }
1227
1228 void conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
1229 {
1230         struct drbd_socket *sock;
1231         struct p_req_state_reply *p;
1232         enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
1233
1234         sock = &tconn->meta;
1235         p = conn_prepare_command(tconn, sock);
1236         if (p) {
1237                 p->retcode = cpu_to_be32(retcode);
1238                 conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1239         }
1240 }
1241
1242 static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
1243 {
1244         BUG_ON(code & ~0xf);
1245         p->encoding = (p->encoding & ~0xf) | code;
1246 }
1247
1248 static void dcbp_set_start(struct p_compressed_bm *p, int set)
1249 {
1250         p->encoding = (p->encoding & ~0x80) | (set ? 0x80 : 0);
1251 }
1252
1253 static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
1254 {
1255         BUG_ON(n & ~0x7);
1256         p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
1257 }
1258
1259 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1260                          struct p_compressed_bm *p,
1261                          unsigned int size,
1262                          struct bm_xfer_ctx *c)
1263 {
1264         struct bitstream bs;
1265         unsigned long plain_bits;
1266         unsigned long tmp;
1267         unsigned long rl;
1268         unsigned len;
1269         unsigned toggle;
1270         int bits, use_rle;
1271
1272         /* may we use this feature? */
1273         rcu_read_lock();
1274         use_rle = rcu_dereference(mdev->tconn->net_conf)->use_rle;
1275         rcu_read_unlock();
1276         if (!use_rle || mdev->tconn->agreed_pro_version < 90)
1277                 return 0;
1278
1279         if (c->bit_offset >= c->bm_bits)
1280                 return 0; /* nothing to do. */
1281
1282         /* use at most thus many bytes */
1283         bitstream_init(&bs, p->code, size, 0);
1284         memset(p->code, 0, size);
1285         /* plain bits covered in this code string */
1286         plain_bits = 0;
1287
1288         /* p->encoding & 0x80 stores whether the first run length is set.
1289          * bit offset is implicit.
1290          * start with toggle == 2 to be able to tell the first iteration */
1291         toggle = 2;
1292
1293         /* see how much plain bits we can stuff into one packet
1294          * using RLE and VLI. */
1295         do {
1296                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1297                                     : _drbd_bm_find_next(mdev, c->bit_offset);
1298                 if (tmp == -1UL)
1299                         tmp = c->bm_bits;
1300                 rl = tmp - c->bit_offset;
1301
1302                 if (toggle == 2) { /* first iteration */
1303                         if (rl == 0) {
1304                                 /* the first checked bit was set,
1305                                  * store start value, */
1306                                 dcbp_set_start(p, 1);
1307                                 /* but skip encoding of zero run length */
1308                                 toggle = !toggle;
1309                                 continue;
1310                         }
1311                         dcbp_set_start(p, 0);
1312                 }
1313
1314                 /* paranoia: catch zero runlength.
1315                  * can only happen if bitmap is modified while we scan it. */
1316                 if (rl == 0) {
1317                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1318                             "t:%u bo:%lu\n", toggle, c->bit_offset);
1319                         return -1;
1320                 }
1321
1322                 bits = vli_encode_bits(&bs, rl);
1323                 if (bits == -ENOBUFS) /* buffer full */
1324                         break;
1325                 if (bits <= 0) {
1326                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1327                         return 0;
1328                 }
1329
1330                 toggle = !toggle;
1331                 plain_bits += rl;
1332                 c->bit_offset = tmp;
1333         } while (c->bit_offset < c->bm_bits);
1334
1335         len = bs.cur.b - p->code + !!bs.cur.bit;
1336
1337         if (plain_bits < (len << 3)) {
1338                 /* incompressible with this method.
1339                  * we need to rewind both word and bit position. */
1340                 c->bit_offset -= plain_bits;
1341                 bm_xfer_ctx_bit_to_word_offset(c);
1342                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1343                 return 0;
1344         }
1345
1346         /* RLE + VLI was able to compress it just fine.
1347          * update c->word_offset. */
1348         bm_xfer_ctx_bit_to_word_offset(c);
1349
1350         /* store pad_bits */
1351         dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1352
1353         return len;
1354 }
1355
1356 /**
1357  * send_bitmap_rle_or_plain
1358  *
1359  * Return 0 when done, 1 when another iteration is needed, and a negative error
1360  * code upon failure.
1361  */
1362 static int
1363 send_bitmap_rle_or_plain(struct drbd_conf *mdev, struct bm_xfer_ctx *c)
1364 {
1365         struct drbd_socket *sock = &mdev->tconn->data;
1366         unsigned int header_size = drbd_header_size(mdev->tconn);
1367         struct p_compressed_bm *p = sock->sbuf + header_size;
1368         int len, err;
1369
1370         len = fill_bitmap_rle_bits(mdev, p,
1371                         DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c);
1372         if (len < 0)
1373                 return -EIO;
1374
1375         if (len) {
1376                 dcbp_set_code(p, RLE_VLI_Bits);
1377                 err = __send_command(mdev->tconn, mdev->vnr, sock,
1378                                      P_COMPRESSED_BITMAP, sizeof(*p) + len,
1379                                      NULL, 0);
1380                 c->packets[0]++;
1381                 c->bytes[0] += header_size + sizeof(*p) + len;
1382
1383                 if (c->bit_offset >= c->bm_bits)
1384                         len = 0; /* DONE */
1385         } else {
1386                 /* was not compressible.
1387                  * send a buffer full of plain text bits instead. */
1388                 unsigned int data_size;
1389                 unsigned long num_words;
1390                 unsigned long *p = sock->sbuf + header_size;
1391
1392                 data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
1393                 num_words = min_t(size_t, data_size / sizeof(*p),
1394                                   c->bm_words - c->word_offset);
1395                 len = num_words * sizeof(*p);
1396                 if (len)
1397                         drbd_bm_get_lel(mdev, c->word_offset, num_words, p);
1398                 err = __send_command(mdev->tconn, mdev->vnr, sock, P_BITMAP, len, NULL, 0);
1399                 c->word_offset += num_words;
1400                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1401
1402                 c->packets[1]++;
1403                 c->bytes[1] += header_size + len;
1404
1405                 if (c->bit_offset > c->bm_bits)
1406                         c->bit_offset = c->bm_bits;
1407         }
1408         if (!err) {
1409                 if (len == 0) {
1410                         INFO_bm_xfer_stats(mdev, "send", c);
1411                         return 0;
1412                 } else
1413                         return 1;
1414         }
1415         return -EIO;
1416 }
1417
1418 /* See the comment at receive_bitmap() */
1419 static int _drbd_send_bitmap(struct drbd_conf *mdev)
1420 {
1421         struct bm_xfer_ctx c;
1422         int err;
1423
1424         if (!expect(mdev->bitmap))
1425                 return false;
1426
1427         if (get_ldev(mdev)) {
1428                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1429                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1430                         drbd_bm_set_all(mdev);
1431                         if (drbd_bm_write(mdev)) {
1432                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1433                                  * but otherwise process as per normal - need to tell other
1434                                  * side that a full resync is required! */
1435                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
1436                         } else {
1437                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1438                                 drbd_md_sync(mdev);
1439                         }
1440                 }
1441                 put_ldev(mdev);
1442         }
1443
1444         c = (struct bm_xfer_ctx) {
1445                 .bm_bits = drbd_bm_bits(mdev),
1446                 .bm_words = drbd_bm_words(mdev),
1447         };
1448
1449         do {
1450                 err = send_bitmap_rle_or_plain(mdev, &c);
1451         } while (err > 0);
1452
1453         return err == 0;
1454 }
1455
1456 int drbd_send_bitmap(struct drbd_conf *mdev)
1457 {
1458         struct drbd_socket *sock = &mdev->tconn->data;
1459         int err = -1;
1460
1461         mutex_lock(&sock->mutex);
1462         if (sock->socket)
1463                 err = !_drbd_send_bitmap(mdev);
1464         mutex_unlock(&sock->mutex);
1465         return err;
1466 }
1467
1468 void drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
1469 {
1470         struct drbd_socket *sock;
1471         struct p_barrier_ack *p;
1472
1473         if (mdev->state.conn < C_CONNECTED)
1474                 return;
1475
1476         sock = &mdev->tconn->meta;
1477         p = drbd_prepare_command(mdev, sock);
1478         if (!p)
1479                 return;
1480         p->barrier = barrier_nr;
1481         p->set_size = cpu_to_be32(set_size);
1482         drbd_send_command(mdev, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0);
1483 }
1484
1485 /**
1486  * _drbd_send_ack() - Sends an ack packet
1487  * @mdev:       DRBD device.
1488  * @cmd:        Packet command code.
1489  * @sector:     sector, needs to be in big endian byte order
1490  * @blksize:    size in byte, needs to be in big endian byte order
1491  * @block_id:   Id, big endian byte order
1492  */
1493 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1494                           u64 sector, u32 blksize, u64 block_id)
1495 {
1496         struct drbd_socket *sock;
1497         struct p_block_ack *p;
1498
1499         if (mdev->state.conn < C_CONNECTED)
1500                 return -EIO;
1501
1502         sock = &mdev->tconn->meta;
1503         p = drbd_prepare_command(mdev, sock);
1504         if (!p)
1505                 return -EIO;
1506         p->sector = sector;
1507         p->block_id = block_id;
1508         p->blksize = blksize;
1509         p->seq_num = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
1510         return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1511 }
1512
1513 /* dp->sector and dp->block_id already/still in network byte order,
1514  * data_size is payload size according to dp->head,
1515  * and may need to be corrected for digest size. */
1516 void drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
1517                       struct p_data *dp, int data_size)
1518 {
1519         if (mdev->tconn->peer_integrity_tfm)
1520                 data_size -= crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1521         _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
1522                        dp->block_id);
1523 }
1524
1525 void drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packet cmd,
1526                       struct p_block_req *rp)
1527 {
1528         _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
1529 }
1530
1531 /**
1532  * drbd_send_ack() - Sends an ack packet
1533  * @mdev:       DRBD device
1534  * @cmd:        packet command code
1535  * @peer_req:   peer request
1536  */
1537 int drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1538                   struct drbd_peer_request *peer_req)
1539 {
1540         return _drbd_send_ack(mdev, cmd,
1541                               cpu_to_be64(peer_req->i.sector),
1542                               cpu_to_be32(peer_req->i.size),
1543                               peer_req->block_id);
1544 }
1545
1546 /* This function misuses the block_id field to signal if the blocks
1547  * are is sync or not. */
1548 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
1549                      sector_t sector, int blksize, u64 block_id)
1550 {
1551         return _drbd_send_ack(mdev, cmd,
1552                               cpu_to_be64(sector),
1553                               cpu_to_be32(blksize),
1554                               cpu_to_be64(block_id));
1555 }
1556
1557 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
1558                        sector_t sector, int size, u64 block_id)
1559 {
1560         struct drbd_socket *sock;
1561         struct p_block_req *p;
1562
1563         sock = &mdev->tconn->data;
1564         p = drbd_prepare_command(mdev, sock);
1565         if (!p)
1566                 return -EIO;
1567         p->sector = cpu_to_be64(sector);
1568         p->block_id = block_id;
1569         p->blksize = cpu_to_be32(size);
1570         return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1571 }
1572
1573 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
1574                             void *digest, int digest_size, enum drbd_packet cmd)
1575 {
1576         struct drbd_socket *sock;
1577         struct p_block_req *p;
1578
1579         /* FIXME: Put the digest into the preallocated socket buffer.  */
1580
1581         sock = &mdev->tconn->data;
1582         p = drbd_prepare_command(mdev, sock);
1583         if (!p)
1584                 return -EIO;
1585         p->sector = cpu_to_be64(sector);
1586         p->block_id = ID_SYNCER /* unused */;
1587         p->blksize = cpu_to_be32(size);
1588         return drbd_send_command(mdev, sock, cmd, sizeof(*p),
1589                                  digest, digest_size);
1590 }
1591
1592 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
1593 {
1594         struct drbd_socket *sock;
1595         struct p_block_req *p;
1596
1597         sock = &mdev->tconn->data;
1598         p = drbd_prepare_command(mdev, sock);
1599         if (!p)
1600                 return -EIO;
1601         p->sector = cpu_to_be64(sector);
1602         p->block_id = ID_SYNCER /* unused */;
1603         p->blksize = cpu_to_be32(size);
1604         return drbd_send_command(mdev, sock, P_OV_REQUEST, sizeof(*p), NULL, 0);
1605 }
1606
1607 /* called on sndtimeo
1608  * returns false if we should retry,
1609  * true if we think connection is dead
1610  */
1611 static int we_should_drop_the_connection(struct drbd_tconn *tconn, struct socket *sock)
1612 {
1613         int drop_it;
1614         /* long elapsed = (long)(jiffies - mdev->last_received); */
1615
1616         drop_it =   tconn->meta.socket == sock
1617                 || !tconn->asender.task
1618                 || get_t_state(&tconn->asender) != RUNNING
1619                 || tconn->cstate < C_WF_REPORT_PARAMS;
1620
1621         if (drop_it)
1622                 return true;
1623
1624         drop_it = !--tconn->ko_count;
1625         if (!drop_it) {
1626                 conn_err(tconn, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1627                          current->comm, current->pid, tconn->ko_count);
1628                 request_ping(tconn);
1629         }
1630
1631         return drop_it; /* && (mdev->state == R_PRIMARY) */;
1632 }
1633
1634 static void drbd_update_congested(struct drbd_tconn *tconn)
1635 {
1636         struct sock *sk = tconn->data.socket->sk;
1637         if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1638                 set_bit(NET_CONGESTED, &tconn->flags);
1639 }
1640
1641 /* The idea of sendpage seems to be to put some kind of reference
1642  * to the page into the skb, and to hand it over to the NIC. In
1643  * this process get_page() gets called.
1644  *
1645  * As soon as the page was really sent over the network put_page()
1646  * gets called by some part of the network layer. [ NIC driver? ]
1647  *
1648  * [ get_page() / put_page() increment/decrement the count. If count
1649  *   reaches 0 the page will be freed. ]
1650  *
1651  * This works nicely with pages from FSs.
1652  * But this means that in protocol A we might signal IO completion too early!
1653  *
1654  * In order not to corrupt data during a resync we must make sure
1655  * that we do not reuse our own buffer pages (EEs) to early, therefore
1656  * we have the net_ee list.
1657  *
1658  * XFS seems to have problems, still, it submits pages with page_count == 0!
1659  * As a workaround, we disable sendpage on pages
1660  * with page_count == 0 or PageSlab.
1661  */
1662 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
1663                               int offset, size_t size, unsigned msg_flags)
1664 {
1665         struct socket *socket;
1666         void *addr;
1667         int err;
1668
1669         socket = mdev->tconn->data.socket;
1670         addr = kmap(page) + offset;
1671         err = drbd_send_all(mdev->tconn, socket, addr, size, msg_flags);
1672         kunmap(page);
1673         if (!err)
1674                 mdev->send_cnt += size >> 9;
1675         return err;
1676 }
1677
1678 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
1679                     int offset, size_t size, unsigned msg_flags)
1680 {
1681         struct socket *socket = mdev->tconn->data.socket;
1682         mm_segment_t oldfs = get_fs();
1683         int len = size;
1684         int err = -EIO;
1685
1686         /* e.g. XFS meta- & log-data is in slab pages, which have a
1687          * page_count of 0 and/or have PageSlab() set.
1688          * we cannot use send_page for those, as that does get_page();
1689          * put_page(); and would cause either a VM_BUG directly, or
1690          * __page_cache_release a page that would actually still be referenced
1691          * by someone, leading to some obscure delayed Oops somewhere else. */
1692         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1693                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
1694
1695         msg_flags |= MSG_NOSIGNAL;
1696         drbd_update_congested(mdev->tconn);
1697         set_fs(KERNEL_DS);
1698         do {
1699                 int sent;
1700
1701                 sent = socket->ops->sendpage(socket, page, offset, len, msg_flags);
1702                 if (sent <= 0) {
1703                         if (sent == -EAGAIN) {
1704                                 if (we_should_drop_the_connection(mdev->tconn, socket))
1705                                         break;
1706                                 continue;
1707                         }
1708                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
1709                              __func__, (int)size, len, sent);
1710                         if (sent < 0)
1711                                 err = sent;
1712                         break;
1713                 }
1714                 len    -= sent;
1715                 offset += sent;
1716         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
1717         set_fs(oldfs);
1718         clear_bit(NET_CONGESTED, &mdev->tconn->flags);
1719
1720         if (len == 0) {
1721                 err = 0;
1722                 mdev->send_cnt += size >> 9;
1723         }
1724         return err;
1725 }
1726
1727 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
1728 {
1729         struct bio_vec *bvec;
1730         int i;
1731         /* hint all but last page with MSG_MORE */
1732         bio_for_each_segment(bvec, bio, i) {
1733                 int err;
1734
1735                 err = _drbd_no_send_page(mdev, bvec->bv_page,
1736                                          bvec->bv_offset, bvec->bv_len,
1737                                          i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1738                 if (err)
1739                         return err;
1740         }
1741         return 0;
1742 }
1743
1744 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
1745 {
1746         struct bio_vec *bvec;
1747         int i;
1748         /* hint all but last page with MSG_MORE */
1749         bio_for_each_segment(bvec, bio, i) {
1750                 int err;
1751
1752                 err = _drbd_send_page(mdev, bvec->bv_page,
1753                                       bvec->bv_offset, bvec->bv_len,
1754                                       i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1755                 if (err)
1756                         return err;
1757         }
1758         return 0;
1759 }
1760
1761 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
1762                             struct drbd_peer_request *peer_req)
1763 {
1764         struct page *page = peer_req->pages;
1765         unsigned len = peer_req->i.size;
1766         int err;
1767
1768         /* hint all but last page with MSG_MORE */
1769         page_chain_for_each(page) {
1770                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
1771
1772                 err = _drbd_send_page(mdev, page, 0, l,
1773                                       page_chain_next(page) ? MSG_MORE : 0);
1774                 if (err)
1775                         return err;
1776                 len -= l;
1777         }
1778         return 0;
1779 }
1780
1781 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
1782 {
1783         if (mdev->tconn->agreed_pro_version >= 95)
1784                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
1785                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
1786                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
1787                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
1788         else
1789                 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
1790 }
1791
1792 /* Used to send write requests
1793  * R_PRIMARY -> Peer    (P_DATA)
1794  */
1795 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
1796 {
1797         struct drbd_socket *sock;
1798         struct p_data *p;
1799         unsigned int dp_flags = 0;
1800         int dgs;
1801         int err;
1802
1803         sock = &mdev->tconn->data;
1804         p = drbd_prepare_command(mdev, sock);
1805         dgs = mdev->tconn->integrity_tfm ? crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1806
1807         if (!p)
1808                 return -EIO;
1809         p->sector = cpu_to_be64(req->i.sector);
1810         p->block_id = (unsigned long)req;
1811         p->seq_num = cpu_to_be32(req->seq_num = atomic_inc_return(&mdev->packet_seq));
1812         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
1813         if (mdev->state.conn >= C_SYNC_SOURCE &&
1814             mdev->state.conn <= C_PAUSED_SYNC_T)
1815                 dp_flags |= DP_MAY_SET_IN_SYNC;
1816         if (mdev->tconn->agreed_pro_version >= 100) {
1817                 if (req->rq_state & RQ_EXP_RECEIVE_ACK)
1818                         dp_flags |= DP_SEND_RECEIVE_ACK;
1819                 if (req->rq_state & RQ_EXP_WRITE_ACK)
1820                         dp_flags |= DP_SEND_WRITE_ACK;
1821         }
1822         p->dp_flags = cpu_to_be32(dp_flags);
1823         if (dgs)
1824                 drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, p + 1);
1825         err = __send_command(mdev->tconn, mdev->vnr, sock, P_DATA, sizeof(*p) + dgs, NULL, req->i.size);
1826         if (!err) {
1827                 /* For protocol A, we have to memcpy the payload into
1828                  * socket buffers, as we may complete right away
1829                  * as soon as we handed it over to tcp, at which point the data
1830                  * pages may become invalid.
1831                  *
1832                  * For data-integrity enabled, we copy it as well, so we can be
1833                  * sure that even if the bio pages may still be modified, it
1834                  * won't change the data on the wire, thus if the digest checks
1835                  * out ok after sending on this side, but does not fit on the
1836                  * receiving side, we sure have detected corruption elsewhere.
1837                  */
1838                 if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)) || dgs)
1839                         err = _drbd_send_bio(mdev, req->master_bio);
1840                 else
1841                         err = _drbd_send_zc_bio(mdev, req->master_bio);
1842
1843                 /* double check digest, sometimes buffers have been modified in flight. */
1844                 if (dgs > 0 && dgs <= 64) {
1845                         /* 64 byte, 512 bit, is the largest digest size
1846                          * currently supported in kernel crypto. */
1847                         unsigned char digest[64];
1848                         drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, digest);
1849                         if (memcmp(p + 1, digest, dgs)) {
1850                                 dev_warn(DEV,
1851                                         "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1852                                         (unsigned long long)req->i.sector, req->i.size);
1853                         }
1854                 } /* else if (dgs > 64) {
1855                      ... Be noisy about digest too large ...
1856                 } */
1857         }
1858         mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1859
1860         return err;
1861 }
1862
1863 /* answer packet, used to send data back for read requests:
1864  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
1865  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
1866  */
1867 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
1868                     struct drbd_peer_request *peer_req)
1869 {
1870         struct drbd_socket *sock;
1871         struct p_data *p;
1872         int err;
1873         int dgs;
1874
1875         sock = &mdev->tconn->data;
1876         p = drbd_prepare_command(mdev, sock);
1877
1878         dgs = mdev->tconn->integrity_tfm ? crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1879
1880         if (!p)
1881                 return -EIO;
1882         p->sector = cpu_to_be64(peer_req->i.sector);
1883         p->block_id = peer_req->block_id;
1884         p->seq_num = 0;  /* unused */
1885         p->dp_flags = 0;
1886         if (dgs)
1887                 drbd_csum_ee(mdev, mdev->tconn->integrity_tfm, peer_req, p + 1);
1888         err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, sizeof(*p) + dgs, NULL, peer_req->i.size);
1889         if (!err)
1890                 err = _drbd_send_zc_ee(mdev, peer_req);
1891         mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1892
1893         return err;
1894 }
1895
1896 int drbd_send_out_of_sync(struct drbd_conf *mdev, struct drbd_request *req)
1897 {
1898         struct drbd_socket *sock;
1899         struct p_block_desc *p;
1900
1901         sock = &mdev->tconn->data;
1902         p = drbd_prepare_command(mdev, sock);
1903         if (!p)
1904                 return -EIO;
1905         p->sector = cpu_to_be64(req->i.sector);
1906         p->blksize = cpu_to_be32(req->i.size);
1907         return drbd_send_command(mdev, sock, P_OUT_OF_SYNC, sizeof(*p), NULL, 0);
1908 }
1909
1910 /*
1911   drbd_send distinguishes two cases:
1912
1913   Packets sent via the data socket "sock"
1914   and packets sent via the meta data socket "msock"
1915
1916                     sock                      msock
1917   -----------------+-------------------------+------------------------------
1918   timeout           conf.timeout / 2          conf.timeout / 2
1919   timeout action    send a ping via msock     Abort communication
1920                                               and close all sockets
1921 */
1922
1923 /*
1924  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1925  */
1926 int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1927               void *buf, size_t size, unsigned msg_flags)
1928 {
1929         struct kvec iov;
1930         struct msghdr msg;
1931         int rv, sent = 0;
1932
1933         if (!sock)
1934                 return -EBADR;
1935
1936         /* THINK  if (signal_pending) return ... ? */
1937
1938         iov.iov_base = buf;
1939         iov.iov_len  = size;
1940
1941         msg.msg_name       = NULL;
1942         msg.msg_namelen    = 0;
1943         msg.msg_control    = NULL;
1944         msg.msg_controllen = 0;
1945         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
1946
1947         if (sock == tconn->data.socket) {
1948                 rcu_read_lock();
1949                 tconn->ko_count = rcu_dereference(tconn->net_conf)->ko_count;
1950                 rcu_read_unlock();
1951                 drbd_update_congested(tconn);
1952         }
1953         do {
1954                 /* STRANGE
1955                  * tcp_sendmsg does _not_ use its size parameter at all ?
1956                  *
1957                  * -EAGAIN on timeout, -EINTR on signal.
1958                  */
1959 /* THINK
1960  * do we need to block DRBD_SIG if sock == &meta.socket ??
1961  * otherwise wake_asender() might interrupt some send_*Ack !
1962  */
1963                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
1964                 if (rv == -EAGAIN) {
1965                         if (we_should_drop_the_connection(tconn, sock))
1966                                 break;
1967                         else
1968                                 continue;
1969                 }
1970                 if (rv == -EINTR) {
1971                         flush_signals(current);
1972                         rv = 0;
1973                 }
1974                 if (rv < 0)
1975                         break;
1976                 sent += rv;
1977                 iov.iov_base += rv;
1978                 iov.iov_len  -= rv;
1979         } while (sent < size);
1980
1981         if (sock == tconn->data.socket)
1982                 clear_bit(NET_CONGESTED, &tconn->flags);
1983
1984         if (rv <= 0) {
1985                 if (rv != -EAGAIN) {
1986                         conn_err(tconn, "%s_sendmsg returned %d\n",
1987                                  sock == tconn->meta.socket ? "msock" : "sock",
1988                                  rv);
1989                         conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
1990                 } else
1991                         conn_request_state(tconn, NS(conn, C_TIMEOUT), CS_HARD);
1992         }
1993
1994         return sent;
1995 }
1996
1997 /**
1998  * drbd_send_all  -  Send an entire buffer
1999  *
2000  * Returns 0 upon success and a negative error value otherwise.
2001  */
2002 int drbd_send_all(struct drbd_tconn *tconn, struct socket *sock, void *buffer,
2003                   size_t size, unsigned msg_flags)
2004 {
2005         int err;
2006
2007         err = drbd_send(tconn, sock, buffer, size, msg_flags);
2008         if (err < 0)
2009                 return err;
2010         if (err != size)
2011                 return -EIO;
2012         return 0;
2013 }
2014
2015 static int drbd_open(struct block_device *bdev, fmode_t mode)
2016 {
2017         struct drbd_conf *mdev = bdev->bd_disk->private_data;
2018         unsigned long flags;
2019         int rv = 0;
2020
2021         mutex_lock(&drbd_main_mutex);
2022         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
2023         /* to have a stable mdev->state.role
2024          * and no race with updating open_cnt */
2025
2026         if (mdev->state.role != R_PRIMARY) {
2027                 if (mode & FMODE_WRITE)
2028                         rv = -EROFS;
2029                 else if (!allow_oos)
2030                         rv = -EMEDIUMTYPE;
2031         }
2032
2033         if (!rv)
2034                 mdev->open_cnt++;
2035         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
2036         mutex_unlock(&drbd_main_mutex);
2037
2038         return rv;
2039 }
2040
2041 static int drbd_release(struct gendisk *gd, fmode_t mode)
2042 {
2043         struct drbd_conf *mdev = gd->private_data;
2044         mutex_lock(&drbd_main_mutex);
2045         mdev->open_cnt--;
2046         mutex_unlock(&drbd_main_mutex);
2047         return 0;
2048 }
2049
2050 static void drbd_set_defaults(struct drbd_conf *mdev)
2051 {
2052         /* Beware! The actual layout differs
2053          * between big endian and little endian */
2054         mdev->state = (union drbd_dev_state) {
2055                 { .role = R_SECONDARY,
2056                   .peer = R_UNKNOWN,
2057                   .conn = C_STANDALONE,
2058                   .disk = D_DISKLESS,
2059                   .pdsk = D_UNKNOWN,
2060                 } };
2061 }
2062
2063 void drbd_init_set_defaults(struct drbd_conf *mdev)
2064 {
2065         /* the memset(,0,) did most of this.
2066          * note: only assignments, no allocation in here */
2067
2068         drbd_set_defaults(mdev);
2069
2070         atomic_set(&mdev->ap_bio_cnt, 0);
2071         atomic_set(&mdev->ap_pending_cnt, 0);
2072         atomic_set(&mdev->rs_pending_cnt, 0);
2073         atomic_set(&mdev->unacked_cnt, 0);
2074         atomic_set(&mdev->local_cnt, 0);
2075         atomic_set(&mdev->pp_in_use_by_net, 0);
2076         atomic_set(&mdev->rs_sect_in, 0);
2077         atomic_set(&mdev->rs_sect_ev, 0);
2078         atomic_set(&mdev->ap_in_flight, 0);
2079         atomic_set(&mdev->md_io_in_use, 0);
2080
2081         mutex_init(&mdev->own_state_mutex);
2082         mdev->state_mutex = &mdev->own_state_mutex;
2083
2084         spin_lock_init(&mdev->al_lock);
2085         spin_lock_init(&mdev->peer_seq_lock);
2086
2087         INIT_LIST_HEAD(&mdev->active_ee);
2088         INIT_LIST_HEAD(&mdev->sync_ee);
2089         INIT_LIST_HEAD(&mdev->done_ee);
2090         INIT_LIST_HEAD(&mdev->read_ee);
2091         INIT_LIST_HEAD(&mdev->net_ee);
2092         INIT_LIST_HEAD(&mdev->resync_reads);
2093         INIT_LIST_HEAD(&mdev->resync_work.list);
2094         INIT_LIST_HEAD(&mdev->unplug_work.list);
2095         INIT_LIST_HEAD(&mdev->go_diskless.list);
2096         INIT_LIST_HEAD(&mdev->md_sync_work.list);
2097         INIT_LIST_HEAD(&mdev->start_resync_work.list);
2098         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2099
2100         mdev->resync_work.cb  = w_resync_timer;
2101         mdev->unplug_work.cb  = w_send_write_hint;
2102         mdev->go_diskless.cb  = w_go_diskless;
2103         mdev->md_sync_work.cb = w_md_sync;
2104         mdev->bm_io_work.w.cb = w_bitmap_io;
2105         mdev->start_resync_work.cb = w_start_resync;
2106
2107         mdev->resync_work.mdev  = mdev;
2108         mdev->unplug_work.mdev  = mdev;
2109         mdev->go_diskless.mdev  = mdev;
2110         mdev->md_sync_work.mdev = mdev;
2111         mdev->bm_io_work.w.mdev = mdev;
2112         mdev->start_resync_work.mdev = mdev;
2113
2114         init_timer(&mdev->resync_timer);
2115         init_timer(&mdev->md_sync_timer);
2116         init_timer(&mdev->start_resync_timer);
2117         init_timer(&mdev->request_timer);
2118         mdev->resync_timer.function = resync_timer_fn;
2119         mdev->resync_timer.data = (unsigned long) mdev;
2120         mdev->md_sync_timer.function = md_sync_timer_fn;
2121         mdev->md_sync_timer.data = (unsigned long) mdev;
2122         mdev->start_resync_timer.function = start_resync_timer_fn;
2123         mdev->start_resync_timer.data = (unsigned long) mdev;
2124         mdev->request_timer.function = request_timer_fn;
2125         mdev->request_timer.data = (unsigned long) mdev;
2126
2127         init_waitqueue_head(&mdev->misc_wait);
2128         init_waitqueue_head(&mdev->state_wait);
2129         init_waitqueue_head(&mdev->ee_wait);
2130         init_waitqueue_head(&mdev->al_wait);
2131         init_waitqueue_head(&mdev->seq_wait);
2132
2133         mdev->resync_wenr = LC_FREE;
2134         mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2135         mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2136 }
2137
2138 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2139 {
2140         int i;
2141         if (mdev->tconn->receiver.t_state != NONE)
2142                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2143                                 mdev->tconn->receiver.t_state);
2144
2145         mdev->al_writ_cnt  =
2146         mdev->bm_writ_cnt  =
2147         mdev->read_cnt     =
2148         mdev->recv_cnt     =
2149         mdev->send_cnt     =
2150         mdev->writ_cnt     =
2151         mdev->p_size       =
2152         mdev->rs_start     =
2153         mdev->rs_total     =
2154         mdev->rs_failed    = 0;
2155         mdev->rs_last_events = 0;
2156         mdev->rs_last_sect_ev = 0;
2157         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2158                 mdev->rs_mark_left[i] = 0;
2159                 mdev->rs_mark_time[i] = 0;
2160         }
2161         D_ASSERT(mdev->tconn->net_conf == NULL);
2162
2163         drbd_set_my_capacity(mdev, 0);
2164         if (mdev->bitmap) {
2165                 /* maybe never allocated. */
2166                 drbd_bm_resize(mdev, 0, 1);
2167                 drbd_bm_cleanup(mdev);
2168         }
2169
2170         drbd_free_bc(mdev->ldev);
2171         mdev->ldev = NULL;
2172
2173         clear_bit(AL_SUSPENDED, &mdev->flags);
2174
2175         D_ASSERT(list_empty(&mdev->active_ee));
2176         D_ASSERT(list_empty(&mdev->sync_ee));
2177         D_ASSERT(list_empty(&mdev->done_ee));
2178         D_ASSERT(list_empty(&mdev->read_ee));
2179         D_ASSERT(list_empty(&mdev->net_ee));
2180         D_ASSERT(list_empty(&mdev->resync_reads));
2181         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2182         D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
2183         D_ASSERT(list_empty(&mdev->resync_work.list));
2184         D_ASSERT(list_empty(&mdev->unplug_work.list));
2185         D_ASSERT(list_empty(&mdev->go_diskless.list));
2186
2187         drbd_set_defaults(mdev);
2188 }
2189
2190
2191 static void drbd_destroy_mempools(void)
2192 {
2193         struct page *page;
2194
2195         while (drbd_pp_pool) {
2196                 page = drbd_pp_pool;
2197                 drbd_pp_pool = (struct page *)page_private(page);
2198                 __free_page(page);
2199                 drbd_pp_vacant--;
2200         }
2201
2202         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2203
2204         if (drbd_md_io_bio_set)
2205                 bioset_free(drbd_md_io_bio_set);
2206         if (drbd_md_io_page_pool)
2207                 mempool_destroy(drbd_md_io_page_pool);
2208         if (drbd_ee_mempool)
2209                 mempool_destroy(drbd_ee_mempool);
2210         if (drbd_request_mempool)
2211                 mempool_destroy(drbd_request_mempool);
2212         if (drbd_ee_cache)
2213                 kmem_cache_destroy(drbd_ee_cache);
2214         if (drbd_request_cache)
2215                 kmem_cache_destroy(drbd_request_cache);
2216         if (drbd_bm_ext_cache)
2217                 kmem_cache_destroy(drbd_bm_ext_cache);
2218         if (drbd_al_ext_cache)
2219                 kmem_cache_destroy(drbd_al_ext_cache);
2220
2221         drbd_md_io_bio_set   = NULL;
2222         drbd_md_io_page_pool = NULL;
2223         drbd_ee_mempool      = NULL;
2224         drbd_request_mempool = NULL;
2225         drbd_ee_cache        = NULL;
2226         drbd_request_cache   = NULL;
2227         drbd_bm_ext_cache    = NULL;
2228         drbd_al_ext_cache    = NULL;
2229
2230         return;
2231 }
2232
2233 static int drbd_create_mempools(void)
2234 {
2235         struct page *page;
2236         const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
2237         int i;
2238
2239         /* prepare our caches and mempools */
2240         drbd_request_mempool = NULL;
2241         drbd_ee_cache        = NULL;
2242         drbd_request_cache   = NULL;
2243         drbd_bm_ext_cache    = NULL;
2244         drbd_al_ext_cache    = NULL;
2245         drbd_pp_pool         = NULL;
2246         drbd_md_io_page_pool = NULL;
2247         drbd_md_io_bio_set   = NULL;
2248
2249         /* caches */
2250         drbd_request_cache = kmem_cache_create(
2251                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2252         if (drbd_request_cache == NULL)
2253                 goto Enomem;
2254
2255         drbd_ee_cache = kmem_cache_create(
2256                 "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
2257         if (drbd_ee_cache == NULL)
2258                 goto Enomem;
2259
2260         drbd_bm_ext_cache = kmem_cache_create(
2261                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2262         if (drbd_bm_ext_cache == NULL)
2263                 goto Enomem;
2264
2265         drbd_al_ext_cache = kmem_cache_create(
2266                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2267         if (drbd_al_ext_cache == NULL)
2268                 goto Enomem;
2269
2270         /* mempools */
2271         drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
2272         if (drbd_md_io_bio_set == NULL)
2273                 goto Enomem;
2274
2275         drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
2276         if (drbd_md_io_page_pool == NULL)
2277                 goto Enomem;
2278
2279         drbd_request_mempool = mempool_create(number,
2280                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2281         if (drbd_request_mempool == NULL)
2282                 goto Enomem;
2283
2284         drbd_ee_mempool = mempool_create(number,
2285                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2286         if (drbd_ee_mempool == NULL)
2287                 goto Enomem;
2288
2289         /* drbd's page pool */
2290         spin_lock_init(&drbd_pp_lock);
2291
2292         for (i = 0; i < number; i++) {
2293                 page = alloc_page(GFP_HIGHUSER);
2294                 if (!page)
2295                         goto Enomem;
2296                 set_page_private(page, (unsigned long)drbd_pp_pool);
2297                 drbd_pp_pool = page;
2298         }
2299         drbd_pp_vacant = number;
2300
2301         return 0;
2302
2303 Enomem:
2304         drbd_destroy_mempools(); /* in case we allocated some */
2305         return -ENOMEM;
2306 }
2307
2308 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2309         void *unused)
2310 {
2311         /* just so we have it.  you never know what interesting things we
2312          * might want to do here some day...
2313          */
2314
2315         return NOTIFY_DONE;
2316 }
2317
2318 static struct notifier_block drbd_notifier = {
2319         .notifier_call = drbd_notify_sys,
2320 };
2321
2322 static void drbd_release_all_peer_reqs(struct drbd_conf *mdev)
2323 {
2324         int rr;
2325
2326         rr = drbd_free_peer_reqs(mdev, &mdev->active_ee);
2327         if (rr)
2328                 dev_err(DEV, "%d EEs in active list found!\n", rr);
2329
2330         rr = drbd_free_peer_reqs(mdev, &mdev->sync_ee);
2331         if (rr)
2332                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2333
2334         rr = drbd_free_peer_reqs(mdev, &mdev->read_ee);
2335         if (rr)
2336                 dev_err(DEV, "%d EEs in read list found!\n", rr);
2337
2338         rr = drbd_free_peer_reqs(mdev, &mdev->done_ee);
2339         if (rr)
2340                 dev_err(DEV, "%d EEs in done list found!\n", rr);
2341
2342         rr = drbd_free_peer_reqs(mdev, &mdev->net_ee);
2343         if (rr)
2344                 dev_err(DEV, "%d EEs in net list found!\n", rr);
2345 }
2346
2347 /* caution. no locking. */
2348 void drbd_minor_destroy(struct kref *kref)
2349 {
2350         struct drbd_conf *mdev = container_of(kref, struct drbd_conf, kref);
2351         struct drbd_tconn *tconn = mdev->tconn;
2352
2353         del_timer_sync(&mdev->request_timer);
2354
2355         /* paranoia asserts */
2356         D_ASSERT(mdev->open_cnt == 0);
2357         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2358         /* end paranoia asserts */
2359
2360         /* cleanup stuff that may have been allocated during
2361          * device (re-)configuration or state changes */
2362
2363         if (mdev->this_bdev)
2364                 bdput(mdev->this_bdev);
2365
2366         drbd_free_bc(mdev->ldev);
2367         mdev->ldev = NULL;
2368
2369         drbd_release_all_peer_reqs(mdev);
2370
2371         lc_destroy(mdev->act_log);
2372         lc_destroy(mdev->resync);
2373
2374         kfree(mdev->p_uuid);
2375         /* mdev->p_uuid = NULL; */
2376
2377         if (mdev->bitmap) /* should no longer be there. */
2378                 drbd_bm_cleanup(mdev);
2379         __free_page(mdev->md_io_page);
2380         put_disk(mdev->vdisk);
2381         blk_cleanup_queue(mdev->rq_queue);
2382         kfree(mdev->rs_plan_s);
2383         kfree(mdev);
2384
2385         kref_put(&tconn->kref, &conn_destroy);
2386 }
2387
2388 static void drbd_cleanup(void)
2389 {
2390         unsigned int i;
2391         struct drbd_conf *mdev;
2392         struct drbd_tconn *tconn, *tmp;
2393
2394         unregister_reboot_notifier(&drbd_notifier);
2395
2396         /* first remove proc,
2397          * drbdsetup uses it's presence to detect
2398          * whether DRBD is loaded.
2399          * If we would get stuck in proc removal,
2400          * but have netlink already deregistered,
2401          * some drbdsetup commands may wait forever
2402          * for an answer.
2403          */
2404         if (drbd_proc)
2405                 remove_proc_entry("drbd", NULL);
2406
2407         drbd_genl_unregister();
2408
2409         idr_for_each_entry(&minors, mdev, i) {
2410                 idr_remove(&minors, mdev_to_minor(mdev));
2411                 idr_remove(&mdev->tconn->volumes, mdev->vnr);
2412                 del_gendisk(mdev->vdisk);
2413                 /* synchronize_rcu(); No other threads running at this point */
2414                 kref_put(&mdev->kref, &drbd_minor_destroy);
2415         }
2416
2417         /* not _rcu since, no other updater anymore. Genl already unregistered */
2418         list_for_each_entry_safe(tconn, tmp, &drbd_tconns, all_tconn) {
2419                 list_del(&tconn->all_tconn); /* not _rcu no proc, not other threads */
2420                 /* synchronize_rcu(); */
2421                 kref_put(&tconn->kref, &conn_destroy);
2422         }
2423
2424         drbd_destroy_mempools();
2425         unregister_blkdev(DRBD_MAJOR, "drbd");
2426
2427         idr_destroy(&minors);
2428
2429         printk(KERN_INFO "drbd: module cleanup done.\n");
2430 }
2431
2432 /**
2433  * drbd_congested() - Callback for pdflush
2434  * @congested_data:     User data
2435  * @bdi_bits:           Bits pdflush is currently interested in
2436  *
2437  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2438  */
2439 static int drbd_congested(void *congested_data, int bdi_bits)
2440 {
2441         struct drbd_conf *mdev = congested_data;
2442         struct request_queue *q;
2443         char reason = '-';
2444         int r = 0;
2445
2446         if (!may_inc_ap_bio(mdev)) {
2447                 /* DRBD has frozen IO */
2448                 r = bdi_bits;
2449                 reason = 'd';
2450                 goto out;
2451         }
2452
2453         if (get_ldev(mdev)) {
2454                 q = bdev_get_queue(mdev->ldev->backing_bdev);
2455                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2456                 put_ldev(mdev);
2457                 if (r)
2458                         reason = 'b';
2459         }
2460
2461         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->tconn->flags)) {
2462                 r |= (1 << BDI_async_congested);
2463                 reason = reason == 'b' ? 'a' : 'n';
2464         }
2465
2466 out:
2467         mdev->congestion_reason = reason;
2468         return r;
2469 }
2470
2471 static void drbd_init_workqueue(struct drbd_work_queue* wq)
2472 {
2473         sema_init(&wq->s, 0);
2474         spin_lock_init(&wq->q_lock);
2475         INIT_LIST_HEAD(&wq->q);
2476 }
2477
2478 struct drbd_tconn *conn_get_by_name(const char *name)
2479 {
2480         struct drbd_tconn *tconn;
2481
2482         if (!name || !name[0])
2483                 return NULL;
2484
2485         rcu_read_lock();
2486         list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
2487                 if (!strcmp(tconn->name, name)) {
2488                         kref_get(&tconn->kref);
2489                         goto found;
2490                 }
2491         }
2492         tconn = NULL;
2493 found:
2494         rcu_read_unlock();
2495         return tconn;
2496 }
2497
2498 struct drbd_tconn *conn_get_by_addrs(void *my_addr, int my_addr_len,
2499                                      void *peer_addr, int peer_addr_len)
2500 {
2501         struct drbd_tconn *tconn;
2502
2503         rcu_read_lock();
2504         list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
2505                 if (tconn->my_addr_len == my_addr_len &&
2506                     tconn->peer_addr_len == peer_addr_len &&
2507                     !memcmp(&tconn->my_addr, my_addr, my_addr_len) &&
2508                     !memcmp(&tconn->peer_addr, peer_addr, peer_addr_len)) {
2509                         kref_get(&tconn->kref);
2510                         goto found;
2511                 }
2512         }
2513         tconn = NULL;
2514 found:
2515         rcu_read_unlock();
2516         return tconn;
2517 }
2518
2519 static int drbd_alloc_socket(struct drbd_socket *socket)
2520 {
2521         socket->rbuf = (void *) __get_free_page(GFP_KERNEL);
2522         if (!socket->rbuf)
2523                 return -ENOMEM;
2524         socket->sbuf = (void *) __get_free_page(GFP_KERNEL);
2525         if (!socket->sbuf)
2526                 return -ENOMEM;
2527         return 0;
2528 }
2529
2530 static void drbd_free_socket(struct drbd_socket *socket)
2531 {
2532         free_page((unsigned long) socket->sbuf);
2533         free_page((unsigned long) socket->rbuf);
2534 }
2535
2536 void conn_free_crypto(struct drbd_tconn *tconn)
2537 {
2538         drbd_free_sock(tconn);
2539
2540         crypto_free_hash(tconn->csums_tfm);
2541         crypto_free_hash(tconn->verify_tfm);
2542         crypto_free_hash(tconn->cram_hmac_tfm);
2543         crypto_free_hash(tconn->integrity_tfm);
2544         crypto_free_hash(tconn->peer_integrity_tfm);
2545         kfree(tconn->int_dig_in);
2546         kfree(tconn->int_dig_vv);
2547
2548         tconn->csums_tfm = NULL;
2549         tconn->verify_tfm = NULL;
2550         tconn->cram_hmac_tfm = NULL;
2551         tconn->integrity_tfm = NULL;
2552         tconn->peer_integrity_tfm = NULL;
2553         tconn->int_dig_in = NULL;
2554         tconn->int_dig_vv = NULL;
2555 }
2556
2557 int set_resource_options(struct drbd_tconn *tconn, struct res_opts *res_opts)
2558 {
2559         cpumask_var_t new_cpu_mask;
2560         int err;
2561
2562         if (!zalloc_cpumask_var(&new_cpu_mask, GFP_KERNEL))
2563                 return -ENOMEM;
2564                 /*
2565                 retcode = ERR_NOMEM;
2566                 drbd_msg_put_info("unable to allocate cpumask");
2567                 */
2568
2569         /* silently ignore cpu mask on UP kernel */
2570         if (nr_cpu_ids > 1 && res_opts->cpu_mask[0] != 0) {
2571                 /* FIXME: Get rid of constant 32 here */
2572                 err = __bitmap_parse(res_opts->cpu_mask, 32, 0,
2573                                 cpumask_bits(new_cpu_mask), nr_cpu_ids);
2574                 if (err) {
2575                         conn_warn(tconn, "__bitmap_parse() failed with %d\n", err);
2576                         /* retcode = ERR_CPU_MASK_PARSE; */
2577                         goto fail;
2578                 }
2579         }
2580         tconn->res_opts = *res_opts;
2581         if (!cpumask_equal(tconn->cpu_mask, new_cpu_mask)) {
2582                 cpumask_copy(tconn->cpu_mask, new_cpu_mask);
2583                 drbd_calc_cpu_mask(tconn);
2584                 tconn->receiver.reset_cpu_mask = 1;
2585                 tconn->asender.reset_cpu_mask = 1;
2586                 tconn->worker.reset_cpu_mask = 1;
2587         }
2588         err = 0;
2589
2590 fail:
2591         free_cpumask_var(new_cpu_mask);
2592         return err;
2593
2594 }
2595
2596 /* caller must be under genl_lock() */
2597 struct drbd_tconn *conn_create(const char *name, struct res_opts *res_opts)
2598 {
2599         struct drbd_tconn *tconn;
2600
2601         tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
2602         if (!tconn)
2603                 return NULL;
2604
2605         tconn->name = kstrdup(name, GFP_KERNEL);
2606         if (!tconn->name)
2607                 goto fail;
2608
2609         if (drbd_alloc_socket(&tconn->data))
2610                 goto fail;
2611         if (drbd_alloc_socket(&tconn->meta))
2612                 goto fail;
2613
2614         if (!zalloc_cpumask_var(&tconn->cpu_mask, GFP_KERNEL))
2615                 goto fail;
2616
2617         if (set_resource_options(tconn, res_opts))
2618                 goto fail;
2619
2620         if (!tl_init(tconn))
2621                 goto fail;
2622
2623         tconn->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2624         if (!tconn->current_epoch)
2625                 goto fail;
2626         INIT_LIST_HEAD(&tconn->current_epoch->list);
2627         tconn->epochs = 1;
2628         spin_lock_init(&tconn->epoch_lock);
2629         tconn->write_ordering = WO_bdev_flush;
2630
2631         tconn->cstate = C_STANDALONE;
2632         mutex_init(&tconn->cstate_mutex);
2633         spin_lock_init(&tconn->req_lock);
2634         mutex_init(&tconn->conf_update);
2635         init_waitqueue_head(&tconn->ping_wait);
2636         idr_init(&tconn->volumes);
2637
2638         drbd_init_workqueue(&tconn->data.work);
2639         mutex_init(&tconn->data.mutex);
2640
2641         drbd_init_workqueue(&tconn->meta.work);
2642         mutex_init(&tconn->meta.mutex);
2643
2644         drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
2645         drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
2646         drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
2647
2648         kref_init(&tconn->kref);
2649         list_add_tail_rcu(&tconn->all_tconn, &drbd_tconns);
2650
2651         return tconn;
2652
2653 fail:
2654         kfree(tconn->current_epoch);
2655         tl_cleanup(tconn);
2656         free_cpumask_var(tconn->cpu_mask);
2657         drbd_free_socket(&tconn->meta);
2658         drbd_free_socket(&tconn->data);
2659         kfree(tconn->name);
2660         kfree(tconn);
2661
2662         return NULL;
2663 }
2664
2665 void conn_destroy(struct kref *kref)
2666 {
2667         struct drbd_tconn *tconn = container_of(kref, struct drbd_tconn, kref);
2668
2669         if (atomic_read(&tconn->current_epoch->epoch_size) !=  0)
2670                 conn_err(tconn, "epoch_size:%d\n", atomic_read(&tconn->current_epoch->epoch_size));
2671         kfree(tconn->current_epoch);
2672
2673         idr_destroy(&tconn->volumes);
2674
2675         free_cpumask_var(tconn->cpu_mask);
2676         drbd_free_socket(&tconn->meta);
2677         drbd_free_socket(&tconn->data);
2678         kfree(tconn->name);
2679         kfree(tconn->int_dig_in);
2680         kfree(tconn->int_dig_vv);
2681         kfree(tconn);
2682 }
2683
2684 enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor, int vnr)
2685 {
2686         struct drbd_conf *mdev;
2687         struct gendisk *disk;
2688         struct request_queue *q;
2689         int vnr_got = vnr;
2690         int minor_got = minor;
2691         enum drbd_ret_code err = ERR_NOMEM;
2692
2693         mdev = minor_to_mdev(minor);
2694         if (mdev)
2695                 return ERR_MINOR_EXISTS;
2696
2697         /* GFP_KERNEL, we are outside of all write-out paths */
2698         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2699         if (!mdev)
2700                 return ERR_NOMEM;
2701
2702         kref_get(&tconn->kref);
2703         mdev->tconn = tconn;
2704
2705         mdev->minor = minor;
2706         mdev->vnr = vnr;
2707
2708         drbd_init_set_defaults(mdev);
2709
2710         q = blk_alloc_queue(GFP_KERNEL);
2711         if (!q)
2712                 goto out_no_q;
2713         mdev->rq_queue = q;
2714         q->queuedata   = mdev;
2715
2716         disk = alloc_disk(1);
2717         if (!disk)
2718                 goto out_no_disk;
2719         mdev->vdisk = disk;
2720
2721         set_disk_ro(disk, true);
2722
2723         disk->queue = q;
2724         disk->major = DRBD_MAJOR;
2725         disk->first_minor = minor;
2726         disk->fops = &drbd_ops;
2727         sprintf(disk->disk_name, "drbd%d", minor);
2728         disk->private_data = mdev;
2729
2730         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2731         /* we have no partitions. we contain only ourselves. */
2732         mdev->this_bdev->bd_contains = mdev->this_bdev;
2733
2734         q->backing_dev_info.congested_fn = drbd_congested;
2735         q->backing_dev_info.congested_data = mdev;
2736
2737         blk_queue_make_request(q, drbd_make_request);
2738         /* Setting the max_hw_sectors to an odd value of 8kibyte here
2739            This triggers a max_bio_size message upon first attach or connect */
2740         blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2741         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
2742         blk_queue_merge_bvec(q, drbd_merge_bvec);
2743         q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
2744
2745         mdev->md_io_page = alloc_page(GFP_KERNEL);
2746         if (!mdev->md_io_page)
2747                 goto out_no_io_page;
2748
2749         if (drbd_bm_init(mdev))
2750                 goto out_no_bitmap;
2751         mdev->read_requests = RB_ROOT;
2752         mdev->write_requests = RB_ROOT;
2753
2754         if (!idr_pre_get(&minors, GFP_KERNEL))
2755                 goto out_no_minor_idr;
2756         if (idr_get_new_above(&minors, mdev, minor, &minor_got))
2757                 goto out_no_minor_idr;
2758         if (minor_got != minor) {
2759                 err = ERR_MINOR_EXISTS;
2760                 drbd_msg_put_info("requested minor exists already");
2761                 goto out_idr_remove_minor;
2762         }
2763
2764         if (!idr_pre_get(&tconn->volumes, GFP_KERNEL))
2765                 goto out_idr_remove_minor;
2766         if (idr_get_new_above(&tconn->volumes, mdev, vnr, &vnr_got))
2767                 goto out_idr_remove_minor;
2768         if (vnr_got != vnr) {
2769                 err = ERR_INVALID_REQUEST;
2770                 drbd_msg_put_info("requested volume exists already");
2771                 goto out_idr_remove_vol;
2772         }
2773         add_disk(disk);
2774         kref_init(&mdev->kref); /* one ref for both idrs and the the add_disk */
2775
2776         /* inherit the connection state */
2777         mdev->state.conn = tconn->cstate;
2778         if (mdev->state.conn == C_WF_REPORT_PARAMS)
2779                 drbd_connected(mdev);
2780
2781         return NO_ERROR;
2782
2783 out_idr_remove_vol:
2784         idr_remove(&tconn->volumes, vnr_got);
2785 out_idr_remove_minor:
2786         idr_remove(&minors, minor_got);
2787         synchronize_rcu();
2788 out_no_minor_idr:
2789         drbd_bm_cleanup(mdev);
2790 out_no_bitmap:
2791         __free_page(mdev->md_io_page);
2792 out_no_io_page:
2793         put_disk(disk);
2794 out_no_disk:
2795         blk_cleanup_queue(q);
2796 out_no_q:
2797         kfree(mdev);
2798         kref_put(&tconn->kref, &conn_destroy);
2799         return err;
2800 }
2801
2802 int __init drbd_init(void)
2803 {
2804         int err;
2805
2806         if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
2807                 printk(KERN_ERR
2808                        "drbd: invalid minor_count (%d)\n", minor_count);
2809 #ifdef MODULE
2810                 return -EINVAL;
2811 #else
2812                 minor_count = DRBD_MINOR_COUNT_DEF;
2813 #endif
2814         }
2815
2816         err = register_blkdev(DRBD_MAJOR, "drbd");
2817         if (err) {
2818                 printk(KERN_ERR
2819                        "drbd: unable to register block device major %d\n",
2820                        DRBD_MAJOR);
2821                 return err;
2822         }
2823
2824         err = drbd_genl_register();
2825         if (err) {
2826                 printk(KERN_ERR "drbd: unable to register generic netlink family\n");
2827                 goto fail;
2828         }
2829
2830
2831         register_reboot_notifier(&drbd_notifier);
2832
2833         /*
2834          * allocate all necessary structs
2835          */
2836         err = -ENOMEM;
2837
2838         init_waitqueue_head(&drbd_pp_wait);
2839
2840         drbd_proc = NULL; /* play safe for drbd_cleanup */
2841         idr_init(&minors);
2842
2843         err = drbd_create_mempools();
2844         if (err)
2845                 goto fail;
2846
2847         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
2848         if (!drbd_proc) {
2849                 printk(KERN_ERR "drbd: unable to register proc file\n");
2850                 goto fail;
2851         }
2852
2853         rwlock_init(&global_state_lock);
2854         INIT_LIST_HEAD(&drbd_tconns);
2855
2856         printk(KERN_INFO "drbd: initialized. "
2857                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2858                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2859         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
2860         printk(KERN_INFO "drbd: registered as block device major %d\n",
2861                 DRBD_MAJOR);
2862
2863         return 0; /* Success! */
2864
2865 fail:
2866         drbd_cleanup();
2867         if (err == -ENOMEM)
2868                 /* currently always the case */
2869                 printk(KERN_ERR "drbd: ran out of memory\n");
2870         else
2871                 printk(KERN_ERR "drbd: initialization failure\n");
2872         return err;
2873 }
2874
2875 void drbd_free_bc(struct drbd_backing_dev *ldev)
2876 {
2877         if (ldev == NULL)
2878                 return;
2879
2880         blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2881         blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2882
2883         kfree(ldev);
2884 }
2885
2886 void drbd_free_sock(struct drbd_tconn *tconn)
2887 {
2888         if (tconn->data.socket) {
2889                 mutex_lock(&tconn->data.mutex);
2890                 kernel_sock_shutdown(tconn->data.socket, SHUT_RDWR);
2891                 sock_release(tconn->data.socket);
2892                 tconn->data.socket = NULL;
2893                 mutex_unlock(&tconn->data.mutex);
2894         }
2895         if (tconn->meta.socket) {
2896                 mutex_lock(&tconn->meta.mutex);
2897                 kernel_sock_shutdown(tconn->meta.socket, SHUT_RDWR);
2898                 sock_release(tconn->meta.socket);
2899                 tconn->meta.socket = NULL;
2900                 mutex_unlock(&tconn->meta.mutex);
2901         }
2902 }
2903
2904 /* meta data management */
2905
2906 struct meta_data_on_disk {
2907         u64 la_size;           /* last agreed size. */
2908         u64 uuid[UI_SIZE];   /* UUIDs. */
2909         u64 device_uuid;
2910         u64 reserved_u64_1;
2911         u32 flags;             /* MDF */
2912         u32 magic;
2913         u32 md_size_sect;
2914         u32 al_offset;         /* offset to this block */
2915         u32 al_nr_extents;     /* important for restoring the AL */
2916               /* `-- act_log->nr_elements <-- ldev->dc.al_extents */
2917         u32 bm_offset;         /* offset to the bitmap, from here */
2918         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
2919         u32 la_peer_max_bio_size;   /* last peer max_bio_size */
2920         u32 reserved_u32[3];
2921
2922 } __packed;
2923
2924 /**
2925  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
2926  * @mdev:       DRBD device.
2927  */
2928 void drbd_md_sync(struct drbd_conf *mdev)
2929 {
2930         struct meta_data_on_disk *buffer;
2931         sector_t sector;
2932         int i;
2933
2934         del_timer(&mdev->md_sync_timer);
2935         /* timer may be rearmed by drbd_md_mark_dirty() now. */
2936         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
2937                 return;
2938
2939         /* We use here D_FAILED and not D_ATTACHING because we try to write
2940          * metadata even if we detach due to a disk failure! */
2941         if (!get_ldev_if_state(mdev, D_FAILED))
2942                 return;
2943
2944         buffer = drbd_md_get_buffer(mdev);
2945         if (!buffer)
2946                 goto out;
2947
2948         memset(buffer, 0, 512);
2949
2950         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
2951         for (i = UI_CURRENT; i < UI_SIZE; i++)
2952                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
2953         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
2954         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC_84_UNCLEAN);
2955
2956         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
2957         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
2958         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
2959         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
2960         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
2961
2962         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
2963         buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
2964
2965         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
2966         sector = mdev->ldev->md.md_offset;
2967
2968         if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
2969                 /* this was a try anyways ... */
2970                 dev_err(DEV, "meta data update failed!\n");
2971                 drbd_chk_io_error(mdev, 1, true);
2972         }
2973
2974         /* Update mdev->ldev->md.la_size_sect,
2975          * since we updated it on metadata. */
2976         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
2977
2978         drbd_md_put_buffer(mdev);
2979 out:
2980         put_ldev(mdev);
2981 }
2982
2983 /**
2984  * drbd_md_read() - Reads in the meta data super block
2985  * @mdev:       DRBD device.
2986  * @bdev:       Device from which the meta data should be read in.
2987  *
2988  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
2989  * something goes wrong.
2990  */
2991 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
2992 {
2993         struct meta_data_on_disk *buffer;
2994         u32 magic, flags;
2995         int i, rv = NO_ERROR;
2996
2997         if (!get_ldev_if_state(mdev, D_ATTACHING))
2998                 return ERR_IO_MD_DISK;
2999
3000         buffer = drbd_md_get_buffer(mdev);
3001         if (!buffer)
3002                 goto out;
3003
3004         if (drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3005                 /* NOTE: can't do normal error processing here as this is
3006                    called BEFORE disk is attached */
3007                 dev_err(DEV, "Error while reading metadata.\n");
3008                 rv = ERR_IO_MD_DISK;
3009                 goto err;
3010         }
3011
3012         magic = be32_to_cpu(buffer->magic);
3013         flags = be32_to_cpu(buffer->flags);
3014         if (magic == DRBD_MD_MAGIC_84_UNCLEAN ||
3015             (magic == DRBD_MD_MAGIC_08 && !(flags & MDF_AL_CLEAN))) {
3016                         /* btw: that's Activity Log clean, not "all" clean. */
3017                 dev_err(DEV, "Found unclean meta data. Did you \"drbdadm apply-al\"?\n");
3018                 rv = ERR_MD_UNCLEAN;
3019                 goto err;
3020         }
3021         if (magic != DRBD_MD_MAGIC_08) {
3022                 if (magic == DRBD_MD_MAGIC_07)
3023                         dev_err(DEV, "Found old (0.7) meta data magic. Did you \"drbdadm create-md\"?\n");
3024                 else
3025                         dev_err(DEV, "Meta data magic not found. Did you \"drbdadm create-md\"?\n");
3026                 rv = ERR_MD_INVALID;
3027                 goto err;
3028         }
3029         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3030                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3031                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3032                 rv = ERR_MD_INVALID;
3033                 goto err;
3034         }
3035         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3036                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3037                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3038                 rv = ERR_MD_INVALID;
3039                 goto err;
3040         }
3041         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3042                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3043                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3044                 rv = ERR_MD_INVALID;
3045                 goto err;
3046         }
3047
3048         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3049                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3050                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3051                 rv = ERR_MD_INVALID;
3052                 goto err;
3053         }
3054
3055         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3056         for (i = UI_CURRENT; i < UI_SIZE; i++)
3057                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3058         bdev->md.flags = be32_to_cpu(buffer->flags);
3059         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3060
3061         spin_lock_irq(&mdev->tconn->req_lock);
3062         if (mdev->state.conn < C_CONNECTED) {
3063                 int peer;
3064                 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
3065                 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
3066                 mdev->peer_max_bio_size = peer;
3067         }
3068         spin_unlock_irq(&mdev->tconn->req_lock);
3069
3070  err:
3071         drbd_md_put_buffer(mdev);
3072  out:
3073         put_ldev(mdev);
3074
3075         return rv;
3076 }
3077
3078 /**
3079  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3080  * @mdev:       DRBD device.
3081  *
3082  * Call this function if you change anything that should be written to
3083  * the meta-data super block. This function sets MD_DIRTY, and starts a
3084  * timer that ensures that within five seconds you have to call drbd_md_sync().
3085  */
3086 #ifdef DEBUG
3087 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
3088 {
3089         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
3090                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
3091                 mdev->last_md_mark_dirty.line = line;
3092                 mdev->last_md_mark_dirty.func = func;
3093         }
3094 }
3095 #else
3096 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3097 {
3098         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
3099                 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3100 }
3101 #endif
3102
3103 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3104 {
3105         int i;
3106
3107         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3108                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3109 }
3110
3111 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3112 {
3113         if (idx == UI_CURRENT) {
3114                 if (mdev->state.role == R_PRIMARY)
3115                         val |= 1;
3116                 else
3117                         val &= ~((u64)1);
3118
3119                 drbd_set_ed_uuid(mdev, val);
3120         }
3121
3122         mdev->ldev->md.uuid[idx] = val;
3123         drbd_md_mark_dirty(mdev);
3124 }
3125
3126
3127 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3128 {
3129         if (mdev->ldev->md.uuid[idx]) {
3130                 drbd_uuid_move_history(mdev);
3131                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3132         }
3133         _drbd_uuid_set(mdev, idx, val);
3134 }
3135
3136 /**
3137  * drbd_uuid_new_current() - Creates a new current UUID
3138  * @mdev:       DRBD device.
3139  *
3140  * Creates a new current UUID, and rotates the old current UUID into
3141  * the bitmap slot. Causes an incremental resync upon next connect.
3142  */
3143 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3144 {
3145         u64 val;
3146         unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3147
3148         if (bm_uuid)
3149                 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3150
3151         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3152
3153         get_random_bytes(&val, sizeof(u64));
3154         _drbd_uuid_set(mdev, UI_CURRENT, val);
3155         drbd_print_uuids(mdev, "new current UUID");
3156         /* get it to stable storage _now_ */
3157         drbd_md_sync(mdev);
3158 }
3159
3160 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3161 {
3162         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3163                 return;
3164
3165         if (val == 0) {
3166                 drbd_uuid_move_history(mdev);
3167                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3168                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
3169         } else {
3170                 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3171                 if (bm_uuid)
3172                         dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3173
3174                 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
3175         }
3176         drbd_md_mark_dirty(mdev);
3177 }
3178
3179 /**
3180  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3181  * @mdev:       DRBD device.
3182  *
3183  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3184  */
3185 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3186 {
3187         int rv = -EIO;
3188
3189         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3190                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3191                 drbd_md_sync(mdev);
3192                 drbd_bm_set_all(mdev);
3193
3194                 rv = drbd_bm_write(mdev);
3195
3196                 if (!rv) {
3197                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3198                         drbd_md_sync(mdev);
3199                 }
3200
3201                 put_ldev(mdev);
3202         }
3203
3204         return rv;
3205 }
3206
3207 /**
3208  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3209  * @mdev:       DRBD device.
3210  *
3211  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3212  */
3213 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3214 {
3215         int rv = -EIO;
3216
3217         drbd_resume_al(mdev);
3218         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3219                 drbd_bm_clear_all(mdev);
3220                 rv = drbd_bm_write(mdev);
3221                 put_ldev(mdev);
3222         }
3223
3224         return rv;
3225 }
3226
3227 static int w_bitmap_io(struct drbd_work *w, int unused)
3228 {
3229         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3230         struct drbd_conf *mdev = w->mdev;
3231         int rv = -EIO;
3232
3233         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3234
3235         if (get_ldev(mdev)) {
3236                 drbd_bm_lock(mdev, work->why, work->flags);
3237                 rv = work->io_fn(mdev);
3238                 drbd_bm_unlock(mdev);
3239                 put_ldev(mdev);
3240         }
3241
3242         clear_bit_unlock(BITMAP_IO, &mdev->flags);
3243         wake_up(&mdev->misc_wait);
3244
3245         if (work->done)
3246                 work->done(mdev, rv);
3247
3248         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3249         work->why = NULL;
3250         work->flags = 0;
3251
3252         return 0;
3253 }
3254
3255 void drbd_ldev_destroy(struct drbd_conf *mdev)
3256 {
3257         lc_destroy(mdev->resync);
3258         mdev->resync = NULL;
3259         lc_destroy(mdev->act_log);
3260         mdev->act_log = NULL;
3261         __no_warn(local,
3262                 drbd_free_bc(mdev->ldev);
3263                 mdev->ldev = NULL;);
3264
3265         clear_bit(GO_DISKLESS, &mdev->flags);
3266 }
3267
3268 static int w_go_diskless(struct drbd_work *w, int unused)
3269 {
3270         struct drbd_conf *mdev = w->mdev;
3271
3272         D_ASSERT(mdev->state.disk == D_FAILED);
3273         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
3274          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
3275          * the protected members anymore, though, so once put_ldev reaches zero
3276          * again, it will be safe to free them. */
3277         drbd_force_state(mdev, NS(disk, D_DISKLESS));
3278         return 0;
3279 }
3280
3281 void drbd_go_diskless(struct drbd_conf *mdev)
3282 {
3283         D_ASSERT(mdev->state.disk == D_FAILED);
3284         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3285                 drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
3286 }
3287
3288 /**
3289  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3290  * @mdev:       DRBD device.
3291  * @io_fn:      IO callback to be called when bitmap IO is possible
3292  * @done:       callback to be called after the bitmap IO was performed
3293  * @why:        Descriptive text of the reason for doing the IO
3294  *
3295  * While IO on the bitmap happens we freeze application IO thus we ensure
3296  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3297  * called from worker context. It MUST NOT be used while a previous such
3298  * work is still pending!
3299  */
3300 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3301                           int (*io_fn)(struct drbd_conf *),
3302                           void (*done)(struct drbd_conf *, int),
3303                           char *why, enum bm_flag flags)
3304 {
3305         D_ASSERT(current == mdev->tconn->worker.task);
3306
3307         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3308         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3309         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3310         if (mdev->bm_io_work.why)
3311                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3312                         why, mdev->bm_io_work.why);
3313
3314         mdev->bm_io_work.io_fn = io_fn;
3315         mdev->bm_io_work.done = done;
3316         mdev->bm_io_work.why = why;
3317         mdev->bm_io_work.flags = flags;
3318
3319         spin_lock_irq(&mdev->tconn->req_lock);
3320         set_bit(BITMAP_IO, &mdev->flags);
3321         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3322                 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
3323                         drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
3324         }
3325         spin_unlock_irq(&mdev->tconn->req_lock);
3326 }
3327
3328 /**
3329  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3330  * @mdev:       DRBD device.
3331  * @io_fn:      IO callback to be called when bitmap IO is possible
3332  * @why:        Descriptive text of the reason for doing the IO
3333  *
3334  * freezes application IO while that the actual IO operations runs. This
3335  * functions MAY NOT be called from worker context.
3336  */
3337 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
3338                 char *why, enum bm_flag flags)
3339 {
3340         int rv;
3341
3342         D_ASSERT(current != mdev->tconn->worker.task);
3343
3344         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3345                 drbd_suspend_io(mdev);
3346
3347         drbd_bm_lock(mdev, why, flags);
3348         rv = io_fn(mdev);
3349         drbd_bm_unlock(mdev);
3350
3351         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3352                 drbd_resume_io(mdev);
3353
3354         return rv;
3355 }
3356
3357 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3358 {
3359         if ((mdev->ldev->md.flags & flag) != flag) {
3360                 drbd_md_mark_dirty(mdev);
3361                 mdev->ldev->md.flags |= flag;
3362         }
3363 }
3364
3365 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3366 {
3367         if ((mdev->ldev->md.flags & flag) != 0) {
3368                 drbd_md_mark_dirty(mdev);
3369                 mdev->ldev->md.flags &= ~flag;
3370         }
3371 }
3372 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3373 {
3374         return (bdev->md.flags & flag) != 0;
3375 }
3376
3377 static void md_sync_timer_fn(unsigned long data)
3378 {
3379         struct drbd_conf *mdev = (struct drbd_conf *) data;
3380
3381         drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
3382 }
3383
3384 static int w_md_sync(struct drbd_work *w, int unused)
3385 {
3386         struct drbd_conf *mdev = w->mdev;
3387
3388         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3389 #ifdef DEBUG
3390         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3391                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3392 #endif
3393         drbd_md_sync(mdev);
3394         return 0;
3395 }
3396
3397 const char *cmdname(enum drbd_packet cmd)
3398 {
3399         /* THINK may need to become several global tables
3400          * when we want to support more than
3401          * one PRO_VERSION */
3402         static const char *cmdnames[] = {
3403                 [P_DATA]                = "Data",
3404                 [P_DATA_REPLY]          = "DataReply",
3405                 [P_RS_DATA_REPLY]       = "RSDataReply",
3406                 [P_BARRIER]             = "Barrier",
3407                 [P_BITMAP]              = "ReportBitMap",
3408                 [P_BECOME_SYNC_TARGET]  = "BecomeSyncTarget",
3409                 [P_BECOME_SYNC_SOURCE]  = "BecomeSyncSource",
3410                 [P_UNPLUG_REMOTE]       = "UnplugRemote",
3411                 [P_DATA_REQUEST]        = "DataRequest",
3412                 [P_RS_DATA_REQUEST]     = "RSDataRequest",
3413                 [P_SYNC_PARAM]          = "SyncParam",
3414                 [P_SYNC_PARAM89]        = "SyncParam89",
3415                 [P_PROTOCOL]            = "ReportProtocol",
3416                 [P_UUIDS]               = "ReportUUIDs",
3417                 [P_SIZES]               = "ReportSizes",
3418                 [P_STATE]               = "ReportState",
3419                 [P_SYNC_UUID]           = "ReportSyncUUID",
3420                 [P_AUTH_CHALLENGE]      = "AuthChallenge",
3421                 [P_AUTH_RESPONSE]       = "AuthResponse",
3422                 [P_PING]                = "Ping",
3423                 [P_PING_ACK]            = "PingAck",
3424                 [P_RECV_ACK]            = "RecvAck",
3425                 [P_WRITE_ACK]           = "WriteAck",
3426                 [P_RS_WRITE_ACK]        = "RSWriteAck",
3427                 [P_DISCARD_WRITE]        = "DiscardWrite",
3428                 [P_NEG_ACK]             = "NegAck",
3429                 [P_NEG_DREPLY]          = "NegDReply",
3430                 [P_NEG_RS_DREPLY]       = "NegRSDReply",
3431                 [P_BARRIER_ACK]         = "BarrierAck",
3432                 [P_STATE_CHG_REQ]       = "StateChgRequest",
3433                 [P_STATE_CHG_REPLY]     = "StateChgReply",
3434                 [P_OV_REQUEST]          = "OVRequest",
3435                 [P_OV_REPLY]            = "OVReply",
3436                 [P_OV_RESULT]           = "OVResult",
3437                 [P_CSUM_RS_REQUEST]     = "CsumRSRequest",
3438                 [P_RS_IS_IN_SYNC]       = "CsumRSIsInSync",
3439                 [P_COMPRESSED_BITMAP]   = "CBitmap",
3440                 [P_DELAY_PROBE]         = "DelayProbe",
3441                 [P_OUT_OF_SYNC]         = "OutOfSync",
3442                 [P_RETRY_WRITE]         = "RetryWrite",
3443                 [P_RS_CANCEL]           = "RSCancel",
3444                 [P_CONN_ST_CHG_REQ]     = "conn_st_chg_req",
3445                 [P_CONN_ST_CHG_REPLY]   = "conn_st_chg_reply",
3446                 [P_RETRY_WRITE]         = "retry_write",
3447                 [P_PROTOCOL_UPDATE]     = "protocol_update",
3448
3449                 /* enum drbd_packet, but not commands - obsoleted flags:
3450                  *      P_MAY_IGNORE
3451                  *      P_MAX_OPT_CMD
3452                  */
3453         };
3454
3455         /* too big for the array: 0xfffX */
3456         if (cmd == P_INITIAL_META)
3457                 return "InitialMeta";
3458         if (cmd == P_INITIAL_DATA)
3459                 return "InitialData";
3460         if (cmd == P_CONNECTION_FEATURES)
3461                 return "ConnectionFeatures";
3462         if (cmd >= ARRAY_SIZE(cmdnames))
3463                 return "Unknown";
3464         return cmdnames[cmd];
3465 }
3466
3467 /**
3468  * drbd_wait_misc  -  wait for a request to make progress
3469  * @mdev:       device associated with the request
3470  * @i:          the struct drbd_interval embedded in struct drbd_request or
3471  *              struct drbd_peer_request
3472  */
3473 int drbd_wait_misc(struct drbd_conf *mdev, struct drbd_interval *i)
3474 {
3475         struct net_conf *nc;
3476         DEFINE_WAIT(wait);
3477         long timeout;
3478
3479         rcu_read_lock();
3480         nc = rcu_dereference(mdev->tconn->net_conf);
3481         if (!nc) {
3482                 rcu_read_unlock();
3483                 return -ETIMEDOUT;
3484         }
3485         timeout = nc->ko_count ? nc->timeout * HZ / 10 * nc->ko_count : MAX_SCHEDULE_TIMEOUT;
3486         rcu_read_unlock();
3487
3488         /* Indicate to wake up mdev->misc_wait on progress.  */
3489         i->waiting = true;
3490         prepare_to_wait(&mdev->misc_wait, &wait, TASK_INTERRUPTIBLE);
3491         spin_unlock_irq(&mdev->tconn->req_lock);
3492         timeout = schedule_timeout(timeout);
3493         finish_wait(&mdev->misc_wait, &wait);
3494         spin_lock_irq(&mdev->tconn->req_lock);
3495         if (!timeout || mdev->state.conn < C_CONNECTED)
3496                 return -ETIMEDOUT;
3497         if (signal_pending(current))
3498                 return -ERESTARTSYS;
3499         return 0;
3500 }
3501
3502 #ifdef CONFIG_DRBD_FAULT_INJECTION
3503 /* Fault insertion support including random number generator shamelessly
3504  * stolen from kernel/rcutorture.c */
3505 struct fault_random_state {
3506         unsigned long state;
3507         unsigned long count;
3508 };
3509
3510 #define FAULT_RANDOM_MULT 39916801  /* prime */
3511 #define FAULT_RANDOM_ADD        479001701 /* prime */
3512 #define FAULT_RANDOM_REFRESH 10000
3513
3514 /*
3515  * Crude but fast random-number generator.  Uses a linear congruential
3516  * generator, with occasional help from get_random_bytes().
3517  */
3518 static unsigned long
3519 _drbd_fault_random(struct fault_random_state *rsp)
3520 {
3521         long refresh;
3522
3523         if (!rsp->count--) {
3524                 get_random_bytes(&refresh, sizeof(refresh));
3525                 rsp->state += refresh;
3526                 rsp->count = FAULT_RANDOM_REFRESH;
3527         }
3528         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3529         return swahw32(rsp->state);
3530 }
3531
3532 static char *
3533 _drbd_fault_str(unsigned int type) {
3534         static char *_faults[] = {
3535                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3536                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3537                 [DRBD_FAULT_RS_WR] = "Resync write",
3538                 [DRBD_FAULT_RS_RD] = "Resync read",
3539                 [DRBD_FAULT_DT_WR] = "Data write",
3540                 [DRBD_FAULT_DT_RD] = "Data read",
3541                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3542                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3543                 [DRBD_FAULT_AL_EE] = "EE allocation",
3544                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3545         };
3546
3547         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3548 }
3549
3550 unsigned int
3551 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3552 {
3553         static struct fault_random_state rrs = {0, 0};
3554
3555         unsigned int ret = (
3556                 (fault_devs == 0 ||
3557                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3558                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3559
3560         if (ret) {
3561                 fault_count++;
3562
3563                 if (__ratelimit(&drbd_ratelimit_state))
3564                         dev_warn(DEV, "***Simulating %s failure\n",
3565                                 _drbd_fault_str(type));
3566         }
3567
3568         return ret;
3569 }
3570 #endif
3571
3572 const char *drbd_buildtag(void)
3573 {
3574         /* DRBD built from external sources has here a reference to the
3575            git hash of the source code. */
3576
3577         static char buildtag[38] = "\0uilt-in";
3578
3579         if (buildtag[0] == 0) {
3580 #ifdef CONFIG_MODULES
3581                 if (THIS_MODULE != NULL)
3582                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3583                 else
3584 #endif
3585                         buildtag[0] = 'b';
3586         }
3587
3588         return buildtag;
3589 }
3590
3591 module_init(drbd_init)
3592 module_exit(drbd_cleanup)
3593
3594 EXPORT_SYMBOL(drbd_conn_str);
3595 EXPORT_SYMBOL(drbd_role_str);
3596 EXPORT_SYMBOL(drbd_disk_str);
3597 EXPORT_SYMBOL(drbd_set_st_err_str);