KVM: x86: update KVM_SAVE_MSRS_BEGIN to correct value
[cascardo/linux.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42 static int w_make_resync_request(struct drbd_conf *mdev,
43                                  struct drbd_work *w, int cancel);
44
45
46
47 /* endio handlers:
48  *   drbd_md_io_complete (defined here)
49  *   drbd_endio_pri (defined here)
50  *   drbd_endio_sec (defined here)
51  *   bm_async_io_complete (defined in drbd_bitmap.c)
52  *
53  * For all these callbacks, note the following:
54  * The callbacks will be called in irq context by the IDE drivers,
55  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
56  * Try to get the locking right :)
57  *
58  */
59
60
61 /* About the global_state_lock
62    Each state transition on an device holds a read lock. In case we have
63    to evaluate the sync after dependencies, we grab a write lock, because
64    we need stable states on all devices for that.  */
65 rwlock_t global_state_lock;
66
67 /* used for synchronous meta data and bitmap IO
68  * submitted by drbd_md_sync_page_io()
69  */
70 void drbd_md_io_complete(struct bio *bio, int error)
71 {
72         struct drbd_md_io *md_io;
73         struct drbd_conf *mdev;
74
75         md_io = (struct drbd_md_io *)bio->bi_private;
76         mdev = container_of(md_io, struct drbd_conf, md_io);
77
78         md_io->error = error;
79
80         /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
81          * to timeout on the lower level device, and eventually detach from it.
82          * If this io completion runs after that timeout expired, this
83          * drbd_md_put_buffer() may allow us to finally try and re-attach.
84          * During normal operation, this only puts that extra reference
85          * down to 1 again.
86          * Make sure we first drop the reference, and only then signal
87          * completion, or we may (in drbd_al_read_log()) cycle so fast into the
88          * next drbd_md_sync_page_io(), that we trigger the
89          * ASSERT(atomic_read(&mdev->md_io_in_use) == 1) there.
90          */
91         drbd_md_put_buffer(mdev);
92         md_io->done = 1;
93         wake_up(&mdev->misc_wait);
94         bio_put(bio);
95         put_ldev(mdev);
96 }
97
98 /* reads on behalf of the partner,
99  * "submitted" by the receiver
100  */
101 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
102 {
103         unsigned long flags = 0;
104         struct drbd_conf *mdev = e->mdev;
105
106         D_ASSERT(e->block_id != ID_VACANT);
107
108         spin_lock_irqsave(&mdev->req_lock, flags);
109         mdev->read_cnt += e->size >> 9;
110         list_del(&e->w.list);
111         if (list_empty(&mdev->read_ee))
112                 wake_up(&mdev->ee_wait);
113         if (test_bit(__EE_WAS_ERROR, &e->flags))
114                 __drbd_chk_io_error(mdev, false);
115         spin_unlock_irqrestore(&mdev->req_lock, flags);
116
117         drbd_queue_work(&mdev->data.work, &e->w);
118         put_ldev(mdev);
119 }
120
121 /* writes on behalf of the partner, or resync writes,
122  * "submitted" by the receiver, final stage.  */
123 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
124 {
125         unsigned long flags = 0;
126         struct drbd_conf *mdev = e->mdev;
127         sector_t e_sector;
128         int do_wake;
129         int is_syncer_req;
130         int do_al_complete_io;
131
132         D_ASSERT(e->block_id != ID_VACANT);
133
134         /* after we moved e to done_ee,
135          * we may no longer access it,
136          * it may be freed/reused already!
137          * (as soon as we release the req_lock) */
138         e_sector = e->sector;
139         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
140         is_syncer_req = is_syncer_block_id(e->block_id);
141
142         spin_lock_irqsave(&mdev->req_lock, flags);
143         mdev->writ_cnt += e->size >> 9;
144         list_del(&e->w.list); /* has been on active_ee or sync_ee */
145         list_add_tail(&e->w.list, &mdev->done_ee);
146
147         /* No hlist_del_init(&e->collision) here, we did not send the Ack yet,
148          * neither did we wake possibly waiting conflicting requests.
149          * done from "drbd_process_done_ee" within the appropriate w.cb
150          * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
151
152         do_wake = is_syncer_req
153                 ? list_empty(&mdev->sync_ee)
154                 : list_empty(&mdev->active_ee);
155
156         if (test_bit(__EE_WAS_ERROR, &e->flags))
157                 __drbd_chk_io_error(mdev, false);
158         spin_unlock_irqrestore(&mdev->req_lock, flags);
159
160         if (is_syncer_req)
161                 drbd_rs_complete_io(mdev, e_sector);
162
163         if (do_wake)
164                 wake_up(&mdev->ee_wait);
165
166         if (do_al_complete_io)
167                 drbd_al_complete_io(mdev, e_sector);
168
169         wake_asender(mdev);
170         put_ldev(mdev);
171 }
172
173 /* writes on behalf of the partner, or resync writes,
174  * "submitted" by the receiver.
175  */
176 void drbd_endio_sec(struct bio *bio, int error)
177 {
178         struct drbd_epoch_entry *e = bio->bi_private;
179         struct drbd_conf *mdev = e->mdev;
180         int uptodate = bio_flagged(bio, BIO_UPTODATE);
181         int is_write = bio_data_dir(bio) == WRITE;
182
183         if (error && __ratelimit(&drbd_ratelimit_state))
184                 dev_warn(DEV, "%s: error=%d s=%llus\n",
185                                 is_write ? "write" : "read", error,
186                                 (unsigned long long)e->sector);
187         if (!error && !uptodate) {
188                 if (__ratelimit(&drbd_ratelimit_state))
189                         dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
190                                         is_write ? "write" : "read",
191                                         (unsigned long long)e->sector);
192                 /* strange behavior of some lower level drivers...
193                  * fail the request by clearing the uptodate flag,
194                  * but do not return any error?! */
195                 error = -EIO;
196         }
197
198         if (error)
199                 set_bit(__EE_WAS_ERROR, &e->flags);
200
201         bio_put(bio); /* no need for the bio anymore */
202         if (atomic_dec_and_test(&e->pending_bios)) {
203                 if (is_write)
204                         drbd_endio_write_sec_final(e);
205                 else
206                         drbd_endio_read_sec_final(e);
207         }
208 }
209
210 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
211  */
212 void drbd_endio_pri(struct bio *bio, int error)
213 {
214         unsigned long flags;
215         struct drbd_request *req = bio->bi_private;
216         struct drbd_conf *mdev = req->mdev;
217         struct bio_and_error m;
218         enum drbd_req_event what;
219         int uptodate = bio_flagged(bio, BIO_UPTODATE);
220
221         if (!error && !uptodate) {
222                 dev_warn(DEV, "p %s: setting error to -EIO\n",
223                          bio_data_dir(bio) == WRITE ? "write" : "read");
224                 /* strange behavior of some lower level drivers...
225                  * fail the request by clearing the uptodate flag,
226                  * but do not return any error?! */
227                 error = -EIO;
228         }
229
230         /* to avoid recursion in __req_mod */
231         if (unlikely(error)) {
232                 what = (bio_data_dir(bio) == WRITE)
233                         ? write_completed_with_error
234                         : (bio_rw(bio) == READ)
235                           ? read_completed_with_error
236                           : read_ahead_completed_with_error;
237         } else
238                 what = completed_ok;
239
240         bio_put(req->private_bio);
241         req->private_bio = ERR_PTR(error);
242
243         /* not req_mod(), we need irqsave here! */
244         spin_lock_irqsave(&mdev->req_lock, flags);
245         __req_mod(req, what, &m);
246         spin_unlock_irqrestore(&mdev->req_lock, flags);
247         put_ldev(mdev);
248
249         if (m.bio)
250                 complete_master_bio(mdev, &m);
251 }
252
253 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
254 {
255         struct drbd_request *req = container_of(w, struct drbd_request, w);
256
257         /* We should not detach for read io-error,
258          * but try to WRITE the P_DATA_REPLY to the failed location,
259          * to give the disk the chance to relocate that block */
260
261         spin_lock_irq(&mdev->req_lock);
262         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
263                 _req_mod(req, read_retry_remote_canceled);
264                 spin_unlock_irq(&mdev->req_lock);
265                 return 1;
266         }
267         spin_unlock_irq(&mdev->req_lock);
268
269         return w_send_read_req(mdev, w, 0);
270 }
271
272 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
273 {
274         struct hash_desc desc;
275         struct scatterlist sg;
276         struct page *page = e->pages;
277         struct page *tmp;
278         unsigned len;
279
280         desc.tfm = tfm;
281         desc.flags = 0;
282
283         sg_init_table(&sg, 1);
284         crypto_hash_init(&desc);
285
286         while ((tmp = page_chain_next(page))) {
287                 /* all but the last page will be fully used */
288                 sg_set_page(&sg, page, PAGE_SIZE, 0);
289                 crypto_hash_update(&desc, &sg, sg.length);
290                 page = tmp;
291         }
292         /* and now the last, possibly only partially used page */
293         len = e->size & (PAGE_SIZE - 1);
294         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
295         crypto_hash_update(&desc, &sg, sg.length);
296         crypto_hash_final(&desc, digest);
297 }
298
299 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
300 {
301         struct hash_desc desc;
302         struct scatterlist sg;
303         struct bio_vec *bvec;
304         int i;
305
306         desc.tfm = tfm;
307         desc.flags = 0;
308
309         sg_init_table(&sg, 1);
310         crypto_hash_init(&desc);
311
312         bio_for_each_segment(bvec, bio, i) {
313                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
314                 crypto_hash_update(&desc, &sg, sg.length);
315         }
316         crypto_hash_final(&desc, digest);
317 }
318
319 /* TODO merge common code with w_e_end_ov_req */
320 int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
321 {
322         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
323         int digest_size;
324         void *digest;
325         int ok = 1;
326
327         D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
328
329         if (unlikely(cancel))
330                 goto out;
331
332         if (likely((e->flags & EE_WAS_ERROR) != 0))
333                 goto out;
334
335         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
336         digest = kmalloc(digest_size, GFP_NOIO);
337         if (digest) {
338                 sector_t sector = e->sector;
339                 unsigned int size = e->size;
340                 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
341                 /* Free e and pages before send.
342                  * In case we block on congestion, we could otherwise run into
343                  * some distributed deadlock, if the other side blocks on
344                  * congestion as well, because our receiver blocks in
345                  * drbd_pp_alloc due to pp_in_use > max_buffers. */
346                 drbd_free_ee(mdev, e);
347                 e = NULL;
348                 inc_rs_pending(mdev);
349                 ok = drbd_send_drequest_csum(mdev, sector, size,
350                                              digest, digest_size,
351                                              P_CSUM_RS_REQUEST);
352                 kfree(digest);
353         } else {
354                 dev_err(DEV, "kmalloc() of digest failed.\n");
355                 ok = 0;
356         }
357
358 out:
359         if (e)
360                 drbd_free_ee(mdev, e);
361
362         if (unlikely(!ok))
363                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
364         return ok;
365 }
366
367 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
368
369 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
370 {
371         struct drbd_epoch_entry *e;
372
373         if (!get_ldev(mdev))
374                 return -EIO;
375
376         if (drbd_rs_should_slow_down(mdev, sector))
377                 goto defer;
378
379         /* GFP_TRY, because if there is no memory available right now, this may
380          * be rescheduled for later. It is "only" background resync, after all. */
381         e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
382         if (!e)
383                 goto defer;
384
385         e->w.cb = w_e_send_csum;
386         spin_lock_irq(&mdev->req_lock);
387         list_add(&e->w.list, &mdev->read_ee);
388         spin_unlock_irq(&mdev->req_lock);
389
390         atomic_add(size >> 9, &mdev->rs_sect_ev);
391         if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
392                 return 0;
393
394         /* If it failed because of ENOMEM, retry should help.  If it failed
395          * because bio_add_page failed (probably broken lower level driver),
396          * retry may or may not help.
397          * If it does not, you may need to force disconnect. */
398         spin_lock_irq(&mdev->req_lock);
399         list_del(&e->w.list);
400         spin_unlock_irq(&mdev->req_lock);
401
402         drbd_free_ee(mdev, e);
403 defer:
404         put_ldev(mdev);
405         return -EAGAIN;
406 }
407
408 int w_resync_timer(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
409 {
410         switch (mdev->state.conn) {
411         case C_VERIFY_S:
412                 w_make_ov_request(mdev, w, cancel);
413                 break;
414         case C_SYNC_TARGET:
415                 w_make_resync_request(mdev, w, cancel);
416                 break;
417         }
418
419         return 1;
420 }
421
422 void resync_timer_fn(unsigned long data)
423 {
424         struct drbd_conf *mdev = (struct drbd_conf *) data;
425
426         if (list_empty(&mdev->resync_work.list))
427                 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
428 }
429
430 static void fifo_set(struct fifo_buffer *fb, int value)
431 {
432         int i;
433
434         for (i = 0; i < fb->size; i++)
435                 fb->values[i] = value;
436 }
437
438 static int fifo_push(struct fifo_buffer *fb, int value)
439 {
440         int ov;
441
442         ov = fb->values[fb->head_index];
443         fb->values[fb->head_index++] = value;
444
445         if (fb->head_index >= fb->size)
446                 fb->head_index = 0;
447
448         return ov;
449 }
450
451 static void fifo_add_val(struct fifo_buffer *fb, int value)
452 {
453         int i;
454
455         for (i = 0; i < fb->size; i++)
456                 fb->values[i] += value;
457 }
458
459 static int drbd_rs_controller(struct drbd_conf *mdev)
460 {
461         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
462         unsigned int want;     /* The number of sectors we want in the proxy */
463         int req_sect; /* Number of sectors to request in this turn */
464         int correction; /* Number of sectors more we need in the proxy*/
465         int cps; /* correction per invocation of drbd_rs_controller() */
466         int steps; /* Number of time steps to plan ahead */
467         int curr_corr;
468         int max_sect;
469
470         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
471         mdev->rs_in_flight -= sect_in;
472
473         spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
474
475         steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
476
477         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
478                 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
479         } else { /* normal path */
480                 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
481                         sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
482         }
483
484         correction = want - mdev->rs_in_flight - mdev->rs_planed;
485
486         /* Plan ahead */
487         cps = correction / steps;
488         fifo_add_val(&mdev->rs_plan_s, cps);
489         mdev->rs_planed += cps * steps;
490
491         /* What we do in this step */
492         curr_corr = fifo_push(&mdev->rs_plan_s, 0);
493         spin_unlock(&mdev->peer_seq_lock);
494         mdev->rs_planed -= curr_corr;
495
496         req_sect = sect_in + curr_corr;
497         if (req_sect < 0)
498                 req_sect = 0;
499
500         max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
501         if (req_sect > max_sect)
502                 req_sect = max_sect;
503
504         /*
505         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
506                  sect_in, mdev->rs_in_flight, want, correction,
507                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
508         */
509
510         return req_sect;
511 }
512
513 static int drbd_rs_number_requests(struct drbd_conf *mdev)
514 {
515         int number;
516         if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
517                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
518                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
519         } else {
520                 mdev->c_sync_rate = mdev->sync_conf.rate;
521                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
522         }
523
524         /* ignore the amount of pending requests, the resync controller should
525          * throttle down to incoming reply rate soon enough anyways. */
526         return number;
527 }
528
529 static int w_make_resync_request(struct drbd_conf *mdev,
530                                  struct drbd_work *w, int cancel)
531 {
532         unsigned long bit;
533         sector_t sector;
534         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
535         int max_bio_size;
536         int number, rollback_i, size;
537         int align, queued, sndbuf;
538         int i = 0;
539
540         if (unlikely(cancel))
541                 return 1;
542
543         if (mdev->rs_total == 0) {
544                 /* empty resync? */
545                 drbd_resync_finished(mdev);
546                 return 1;
547         }
548
549         if (!get_ldev(mdev)) {
550                 /* Since we only need to access mdev->rsync a
551                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
552                    to continue resync with a broken disk makes no sense at
553                    all */
554                 dev_err(DEV, "Disk broke down during resync!\n");
555                 return 1;
556         }
557
558         max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
559         number = drbd_rs_number_requests(mdev);
560         if (number == 0)
561                 goto requeue;
562
563         for (i = 0; i < number; i++) {
564                 /* Stop generating RS requests, when half of the send buffer is filled */
565                 mutex_lock(&mdev->data.mutex);
566                 if (mdev->data.socket) {
567                         queued = mdev->data.socket->sk->sk_wmem_queued;
568                         sndbuf = mdev->data.socket->sk->sk_sndbuf;
569                 } else {
570                         queued = 1;
571                         sndbuf = 0;
572                 }
573                 mutex_unlock(&mdev->data.mutex);
574                 if (queued > sndbuf / 2)
575                         goto requeue;
576
577 next_sector:
578                 size = BM_BLOCK_SIZE;
579                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
580
581                 if (bit == DRBD_END_OF_BITMAP) {
582                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
583                         put_ldev(mdev);
584                         return 1;
585                 }
586
587                 sector = BM_BIT_TO_SECT(bit);
588
589                 if (drbd_rs_should_slow_down(mdev, sector) ||
590                     drbd_try_rs_begin_io(mdev, sector)) {
591                         mdev->bm_resync_fo = bit;
592                         goto requeue;
593                 }
594                 mdev->bm_resync_fo = bit + 1;
595
596                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
597                         drbd_rs_complete_io(mdev, sector);
598                         goto next_sector;
599                 }
600
601 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
602                 /* try to find some adjacent bits.
603                  * we stop if we have already the maximum req size.
604                  *
605                  * Additionally always align bigger requests, in order to
606                  * be prepared for all stripe sizes of software RAIDs.
607                  */
608                 align = 1;
609                 rollback_i = i;
610                 for (;;) {
611                         if (size + BM_BLOCK_SIZE > max_bio_size)
612                                 break;
613
614                         /* Be always aligned */
615                         if (sector & ((1<<(align+3))-1))
616                                 break;
617
618                         /* do not cross extent boundaries */
619                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
620                                 break;
621                         /* now, is it actually dirty, after all?
622                          * caution, drbd_bm_test_bit is tri-state for some
623                          * obscure reason; ( b == 0 ) would get the out-of-band
624                          * only accidentally right because of the "oddly sized"
625                          * adjustment below */
626                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
627                                 break;
628                         bit++;
629                         size += BM_BLOCK_SIZE;
630                         if ((BM_BLOCK_SIZE << align) <= size)
631                                 align++;
632                         i++;
633                 }
634                 /* if we merged some,
635                  * reset the offset to start the next drbd_bm_find_next from */
636                 if (size > BM_BLOCK_SIZE)
637                         mdev->bm_resync_fo = bit + 1;
638 #endif
639
640                 /* adjust very last sectors, in case we are oddly sized */
641                 if (sector + (size>>9) > capacity)
642                         size = (capacity-sector)<<9;
643                 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
644                         switch (read_for_csum(mdev, sector, size)) {
645                         case -EIO: /* Disk failure */
646                                 put_ldev(mdev);
647                                 return 0;
648                         case -EAGAIN: /* allocation failed, or ldev busy */
649                                 drbd_rs_complete_io(mdev, sector);
650                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
651                                 i = rollback_i;
652                                 goto requeue;
653                         case 0:
654                                 /* everything ok */
655                                 break;
656                         default:
657                                 BUG();
658                         }
659                 } else {
660                         inc_rs_pending(mdev);
661                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
662                                                sector, size, ID_SYNCER)) {
663                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
664                                 dec_rs_pending(mdev);
665                                 put_ldev(mdev);
666                                 return 0;
667                         }
668                 }
669         }
670
671         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
672                 /* last syncer _request_ was sent,
673                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
674                  * next sync group will resume), as soon as we receive the last
675                  * resync data block, and the last bit is cleared.
676                  * until then resync "work" is "inactive" ...
677                  */
678                 put_ldev(mdev);
679                 return 1;
680         }
681
682  requeue:
683         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
684         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
685         put_ldev(mdev);
686         return 1;
687 }
688
689 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
690 {
691         int number, i, size;
692         sector_t sector;
693         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
694
695         if (unlikely(cancel))
696                 return 1;
697
698         number = drbd_rs_number_requests(mdev);
699
700         sector = mdev->ov_position;
701         for (i = 0; i < number; i++) {
702                 if (sector >= capacity) {
703                         return 1;
704                 }
705
706                 size = BM_BLOCK_SIZE;
707
708                 if (drbd_rs_should_slow_down(mdev, sector) ||
709                     drbd_try_rs_begin_io(mdev, sector)) {
710                         mdev->ov_position = sector;
711                         goto requeue;
712                 }
713
714                 if (sector + (size>>9) > capacity)
715                         size = (capacity-sector)<<9;
716
717                 inc_rs_pending(mdev);
718                 if (!drbd_send_ov_request(mdev, sector, size)) {
719                         dec_rs_pending(mdev);
720                         return 0;
721                 }
722                 sector += BM_SECT_PER_BIT;
723         }
724         mdev->ov_position = sector;
725
726  requeue:
727         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
728         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
729         return 1;
730 }
731
732
733 void start_resync_timer_fn(unsigned long data)
734 {
735         struct drbd_conf *mdev = (struct drbd_conf *) data;
736
737         drbd_queue_work(&mdev->data.work, &mdev->start_resync_work);
738 }
739
740 int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
741 {
742         if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
743                 dev_warn(DEV, "w_start_resync later...\n");
744                 mdev->start_resync_timer.expires = jiffies + HZ/10;
745                 add_timer(&mdev->start_resync_timer);
746                 return 1;
747         }
748
749         drbd_start_resync(mdev, C_SYNC_SOURCE);
750         clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags);
751         return 1;
752 }
753
754 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
755 {
756         kfree(w);
757         ov_oos_print(mdev);
758         drbd_resync_finished(mdev);
759
760         return 1;
761 }
762
763 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
764 {
765         kfree(w);
766
767         drbd_resync_finished(mdev);
768
769         return 1;
770 }
771
772 static void ping_peer(struct drbd_conf *mdev)
773 {
774         clear_bit(GOT_PING_ACK, &mdev->flags);
775         request_ping(mdev);
776         wait_event(mdev->misc_wait,
777                    test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
778 }
779
780 int drbd_resync_finished(struct drbd_conf *mdev)
781 {
782         unsigned long db, dt, dbdt;
783         unsigned long n_oos;
784         union drbd_state os, ns;
785         struct drbd_work *w;
786         char *khelper_cmd = NULL;
787         int verify_done = 0;
788
789         /* Remove all elements from the resync LRU. Since future actions
790          * might set bits in the (main) bitmap, then the entries in the
791          * resync LRU would be wrong. */
792         if (drbd_rs_del_all(mdev)) {
793                 /* In case this is not possible now, most probably because
794                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
795                  * queue (or even the read operations for those packets
796                  * is not finished by now).   Retry in 100ms. */
797
798                 schedule_timeout_interruptible(HZ / 10);
799                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
800                 if (w) {
801                         w->cb = w_resync_finished;
802                         drbd_queue_work(&mdev->data.work, w);
803                         return 1;
804                 }
805                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
806         }
807
808         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
809         if (dt <= 0)
810                 dt = 1;
811         db = mdev->rs_total;
812         dbdt = Bit2KB(db/dt);
813         mdev->rs_paused /= HZ;
814
815         if (!get_ldev(mdev))
816                 goto out;
817
818         ping_peer(mdev);
819
820         spin_lock_irq(&mdev->req_lock);
821         os = mdev->state;
822
823         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
824
825         /* This protects us against multiple calls (that can happen in the presence
826            of application IO), and against connectivity loss just before we arrive here. */
827         if (os.conn <= C_CONNECTED)
828                 goto out_unlock;
829
830         ns = os;
831         ns.conn = C_CONNECTED;
832
833         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
834              verify_done ? "Online verify " : "Resync",
835              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
836
837         n_oos = drbd_bm_total_weight(mdev);
838
839         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
840                 if (n_oos) {
841                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
842                               n_oos, Bit2KB(1));
843                         khelper_cmd = "out-of-sync";
844                 }
845         } else {
846                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
847
848                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
849                         khelper_cmd = "after-resync-target";
850
851                 if (mdev->csums_tfm && mdev->rs_total) {
852                         const unsigned long s = mdev->rs_same_csum;
853                         const unsigned long t = mdev->rs_total;
854                         const int ratio =
855                                 (t == 0)     ? 0 :
856                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
857                         dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
858                              "transferred %luK total %luK\n",
859                              ratio,
860                              Bit2KB(mdev->rs_same_csum),
861                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
862                              Bit2KB(mdev->rs_total));
863                 }
864         }
865
866         if (mdev->rs_failed) {
867                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
868
869                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
870                         ns.disk = D_INCONSISTENT;
871                         ns.pdsk = D_UP_TO_DATE;
872                 } else {
873                         ns.disk = D_UP_TO_DATE;
874                         ns.pdsk = D_INCONSISTENT;
875                 }
876         } else {
877                 ns.disk = D_UP_TO_DATE;
878                 ns.pdsk = D_UP_TO_DATE;
879
880                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
881                         if (mdev->p_uuid) {
882                                 int i;
883                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
884                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
885                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
886                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
887                         } else {
888                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
889                         }
890                 }
891
892                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
893                         /* for verify runs, we don't update uuids here,
894                          * so there would be nothing to report. */
895                         drbd_uuid_set_bm(mdev, 0UL);
896                         drbd_print_uuids(mdev, "updated UUIDs");
897                         if (mdev->p_uuid) {
898                                 /* Now the two UUID sets are equal, update what we
899                                  * know of the peer. */
900                                 int i;
901                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
902                                         mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
903                         }
904                 }
905         }
906
907         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
908 out_unlock:
909         spin_unlock_irq(&mdev->req_lock);
910         put_ldev(mdev);
911 out:
912         mdev->rs_total  = 0;
913         mdev->rs_failed = 0;
914         mdev->rs_paused = 0;
915         if (verify_done)
916                 mdev->ov_start_sector = 0;
917
918         drbd_md_sync(mdev);
919
920         if (khelper_cmd)
921                 drbd_khelper(mdev, khelper_cmd);
922
923         return 1;
924 }
925
926 /* helper */
927 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
928 {
929         if (drbd_ee_has_active_page(e)) {
930                 /* This might happen if sendpage() has not finished */
931                 int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
932                 atomic_add(i, &mdev->pp_in_use_by_net);
933                 atomic_sub(i, &mdev->pp_in_use);
934                 spin_lock_irq(&mdev->req_lock);
935                 list_add_tail(&e->w.list, &mdev->net_ee);
936                 spin_unlock_irq(&mdev->req_lock);
937                 wake_up(&drbd_pp_wait);
938         } else
939                 drbd_free_ee(mdev, e);
940 }
941
942 /**
943  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
944  * @mdev:       DRBD device.
945  * @w:          work object.
946  * @cancel:     The connection will be closed anyways
947  */
948 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
949 {
950         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
951         int ok;
952
953         if (unlikely(cancel)) {
954                 drbd_free_ee(mdev, e);
955                 dec_unacked(mdev);
956                 return 1;
957         }
958
959         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
960                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
961         } else {
962                 if (__ratelimit(&drbd_ratelimit_state))
963                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
964                             (unsigned long long)e->sector);
965
966                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
967         }
968
969         dec_unacked(mdev);
970
971         move_to_net_ee_or_free(mdev, e);
972
973         if (unlikely(!ok))
974                 dev_err(DEV, "drbd_send_block() failed\n");
975         return ok;
976 }
977
978 /**
979  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
980  * @mdev:       DRBD device.
981  * @w:          work object.
982  * @cancel:     The connection will be closed anyways
983  */
984 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
985 {
986         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
987         int ok;
988
989         if (unlikely(cancel)) {
990                 drbd_free_ee(mdev, e);
991                 dec_unacked(mdev);
992                 return 1;
993         }
994
995         if (get_ldev_if_state(mdev, D_FAILED)) {
996                 drbd_rs_complete_io(mdev, e->sector);
997                 put_ldev(mdev);
998         }
999
1000         if (mdev->state.conn == C_AHEAD) {
1001                 ok = drbd_send_ack(mdev, P_RS_CANCEL, e);
1002         } else if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1003                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
1004                         inc_rs_pending(mdev);
1005                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1006                 } else {
1007                         if (__ratelimit(&drbd_ratelimit_state))
1008                                 dev_err(DEV, "Not sending RSDataReply, "
1009                                     "partner DISKLESS!\n");
1010                         ok = 1;
1011                 }
1012         } else {
1013                 if (__ratelimit(&drbd_ratelimit_state))
1014                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1015                             (unsigned long long)e->sector);
1016
1017                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1018
1019                 /* update resync data with failure */
1020                 drbd_rs_failed_io(mdev, e->sector, e->size);
1021         }
1022
1023         dec_unacked(mdev);
1024
1025         move_to_net_ee_or_free(mdev, e);
1026
1027         if (unlikely(!ok))
1028                 dev_err(DEV, "drbd_send_block() failed\n");
1029         return ok;
1030 }
1031
1032 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1033 {
1034         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1035         struct digest_info *di;
1036         int digest_size;
1037         void *digest = NULL;
1038         int ok, eq = 0;
1039
1040         if (unlikely(cancel)) {
1041                 drbd_free_ee(mdev, e);
1042                 dec_unacked(mdev);
1043                 return 1;
1044         }
1045
1046         if (get_ldev(mdev)) {
1047                 drbd_rs_complete_io(mdev, e->sector);
1048                 put_ldev(mdev);
1049         }
1050
1051         di = e->digest;
1052
1053         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1054                 /* quick hack to try to avoid a race against reconfiguration.
1055                  * a real fix would be much more involved,
1056                  * introducing more locking mechanisms */
1057                 if (mdev->csums_tfm) {
1058                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1059                         D_ASSERT(digest_size == di->digest_size);
1060                         digest = kmalloc(digest_size, GFP_NOIO);
1061                 }
1062                 if (digest) {
1063                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1064                         eq = !memcmp(digest, di->digest, digest_size);
1065                         kfree(digest);
1066                 }
1067
1068                 if (eq) {
1069                         drbd_set_in_sync(mdev, e->sector, e->size);
1070                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1071                         mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1072                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1073                 } else {
1074                         inc_rs_pending(mdev);
1075                         e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1076                         e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1077                         kfree(di);
1078                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1079                 }
1080         } else {
1081                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1082                 if (__ratelimit(&drbd_ratelimit_state))
1083                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1084         }
1085
1086         dec_unacked(mdev);
1087         move_to_net_ee_or_free(mdev, e);
1088
1089         if (unlikely(!ok))
1090                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1091         return ok;
1092 }
1093
1094 /* TODO merge common code with w_e_send_csum */
1095 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1096 {
1097         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1098         sector_t sector = e->sector;
1099         unsigned int size = e->size;
1100         int digest_size;
1101         void *digest;
1102         int ok = 1;
1103
1104         if (unlikely(cancel))
1105                 goto out;
1106
1107         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1108         digest = kmalloc(digest_size, GFP_NOIO);
1109         if (!digest) {
1110                 ok = 0; /* terminate the connection in case the allocation failed */
1111                 goto out;
1112         }
1113
1114         if (likely(!(e->flags & EE_WAS_ERROR)))
1115                 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1116         else
1117                 memset(digest, 0, digest_size);
1118
1119         /* Free e and pages before send.
1120          * In case we block on congestion, we could otherwise run into
1121          * some distributed deadlock, if the other side blocks on
1122          * congestion as well, because our receiver blocks in
1123          * drbd_pp_alloc due to pp_in_use > max_buffers. */
1124         drbd_free_ee(mdev, e);
1125         e = NULL;
1126         inc_rs_pending(mdev);
1127         ok = drbd_send_drequest_csum(mdev, sector, size,
1128                                      digest, digest_size,
1129                                      P_OV_REPLY);
1130         if (!ok)
1131                 dec_rs_pending(mdev);
1132         kfree(digest);
1133
1134 out:
1135         if (e)
1136                 drbd_free_ee(mdev, e);
1137         dec_unacked(mdev);
1138         return ok;
1139 }
1140
1141 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1142 {
1143         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1144                 mdev->ov_last_oos_size += size>>9;
1145         } else {
1146                 mdev->ov_last_oos_start = sector;
1147                 mdev->ov_last_oos_size = size>>9;
1148         }
1149         drbd_set_out_of_sync(mdev, sector, size);
1150 }
1151
1152 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1153 {
1154         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1155         struct digest_info *di;
1156         void *digest;
1157         sector_t sector = e->sector;
1158         unsigned int size = e->size;
1159         int digest_size;
1160         int ok, eq = 0;
1161
1162         if (unlikely(cancel)) {
1163                 drbd_free_ee(mdev, e);
1164                 dec_unacked(mdev);
1165                 return 1;
1166         }
1167
1168         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1169          * the resync lru has been cleaned up already */
1170         if (get_ldev(mdev)) {
1171                 drbd_rs_complete_io(mdev, e->sector);
1172                 put_ldev(mdev);
1173         }
1174
1175         di = e->digest;
1176
1177         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1178                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1179                 digest = kmalloc(digest_size, GFP_NOIO);
1180                 if (digest) {
1181                         drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1182
1183                         D_ASSERT(digest_size == di->digest_size);
1184                         eq = !memcmp(digest, di->digest, digest_size);
1185                         kfree(digest);
1186                 }
1187         }
1188
1189                 /* Free e and pages before send.
1190                  * In case we block on congestion, we could otherwise run into
1191                  * some distributed deadlock, if the other side blocks on
1192                  * congestion as well, because our receiver blocks in
1193                  * drbd_pp_alloc due to pp_in_use > max_buffers. */
1194         drbd_free_ee(mdev, e);
1195         if (!eq)
1196                 drbd_ov_oos_found(mdev, sector, size);
1197         else
1198                 ov_oos_print(mdev);
1199
1200         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1201                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1202
1203         dec_unacked(mdev);
1204
1205         --mdev->ov_left;
1206
1207         /* let's advance progress step marks only for every other megabyte */
1208         if ((mdev->ov_left & 0x200) == 0x200)
1209                 drbd_advance_rs_marks(mdev, mdev->ov_left);
1210
1211         if (mdev->ov_left == 0) {
1212                 ov_oos_print(mdev);
1213                 drbd_resync_finished(mdev);
1214         }
1215
1216         return ok;
1217 }
1218
1219 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1220 {
1221         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1222         complete(&b->done);
1223         return 1;
1224 }
1225
1226 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1227 {
1228         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1229         struct p_barrier *p = &mdev->data.sbuf.barrier;
1230         int ok = 1;
1231
1232         /* really avoid racing with tl_clear.  w.cb may have been referenced
1233          * just before it was reassigned and re-queued, so double check that.
1234          * actually, this race was harmless, since we only try to send the
1235          * barrier packet here, and otherwise do nothing with the object.
1236          * but compare with the head of w_clear_epoch */
1237         spin_lock_irq(&mdev->req_lock);
1238         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1239                 cancel = 1;
1240         spin_unlock_irq(&mdev->req_lock);
1241         if (cancel)
1242                 return 1;
1243
1244         if (!drbd_get_data_sock(mdev))
1245                 return 0;
1246         p->barrier = b->br_number;
1247         /* inc_ap_pending was done where this was queued.
1248          * dec_ap_pending will be done in got_BarrierAck
1249          * or (on connection loss) in w_clear_epoch.  */
1250         ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1251                                 (struct p_header80 *)p, sizeof(*p), 0);
1252         drbd_put_data_sock(mdev);
1253
1254         return ok;
1255 }
1256
1257 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1258 {
1259         if (cancel)
1260                 return 1;
1261         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1262 }
1263
1264 int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1265 {
1266         struct drbd_request *req = container_of(w, struct drbd_request, w);
1267         int ok;
1268
1269         if (unlikely(cancel)) {
1270                 req_mod(req, send_canceled);
1271                 return 1;
1272         }
1273
1274         ok = drbd_send_oos(mdev, req);
1275         req_mod(req, oos_handed_to_network);
1276
1277         return ok;
1278 }
1279
1280 /**
1281  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1282  * @mdev:       DRBD device.
1283  * @w:          work object.
1284  * @cancel:     The connection will be closed anyways
1285  */
1286 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1287 {
1288         struct drbd_request *req = container_of(w, struct drbd_request, w);
1289         int ok;
1290
1291         if (unlikely(cancel)) {
1292                 req_mod(req, send_canceled);
1293                 return 1;
1294         }
1295
1296         ok = drbd_send_dblock(mdev, req);
1297         req_mod(req, ok ? handed_over_to_network : send_failed);
1298
1299         return ok;
1300 }
1301
1302 /**
1303  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1304  * @mdev:       DRBD device.
1305  * @w:          work object.
1306  * @cancel:     The connection will be closed anyways
1307  */
1308 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1309 {
1310         struct drbd_request *req = container_of(w, struct drbd_request, w);
1311         int ok;
1312
1313         if (unlikely(cancel)) {
1314                 req_mod(req, send_canceled);
1315                 return 1;
1316         }
1317
1318         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1319                                 (unsigned long)req);
1320
1321         if (!ok) {
1322                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1323                  * so this is probably redundant */
1324                 if (mdev->state.conn >= C_CONNECTED)
1325                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1326         }
1327         req_mod(req, ok ? handed_over_to_network : send_failed);
1328
1329         return ok;
1330 }
1331
1332 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1333 {
1334         struct drbd_request *req = container_of(w, struct drbd_request, w);
1335
1336         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1337                 drbd_al_begin_io(mdev, req->sector);
1338         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1339            theoretically. Practically it can not deadlock, since this is
1340            only used when unfreezing IOs. All the extents of the requests
1341            that made it into the TL are already active */
1342
1343         drbd_req_make_private_bio(req, req->master_bio);
1344         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1345         generic_make_request(req->private_bio);
1346
1347         return 1;
1348 }
1349
1350 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1351 {
1352         struct drbd_conf *odev = mdev;
1353
1354         while (1) {
1355                 if (odev->sync_conf.after == -1)
1356                         return 1;
1357                 odev = minor_to_mdev(odev->sync_conf.after);
1358                 ERR_IF(!odev) return 1;
1359                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1360                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1361                     odev->state.aftr_isp || odev->state.peer_isp ||
1362                     odev->state.user_isp)
1363                         return 0;
1364         }
1365 }
1366
1367 /**
1368  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1369  * @mdev:       DRBD device.
1370  *
1371  * Called from process context only (admin command and after_state_ch).
1372  */
1373 static int _drbd_pause_after(struct drbd_conf *mdev)
1374 {
1375         struct drbd_conf *odev;
1376         int i, rv = 0;
1377
1378         for (i = 0; i < minor_count; i++) {
1379                 odev = minor_to_mdev(i);
1380                 if (!odev)
1381                         continue;
1382                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1383                         continue;
1384                 if (!_drbd_may_sync_now(odev))
1385                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1386                                != SS_NOTHING_TO_DO);
1387         }
1388
1389         return rv;
1390 }
1391
1392 /**
1393  * _drbd_resume_next() - Resume resync on all devices that may resync now
1394  * @mdev:       DRBD device.
1395  *
1396  * Called from process context only (admin command and worker).
1397  */
1398 static int _drbd_resume_next(struct drbd_conf *mdev)
1399 {
1400         struct drbd_conf *odev;
1401         int i, rv = 0;
1402
1403         for (i = 0; i < minor_count; i++) {
1404                 odev = minor_to_mdev(i);
1405                 if (!odev)
1406                         continue;
1407                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1408                         continue;
1409                 if (odev->state.aftr_isp) {
1410                         if (_drbd_may_sync_now(odev))
1411                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1412                                                         CS_HARD, NULL)
1413                                        != SS_NOTHING_TO_DO) ;
1414                 }
1415         }
1416         return rv;
1417 }
1418
1419 void resume_next_sg(struct drbd_conf *mdev)
1420 {
1421         write_lock_irq(&global_state_lock);
1422         _drbd_resume_next(mdev);
1423         write_unlock_irq(&global_state_lock);
1424 }
1425
1426 void suspend_other_sg(struct drbd_conf *mdev)
1427 {
1428         write_lock_irq(&global_state_lock);
1429         _drbd_pause_after(mdev);
1430         write_unlock_irq(&global_state_lock);
1431 }
1432
1433 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1434 {
1435         struct drbd_conf *odev;
1436
1437         if (o_minor == -1)
1438                 return NO_ERROR;
1439         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1440                 return ERR_SYNC_AFTER;
1441
1442         /* check for loops */
1443         odev = minor_to_mdev(o_minor);
1444         while (1) {
1445                 if (odev == mdev)
1446                         return ERR_SYNC_AFTER_CYCLE;
1447
1448                 /* dependency chain ends here, no cycles. */
1449                 if (odev->sync_conf.after == -1)
1450                         return NO_ERROR;
1451
1452                 /* follow the dependency chain */
1453                 odev = minor_to_mdev(odev->sync_conf.after);
1454         }
1455 }
1456
1457 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1458 {
1459         int changes;
1460         int retcode;
1461
1462         write_lock_irq(&global_state_lock);
1463         retcode = sync_after_error(mdev, na);
1464         if (retcode == NO_ERROR) {
1465                 mdev->sync_conf.after = na;
1466                 do {
1467                         changes  = _drbd_pause_after(mdev);
1468                         changes |= _drbd_resume_next(mdev);
1469                 } while (changes);
1470         }
1471         write_unlock_irq(&global_state_lock);
1472         return retcode;
1473 }
1474
1475 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1476 {
1477         atomic_set(&mdev->rs_sect_in, 0);
1478         atomic_set(&mdev->rs_sect_ev, 0);
1479         mdev->rs_in_flight = 0;
1480         mdev->rs_planed = 0;
1481         spin_lock(&mdev->peer_seq_lock);
1482         fifo_set(&mdev->rs_plan_s, 0);
1483         spin_unlock(&mdev->peer_seq_lock);
1484 }
1485
1486 /**
1487  * drbd_start_resync() - Start the resync process
1488  * @mdev:       DRBD device.
1489  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1490  *
1491  * This function might bring you directly into one of the
1492  * C_PAUSED_SYNC_* states.
1493  */
1494 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1495 {
1496         union drbd_state ns;
1497         int r;
1498
1499         if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1500                 dev_err(DEV, "Resync already running!\n");
1501                 return;
1502         }
1503
1504         if (mdev->state.conn < C_AHEAD) {
1505                 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1506                 drbd_rs_cancel_all(mdev);
1507                 /* This should be done when we abort the resync. We definitely do not
1508                    want to have this for connections going back and forth between
1509                    Ahead/Behind and SyncSource/SyncTarget */
1510         }
1511
1512         if (side == C_SYNC_TARGET) {
1513                 /* Since application IO was locked out during C_WF_BITMAP_T and
1514                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1515                    we check that we might make the data inconsistent. */
1516                 r = drbd_khelper(mdev, "before-resync-target");
1517                 r = (r >> 8) & 0xff;
1518                 if (r > 0) {
1519                         dev_info(DEV, "before-resync-target handler returned %d, "
1520                              "dropping connection.\n", r);
1521                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1522                         return;
1523                 }
1524         } else /* C_SYNC_SOURCE */ {
1525                 r = drbd_khelper(mdev, "before-resync-source");
1526                 r = (r >> 8) & 0xff;
1527                 if (r > 0) {
1528                         if (r == 3) {
1529                                 dev_info(DEV, "before-resync-source handler returned %d, "
1530                                          "ignoring. Old userland tools?", r);
1531                         } else {
1532                                 dev_info(DEV, "before-resync-source handler returned %d, "
1533                                          "dropping connection.\n", r);
1534                                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1535                                 return;
1536                         }
1537                 }
1538         }
1539
1540         drbd_state_lock(mdev);
1541         write_lock_irq(&global_state_lock);
1542         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1543                 write_unlock_irq(&global_state_lock);
1544                 drbd_state_unlock(mdev);
1545                 return;
1546         }
1547
1548         ns.i = mdev->state.i;
1549
1550         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1551
1552         ns.conn = side;
1553
1554         if (side == C_SYNC_TARGET)
1555                 ns.disk = D_INCONSISTENT;
1556         else /* side == C_SYNC_SOURCE */
1557                 ns.pdsk = D_INCONSISTENT;
1558
1559         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1560         ns = mdev->state;
1561
1562         if (ns.conn < C_CONNECTED)
1563                 r = SS_UNKNOWN_ERROR;
1564
1565         if (r == SS_SUCCESS) {
1566                 unsigned long tw = drbd_bm_total_weight(mdev);
1567                 unsigned long now = jiffies;
1568                 int i;
1569
1570                 mdev->rs_failed    = 0;
1571                 mdev->rs_paused    = 0;
1572                 mdev->rs_same_csum = 0;
1573                 mdev->rs_last_events = 0;
1574                 mdev->rs_last_sect_ev = 0;
1575                 mdev->rs_total     = tw;
1576                 mdev->rs_start     = now;
1577                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1578                         mdev->rs_mark_left[i] = tw;
1579                         mdev->rs_mark_time[i] = now;
1580                 }
1581                 _drbd_pause_after(mdev);
1582         }
1583         write_unlock_irq(&global_state_lock);
1584
1585         if (r == SS_SUCCESS) {
1586                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1587                      drbd_conn_str(ns.conn),
1588                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1589                      (unsigned long) mdev->rs_total);
1590                 if (side == C_SYNC_TARGET)
1591                         mdev->bm_resync_fo = 0;
1592
1593                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1594                  * with w_send_oos, or the sync target will get confused as to
1595                  * how much bits to resync.  We cannot do that always, because for an
1596                  * empty resync and protocol < 95, we need to do it here, as we call
1597                  * drbd_resync_finished from here in that case.
1598                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1599                  * and from after_state_ch otherwise. */
1600                 if (side == C_SYNC_SOURCE && mdev->agreed_pro_version < 96)
1601                         drbd_gen_and_send_sync_uuid(mdev);
1602
1603                 if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1604                         /* This still has a race (about when exactly the peers
1605                          * detect connection loss) that can lead to a full sync
1606                          * on next handshake. In 8.3.9 we fixed this with explicit
1607                          * resync-finished notifications, but the fix
1608                          * introduces a protocol change.  Sleeping for some
1609                          * time longer than the ping interval + timeout on the
1610                          * SyncSource, to give the SyncTarget the chance to
1611                          * detect connection loss, then waiting for a ping
1612                          * response (implicit in drbd_resync_finished) reduces
1613                          * the race considerably, but does not solve it. */
1614                         if (side == C_SYNC_SOURCE)
1615                                 schedule_timeout_interruptible(
1616                                         mdev->net_conf->ping_int * HZ +
1617                                         mdev->net_conf->ping_timeo*HZ/9);
1618                         drbd_resync_finished(mdev);
1619                 }
1620
1621                 drbd_rs_controller_reset(mdev);
1622                 /* ns.conn may already be != mdev->state.conn,
1623                  * we may have been paused in between, or become paused until
1624                  * the timer triggers.
1625                  * No matter, that is handled in resync_timer_fn() */
1626                 if (ns.conn == C_SYNC_TARGET)
1627                         mod_timer(&mdev->resync_timer, jiffies);
1628
1629                 drbd_md_sync(mdev);
1630         }
1631         put_ldev(mdev);
1632         drbd_state_unlock(mdev);
1633 }
1634
1635 int drbd_worker(struct drbd_thread *thi)
1636 {
1637         struct drbd_conf *mdev = thi->mdev;
1638         struct drbd_work *w = NULL;
1639         LIST_HEAD(work_list);
1640         int intr = 0, i;
1641
1642         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1643
1644         while (get_t_state(thi) == Running) {
1645                 drbd_thread_current_set_cpu(mdev);
1646
1647                 if (down_trylock(&mdev->data.work.s)) {
1648                         mutex_lock(&mdev->data.mutex);
1649                         if (mdev->data.socket && !mdev->net_conf->no_cork)
1650                                 drbd_tcp_uncork(mdev->data.socket);
1651                         mutex_unlock(&mdev->data.mutex);
1652
1653                         intr = down_interruptible(&mdev->data.work.s);
1654
1655                         mutex_lock(&mdev->data.mutex);
1656                         if (mdev->data.socket  && !mdev->net_conf->no_cork)
1657                                 drbd_tcp_cork(mdev->data.socket);
1658                         mutex_unlock(&mdev->data.mutex);
1659                 }
1660
1661                 if (intr) {
1662                         D_ASSERT(intr == -EINTR);
1663                         flush_signals(current);
1664                         ERR_IF (get_t_state(thi) == Running)
1665                                 continue;
1666                         break;
1667                 }
1668
1669                 if (get_t_state(thi) != Running)
1670                         break;
1671                 /* With this break, we have done a down() but not consumed
1672                    the entry from the list. The cleanup code takes care of
1673                    this...   */
1674
1675                 w = NULL;
1676                 spin_lock_irq(&mdev->data.work.q_lock);
1677                 ERR_IF(list_empty(&mdev->data.work.q)) {
1678                         /* something terribly wrong in our logic.
1679                          * we were able to down() the semaphore,
1680                          * but the list is empty... doh.
1681                          *
1682                          * what is the best thing to do now?
1683                          * try again from scratch, restarting the receiver,
1684                          * asender, whatnot? could break even more ugly,
1685                          * e.g. when we are primary, but no good local data.
1686                          *
1687                          * I'll try to get away just starting over this loop.
1688                          */
1689                         spin_unlock_irq(&mdev->data.work.q_lock);
1690                         continue;
1691                 }
1692                 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1693                 list_del_init(&w->list);
1694                 spin_unlock_irq(&mdev->data.work.q_lock);
1695
1696                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1697                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1698                         if (mdev->state.conn >= C_CONNECTED)
1699                                 drbd_force_state(mdev,
1700                                                 NS(conn, C_NETWORK_FAILURE));
1701                 }
1702         }
1703         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1704         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1705
1706         spin_lock_irq(&mdev->data.work.q_lock);
1707         i = 0;
1708         while (!list_empty(&mdev->data.work.q)) {
1709                 list_splice_init(&mdev->data.work.q, &work_list);
1710                 spin_unlock_irq(&mdev->data.work.q_lock);
1711
1712                 while (!list_empty(&work_list)) {
1713                         w = list_entry(work_list.next, struct drbd_work, list);
1714                         list_del_init(&w->list);
1715                         w->cb(mdev, w, 1);
1716                         i++; /* dead debugging code */
1717                 }
1718
1719                 spin_lock_irq(&mdev->data.work.q_lock);
1720         }
1721         sema_init(&mdev->data.work.s, 0);
1722         /* DANGEROUS race: if someone did queue his work within the spinlock,
1723          * but up() ed outside the spinlock, we could get an up() on the
1724          * semaphore without corresponding list entry.
1725          * So don't do that.
1726          */
1727         spin_unlock_irq(&mdev->data.work.q_lock);
1728
1729         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1730         /* _drbd_set_state only uses stop_nowait.
1731          * wait here for the Exiting receiver. */
1732         drbd_thread_stop(&mdev->receiver);
1733         drbd_mdev_cleanup(mdev);
1734
1735         dev_info(DEV, "worker terminated\n");
1736
1737         clear_bit(DEVICE_DYING, &mdev->flags);
1738         clear_bit(CONFIG_PENDING, &mdev->flags);
1739         wake_up(&mdev->state_wait);
1740
1741         return 0;
1742 }