drbd: Converted drbd_calc_cpu_mask() and drbd_thread_current_set_cpu() from mdev...
[cascardo/linux.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42 static int w_make_resync_request(struct drbd_conf *mdev,
43                                  struct drbd_work *w, int cancel);
44
45
46
47 /* endio handlers:
48  *   drbd_md_io_complete (defined here)
49  *   drbd_endio_pri (defined here)
50  *   drbd_endio_sec (defined here)
51  *   bm_async_io_complete (defined in drbd_bitmap.c)
52  *
53  * For all these callbacks, note the following:
54  * The callbacks will be called in irq context by the IDE drivers,
55  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
56  * Try to get the locking right :)
57  *
58  */
59
60
61 /* About the global_state_lock
62    Each state transition on an device holds a read lock. In case we have
63    to evaluate the sync after dependencies, we grab a write lock, because
64    we need stable states on all devices for that.  */
65 rwlock_t global_state_lock;
66
67 /* used for synchronous meta data and bitmap IO
68  * submitted by drbd_md_sync_page_io()
69  */
70 void drbd_md_io_complete(struct bio *bio, int error)
71 {
72         struct drbd_md_io *md_io;
73
74         md_io = (struct drbd_md_io *)bio->bi_private;
75         md_io->error = error;
76
77         complete(&md_io->event);
78 }
79
80 /* reads on behalf of the partner,
81  * "submitted" by the receiver
82  */
83 void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
84 {
85         unsigned long flags = 0;
86         struct drbd_conf *mdev = peer_req->mdev;
87
88         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
89         mdev->read_cnt += peer_req->i.size >> 9;
90         list_del(&peer_req->w.list);
91         if (list_empty(&mdev->read_ee))
92                 wake_up(&mdev->ee_wait);
93         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
94                 __drbd_chk_io_error(mdev, false);
95         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
96
97         drbd_queue_work(&mdev->tconn->data.work, &peer_req->w);
98         put_ldev(mdev);
99 }
100
101 /* writes on behalf of the partner, or resync writes,
102  * "submitted" by the receiver, final stage.  */
103 static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
104 {
105         unsigned long flags = 0;
106         struct drbd_conf *mdev = peer_req->mdev;
107         sector_t e_sector;
108         int do_wake;
109         u64 block_id;
110         int do_al_complete_io;
111
112         /* after we moved peer_req to done_ee,
113          * we may no longer access it,
114          * it may be freed/reused already!
115          * (as soon as we release the req_lock) */
116         e_sector = peer_req->i.sector;
117         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
118         block_id = peer_req->block_id;
119
120         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
121         mdev->writ_cnt += peer_req->i.size >> 9;
122         list_del(&peer_req->w.list); /* has been on active_ee or sync_ee */
123         list_add_tail(&peer_req->w.list, &mdev->done_ee);
124
125         /*
126          * Do not remove from the write_requests tree here: we did not send the
127          * Ack yet and did not wake possibly waiting conflicting requests.
128          * Removed from the tree from "drbd_process_done_ee" within the
129          * appropriate w.cb (e_end_block/e_end_resync_block) or from
130          * _drbd_clear_done_ee.
131          */
132
133         do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
134
135         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
136                 __drbd_chk_io_error(mdev, false);
137         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
138
139         if (block_id == ID_SYNCER)
140                 drbd_rs_complete_io(mdev, e_sector);
141
142         if (do_wake)
143                 wake_up(&mdev->ee_wait);
144
145         if (do_al_complete_io)
146                 drbd_al_complete_io(mdev, e_sector);
147
148         wake_asender(mdev->tconn);
149         put_ldev(mdev);
150 }
151
152 /* writes on behalf of the partner, or resync writes,
153  * "submitted" by the receiver.
154  */
155 void drbd_endio_sec(struct bio *bio, int error)
156 {
157         struct drbd_peer_request *peer_req = bio->bi_private;
158         struct drbd_conf *mdev = peer_req->mdev;
159         int uptodate = bio_flagged(bio, BIO_UPTODATE);
160         int is_write = bio_data_dir(bio) == WRITE;
161
162         if (error && __ratelimit(&drbd_ratelimit_state))
163                 dev_warn(DEV, "%s: error=%d s=%llus\n",
164                                 is_write ? "write" : "read", error,
165                                 (unsigned long long)peer_req->i.sector);
166         if (!error && !uptodate) {
167                 if (__ratelimit(&drbd_ratelimit_state))
168                         dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
169                                         is_write ? "write" : "read",
170                                         (unsigned long long)peer_req->i.sector);
171                 /* strange behavior of some lower level drivers...
172                  * fail the request by clearing the uptodate flag,
173                  * but do not return any error?! */
174                 error = -EIO;
175         }
176
177         if (error)
178                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
179
180         bio_put(bio); /* no need for the bio anymore */
181         if (atomic_dec_and_test(&peer_req->pending_bios)) {
182                 if (is_write)
183                         drbd_endio_write_sec_final(peer_req);
184                 else
185                         drbd_endio_read_sec_final(peer_req);
186         }
187 }
188
189 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
190  */
191 void drbd_endio_pri(struct bio *bio, int error)
192 {
193         unsigned long flags;
194         struct drbd_request *req = bio->bi_private;
195         struct drbd_conf *mdev = req->mdev;
196         struct bio_and_error m;
197         enum drbd_req_event what;
198         int uptodate = bio_flagged(bio, BIO_UPTODATE);
199
200         if (!error && !uptodate) {
201                 dev_warn(DEV, "p %s: setting error to -EIO\n",
202                          bio_data_dir(bio) == WRITE ? "write" : "read");
203                 /* strange behavior of some lower level drivers...
204                  * fail the request by clearing the uptodate flag,
205                  * but do not return any error?! */
206                 error = -EIO;
207         }
208
209         /* to avoid recursion in __req_mod */
210         if (unlikely(error)) {
211                 what = (bio_data_dir(bio) == WRITE)
212                         ? WRITE_COMPLETED_WITH_ERROR
213                         : (bio_rw(bio) == READ)
214                           ? READ_COMPLETED_WITH_ERROR
215                           : READ_AHEAD_COMPLETED_WITH_ERROR;
216         } else
217                 what = COMPLETED_OK;
218
219         bio_put(req->private_bio);
220         req->private_bio = ERR_PTR(error);
221
222         /* not req_mod(), we need irqsave here! */
223         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
224         __req_mod(req, what, &m);
225         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
226
227         if (m.bio)
228                 complete_master_bio(mdev, &m);
229 }
230
231 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
232 {
233         struct drbd_request *req = container_of(w, struct drbd_request, w);
234
235         /* We should not detach for read io-error,
236          * but try to WRITE the P_DATA_REPLY to the failed location,
237          * to give the disk the chance to relocate that block */
238
239         spin_lock_irq(&mdev->tconn->req_lock);
240         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
241                 _req_mod(req, READ_RETRY_REMOTE_CANCELED);
242                 spin_unlock_irq(&mdev->tconn->req_lock);
243                 return 1;
244         }
245         spin_unlock_irq(&mdev->tconn->req_lock);
246
247         return w_send_read_req(mdev, w, 0);
248 }
249
250 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
251                   struct drbd_peer_request *peer_req, void *digest)
252 {
253         struct hash_desc desc;
254         struct scatterlist sg;
255         struct page *page = peer_req->pages;
256         struct page *tmp;
257         unsigned len;
258
259         desc.tfm = tfm;
260         desc.flags = 0;
261
262         sg_init_table(&sg, 1);
263         crypto_hash_init(&desc);
264
265         while ((tmp = page_chain_next(page))) {
266                 /* all but the last page will be fully used */
267                 sg_set_page(&sg, page, PAGE_SIZE, 0);
268                 crypto_hash_update(&desc, &sg, sg.length);
269                 page = tmp;
270         }
271         /* and now the last, possibly only partially used page */
272         len = peer_req->i.size & (PAGE_SIZE - 1);
273         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
274         crypto_hash_update(&desc, &sg, sg.length);
275         crypto_hash_final(&desc, digest);
276 }
277
278 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
279 {
280         struct hash_desc desc;
281         struct scatterlist sg;
282         struct bio_vec *bvec;
283         int i;
284
285         desc.tfm = tfm;
286         desc.flags = 0;
287
288         sg_init_table(&sg, 1);
289         crypto_hash_init(&desc);
290
291         __bio_for_each_segment(bvec, bio, i, 0) {
292                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
293                 crypto_hash_update(&desc, &sg, sg.length);
294         }
295         crypto_hash_final(&desc, digest);
296 }
297
298 /* TODO merge common code with w_e_end_ov_req */
299 int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
300 {
301         struct drbd_peer_request *peer_req =
302                 container_of(w, struct drbd_peer_request, w);
303         int digest_size;
304         void *digest;
305         int ok = 1;
306
307         if (unlikely(cancel))
308                 goto out;
309
310         if (likely((peer_req->flags & EE_WAS_ERROR) != 0))
311                 goto out;
312
313         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
314         digest = kmalloc(digest_size, GFP_NOIO);
315         if (digest) {
316                 sector_t sector = peer_req->i.sector;
317                 unsigned int size = peer_req->i.size;
318                 drbd_csum_ee(mdev, mdev->csums_tfm, peer_req, digest);
319                 /* Free e and pages before send.
320                  * In case we block on congestion, we could otherwise run into
321                  * some distributed deadlock, if the other side blocks on
322                  * congestion as well, because our receiver blocks in
323                  * drbd_pp_alloc due to pp_in_use > max_buffers. */
324                 drbd_free_ee(mdev, peer_req);
325                 peer_req = NULL;
326                 inc_rs_pending(mdev);
327                 ok = drbd_send_drequest_csum(mdev, sector, size,
328                                              digest, digest_size,
329                                              P_CSUM_RS_REQUEST);
330                 kfree(digest);
331         } else {
332                 dev_err(DEV, "kmalloc() of digest failed.\n");
333                 ok = 0;
334         }
335
336 out:
337         if (peer_req)
338                 drbd_free_ee(mdev, peer_req);
339
340         if (unlikely(!ok))
341                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
342         return ok;
343 }
344
345 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
346
347 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
348 {
349         struct drbd_peer_request *peer_req;
350
351         if (!get_ldev(mdev))
352                 return -EIO;
353
354         if (drbd_rs_should_slow_down(mdev, sector))
355                 goto defer;
356
357         /* GFP_TRY, because if there is no memory available right now, this may
358          * be rescheduled for later. It is "only" background resync, after all. */
359         peer_req = drbd_alloc_ee(mdev, ID_SYNCER /* unused */, sector, size, GFP_TRY);
360         if (!peer_req)
361                 goto defer;
362
363         peer_req->w.cb = w_e_send_csum;
364         spin_lock_irq(&mdev->tconn->req_lock);
365         list_add(&peer_req->w.list, &mdev->read_ee);
366         spin_unlock_irq(&mdev->tconn->req_lock);
367
368         atomic_add(size >> 9, &mdev->rs_sect_ev);
369         if (drbd_submit_ee(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
370                 return 0;
371
372         /* If it failed because of ENOMEM, retry should help.  If it failed
373          * because bio_add_page failed (probably broken lower level driver),
374          * retry may or may not help.
375          * If it does not, you may need to force disconnect. */
376         spin_lock_irq(&mdev->tconn->req_lock);
377         list_del(&peer_req->w.list);
378         spin_unlock_irq(&mdev->tconn->req_lock);
379
380         drbd_free_ee(mdev, peer_req);
381 defer:
382         put_ldev(mdev);
383         return -EAGAIN;
384 }
385
386 int w_resync_timer(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
387 {
388         switch (mdev->state.conn) {
389         case C_VERIFY_S:
390                 w_make_ov_request(mdev, w, cancel);
391                 break;
392         case C_SYNC_TARGET:
393                 w_make_resync_request(mdev, w, cancel);
394                 break;
395         }
396
397         return 1;
398 }
399
400 void resync_timer_fn(unsigned long data)
401 {
402         struct drbd_conf *mdev = (struct drbd_conf *) data;
403
404         if (list_empty(&mdev->resync_work.list))
405                 drbd_queue_work(&mdev->tconn->data.work, &mdev->resync_work);
406 }
407
408 static void fifo_set(struct fifo_buffer *fb, int value)
409 {
410         int i;
411
412         for (i = 0; i < fb->size; i++)
413                 fb->values[i] = value;
414 }
415
416 static int fifo_push(struct fifo_buffer *fb, int value)
417 {
418         int ov;
419
420         ov = fb->values[fb->head_index];
421         fb->values[fb->head_index++] = value;
422
423         if (fb->head_index >= fb->size)
424                 fb->head_index = 0;
425
426         return ov;
427 }
428
429 static void fifo_add_val(struct fifo_buffer *fb, int value)
430 {
431         int i;
432
433         for (i = 0; i < fb->size; i++)
434                 fb->values[i] += value;
435 }
436
437 static int drbd_rs_controller(struct drbd_conf *mdev)
438 {
439         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
440         unsigned int want;     /* The number of sectors we want in the proxy */
441         int req_sect; /* Number of sectors to request in this turn */
442         int correction; /* Number of sectors more we need in the proxy*/
443         int cps; /* correction per invocation of drbd_rs_controller() */
444         int steps; /* Number of time steps to plan ahead */
445         int curr_corr;
446         int max_sect;
447
448         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
449         mdev->rs_in_flight -= sect_in;
450
451         spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
452
453         steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
454
455         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
456                 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
457         } else { /* normal path */
458                 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
459                         sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
460         }
461
462         correction = want - mdev->rs_in_flight - mdev->rs_planed;
463
464         /* Plan ahead */
465         cps = correction / steps;
466         fifo_add_val(&mdev->rs_plan_s, cps);
467         mdev->rs_planed += cps * steps;
468
469         /* What we do in this step */
470         curr_corr = fifo_push(&mdev->rs_plan_s, 0);
471         spin_unlock(&mdev->peer_seq_lock);
472         mdev->rs_planed -= curr_corr;
473
474         req_sect = sect_in + curr_corr;
475         if (req_sect < 0)
476                 req_sect = 0;
477
478         max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
479         if (req_sect > max_sect)
480                 req_sect = max_sect;
481
482         /*
483         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
484                  sect_in, mdev->rs_in_flight, want, correction,
485                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
486         */
487
488         return req_sect;
489 }
490
491 static int drbd_rs_number_requests(struct drbd_conf *mdev)
492 {
493         int number;
494         if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
495                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
496                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
497         } else {
498                 mdev->c_sync_rate = mdev->sync_conf.rate;
499                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
500         }
501
502         /* ignore the amount of pending requests, the resync controller should
503          * throttle down to incoming reply rate soon enough anyways. */
504         return number;
505 }
506
507 static int w_make_resync_request(struct drbd_conf *mdev,
508                                  struct drbd_work *w, int cancel)
509 {
510         unsigned long bit;
511         sector_t sector;
512         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
513         int max_bio_size;
514         int number, rollback_i, size;
515         int align, queued, sndbuf;
516         int i = 0;
517
518         if (unlikely(cancel))
519                 return 1;
520
521         if (mdev->rs_total == 0) {
522                 /* empty resync? */
523                 drbd_resync_finished(mdev);
524                 return 1;
525         }
526
527         if (!get_ldev(mdev)) {
528                 /* Since we only need to access mdev->rsync a
529                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
530                    to continue resync with a broken disk makes no sense at
531                    all */
532                 dev_err(DEV, "Disk broke down during resync!\n");
533                 return 1;
534         }
535
536         max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
537         number = drbd_rs_number_requests(mdev);
538         if (number == 0)
539                 goto requeue;
540
541         for (i = 0; i < number; i++) {
542                 /* Stop generating RS requests, when half of the send buffer is filled */
543                 mutex_lock(&mdev->tconn->data.mutex);
544                 if (mdev->tconn->data.socket) {
545                         queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
546                         sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
547                 } else {
548                         queued = 1;
549                         sndbuf = 0;
550                 }
551                 mutex_unlock(&mdev->tconn->data.mutex);
552                 if (queued > sndbuf / 2)
553                         goto requeue;
554
555 next_sector:
556                 size = BM_BLOCK_SIZE;
557                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
558
559                 if (bit == DRBD_END_OF_BITMAP) {
560                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
561                         put_ldev(mdev);
562                         return 1;
563                 }
564
565                 sector = BM_BIT_TO_SECT(bit);
566
567                 if (drbd_rs_should_slow_down(mdev, sector) ||
568                     drbd_try_rs_begin_io(mdev, sector)) {
569                         mdev->bm_resync_fo = bit;
570                         goto requeue;
571                 }
572                 mdev->bm_resync_fo = bit + 1;
573
574                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
575                         drbd_rs_complete_io(mdev, sector);
576                         goto next_sector;
577                 }
578
579 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
580                 /* try to find some adjacent bits.
581                  * we stop if we have already the maximum req size.
582                  *
583                  * Additionally always align bigger requests, in order to
584                  * be prepared for all stripe sizes of software RAIDs.
585                  */
586                 align = 1;
587                 rollback_i = i;
588                 for (;;) {
589                         if (size + BM_BLOCK_SIZE > max_bio_size)
590                                 break;
591
592                         /* Be always aligned */
593                         if (sector & ((1<<(align+3))-1))
594                                 break;
595
596                         /* do not cross extent boundaries */
597                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
598                                 break;
599                         /* now, is it actually dirty, after all?
600                          * caution, drbd_bm_test_bit is tri-state for some
601                          * obscure reason; ( b == 0 ) would get the out-of-band
602                          * only accidentally right because of the "oddly sized"
603                          * adjustment below */
604                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
605                                 break;
606                         bit++;
607                         size += BM_BLOCK_SIZE;
608                         if ((BM_BLOCK_SIZE << align) <= size)
609                                 align++;
610                         i++;
611                 }
612                 /* if we merged some,
613                  * reset the offset to start the next drbd_bm_find_next from */
614                 if (size > BM_BLOCK_SIZE)
615                         mdev->bm_resync_fo = bit + 1;
616 #endif
617
618                 /* adjust very last sectors, in case we are oddly sized */
619                 if (sector + (size>>9) > capacity)
620                         size = (capacity-sector)<<9;
621                 if (mdev->tconn->agreed_pro_version >= 89 && mdev->csums_tfm) {
622                         switch (read_for_csum(mdev, sector, size)) {
623                         case -EIO: /* Disk failure */
624                                 put_ldev(mdev);
625                                 return 0;
626                         case -EAGAIN: /* allocation failed, or ldev busy */
627                                 drbd_rs_complete_io(mdev, sector);
628                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
629                                 i = rollback_i;
630                                 goto requeue;
631                         case 0:
632                                 /* everything ok */
633                                 break;
634                         default:
635                                 BUG();
636                         }
637                 } else {
638                         inc_rs_pending(mdev);
639                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
640                                                sector, size, ID_SYNCER)) {
641                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
642                                 dec_rs_pending(mdev);
643                                 put_ldev(mdev);
644                                 return 0;
645                         }
646                 }
647         }
648
649         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
650                 /* last syncer _request_ was sent,
651                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
652                  * next sync group will resume), as soon as we receive the last
653                  * resync data block, and the last bit is cleared.
654                  * until then resync "work" is "inactive" ...
655                  */
656                 put_ldev(mdev);
657                 return 1;
658         }
659
660  requeue:
661         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
662         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
663         put_ldev(mdev);
664         return 1;
665 }
666
667 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
668 {
669         int number, i, size;
670         sector_t sector;
671         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
672
673         if (unlikely(cancel))
674                 return 1;
675
676         number = drbd_rs_number_requests(mdev);
677
678         sector = mdev->ov_position;
679         for (i = 0; i < number; i++) {
680                 if (sector >= capacity) {
681                         return 1;
682                 }
683
684                 size = BM_BLOCK_SIZE;
685
686                 if (drbd_rs_should_slow_down(mdev, sector) ||
687                     drbd_try_rs_begin_io(mdev, sector)) {
688                         mdev->ov_position = sector;
689                         goto requeue;
690                 }
691
692                 if (sector + (size>>9) > capacity)
693                         size = (capacity-sector)<<9;
694
695                 inc_rs_pending(mdev);
696                 if (!drbd_send_ov_request(mdev, sector, size)) {
697                         dec_rs_pending(mdev);
698                         return 0;
699                 }
700                 sector += BM_SECT_PER_BIT;
701         }
702         mdev->ov_position = sector;
703
704  requeue:
705         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
706         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
707         return 1;
708 }
709
710 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
711 {
712         kfree(w);
713         ov_oos_print(mdev);
714         drbd_resync_finished(mdev);
715
716         return 1;
717 }
718
719 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
720 {
721         kfree(w);
722
723         drbd_resync_finished(mdev);
724
725         return 1;
726 }
727
728 static void ping_peer(struct drbd_conf *mdev)
729 {
730         clear_bit(GOT_PING_ACK, &mdev->flags);
731         request_ping(mdev->tconn);
732         wait_event(mdev->misc_wait,
733                    test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
734 }
735
736 int drbd_resync_finished(struct drbd_conf *mdev)
737 {
738         unsigned long db, dt, dbdt;
739         unsigned long n_oos;
740         union drbd_state os, ns;
741         struct drbd_work *w;
742         char *khelper_cmd = NULL;
743         int verify_done = 0;
744
745         /* Remove all elements from the resync LRU. Since future actions
746          * might set bits in the (main) bitmap, then the entries in the
747          * resync LRU would be wrong. */
748         if (drbd_rs_del_all(mdev)) {
749                 /* In case this is not possible now, most probably because
750                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
751                  * queue (or even the read operations for those packets
752                  * is not finished by now).   Retry in 100ms. */
753
754                 schedule_timeout_interruptible(HZ / 10);
755                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
756                 if (w) {
757                         w->cb = w_resync_finished;
758                         drbd_queue_work(&mdev->tconn->data.work, w);
759                         return 1;
760                 }
761                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
762         }
763
764         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
765         if (dt <= 0)
766                 dt = 1;
767         db = mdev->rs_total;
768         dbdt = Bit2KB(db/dt);
769         mdev->rs_paused /= HZ;
770
771         if (!get_ldev(mdev))
772                 goto out;
773
774         ping_peer(mdev);
775
776         spin_lock_irq(&mdev->tconn->req_lock);
777         os = mdev->state;
778
779         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
780
781         /* This protects us against multiple calls (that can happen in the presence
782            of application IO), and against connectivity loss just before we arrive here. */
783         if (os.conn <= C_CONNECTED)
784                 goto out_unlock;
785
786         ns = os;
787         ns.conn = C_CONNECTED;
788
789         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
790              verify_done ? "Online verify " : "Resync",
791              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
792
793         n_oos = drbd_bm_total_weight(mdev);
794
795         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
796                 if (n_oos) {
797                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
798                               n_oos, Bit2KB(1));
799                         khelper_cmd = "out-of-sync";
800                 }
801         } else {
802                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
803
804                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
805                         khelper_cmd = "after-resync-target";
806
807                 if (mdev->csums_tfm && mdev->rs_total) {
808                         const unsigned long s = mdev->rs_same_csum;
809                         const unsigned long t = mdev->rs_total;
810                         const int ratio =
811                                 (t == 0)     ? 0 :
812                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
813                         dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
814                              "transferred %luK total %luK\n",
815                              ratio,
816                              Bit2KB(mdev->rs_same_csum),
817                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
818                              Bit2KB(mdev->rs_total));
819                 }
820         }
821
822         if (mdev->rs_failed) {
823                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
824
825                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
826                         ns.disk = D_INCONSISTENT;
827                         ns.pdsk = D_UP_TO_DATE;
828                 } else {
829                         ns.disk = D_UP_TO_DATE;
830                         ns.pdsk = D_INCONSISTENT;
831                 }
832         } else {
833                 ns.disk = D_UP_TO_DATE;
834                 ns.pdsk = D_UP_TO_DATE;
835
836                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
837                         if (mdev->p_uuid) {
838                                 int i;
839                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
840                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
841                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
842                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
843                         } else {
844                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
845                         }
846                 }
847
848                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
849                         /* for verify runs, we don't update uuids here,
850                          * so there would be nothing to report. */
851                         drbd_uuid_set_bm(mdev, 0UL);
852                         drbd_print_uuids(mdev, "updated UUIDs");
853                         if (mdev->p_uuid) {
854                                 /* Now the two UUID sets are equal, update what we
855                                  * know of the peer. */
856                                 int i;
857                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
858                                         mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
859                         }
860                 }
861         }
862
863         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
864 out_unlock:
865         spin_unlock_irq(&mdev->tconn->req_lock);
866         put_ldev(mdev);
867 out:
868         mdev->rs_total  = 0;
869         mdev->rs_failed = 0;
870         mdev->rs_paused = 0;
871         if (verify_done)
872                 mdev->ov_start_sector = 0;
873
874         drbd_md_sync(mdev);
875
876         if (khelper_cmd)
877                 drbd_khelper(mdev, khelper_cmd);
878
879         return 1;
880 }
881
882 /* helper */
883 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
884 {
885         if (drbd_ee_has_active_page(peer_req)) {
886                 /* This might happen if sendpage() has not finished */
887                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
888                 atomic_add(i, &mdev->pp_in_use_by_net);
889                 atomic_sub(i, &mdev->pp_in_use);
890                 spin_lock_irq(&mdev->tconn->req_lock);
891                 list_add_tail(&peer_req->w.list, &mdev->net_ee);
892                 spin_unlock_irq(&mdev->tconn->req_lock);
893                 wake_up(&drbd_pp_wait);
894         } else
895                 drbd_free_ee(mdev, peer_req);
896 }
897
898 /**
899  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
900  * @mdev:       DRBD device.
901  * @w:          work object.
902  * @cancel:     The connection will be closed anyways
903  */
904 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
905 {
906         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
907         int ok;
908
909         if (unlikely(cancel)) {
910                 drbd_free_ee(mdev, peer_req);
911                 dec_unacked(mdev);
912                 return 1;
913         }
914
915         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
916                 ok = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
917         } else {
918                 if (__ratelimit(&drbd_ratelimit_state))
919                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
920                             (unsigned long long)peer_req->i.sector);
921
922                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
923         }
924
925         dec_unacked(mdev);
926
927         move_to_net_ee_or_free(mdev, peer_req);
928
929         if (unlikely(!ok))
930                 dev_err(DEV, "drbd_send_block() failed\n");
931         return ok;
932 }
933
934 /**
935  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
936  * @mdev:       DRBD device.
937  * @w:          work object.
938  * @cancel:     The connection will be closed anyways
939  */
940 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
941 {
942         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
943         int ok;
944
945         if (unlikely(cancel)) {
946                 drbd_free_ee(mdev, peer_req);
947                 dec_unacked(mdev);
948                 return 1;
949         }
950
951         if (get_ldev_if_state(mdev, D_FAILED)) {
952                 drbd_rs_complete_io(mdev, peer_req->i.sector);
953                 put_ldev(mdev);
954         }
955
956         if (mdev->state.conn == C_AHEAD) {
957                 ok = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
958         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
959                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
960                         inc_rs_pending(mdev);
961                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
962                 } else {
963                         if (__ratelimit(&drbd_ratelimit_state))
964                                 dev_err(DEV, "Not sending RSDataReply, "
965                                     "partner DISKLESS!\n");
966                         ok = 1;
967                 }
968         } else {
969                 if (__ratelimit(&drbd_ratelimit_state))
970                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
971                             (unsigned long long)peer_req->i.sector);
972
973                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
974
975                 /* update resync data with failure */
976                 drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
977         }
978
979         dec_unacked(mdev);
980
981         move_to_net_ee_or_free(mdev, peer_req);
982
983         if (unlikely(!ok))
984                 dev_err(DEV, "drbd_send_block() failed\n");
985         return ok;
986 }
987
988 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
989 {
990         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
991         struct digest_info *di;
992         int digest_size;
993         void *digest = NULL;
994         int ok, eq = 0;
995
996         if (unlikely(cancel)) {
997                 drbd_free_ee(mdev, peer_req);
998                 dec_unacked(mdev);
999                 return 1;
1000         }
1001
1002         if (get_ldev(mdev)) {
1003                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1004                 put_ldev(mdev);
1005         }
1006
1007         di = peer_req->digest;
1008
1009         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1010                 /* quick hack to try to avoid a race against reconfiguration.
1011                  * a real fix would be much more involved,
1012                  * introducing more locking mechanisms */
1013                 if (mdev->csums_tfm) {
1014                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1015                         D_ASSERT(digest_size == di->digest_size);
1016                         digest = kmalloc(digest_size, GFP_NOIO);
1017                 }
1018                 if (digest) {
1019                         drbd_csum_ee(mdev, mdev->csums_tfm, peer_req, digest);
1020                         eq = !memcmp(digest, di->digest, digest_size);
1021                         kfree(digest);
1022                 }
1023
1024                 if (eq) {
1025                         drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1026                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1027                         mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1028                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1029                 } else {
1030                         inc_rs_pending(mdev);
1031                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1032                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1033                         kfree(di);
1034                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1035                 }
1036         } else {
1037                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1038                 if (__ratelimit(&drbd_ratelimit_state))
1039                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1040         }
1041
1042         dec_unacked(mdev);
1043         move_to_net_ee_or_free(mdev, peer_req);
1044
1045         if (unlikely(!ok))
1046                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1047         return ok;
1048 }
1049
1050 /* TODO merge common code with w_e_send_csum */
1051 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1052 {
1053         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1054         sector_t sector = peer_req->i.sector;
1055         unsigned int size = peer_req->i.size;
1056         int digest_size;
1057         void *digest;
1058         int ok = 1;
1059
1060         if (unlikely(cancel))
1061                 goto out;
1062
1063         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1064         digest = kmalloc(digest_size, GFP_NOIO);
1065         if (!digest) {
1066                 ok = 0; /* terminate the connection in case the allocation failed */
1067                 goto out;
1068         }
1069
1070         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1071                 drbd_csum_ee(mdev, mdev->verify_tfm, peer_req, digest);
1072         else
1073                 memset(digest, 0, digest_size);
1074
1075         /* Free e and pages before send.
1076          * In case we block on congestion, we could otherwise run into
1077          * some distributed deadlock, if the other side blocks on
1078          * congestion as well, because our receiver blocks in
1079          * drbd_pp_alloc due to pp_in_use > max_buffers. */
1080         drbd_free_ee(mdev, peer_req);
1081         peer_req = NULL;
1082         inc_rs_pending(mdev);
1083         ok = drbd_send_drequest_csum(mdev, sector, size,
1084                                      digest, digest_size,
1085                                      P_OV_REPLY);
1086         if (!ok)
1087                 dec_rs_pending(mdev);
1088         kfree(digest);
1089
1090 out:
1091         if (peer_req)
1092                 drbd_free_ee(mdev, peer_req);
1093         dec_unacked(mdev);
1094         return ok;
1095 }
1096
1097 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1098 {
1099         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1100                 mdev->ov_last_oos_size += size>>9;
1101         } else {
1102                 mdev->ov_last_oos_start = sector;
1103                 mdev->ov_last_oos_size = size>>9;
1104         }
1105         drbd_set_out_of_sync(mdev, sector, size);
1106 }
1107
1108 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1109 {
1110         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1111         struct digest_info *di;
1112         void *digest;
1113         sector_t sector = peer_req->i.sector;
1114         unsigned int size = peer_req->i.size;
1115         int digest_size;
1116         int ok, eq = 0;
1117
1118         if (unlikely(cancel)) {
1119                 drbd_free_ee(mdev, peer_req);
1120                 dec_unacked(mdev);
1121                 return 1;
1122         }
1123
1124         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1125          * the resync lru has been cleaned up already */
1126         if (get_ldev(mdev)) {
1127                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1128                 put_ldev(mdev);
1129         }
1130
1131         di = peer_req->digest;
1132
1133         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1134                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1135                 digest = kmalloc(digest_size, GFP_NOIO);
1136                 if (digest) {
1137                         drbd_csum_ee(mdev, mdev->verify_tfm, peer_req, digest);
1138
1139                         D_ASSERT(digest_size == di->digest_size);
1140                         eq = !memcmp(digest, di->digest, digest_size);
1141                         kfree(digest);
1142                 }
1143         }
1144
1145                 /* Free e and pages before send.
1146                  * In case we block on congestion, we could otherwise run into
1147                  * some distributed deadlock, if the other side blocks on
1148                  * congestion as well, because our receiver blocks in
1149                  * drbd_pp_alloc due to pp_in_use > max_buffers. */
1150         drbd_free_ee(mdev, peer_req);
1151         if (!eq)
1152                 drbd_ov_oos_found(mdev, sector, size);
1153         else
1154                 ov_oos_print(mdev);
1155
1156         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1157                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1158
1159         dec_unacked(mdev);
1160
1161         --mdev->ov_left;
1162
1163         /* let's advance progress step marks only for every other megabyte */
1164         if ((mdev->ov_left & 0x200) == 0x200)
1165                 drbd_advance_rs_marks(mdev, mdev->ov_left);
1166
1167         if (mdev->ov_left == 0) {
1168                 ov_oos_print(mdev);
1169                 drbd_resync_finished(mdev);
1170         }
1171
1172         return ok;
1173 }
1174
1175 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1176 {
1177         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1178         complete(&b->done);
1179         return 1;
1180 }
1181
1182 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1183 {
1184         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1185         struct p_barrier *p = &mdev->tconn->data.sbuf.barrier;
1186         int ok = 1;
1187
1188         /* really avoid racing with tl_clear.  w.cb may have been referenced
1189          * just before it was reassigned and re-queued, so double check that.
1190          * actually, this race was harmless, since we only try to send the
1191          * barrier packet here, and otherwise do nothing with the object.
1192          * but compare with the head of w_clear_epoch */
1193         spin_lock_irq(&mdev->tconn->req_lock);
1194         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1195                 cancel = 1;
1196         spin_unlock_irq(&mdev->tconn->req_lock);
1197         if (cancel)
1198                 return 1;
1199
1200         if (!drbd_get_data_sock(mdev->tconn))
1201                 return 0;
1202         p->barrier = b->br_number;
1203         /* inc_ap_pending was done where this was queued.
1204          * dec_ap_pending will be done in got_BarrierAck
1205          * or (on connection loss) in w_clear_epoch.  */
1206         ok = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_BARRIER,
1207                             &p->head, sizeof(*p), 0);
1208         drbd_put_data_sock(mdev->tconn);
1209
1210         return ok;
1211 }
1212
1213 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1214 {
1215         if (cancel)
1216                 return 1;
1217         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1218 }
1219
1220 int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1221 {
1222         struct drbd_request *req = container_of(w, struct drbd_request, w);
1223         int ok;
1224
1225         if (unlikely(cancel)) {
1226                 req_mod(req, SEND_CANCELED);
1227                 return 1;
1228         }
1229
1230         ok = drbd_send_oos(mdev, req);
1231         req_mod(req, OOS_HANDED_TO_NETWORK);
1232
1233         return ok;
1234 }
1235
1236 /**
1237  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1238  * @mdev:       DRBD device.
1239  * @w:          work object.
1240  * @cancel:     The connection will be closed anyways
1241  */
1242 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1243 {
1244         struct drbd_request *req = container_of(w, struct drbd_request, w);
1245         int ok;
1246
1247         if (unlikely(cancel)) {
1248                 req_mod(req, SEND_CANCELED);
1249                 return 1;
1250         }
1251
1252         ok = drbd_send_dblock(mdev, req);
1253         req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED);
1254
1255         return ok;
1256 }
1257
1258 /**
1259  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1260  * @mdev:       DRBD device.
1261  * @w:          work object.
1262  * @cancel:     The connection will be closed anyways
1263  */
1264 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1265 {
1266         struct drbd_request *req = container_of(w, struct drbd_request, w);
1267         int ok;
1268
1269         if (unlikely(cancel)) {
1270                 req_mod(req, SEND_CANCELED);
1271                 return 1;
1272         }
1273
1274         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1275                                 (unsigned long)req);
1276
1277         if (!ok) {
1278                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1279                  * so this is probably redundant */
1280                 if (mdev->state.conn >= C_CONNECTED)
1281                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1282         }
1283         req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED);
1284
1285         return ok;
1286 }
1287
1288 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1289 {
1290         struct drbd_request *req = container_of(w, struct drbd_request, w);
1291
1292         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1293                 drbd_al_begin_io(mdev, req->i.sector);
1294         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1295            theoretically. Practically it can not deadlock, since this is
1296            only used when unfreezing IOs. All the extents of the requests
1297            that made it into the TL are already active */
1298
1299         drbd_req_make_private_bio(req, req->master_bio);
1300         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1301         generic_make_request(req->private_bio);
1302
1303         return 1;
1304 }
1305
1306 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1307 {
1308         struct drbd_conf *odev = mdev;
1309
1310         while (1) {
1311                 if (odev->sync_conf.after == -1)
1312                         return 1;
1313                 odev = minor_to_mdev(odev->sync_conf.after);
1314                 if (!expect(odev))
1315                         return 1;
1316                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1317                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1318                     odev->state.aftr_isp || odev->state.peer_isp ||
1319                     odev->state.user_isp)
1320                         return 0;
1321         }
1322 }
1323
1324 /**
1325  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1326  * @mdev:       DRBD device.
1327  *
1328  * Called from process context only (admin command and after_state_ch).
1329  */
1330 static int _drbd_pause_after(struct drbd_conf *mdev)
1331 {
1332         struct drbd_conf *odev;
1333         int i, rv = 0;
1334
1335         for (i = 0; i < minor_count; i++) {
1336                 odev = minor_to_mdev(i);
1337                 if (!odev)
1338                         continue;
1339                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1340                         continue;
1341                 if (!_drbd_may_sync_now(odev))
1342                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1343                                != SS_NOTHING_TO_DO);
1344         }
1345
1346         return rv;
1347 }
1348
1349 /**
1350  * _drbd_resume_next() - Resume resync on all devices that may resync now
1351  * @mdev:       DRBD device.
1352  *
1353  * Called from process context only (admin command and worker).
1354  */
1355 static int _drbd_resume_next(struct drbd_conf *mdev)
1356 {
1357         struct drbd_conf *odev;
1358         int i, rv = 0;
1359
1360         for (i = 0; i < minor_count; i++) {
1361                 odev = minor_to_mdev(i);
1362                 if (!odev)
1363                         continue;
1364                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1365                         continue;
1366                 if (odev->state.aftr_isp) {
1367                         if (_drbd_may_sync_now(odev))
1368                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1369                                                         CS_HARD, NULL)
1370                                        != SS_NOTHING_TO_DO) ;
1371                 }
1372         }
1373         return rv;
1374 }
1375
1376 void resume_next_sg(struct drbd_conf *mdev)
1377 {
1378         write_lock_irq(&global_state_lock);
1379         _drbd_resume_next(mdev);
1380         write_unlock_irq(&global_state_lock);
1381 }
1382
1383 void suspend_other_sg(struct drbd_conf *mdev)
1384 {
1385         write_lock_irq(&global_state_lock);
1386         _drbd_pause_after(mdev);
1387         write_unlock_irq(&global_state_lock);
1388 }
1389
1390 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1391 {
1392         struct drbd_conf *odev;
1393
1394         if (o_minor == -1)
1395                 return NO_ERROR;
1396         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1397                 return ERR_SYNC_AFTER;
1398
1399         /* check for loops */
1400         odev = minor_to_mdev(o_minor);
1401         while (1) {
1402                 if (odev == mdev)
1403                         return ERR_SYNC_AFTER_CYCLE;
1404
1405                 /* dependency chain ends here, no cycles. */
1406                 if (odev->sync_conf.after == -1)
1407                         return NO_ERROR;
1408
1409                 /* follow the dependency chain */
1410                 odev = minor_to_mdev(odev->sync_conf.after);
1411         }
1412 }
1413
1414 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1415 {
1416         int changes;
1417         int retcode;
1418
1419         write_lock_irq(&global_state_lock);
1420         retcode = sync_after_error(mdev, na);
1421         if (retcode == NO_ERROR) {
1422                 mdev->sync_conf.after = na;
1423                 do {
1424                         changes  = _drbd_pause_after(mdev);
1425                         changes |= _drbd_resume_next(mdev);
1426                 } while (changes);
1427         }
1428         write_unlock_irq(&global_state_lock);
1429         return retcode;
1430 }
1431
1432 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1433 {
1434         atomic_set(&mdev->rs_sect_in, 0);
1435         atomic_set(&mdev->rs_sect_ev, 0);
1436         mdev->rs_in_flight = 0;
1437         mdev->rs_planed = 0;
1438         spin_lock(&mdev->peer_seq_lock);
1439         fifo_set(&mdev->rs_plan_s, 0);
1440         spin_unlock(&mdev->peer_seq_lock);
1441 }
1442
1443 void start_resync_timer_fn(unsigned long data)
1444 {
1445         struct drbd_conf *mdev = (struct drbd_conf *) data;
1446
1447         drbd_queue_work(&mdev->tconn->data.work, &mdev->start_resync_work);
1448 }
1449
1450 int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1451 {
1452         if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1453                 dev_warn(DEV, "w_start_resync later...\n");
1454                 mdev->start_resync_timer.expires = jiffies + HZ/10;
1455                 add_timer(&mdev->start_resync_timer);
1456                 return 1;
1457         }
1458
1459         drbd_start_resync(mdev, C_SYNC_SOURCE);
1460         clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
1461         return 1;
1462 }
1463
1464 /**
1465  * drbd_start_resync() - Start the resync process
1466  * @mdev:       DRBD device.
1467  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1468  *
1469  * This function might bring you directly into one of the
1470  * C_PAUSED_SYNC_* states.
1471  */
1472 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1473 {
1474         union drbd_state ns;
1475         int r;
1476
1477         if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1478                 dev_err(DEV, "Resync already running!\n");
1479                 return;
1480         }
1481
1482         if (mdev->state.conn < C_AHEAD) {
1483                 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1484                 drbd_rs_cancel_all(mdev);
1485                 /* This should be done when we abort the resync. We definitely do not
1486                    want to have this for connections going back and forth between
1487                    Ahead/Behind and SyncSource/SyncTarget */
1488         }
1489
1490         if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1491                 if (side == C_SYNC_TARGET) {
1492                         /* Since application IO was locked out during C_WF_BITMAP_T and
1493                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1494                            we check that we might make the data inconsistent. */
1495                         r = drbd_khelper(mdev, "before-resync-target");
1496                         r = (r >> 8) & 0xff;
1497                         if (r > 0) {
1498                                 dev_info(DEV, "before-resync-target handler returned %d, "
1499                                          "dropping connection.\n", r);
1500                                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1501                                 return;
1502                         }
1503                 } else /* C_SYNC_SOURCE */ {
1504                         r = drbd_khelper(mdev, "before-resync-source");
1505                         r = (r >> 8) & 0xff;
1506                         if (r > 0) {
1507                                 if (r == 3) {
1508                                         dev_info(DEV, "before-resync-source handler returned %d, "
1509                                                  "ignoring. Old userland tools?", r);
1510                                 } else {
1511                                         dev_info(DEV, "before-resync-source handler returned %d, "
1512                                                  "dropping connection.\n", r);
1513                                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1514                                         return;
1515                                 }
1516                         }
1517                 }
1518         }
1519
1520         if (current == mdev->tconn->worker.task) {
1521                 /* The worker should not sleep waiting for drbd_state_lock(),
1522                    that can take long */
1523                 if (test_and_set_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
1524                         set_bit(B_RS_H_DONE, &mdev->flags);
1525                         mdev->start_resync_timer.expires = jiffies + HZ/5;
1526                         add_timer(&mdev->start_resync_timer);
1527                         return;
1528                 }
1529         } else {
1530                 drbd_state_lock(mdev);
1531         }
1532         clear_bit(B_RS_H_DONE, &mdev->flags);
1533
1534         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1535                 drbd_state_unlock(mdev);
1536                 return;
1537         }
1538
1539         write_lock_irq(&global_state_lock);
1540         ns = mdev->state;
1541
1542         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1543
1544         ns.conn = side;
1545
1546         if (side == C_SYNC_TARGET)
1547                 ns.disk = D_INCONSISTENT;
1548         else /* side == C_SYNC_SOURCE */
1549                 ns.pdsk = D_INCONSISTENT;
1550
1551         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1552         ns = mdev->state;
1553
1554         if (ns.conn < C_CONNECTED)
1555                 r = SS_UNKNOWN_ERROR;
1556
1557         if (r == SS_SUCCESS) {
1558                 unsigned long tw = drbd_bm_total_weight(mdev);
1559                 unsigned long now = jiffies;
1560                 int i;
1561
1562                 mdev->rs_failed    = 0;
1563                 mdev->rs_paused    = 0;
1564                 mdev->rs_same_csum = 0;
1565                 mdev->rs_last_events = 0;
1566                 mdev->rs_last_sect_ev = 0;
1567                 mdev->rs_total     = tw;
1568                 mdev->rs_start     = now;
1569                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1570                         mdev->rs_mark_left[i] = tw;
1571                         mdev->rs_mark_time[i] = now;
1572                 }
1573                 _drbd_pause_after(mdev);
1574         }
1575         write_unlock_irq(&global_state_lock);
1576
1577         if (r == SS_SUCCESS) {
1578                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1579                      drbd_conn_str(ns.conn),
1580                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1581                      (unsigned long) mdev->rs_total);
1582                 if (side == C_SYNC_TARGET)
1583                         mdev->bm_resync_fo = 0;
1584
1585                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1586                  * with w_send_oos, or the sync target will get confused as to
1587                  * how much bits to resync.  We cannot do that always, because for an
1588                  * empty resync and protocol < 95, we need to do it here, as we call
1589                  * drbd_resync_finished from here in that case.
1590                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1591                  * and from after_state_ch otherwise. */
1592                 if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1593                         drbd_gen_and_send_sync_uuid(mdev);
1594
1595                 if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1596                         /* This still has a race (about when exactly the peers
1597                          * detect connection loss) that can lead to a full sync
1598                          * on next handshake. In 8.3.9 we fixed this with explicit
1599                          * resync-finished notifications, but the fix
1600                          * introduces a protocol change.  Sleeping for some
1601                          * time longer than the ping interval + timeout on the
1602                          * SyncSource, to give the SyncTarget the chance to
1603                          * detect connection loss, then waiting for a ping
1604                          * response (implicit in drbd_resync_finished) reduces
1605                          * the race considerably, but does not solve it. */
1606                         if (side == C_SYNC_SOURCE)
1607                                 schedule_timeout_interruptible(
1608                                         mdev->tconn->net_conf->ping_int * HZ +
1609                                         mdev->tconn->net_conf->ping_timeo*HZ/9);
1610                         drbd_resync_finished(mdev);
1611                 }
1612
1613                 drbd_rs_controller_reset(mdev);
1614                 /* ns.conn may already be != mdev->state.conn,
1615                  * we may have been paused in between, or become paused until
1616                  * the timer triggers.
1617                  * No matter, that is handled in resync_timer_fn() */
1618                 if (ns.conn == C_SYNC_TARGET)
1619                         mod_timer(&mdev->resync_timer, jiffies);
1620
1621                 drbd_md_sync(mdev);
1622         }
1623         put_ldev(mdev);
1624         drbd_state_unlock(mdev);
1625 }
1626
1627 int drbd_worker(struct drbd_thread *thi)
1628 {
1629         struct drbd_conf *mdev = thi->mdev;
1630         struct drbd_work *w = NULL;
1631         LIST_HEAD(work_list);
1632         int intr = 0, i;
1633
1634         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1635
1636         while (get_t_state(thi) == RUNNING) {
1637                 drbd_thread_current_set_cpu(thi);
1638
1639                 if (down_trylock(&mdev->tconn->data.work.s)) {
1640                         mutex_lock(&mdev->tconn->data.mutex);
1641                         if (mdev->tconn->data.socket && !mdev->tconn->net_conf->no_cork)
1642                                 drbd_tcp_uncork(mdev->tconn->data.socket);
1643                         mutex_unlock(&mdev->tconn->data.mutex);
1644
1645                         intr = down_interruptible(&mdev->tconn->data.work.s);
1646
1647                         mutex_lock(&mdev->tconn->data.mutex);
1648                         if (mdev->tconn->data.socket  && !mdev->tconn->net_conf->no_cork)
1649                                 drbd_tcp_cork(mdev->tconn->data.socket);
1650                         mutex_unlock(&mdev->tconn->data.mutex);
1651                 }
1652
1653                 if (intr) {
1654                         D_ASSERT(intr == -EINTR);
1655                         flush_signals(current);
1656                         if (!expect(get_t_state(thi) != RUNNING))
1657                                 continue;
1658                         break;
1659                 }
1660
1661                 if (get_t_state(thi) != RUNNING)
1662                         break;
1663                 /* With this break, we have done a down() but not consumed
1664                    the entry from the list. The cleanup code takes care of
1665                    this...   */
1666
1667                 w = NULL;
1668                 spin_lock_irq(&mdev->tconn->data.work.q_lock);
1669                 if (!expect(!list_empty(&mdev->tconn->data.work.q))) {
1670                         /* something terribly wrong in our logic.
1671                          * we were able to down() the semaphore,
1672                          * but the list is empty... doh.
1673                          *
1674                          * what is the best thing to do now?
1675                          * try again from scratch, restarting the receiver,
1676                          * asender, whatnot? could break even more ugly,
1677                          * e.g. when we are primary, but no good local data.
1678                          *
1679                          * I'll try to get away just starting over this loop.
1680                          */
1681                         spin_unlock_irq(&mdev->tconn->data.work.q_lock);
1682                         continue;
1683                 }
1684                 w = list_entry(mdev->tconn->data.work.q.next, struct drbd_work, list);
1685                 list_del_init(&w->list);
1686                 spin_unlock_irq(&mdev->tconn->data.work.q_lock);
1687
1688                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1689                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1690                         if (mdev->state.conn >= C_CONNECTED)
1691                                 drbd_force_state(mdev,
1692                                                 NS(conn, C_NETWORK_FAILURE));
1693                 }
1694         }
1695         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1696         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1697
1698         spin_lock_irq(&mdev->tconn->data.work.q_lock);
1699         i = 0;
1700         while (!list_empty(&mdev->tconn->data.work.q)) {
1701                 list_splice_init(&mdev->tconn->data.work.q, &work_list);
1702                 spin_unlock_irq(&mdev->tconn->data.work.q_lock);
1703
1704                 while (!list_empty(&work_list)) {
1705                         w = list_entry(work_list.next, struct drbd_work, list);
1706                         list_del_init(&w->list);
1707                         w->cb(mdev, w, 1);
1708                         i++; /* dead debugging code */
1709                 }
1710
1711                 spin_lock_irq(&mdev->tconn->data.work.q_lock);
1712         }
1713         sema_init(&mdev->tconn->data.work.s, 0);
1714         /* DANGEROUS race: if someone did queue his work within the spinlock,
1715          * but up() ed outside the spinlock, we could get an up() on the
1716          * semaphore without corresponding list entry.
1717          * So don't do that.
1718          */
1719         spin_unlock_irq(&mdev->tconn->data.work.q_lock);
1720
1721         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1722         /* _drbd_set_state only uses stop_nowait.
1723          * wait here for the exiting receiver. */
1724         drbd_thread_stop(&mdev->tconn->receiver);
1725         drbd_mdev_cleanup(mdev);
1726
1727         dev_info(DEV, "worker terminated\n");
1728
1729         clear_bit(DEVICE_DYING, &mdev->flags);
1730         clear_bit(CONFIG_PENDING, &mdev->flags);
1731         wake_up(&mdev->state_wait);
1732
1733         return 0;
1734 }