drbd: Fixes from the drbd-8.3 branch
[cascardo/linux.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_work *w, int cancel);
42
43
44 /* endio handlers:
45  *   drbd_md_io_complete (defined here)
46  *   drbd_request_endio (defined here)
47  *   drbd_peer_request_endio (defined here)
48  *   bm_async_io_complete (defined in drbd_bitmap.c)
49  *
50  * For all these callbacks, note the following:
51  * The callbacks will be called in irq context by the IDE drivers,
52  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
53  * Try to get the locking right :)
54  *
55  */
56
57
58 /* About the global_state_lock
59    Each state transition on an device holds a read lock. In case we have
60    to evaluate the resync after dependencies, we grab a write lock, because
61    we need stable states on all devices for that.  */
62 rwlock_t global_state_lock;
63
64 /* used for synchronous meta data and bitmap IO
65  * submitted by drbd_md_sync_page_io()
66  */
67 void drbd_md_io_complete(struct bio *bio, int error)
68 {
69         struct drbd_md_io *md_io;
70         struct drbd_conf *mdev;
71
72         md_io = (struct drbd_md_io *)bio->bi_private;
73         mdev = container_of(md_io, struct drbd_conf, md_io);
74
75         md_io->error = error;
76
77         /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
78          * to timeout on the lower level device, and eventually detach from it.
79          * If this io completion runs after that timeout expired, this
80          * drbd_md_put_buffer() may allow us to finally try and re-attach.
81          * During normal operation, this only puts that extra reference
82          * down to 1 again.
83          * Make sure we first drop the reference, and only then signal
84          * completion, or we may (in drbd_al_read_log()) cycle so fast into the
85          * next drbd_md_sync_page_io(), that we trigger the
86          * ASSERT(atomic_read(&mdev->md_io_in_use) == 1) there.
87          */
88         drbd_md_put_buffer(mdev);
89         md_io->done = 1;
90         wake_up(&mdev->misc_wait);
91         bio_put(bio);
92         put_ldev(mdev);
93 }
94
95 /* reads on behalf of the partner,
96  * "submitted" by the receiver
97  */
98 void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
99 {
100         unsigned long flags = 0;
101         struct drbd_conf *mdev = peer_req->w.mdev;
102
103         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
104         mdev->read_cnt += peer_req->i.size >> 9;
105         list_del(&peer_req->w.list);
106         if (list_empty(&mdev->read_ee))
107                 wake_up(&mdev->ee_wait);
108         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
109                 __drbd_chk_io_error(mdev, false);
110         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
111
112         drbd_queue_work(&mdev->tconn->data.work, &peer_req->w);
113         put_ldev(mdev);
114 }
115
116 /* writes on behalf of the partner, or resync writes,
117  * "submitted" by the receiver, final stage.  */
118 static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
119 {
120         unsigned long flags = 0;
121         struct drbd_conf *mdev = peer_req->w.mdev;
122         struct drbd_interval i;
123         int do_wake;
124         u64 block_id;
125         int do_al_complete_io;
126
127         /* after we moved peer_req to done_ee,
128          * we may no longer access it,
129          * it may be freed/reused already!
130          * (as soon as we release the req_lock) */
131         i = peer_req->i;
132         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
133         block_id = peer_req->block_id;
134
135         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
136         mdev->writ_cnt += peer_req->i.size >> 9;
137         list_del(&peer_req->w.list); /* has been on active_ee or sync_ee */
138         list_add_tail(&peer_req->w.list, &mdev->done_ee);
139
140         /*
141          * Do not remove from the write_requests tree here: we did not send the
142          * Ack yet and did not wake possibly waiting conflicting requests.
143          * Removed from the tree from "drbd_process_done_ee" within the
144          * appropriate w.cb (e_end_block/e_end_resync_block) or from
145          * _drbd_clear_done_ee.
146          */
147
148         do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
149
150         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
151                 __drbd_chk_io_error(mdev, false);
152         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
153
154         if (block_id == ID_SYNCER)
155                 drbd_rs_complete_io(mdev, i.sector);
156
157         if (do_wake)
158                 wake_up(&mdev->ee_wait);
159
160         if (do_al_complete_io)
161                 drbd_al_complete_io(mdev, &i);
162
163         wake_asender(mdev->tconn);
164         put_ldev(mdev);
165 }
166
167 /* writes on behalf of the partner, or resync writes,
168  * "submitted" by the receiver.
169  */
170 void drbd_peer_request_endio(struct bio *bio, int error)
171 {
172         struct drbd_peer_request *peer_req = bio->bi_private;
173         struct drbd_conf *mdev = peer_req->w.mdev;
174         int uptodate = bio_flagged(bio, BIO_UPTODATE);
175         int is_write = bio_data_dir(bio) == WRITE;
176
177         if (error && __ratelimit(&drbd_ratelimit_state))
178                 dev_warn(DEV, "%s: error=%d s=%llus\n",
179                                 is_write ? "write" : "read", error,
180                                 (unsigned long long)peer_req->i.sector);
181         if (!error && !uptodate) {
182                 if (__ratelimit(&drbd_ratelimit_state))
183                         dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
184                                         is_write ? "write" : "read",
185                                         (unsigned long long)peer_req->i.sector);
186                 /* strange behavior of some lower level drivers...
187                  * fail the request by clearing the uptodate flag,
188                  * but do not return any error?! */
189                 error = -EIO;
190         }
191
192         if (error)
193                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
194
195         bio_put(bio); /* no need for the bio anymore */
196         if (atomic_dec_and_test(&peer_req->pending_bios)) {
197                 if (is_write)
198                         drbd_endio_write_sec_final(peer_req);
199                 else
200                         drbd_endio_read_sec_final(peer_req);
201         }
202 }
203
204 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
205  */
206 void drbd_request_endio(struct bio *bio, int error)
207 {
208         unsigned long flags;
209         struct drbd_request *req = bio->bi_private;
210         struct drbd_conf *mdev = req->w.mdev;
211         struct bio_and_error m;
212         enum drbd_req_event what;
213         int uptodate = bio_flagged(bio, BIO_UPTODATE);
214
215         if (!error && !uptodate) {
216                 dev_warn(DEV, "p %s: setting error to -EIO\n",
217                          bio_data_dir(bio) == WRITE ? "write" : "read");
218                 /* strange behavior of some lower level drivers...
219                  * fail the request by clearing the uptodate flag,
220                  * but do not return any error?! */
221                 error = -EIO;
222         }
223
224         /* to avoid recursion in __req_mod */
225         if (unlikely(error)) {
226                 what = (bio_data_dir(bio) == WRITE)
227                         ? WRITE_COMPLETED_WITH_ERROR
228                         : (bio_rw(bio) == READ)
229                           ? READ_COMPLETED_WITH_ERROR
230                           : READ_AHEAD_COMPLETED_WITH_ERROR;
231         } else
232                 what = COMPLETED_OK;
233
234         bio_put(req->private_bio);
235         req->private_bio = ERR_PTR(error);
236
237         /* not req_mod(), we need irqsave here! */
238         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
239         __req_mod(req, what, &m);
240         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
241
242         if (m.bio)
243                 complete_master_bio(mdev, &m);
244 }
245
246 int w_read_retry_remote(struct drbd_work *w, int cancel)
247 {
248         struct drbd_request *req = container_of(w, struct drbd_request, w);
249         struct drbd_conf *mdev = w->mdev;
250
251         /* We should not detach for read io-error,
252          * but try to WRITE the P_DATA_REPLY to the failed location,
253          * to give the disk the chance to relocate that block */
254
255         spin_lock_irq(&mdev->tconn->req_lock);
256         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
257                 _req_mod(req, READ_RETRY_REMOTE_CANCELED);
258                 spin_unlock_irq(&mdev->tconn->req_lock);
259                 return 0;
260         }
261         spin_unlock_irq(&mdev->tconn->req_lock);
262
263         return w_send_read_req(w, 0);
264 }
265
266 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
267                   struct drbd_peer_request *peer_req, void *digest)
268 {
269         struct hash_desc desc;
270         struct scatterlist sg;
271         struct page *page = peer_req->pages;
272         struct page *tmp;
273         unsigned len;
274
275         desc.tfm = tfm;
276         desc.flags = 0;
277
278         sg_init_table(&sg, 1);
279         crypto_hash_init(&desc);
280
281         while ((tmp = page_chain_next(page))) {
282                 /* all but the last page will be fully used */
283                 sg_set_page(&sg, page, PAGE_SIZE, 0);
284                 crypto_hash_update(&desc, &sg, sg.length);
285                 page = tmp;
286         }
287         /* and now the last, possibly only partially used page */
288         len = peer_req->i.size & (PAGE_SIZE - 1);
289         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
290         crypto_hash_update(&desc, &sg, sg.length);
291         crypto_hash_final(&desc, digest);
292 }
293
294 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
295 {
296         struct hash_desc desc;
297         struct scatterlist sg;
298         struct bio_vec *bvec;
299         int i;
300
301         desc.tfm = tfm;
302         desc.flags = 0;
303
304         sg_init_table(&sg, 1);
305         crypto_hash_init(&desc);
306
307         __bio_for_each_segment(bvec, bio, i, 0) {
308                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
309                 crypto_hash_update(&desc, &sg, sg.length);
310         }
311         crypto_hash_final(&desc, digest);
312 }
313
314 /* MAYBE merge common code with w_e_end_ov_req */
315 static int w_e_send_csum(struct drbd_work *w, int cancel)
316 {
317         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
318         struct drbd_conf *mdev = w->mdev;
319         int digest_size;
320         void *digest;
321         int err = 0;
322
323         if (unlikely(cancel))
324                 goto out;
325
326         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
327                 goto out;
328
329         digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
330         digest = kmalloc(digest_size, GFP_NOIO);
331         if (digest) {
332                 sector_t sector = peer_req->i.sector;
333                 unsigned int size = peer_req->i.size;
334                 drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
335                 /* Free peer_req and pages before send.
336                  * In case we block on congestion, we could otherwise run into
337                  * some distributed deadlock, if the other side blocks on
338                  * congestion as well, because our receiver blocks in
339                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
340                 drbd_free_peer_req(mdev, peer_req);
341                 peer_req = NULL;
342                 inc_rs_pending(mdev);
343                 err = drbd_send_drequest_csum(mdev, sector, size,
344                                               digest, digest_size,
345                                               P_CSUM_RS_REQUEST);
346                 kfree(digest);
347         } else {
348                 dev_err(DEV, "kmalloc() of digest failed.\n");
349                 err = -ENOMEM;
350         }
351
352 out:
353         if (peer_req)
354                 drbd_free_peer_req(mdev, peer_req);
355
356         if (unlikely(err))
357                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
358         return err;
359 }
360
361 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
362
363 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
364 {
365         struct drbd_peer_request *peer_req;
366
367         if (!get_ldev(mdev))
368                 return -EIO;
369
370         if (drbd_rs_should_slow_down(mdev, sector))
371                 goto defer;
372
373         /* GFP_TRY, because if there is no memory available right now, this may
374          * be rescheduled for later. It is "only" background resync, after all. */
375         peer_req = drbd_alloc_peer_req(mdev, ID_SYNCER /* unused */, sector,
376                                        size, GFP_TRY);
377         if (!peer_req)
378                 goto defer;
379
380         peer_req->w.cb = w_e_send_csum;
381         spin_lock_irq(&mdev->tconn->req_lock);
382         list_add(&peer_req->w.list, &mdev->read_ee);
383         spin_unlock_irq(&mdev->tconn->req_lock);
384
385         atomic_add(size >> 9, &mdev->rs_sect_ev);
386         if (drbd_submit_peer_request(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
387                 return 0;
388
389         /* If it failed because of ENOMEM, retry should help.  If it failed
390          * because bio_add_page failed (probably broken lower level driver),
391          * retry may or may not help.
392          * If it does not, you may need to force disconnect. */
393         spin_lock_irq(&mdev->tconn->req_lock);
394         list_del(&peer_req->w.list);
395         spin_unlock_irq(&mdev->tconn->req_lock);
396
397         drbd_free_peer_req(mdev, peer_req);
398 defer:
399         put_ldev(mdev);
400         return -EAGAIN;
401 }
402
403 int w_resync_timer(struct drbd_work *w, int cancel)
404 {
405         struct drbd_conf *mdev = w->mdev;
406         switch (mdev->state.conn) {
407         case C_VERIFY_S:
408                 w_make_ov_request(w, cancel);
409                 break;
410         case C_SYNC_TARGET:
411                 w_make_resync_request(w, cancel);
412                 break;
413         }
414
415         return 0;
416 }
417
418 void resync_timer_fn(unsigned long data)
419 {
420         struct drbd_conf *mdev = (struct drbd_conf *) data;
421
422         if (list_empty(&mdev->resync_work.list))
423                 drbd_queue_work(&mdev->tconn->data.work, &mdev->resync_work);
424 }
425
426 static void fifo_set(struct fifo_buffer *fb, int value)
427 {
428         int i;
429
430         for (i = 0; i < fb->size; i++)
431                 fb->values[i] = value;
432 }
433
434 static int fifo_push(struct fifo_buffer *fb, int value)
435 {
436         int ov;
437
438         ov = fb->values[fb->head_index];
439         fb->values[fb->head_index++] = value;
440
441         if (fb->head_index >= fb->size)
442                 fb->head_index = 0;
443
444         return ov;
445 }
446
447 static void fifo_add_val(struct fifo_buffer *fb, int value)
448 {
449         int i;
450
451         for (i = 0; i < fb->size; i++)
452                 fb->values[i] += value;
453 }
454
455 struct fifo_buffer *fifo_alloc(int fifo_size)
456 {
457         struct fifo_buffer *fb;
458
459         fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_KERNEL);
460         if (!fb)
461                 return NULL;
462
463         fb->head_index = 0;
464         fb->size = fifo_size;
465         fb->total = 0;
466
467         return fb;
468 }
469
470 static int drbd_rs_controller(struct drbd_conf *mdev)
471 {
472         struct disk_conf *dc;
473         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
474         unsigned int want;     /* The number of sectors we want in the proxy */
475         int req_sect; /* Number of sectors to request in this turn */
476         int correction; /* Number of sectors more we need in the proxy*/
477         int cps; /* correction per invocation of drbd_rs_controller() */
478         int steps; /* Number of time steps to plan ahead */
479         int curr_corr;
480         int max_sect;
481         struct fifo_buffer *plan;
482
483         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
484         mdev->rs_in_flight -= sect_in;
485
486         dc = rcu_dereference(mdev->ldev->disk_conf);
487         plan = rcu_dereference(mdev->rs_plan_s);
488
489         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
490
491         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
492                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
493         } else { /* normal path */
494                 want = dc->c_fill_target ? dc->c_fill_target :
495                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
496         }
497
498         correction = want - mdev->rs_in_flight - plan->total;
499
500         /* Plan ahead */
501         cps = correction / steps;
502         fifo_add_val(plan, cps);
503         plan->total += cps * steps;
504
505         /* What we do in this step */
506         curr_corr = fifo_push(plan, 0);
507         plan->total -= curr_corr;
508
509         req_sect = sect_in + curr_corr;
510         if (req_sect < 0)
511                 req_sect = 0;
512
513         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
514         if (req_sect > max_sect)
515                 req_sect = max_sect;
516
517         /*
518         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
519                  sect_in, mdev->rs_in_flight, want, correction,
520                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
521         */
522
523         return req_sect;
524 }
525
526 static int drbd_rs_number_requests(struct drbd_conf *mdev)
527 {
528         int number;
529
530         rcu_read_lock();
531         if (rcu_dereference(mdev->rs_plan_s)->size) {
532                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
533                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
534         } else {
535                 mdev->c_sync_rate = rcu_dereference(mdev->ldev->disk_conf)->resync_rate;
536                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
537         }
538         rcu_read_unlock();
539
540         /* ignore the amount of pending requests, the resync controller should
541          * throttle down to incoming reply rate soon enough anyways. */
542         return number;
543 }
544
545 int w_make_resync_request(struct drbd_work *w, int cancel)
546 {
547         struct drbd_conf *mdev = w->mdev;
548         unsigned long bit;
549         sector_t sector;
550         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
551         int max_bio_size;
552         int number, rollback_i, size;
553         int align, queued, sndbuf;
554         int i = 0;
555
556         if (unlikely(cancel))
557                 return 0;
558
559         if (mdev->rs_total == 0) {
560                 /* empty resync? */
561                 drbd_resync_finished(mdev);
562                 return 0;
563         }
564
565         if (!get_ldev(mdev)) {
566                 /* Since we only need to access mdev->rsync a
567                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
568                    to continue resync with a broken disk makes no sense at
569                    all */
570                 dev_err(DEV, "Disk broke down during resync!\n");
571                 return 0;
572         }
573
574         max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
575         number = drbd_rs_number_requests(mdev);
576         if (number == 0)
577                 goto requeue;
578
579         for (i = 0; i < number; i++) {
580                 /* Stop generating RS requests, when half of the send buffer is filled */
581                 mutex_lock(&mdev->tconn->data.mutex);
582                 if (mdev->tconn->data.socket) {
583                         queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
584                         sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
585                 } else {
586                         queued = 1;
587                         sndbuf = 0;
588                 }
589                 mutex_unlock(&mdev->tconn->data.mutex);
590                 if (queued > sndbuf / 2)
591                         goto requeue;
592
593 next_sector:
594                 size = BM_BLOCK_SIZE;
595                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
596
597                 if (bit == DRBD_END_OF_BITMAP) {
598                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
599                         put_ldev(mdev);
600                         return 0;
601                 }
602
603                 sector = BM_BIT_TO_SECT(bit);
604
605                 if (drbd_rs_should_slow_down(mdev, sector) ||
606                     drbd_try_rs_begin_io(mdev, sector)) {
607                         mdev->bm_resync_fo = bit;
608                         goto requeue;
609                 }
610                 mdev->bm_resync_fo = bit + 1;
611
612                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
613                         drbd_rs_complete_io(mdev, sector);
614                         goto next_sector;
615                 }
616
617 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
618                 /* try to find some adjacent bits.
619                  * we stop if we have already the maximum req size.
620                  *
621                  * Additionally always align bigger requests, in order to
622                  * be prepared for all stripe sizes of software RAIDs.
623                  */
624                 align = 1;
625                 rollback_i = i;
626                 for (;;) {
627                         if (size + BM_BLOCK_SIZE > max_bio_size)
628                                 break;
629
630                         /* Be always aligned */
631                         if (sector & ((1<<(align+3))-1))
632                                 break;
633
634                         /* do not cross extent boundaries */
635                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
636                                 break;
637                         /* now, is it actually dirty, after all?
638                          * caution, drbd_bm_test_bit is tri-state for some
639                          * obscure reason; ( b == 0 ) would get the out-of-band
640                          * only accidentally right because of the "oddly sized"
641                          * adjustment below */
642                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
643                                 break;
644                         bit++;
645                         size += BM_BLOCK_SIZE;
646                         if ((BM_BLOCK_SIZE << align) <= size)
647                                 align++;
648                         i++;
649                 }
650                 /* if we merged some,
651                  * reset the offset to start the next drbd_bm_find_next from */
652                 if (size > BM_BLOCK_SIZE)
653                         mdev->bm_resync_fo = bit + 1;
654 #endif
655
656                 /* adjust very last sectors, in case we are oddly sized */
657                 if (sector + (size>>9) > capacity)
658                         size = (capacity-sector)<<9;
659                 if (mdev->tconn->agreed_pro_version >= 89 && mdev->tconn->csums_tfm) {
660                         switch (read_for_csum(mdev, sector, size)) {
661                         case -EIO: /* Disk failure */
662                                 put_ldev(mdev);
663                                 return -EIO;
664                         case -EAGAIN: /* allocation failed, or ldev busy */
665                                 drbd_rs_complete_io(mdev, sector);
666                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
667                                 i = rollback_i;
668                                 goto requeue;
669                         case 0:
670                                 /* everything ok */
671                                 break;
672                         default:
673                                 BUG();
674                         }
675                 } else {
676                         int err;
677
678                         inc_rs_pending(mdev);
679                         err = drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
680                                                  sector, size, ID_SYNCER);
681                         if (err) {
682                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
683                                 dec_rs_pending(mdev);
684                                 put_ldev(mdev);
685                                 return err;
686                         }
687                 }
688         }
689
690         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
691                 /* last syncer _request_ was sent,
692                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
693                  * next sync group will resume), as soon as we receive the last
694                  * resync data block, and the last bit is cleared.
695                  * until then resync "work" is "inactive" ...
696                  */
697                 put_ldev(mdev);
698                 return 0;
699         }
700
701  requeue:
702         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
703         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
704         put_ldev(mdev);
705         return 0;
706 }
707
708 static int w_make_ov_request(struct drbd_work *w, int cancel)
709 {
710         struct drbd_conf *mdev = w->mdev;
711         int number, i, size;
712         sector_t sector;
713         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
714
715         if (unlikely(cancel))
716                 return 1;
717
718         number = drbd_rs_number_requests(mdev);
719
720         sector = mdev->ov_position;
721         for (i = 0; i < number; i++) {
722                 if (sector >= capacity) {
723                         return 1;
724                 }
725
726                 size = BM_BLOCK_SIZE;
727
728                 if (drbd_rs_should_slow_down(mdev, sector) ||
729                     drbd_try_rs_begin_io(mdev, sector)) {
730                         mdev->ov_position = sector;
731                         goto requeue;
732                 }
733
734                 if (sector + (size>>9) > capacity)
735                         size = (capacity-sector)<<9;
736
737                 inc_rs_pending(mdev);
738                 if (drbd_send_ov_request(mdev, sector, size)) {
739                         dec_rs_pending(mdev);
740                         return 0;
741                 }
742                 sector += BM_SECT_PER_BIT;
743         }
744         mdev->ov_position = sector;
745
746  requeue:
747         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
748         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
749         return 1;
750 }
751
752 int w_ov_finished(struct drbd_work *w, int cancel)
753 {
754         struct drbd_conf *mdev = w->mdev;
755         kfree(w);
756         ov_out_of_sync_print(mdev);
757         drbd_resync_finished(mdev);
758
759         return 0;
760 }
761
762 static int w_resync_finished(struct drbd_work *w, int cancel)
763 {
764         struct drbd_conf *mdev = w->mdev;
765         kfree(w);
766
767         drbd_resync_finished(mdev);
768
769         return 0;
770 }
771
772 static void ping_peer(struct drbd_conf *mdev)
773 {
774         struct drbd_tconn *tconn = mdev->tconn;
775
776         clear_bit(GOT_PING_ACK, &tconn->flags);
777         request_ping(tconn);
778         wait_event(tconn->ping_wait,
779                    test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED);
780 }
781
782 int drbd_resync_finished(struct drbd_conf *mdev)
783 {
784         unsigned long db, dt, dbdt;
785         unsigned long n_oos;
786         union drbd_state os, ns;
787         struct drbd_work *w;
788         char *khelper_cmd = NULL;
789         int verify_done = 0;
790
791         /* Remove all elements from the resync LRU. Since future actions
792          * might set bits in the (main) bitmap, then the entries in the
793          * resync LRU would be wrong. */
794         if (drbd_rs_del_all(mdev)) {
795                 /* In case this is not possible now, most probably because
796                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
797                  * queue (or even the read operations for those packets
798                  * is not finished by now).   Retry in 100ms. */
799
800                 schedule_timeout_interruptible(HZ / 10);
801                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
802                 if (w) {
803                         w->cb = w_resync_finished;
804                         w->mdev = mdev;
805                         drbd_queue_work(&mdev->tconn->data.work, w);
806                         return 1;
807                 }
808                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
809         }
810
811         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
812         if (dt <= 0)
813                 dt = 1;
814         db = mdev->rs_total;
815         dbdt = Bit2KB(db/dt);
816         mdev->rs_paused /= HZ;
817
818         if (!get_ldev(mdev))
819                 goto out;
820
821         ping_peer(mdev);
822
823         spin_lock_irq(&mdev->tconn->req_lock);
824         os = drbd_read_state(mdev);
825
826         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
827
828         /* This protects us against multiple calls (that can happen in the presence
829            of application IO), and against connectivity loss just before we arrive here. */
830         if (os.conn <= C_CONNECTED)
831                 goto out_unlock;
832
833         ns = os;
834         ns.conn = C_CONNECTED;
835
836         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
837              verify_done ? "Online verify " : "Resync",
838              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
839
840         n_oos = drbd_bm_total_weight(mdev);
841
842         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
843                 if (n_oos) {
844                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
845                               n_oos, Bit2KB(1));
846                         khelper_cmd = "out-of-sync";
847                 }
848         } else {
849                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
850
851                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
852                         khelper_cmd = "after-resync-target";
853
854                 if (mdev->tconn->csums_tfm && mdev->rs_total) {
855                         const unsigned long s = mdev->rs_same_csum;
856                         const unsigned long t = mdev->rs_total;
857                         const int ratio =
858                                 (t == 0)     ? 0 :
859                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
860                         dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
861                              "transferred %luK total %luK\n",
862                              ratio,
863                              Bit2KB(mdev->rs_same_csum),
864                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
865                              Bit2KB(mdev->rs_total));
866                 }
867         }
868
869         if (mdev->rs_failed) {
870                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
871
872                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
873                         ns.disk = D_INCONSISTENT;
874                         ns.pdsk = D_UP_TO_DATE;
875                 } else {
876                         ns.disk = D_UP_TO_DATE;
877                         ns.pdsk = D_INCONSISTENT;
878                 }
879         } else {
880                 ns.disk = D_UP_TO_DATE;
881                 ns.pdsk = D_UP_TO_DATE;
882
883                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
884                         if (mdev->p_uuid) {
885                                 int i;
886                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
887                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
888                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
889                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
890                         } else {
891                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
892                         }
893                 }
894
895                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
896                         /* for verify runs, we don't update uuids here,
897                          * so there would be nothing to report. */
898                         drbd_uuid_set_bm(mdev, 0UL);
899                         drbd_print_uuids(mdev, "updated UUIDs");
900                         if (mdev->p_uuid) {
901                                 /* Now the two UUID sets are equal, update what we
902                                  * know of the peer. */
903                                 int i;
904                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
905                                         mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
906                         }
907                 }
908         }
909
910         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
911 out_unlock:
912         spin_unlock_irq(&mdev->tconn->req_lock);
913         put_ldev(mdev);
914 out:
915         mdev->rs_total  = 0;
916         mdev->rs_failed = 0;
917         mdev->rs_paused = 0;
918         if (verify_done)
919                 mdev->ov_start_sector = 0;
920
921         drbd_md_sync(mdev);
922
923         if (khelper_cmd)
924                 drbd_khelper(mdev, khelper_cmd);
925
926         return 1;
927 }
928
929 /* helper */
930 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
931 {
932         if (drbd_peer_req_has_active_page(peer_req)) {
933                 /* This might happen if sendpage() has not finished */
934                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
935                 atomic_add(i, &mdev->pp_in_use_by_net);
936                 atomic_sub(i, &mdev->pp_in_use);
937                 spin_lock_irq(&mdev->tconn->req_lock);
938                 list_add_tail(&peer_req->w.list, &mdev->net_ee);
939                 spin_unlock_irq(&mdev->tconn->req_lock);
940                 wake_up(&drbd_pp_wait);
941         } else
942                 drbd_free_peer_req(mdev, peer_req);
943 }
944
945 /**
946  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
947  * @mdev:       DRBD device.
948  * @w:          work object.
949  * @cancel:     The connection will be closed anyways
950  */
951 int w_e_end_data_req(struct drbd_work *w, int cancel)
952 {
953         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
954         struct drbd_conf *mdev = w->mdev;
955         int err;
956
957         if (unlikely(cancel)) {
958                 drbd_free_peer_req(mdev, peer_req);
959                 dec_unacked(mdev);
960                 return 0;
961         }
962
963         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
964                 err = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
965         } else {
966                 if (__ratelimit(&drbd_ratelimit_state))
967                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
968                             (unsigned long long)peer_req->i.sector);
969
970                 err = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
971         }
972
973         dec_unacked(mdev);
974
975         move_to_net_ee_or_free(mdev, peer_req);
976
977         if (unlikely(err))
978                 dev_err(DEV, "drbd_send_block() failed\n");
979         return err;
980 }
981
982 /**
983  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
984  * @mdev:       DRBD device.
985  * @w:          work object.
986  * @cancel:     The connection will be closed anyways
987  */
988 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
989 {
990         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
991         struct drbd_conf *mdev = w->mdev;
992         int err;
993
994         if (unlikely(cancel)) {
995                 drbd_free_peer_req(mdev, peer_req);
996                 dec_unacked(mdev);
997                 return 0;
998         }
999
1000         if (get_ldev_if_state(mdev, D_FAILED)) {
1001                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1002                 put_ldev(mdev);
1003         }
1004
1005         if (mdev->state.conn == C_AHEAD) {
1006                 err = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
1007         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1008                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
1009                         inc_rs_pending(mdev);
1010                         err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1011                 } else {
1012                         if (__ratelimit(&drbd_ratelimit_state))
1013                                 dev_err(DEV, "Not sending RSDataReply, "
1014                                     "partner DISKLESS!\n");
1015                         err = 0;
1016                 }
1017         } else {
1018                 if (__ratelimit(&drbd_ratelimit_state))
1019                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1020                             (unsigned long long)peer_req->i.sector);
1021
1022                 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1023
1024                 /* update resync data with failure */
1025                 drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
1026         }
1027
1028         dec_unacked(mdev);
1029
1030         move_to_net_ee_or_free(mdev, peer_req);
1031
1032         if (unlikely(err))
1033                 dev_err(DEV, "drbd_send_block() failed\n");
1034         return err;
1035 }
1036
1037 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1038 {
1039         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1040         struct drbd_conf *mdev = w->mdev;
1041         struct digest_info *di;
1042         int digest_size;
1043         void *digest = NULL;
1044         int err, eq = 0;
1045
1046         if (unlikely(cancel)) {
1047                 drbd_free_peer_req(mdev, peer_req);
1048                 dec_unacked(mdev);
1049                 return 0;
1050         }
1051
1052         if (get_ldev(mdev)) {
1053                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1054                 put_ldev(mdev);
1055         }
1056
1057         di = peer_req->digest;
1058
1059         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1060                 /* quick hack to try to avoid a race against reconfiguration.
1061                  * a real fix would be much more involved,
1062                  * introducing more locking mechanisms */
1063                 if (mdev->tconn->csums_tfm) {
1064                         digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
1065                         D_ASSERT(digest_size == di->digest_size);
1066                         digest = kmalloc(digest_size, GFP_NOIO);
1067                 }
1068                 if (digest) {
1069                         drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
1070                         eq = !memcmp(digest, di->digest, digest_size);
1071                         kfree(digest);
1072                 }
1073
1074                 if (eq) {
1075                         drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1076                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1077                         mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1078                         err = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1079                 } else {
1080                         inc_rs_pending(mdev);
1081                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1082                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1083                         kfree(di);
1084                         err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1085                 }
1086         } else {
1087                 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1088                 if (__ratelimit(&drbd_ratelimit_state))
1089                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1090         }
1091
1092         dec_unacked(mdev);
1093         move_to_net_ee_or_free(mdev, peer_req);
1094
1095         if (unlikely(err))
1096                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1097         return err;
1098 }
1099
1100 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1101 {
1102         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1103         struct drbd_conf *mdev = w->mdev;
1104         sector_t sector = peer_req->i.sector;
1105         unsigned int size = peer_req->i.size;
1106         int digest_size;
1107         void *digest;
1108         int err = 0;
1109
1110         if (unlikely(cancel))
1111                 goto out;
1112
1113         digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1114         digest = kmalloc(digest_size, GFP_NOIO);
1115         if (!digest) {
1116                 err = 1;        /* terminate the connection in case the allocation failed */
1117                 goto out;
1118         }
1119
1120         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1121                 drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1122         else
1123                 memset(digest, 0, digest_size);
1124
1125         /* Free e and pages before send.
1126          * In case we block on congestion, we could otherwise run into
1127          * some distributed deadlock, if the other side blocks on
1128          * congestion as well, because our receiver blocks in
1129          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1130         drbd_free_peer_req(mdev, peer_req);
1131         peer_req = NULL;
1132         inc_rs_pending(mdev);
1133         err = drbd_send_drequest_csum(mdev, sector, size, digest, digest_size, P_OV_REPLY);
1134         if (err)
1135                 dec_rs_pending(mdev);
1136         kfree(digest);
1137
1138 out:
1139         if (peer_req)
1140                 drbd_free_peer_req(mdev, peer_req);
1141         dec_unacked(mdev);
1142         return err;
1143 }
1144
1145 void drbd_ov_out_of_sync_found(struct drbd_conf *mdev, sector_t sector, int size)
1146 {
1147         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1148                 mdev->ov_last_oos_size += size>>9;
1149         } else {
1150                 mdev->ov_last_oos_start = sector;
1151                 mdev->ov_last_oos_size = size>>9;
1152         }
1153         drbd_set_out_of_sync(mdev, sector, size);
1154 }
1155
1156 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1157 {
1158         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1159         struct drbd_conf *mdev = w->mdev;
1160         struct digest_info *di;
1161         void *digest;
1162         sector_t sector = peer_req->i.sector;
1163         unsigned int size = peer_req->i.size;
1164         int digest_size;
1165         int err, eq = 0;
1166
1167         if (unlikely(cancel)) {
1168                 drbd_free_peer_req(mdev, peer_req);
1169                 dec_unacked(mdev);
1170                 return 0;
1171         }
1172
1173         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1174          * the resync lru has been cleaned up already */
1175         if (get_ldev(mdev)) {
1176                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1177                 put_ldev(mdev);
1178         }
1179
1180         di = peer_req->digest;
1181
1182         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1183                 digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1184                 digest = kmalloc(digest_size, GFP_NOIO);
1185                 if (digest) {
1186                         drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1187
1188                         D_ASSERT(digest_size == di->digest_size);
1189                         eq = !memcmp(digest, di->digest, digest_size);
1190                         kfree(digest);
1191                 }
1192         }
1193
1194         /* Free peer_req and pages before send.
1195          * In case we block on congestion, we could otherwise run into
1196          * some distributed deadlock, if the other side blocks on
1197          * congestion as well, because our receiver blocks in
1198          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1199         drbd_free_peer_req(mdev, peer_req);
1200         if (!eq)
1201                 drbd_ov_out_of_sync_found(mdev, sector, size);
1202         else
1203                 ov_out_of_sync_print(mdev);
1204
1205         err = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1206                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1207
1208         dec_unacked(mdev);
1209
1210         --mdev->ov_left;
1211
1212         /* let's advance progress step marks only for every other megabyte */
1213         if ((mdev->ov_left & 0x200) == 0x200)
1214                 drbd_advance_rs_marks(mdev, mdev->ov_left);
1215
1216         if (mdev->ov_left == 0) {
1217                 ov_out_of_sync_print(mdev);
1218                 drbd_resync_finished(mdev);
1219         }
1220
1221         return err;
1222 }
1223
1224 int w_prev_work_done(struct drbd_work *w, int cancel)
1225 {
1226         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1227
1228         complete(&b->done);
1229         return 0;
1230 }
1231
1232 int w_send_barrier(struct drbd_work *w, int cancel)
1233 {
1234         struct drbd_socket *sock;
1235         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1236         struct drbd_conf *mdev = w->mdev;
1237         struct p_barrier *p;
1238
1239         /* really avoid racing with tl_clear.  w.cb may have been referenced
1240          * just before it was reassigned and re-queued, so double check that.
1241          * actually, this race was harmless, since we only try to send the
1242          * barrier packet here, and otherwise do nothing with the object.
1243          * but compare with the head of w_clear_epoch */
1244         spin_lock_irq(&mdev->tconn->req_lock);
1245         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1246                 cancel = 1;
1247         spin_unlock_irq(&mdev->tconn->req_lock);
1248         if (cancel)
1249                 return 0;
1250
1251         sock = &mdev->tconn->data;
1252         p = drbd_prepare_command(mdev, sock);
1253         if (!p)
1254                 return -EIO;
1255         p->barrier = b->br_number;
1256         /* inc_ap_pending was done where this was queued.
1257          * dec_ap_pending will be done in got_BarrierAck
1258          * or (on connection loss) in w_clear_epoch.  */
1259         return drbd_send_command(mdev, sock, P_BARRIER, sizeof(*p), NULL, 0);
1260 }
1261
1262 int w_send_write_hint(struct drbd_work *w, int cancel)
1263 {
1264         struct drbd_conf *mdev = w->mdev;
1265         struct drbd_socket *sock;
1266
1267         if (cancel)
1268                 return 0;
1269         sock = &mdev->tconn->data;
1270         if (!drbd_prepare_command(mdev, sock))
1271                 return -EIO;
1272         return drbd_send_command(mdev, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1273 }
1274
1275 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1276 {
1277         struct drbd_request *req = container_of(w, struct drbd_request, w);
1278         struct drbd_conf *mdev = w->mdev;
1279         int err;
1280
1281         if (unlikely(cancel)) {
1282                 req_mod(req, SEND_CANCELED);
1283                 return 0;
1284         }
1285
1286         err = drbd_send_out_of_sync(mdev, req);
1287         req_mod(req, OOS_HANDED_TO_NETWORK);
1288
1289         return err;
1290 }
1291
1292 /**
1293  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1294  * @mdev:       DRBD device.
1295  * @w:          work object.
1296  * @cancel:     The connection will be closed anyways
1297  */
1298 int w_send_dblock(struct drbd_work *w, int cancel)
1299 {
1300         struct drbd_request *req = container_of(w, struct drbd_request, w);
1301         struct drbd_conf *mdev = w->mdev;
1302         int err;
1303
1304         if (unlikely(cancel)) {
1305                 req_mod(req, SEND_CANCELED);
1306                 return 0;
1307         }
1308
1309         err = drbd_send_dblock(mdev, req);
1310         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1311
1312         return err;
1313 }
1314
1315 /**
1316  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1317  * @mdev:       DRBD device.
1318  * @w:          work object.
1319  * @cancel:     The connection will be closed anyways
1320  */
1321 int w_send_read_req(struct drbd_work *w, int cancel)
1322 {
1323         struct drbd_request *req = container_of(w, struct drbd_request, w);
1324         struct drbd_conf *mdev = w->mdev;
1325         int err;
1326
1327         if (unlikely(cancel)) {
1328                 req_mod(req, SEND_CANCELED);
1329                 return 0;
1330         }
1331
1332         err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1333                                  (unsigned long)req);
1334
1335         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1336
1337         return err;
1338 }
1339
1340 int w_restart_disk_io(struct drbd_work *w, int cancel)
1341 {
1342         struct drbd_request *req = container_of(w, struct drbd_request, w);
1343         struct drbd_conf *mdev = w->mdev;
1344
1345         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1346                 drbd_al_begin_io(mdev, &req->i);
1347
1348         drbd_req_make_private_bio(req, req->master_bio);
1349         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1350         generic_make_request(req->private_bio);
1351
1352         return 0;
1353 }
1354
1355 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1356 {
1357         struct drbd_conf *odev = mdev;
1358         int resync_after;
1359
1360         while (1) {
1361                 if (!odev->ldev)
1362                         return 1;
1363                 rcu_read_lock();
1364                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1365                 rcu_read_unlock();
1366                 if (resync_after == -1)
1367                         return 1;
1368                 odev = minor_to_mdev(resync_after);
1369                 if (!expect(odev))
1370                         return 1;
1371                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1372                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1373                     odev->state.aftr_isp || odev->state.peer_isp ||
1374                     odev->state.user_isp)
1375                         return 0;
1376         }
1377 }
1378
1379 /**
1380  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1381  * @mdev:       DRBD device.
1382  *
1383  * Called from process context only (admin command and after_state_ch).
1384  */
1385 static int _drbd_pause_after(struct drbd_conf *mdev)
1386 {
1387         struct drbd_conf *odev;
1388         int i, rv = 0;
1389
1390         rcu_read_lock();
1391         idr_for_each_entry(&minors, odev, i) {
1392                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1393                         continue;
1394                 if (!_drbd_may_sync_now(odev))
1395                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1396                                != SS_NOTHING_TO_DO);
1397         }
1398         rcu_read_unlock();
1399
1400         return rv;
1401 }
1402
1403 /**
1404  * _drbd_resume_next() - Resume resync on all devices that may resync now
1405  * @mdev:       DRBD device.
1406  *
1407  * Called from process context only (admin command and worker).
1408  */
1409 static int _drbd_resume_next(struct drbd_conf *mdev)
1410 {
1411         struct drbd_conf *odev;
1412         int i, rv = 0;
1413
1414         rcu_read_lock();
1415         idr_for_each_entry(&minors, odev, i) {
1416                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1417                         continue;
1418                 if (odev->state.aftr_isp) {
1419                         if (_drbd_may_sync_now(odev))
1420                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1421                                                         CS_HARD, NULL)
1422                                        != SS_NOTHING_TO_DO) ;
1423                 }
1424         }
1425         rcu_read_unlock();
1426         return rv;
1427 }
1428
1429 void resume_next_sg(struct drbd_conf *mdev)
1430 {
1431         write_lock_irq(&global_state_lock);
1432         _drbd_resume_next(mdev);
1433         write_unlock_irq(&global_state_lock);
1434 }
1435
1436 void suspend_other_sg(struct drbd_conf *mdev)
1437 {
1438         write_lock_irq(&global_state_lock);
1439         _drbd_pause_after(mdev);
1440         write_unlock_irq(&global_state_lock);
1441 }
1442
1443 /* caller must hold global_state_lock */
1444 enum drbd_ret_code drbd_resync_after_valid(struct drbd_conf *mdev, int o_minor)
1445 {
1446         struct drbd_conf *odev;
1447         int resync_after;
1448
1449         if (o_minor == -1)
1450                 return NO_ERROR;
1451         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1452                 return ERR_RESYNC_AFTER;
1453
1454         /* check for loops */
1455         odev = minor_to_mdev(o_minor);
1456         while (1) {
1457                 if (odev == mdev)
1458                         return ERR_RESYNC_AFTER_CYCLE;
1459
1460                 rcu_read_lock();
1461                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1462                 rcu_read_unlock();
1463                 /* dependency chain ends here, no cycles. */
1464                 if (resync_after == -1)
1465                         return NO_ERROR;
1466
1467                 /* follow the dependency chain */
1468                 odev = minor_to_mdev(resync_after);
1469         }
1470 }
1471
1472 /* caller must hold global_state_lock */
1473 void drbd_resync_after_changed(struct drbd_conf *mdev)
1474 {
1475         int changes;
1476
1477         do {
1478                 changes  = _drbd_pause_after(mdev);
1479                 changes |= _drbd_resume_next(mdev);
1480         } while (changes);
1481 }
1482
1483 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1484 {
1485         struct fifo_buffer *plan;
1486
1487         atomic_set(&mdev->rs_sect_in, 0);
1488         atomic_set(&mdev->rs_sect_ev, 0);
1489         mdev->rs_in_flight = 0;
1490
1491         /* Updating the RCU protected object in place is necessary since
1492            this function gets called from atomic context.
1493            It is valid since all other updates also lead to an completely
1494            empty fifo */
1495         rcu_read_lock();
1496         plan = rcu_dereference(mdev->rs_plan_s);
1497         plan->total = 0;
1498         fifo_set(plan, 0);
1499         rcu_read_unlock();
1500 }
1501
1502 void start_resync_timer_fn(unsigned long data)
1503 {
1504         struct drbd_conf *mdev = (struct drbd_conf *) data;
1505
1506         drbd_queue_work(&mdev->tconn->data.work, &mdev->start_resync_work);
1507 }
1508
1509 int w_start_resync(struct drbd_work *w, int cancel)
1510 {
1511         struct drbd_conf *mdev = w->mdev;
1512
1513         if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1514                 dev_warn(DEV, "w_start_resync later...\n");
1515                 mdev->start_resync_timer.expires = jiffies + HZ/10;
1516                 add_timer(&mdev->start_resync_timer);
1517                 return 0;
1518         }
1519
1520         drbd_start_resync(mdev, C_SYNC_SOURCE);
1521         clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
1522         return 0;
1523 }
1524
1525 /**
1526  * drbd_start_resync() - Start the resync process
1527  * @mdev:       DRBD device.
1528  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1529  *
1530  * This function might bring you directly into one of the
1531  * C_PAUSED_SYNC_* states.
1532  */
1533 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1534 {
1535         union drbd_state ns;
1536         int r;
1537
1538         if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1539                 dev_err(DEV, "Resync already running!\n");
1540                 return;
1541         }
1542
1543         if (mdev->state.conn < C_AHEAD) {
1544                 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1545                 drbd_rs_cancel_all(mdev);
1546                 /* This should be done when we abort the resync. We definitely do not
1547                    want to have this for connections going back and forth between
1548                    Ahead/Behind and SyncSource/SyncTarget */
1549         }
1550
1551         if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1552                 if (side == C_SYNC_TARGET) {
1553                         /* Since application IO was locked out during C_WF_BITMAP_T and
1554                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1555                            we check that we might make the data inconsistent. */
1556                         r = drbd_khelper(mdev, "before-resync-target");
1557                         r = (r >> 8) & 0xff;
1558                         if (r > 0) {
1559                                 dev_info(DEV, "before-resync-target handler returned %d, "
1560                                          "dropping connection.\n", r);
1561                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1562                                 return;
1563                         }
1564                 } else /* C_SYNC_SOURCE */ {
1565                         r = drbd_khelper(mdev, "before-resync-source");
1566                         r = (r >> 8) & 0xff;
1567                         if (r > 0) {
1568                                 if (r == 3) {
1569                                         dev_info(DEV, "before-resync-source handler returned %d, "
1570                                                  "ignoring. Old userland tools?", r);
1571                                 } else {
1572                                         dev_info(DEV, "before-resync-source handler returned %d, "
1573                                                  "dropping connection.\n", r);
1574                                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1575                                         return;
1576                                 }
1577                         }
1578                 }
1579         }
1580
1581         if (current == mdev->tconn->worker.task) {
1582                 /* The worker should not sleep waiting for state_mutex,
1583                    that can take long */
1584                 if (!mutex_trylock(mdev->state_mutex)) {
1585                         set_bit(B_RS_H_DONE, &mdev->flags);
1586                         mdev->start_resync_timer.expires = jiffies + HZ/5;
1587                         add_timer(&mdev->start_resync_timer);
1588                         return;
1589                 }
1590         } else {
1591                 mutex_lock(mdev->state_mutex);
1592         }
1593         clear_bit(B_RS_H_DONE, &mdev->flags);
1594
1595         write_lock_irq(&global_state_lock);
1596         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1597                 write_unlock_irq(&global_state_lock);
1598                 mutex_unlock(mdev->state_mutex);
1599                 return;
1600         }
1601
1602         ns = drbd_read_state(mdev);
1603
1604         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1605
1606         ns.conn = side;
1607
1608         if (side == C_SYNC_TARGET)
1609                 ns.disk = D_INCONSISTENT;
1610         else /* side == C_SYNC_SOURCE */
1611                 ns.pdsk = D_INCONSISTENT;
1612
1613         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1614         ns = drbd_read_state(mdev);
1615
1616         if (ns.conn < C_CONNECTED)
1617                 r = SS_UNKNOWN_ERROR;
1618
1619         if (r == SS_SUCCESS) {
1620                 unsigned long tw = drbd_bm_total_weight(mdev);
1621                 unsigned long now = jiffies;
1622                 int i;
1623
1624                 mdev->rs_failed    = 0;
1625                 mdev->rs_paused    = 0;
1626                 mdev->rs_same_csum = 0;
1627                 mdev->rs_last_events = 0;
1628                 mdev->rs_last_sect_ev = 0;
1629                 mdev->rs_total     = tw;
1630                 mdev->rs_start     = now;
1631                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1632                         mdev->rs_mark_left[i] = tw;
1633                         mdev->rs_mark_time[i] = now;
1634                 }
1635                 _drbd_pause_after(mdev);
1636         }
1637         write_unlock_irq(&global_state_lock);
1638
1639         if (r == SS_SUCCESS) {
1640                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1641                      drbd_conn_str(ns.conn),
1642                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1643                      (unsigned long) mdev->rs_total);
1644                 if (side == C_SYNC_TARGET)
1645                         mdev->bm_resync_fo = 0;
1646
1647                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1648                  * with w_send_oos, or the sync target will get confused as to
1649                  * how much bits to resync.  We cannot do that always, because for an
1650                  * empty resync and protocol < 95, we need to do it here, as we call
1651                  * drbd_resync_finished from here in that case.
1652                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1653                  * and from after_state_ch otherwise. */
1654                 if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1655                         drbd_gen_and_send_sync_uuid(mdev);
1656
1657                 if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1658                         /* This still has a race (about when exactly the peers
1659                          * detect connection loss) that can lead to a full sync
1660                          * on next handshake. In 8.3.9 we fixed this with explicit
1661                          * resync-finished notifications, but the fix
1662                          * introduces a protocol change.  Sleeping for some
1663                          * time longer than the ping interval + timeout on the
1664                          * SyncSource, to give the SyncTarget the chance to
1665                          * detect connection loss, then waiting for a ping
1666                          * response (implicit in drbd_resync_finished) reduces
1667                          * the race considerably, but does not solve it. */
1668                         if (side == C_SYNC_SOURCE) {
1669                                 struct net_conf *nc;
1670                                 int timeo;
1671
1672                                 rcu_read_lock();
1673                                 nc = rcu_dereference(mdev->tconn->net_conf);
1674                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1675                                 rcu_read_unlock();
1676                                 schedule_timeout_interruptible(timeo);
1677                         }
1678                         drbd_resync_finished(mdev);
1679                 }
1680
1681                 drbd_rs_controller_reset(mdev);
1682                 /* ns.conn may already be != mdev->state.conn,
1683                  * we may have been paused in between, or become paused until
1684                  * the timer triggers.
1685                  * No matter, that is handled in resync_timer_fn() */
1686                 if (ns.conn == C_SYNC_TARGET)
1687                         mod_timer(&mdev->resync_timer, jiffies);
1688
1689                 drbd_md_sync(mdev);
1690         }
1691         put_ldev(mdev);
1692         mutex_unlock(mdev->state_mutex);
1693 }
1694
1695 int drbd_worker(struct drbd_thread *thi)
1696 {
1697         struct drbd_tconn *tconn = thi->tconn;
1698         struct drbd_work *w = NULL;
1699         struct drbd_conf *mdev;
1700         struct net_conf *nc;
1701         LIST_HEAD(work_list);
1702         int vnr, intr = 0;
1703         int cork;
1704
1705         while (get_t_state(thi) == RUNNING) {
1706                 drbd_thread_current_set_cpu(thi);
1707
1708                 if (down_trylock(&tconn->data.work.s)) {
1709                         mutex_lock(&tconn->data.mutex);
1710
1711                         rcu_read_lock();
1712                         nc = rcu_dereference(tconn->net_conf);
1713                         cork = nc ? nc->tcp_cork : 0;
1714                         rcu_read_unlock();
1715
1716                         if (tconn->data.socket && cork)
1717                                 drbd_tcp_uncork(tconn->data.socket);
1718                         mutex_unlock(&tconn->data.mutex);
1719
1720                         intr = down_interruptible(&tconn->data.work.s);
1721
1722                         mutex_lock(&tconn->data.mutex);
1723                         if (tconn->data.socket  && cork)
1724                                 drbd_tcp_cork(tconn->data.socket);
1725                         mutex_unlock(&tconn->data.mutex);
1726                 }
1727
1728                 if (intr) {
1729                         flush_signals(current);
1730                         if (get_t_state(thi) == RUNNING) {
1731                                 conn_warn(tconn, "Worker got an unexpected signal\n");
1732                                 continue;
1733                         }
1734                         break;
1735                 }
1736
1737                 if (get_t_state(thi) != RUNNING)
1738                         break;
1739                 /* With this break, we have done a down() but not consumed
1740                    the entry from the list. The cleanup code takes care of
1741                    this...   */
1742
1743                 w = NULL;
1744                 spin_lock_irq(&tconn->data.work.q_lock);
1745                 if (list_empty(&tconn->data.work.q)) {
1746                         /* something terribly wrong in our logic.
1747                          * we were able to down() the semaphore,
1748                          * but the list is empty... doh.
1749                          *
1750                          * what is the best thing to do now?
1751                          * try again from scratch, restarting the receiver,
1752                          * asender, whatnot? could break even more ugly,
1753                          * e.g. when we are primary, but no good local data.
1754                          *
1755                          * I'll try to get away just starting over this loop.
1756                          */
1757                         conn_warn(tconn, "Work list unexpectedly empty\n");
1758                         spin_unlock_irq(&tconn->data.work.q_lock);
1759                         continue;
1760                 }
1761                 w = list_entry(tconn->data.work.q.next, struct drbd_work, list);
1762                 list_del_init(&w->list);
1763                 spin_unlock_irq(&tconn->data.work.q_lock);
1764
1765                 if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) {
1766                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1767                         if (tconn->cstate >= C_WF_REPORT_PARAMS)
1768                                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1769                 }
1770         }
1771
1772         spin_lock_irq(&tconn->data.work.q_lock);
1773         while (!list_empty(&tconn->data.work.q)) {
1774                 list_splice_init(&tconn->data.work.q, &work_list);
1775                 spin_unlock_irq(&tconn->data.work.q_lock);
1776
1777                 while (!list_empty(&work_list)) {
1778                         w = list_entry(work_list.next, struct drbd_work, list);
1779                         list_del_init(&w->list);
1780                         w->cb(w, 1);
1781                 }
1782
1783                 spin_lock_irq(&tconn->data.work.q_lock);
1784         }
1785         sema_init(&tconn->data.work.s, 0);
1786         /* DANGEROUS race: if someone did queue his work within the spinlock,
1787          * but up() ed outside the spinlock, we could get an up() on the
1788          * semaphore without corresponding list entry.
1789          * So don't do that.
1790          */
1791         spin_unlock_irq(&tconn->data.work.q_lock);
1792
1793         rcu_read_lock();
1794         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1795                 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1796                 kref_get(&mdev->kref);
1797                 rcu_read_unlock();
1798                 drbd_mdev_cleanup(mdev);
1799                 kref_put(&mdev->kref, &drbd_minor_destroy);
1800                 rcu_read_lock();
1801         }
1802         rcu_read_unlock();
1803
1804         return 0;
1805 }