drbd: Make all worker callbacks return 0 upon success and an error code otherwise
[cascardo/linux.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_work *w, int cancel);
42
43
44 /* endio handlers:
45  *   drbd_md_io_complete (defined here)
46  *   drbd_request_endio (defined here)
47  *   drbd_peer_request_endio (defined here)
48  *   bm_async_io_complete (defined in drbd_bitmap.c)
49  *
50  * For all these callbacks, note the following:
51  * The callbacks will be called in irq context by the IDE drivers,
52  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
53  * Try to get the locking right :)
54  *
55  */
56
57
58 /* About the global_state_lock
59    Each state transition on an device holds a read lock. In case we have
60    to evaluate the sync after dependencies, we grab a write lock, because
61    we need stable states on all devices for that.  */
62 rwlock_t global_state_lock;
63
64 /* used for synchronous meta data and bitmap IO
65  * submitted by drbd_md_sync_page_io()
66  */
67 void drbd_md_io_complete(struct bio *bio, int error)
68 {
69         struct drbd_md_io *md_io;
70
71         md_io = (struct drbd_md_io *)bio->bi_private;
72         md_io->error = error;
73
74         complete(&md_io->event);
75 }
76
77 /* reads on behalf of the partner,
78  * "submitted" by the receiver
79  */
80 void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
81 {
82         unsigned long flags = 0;
83         struct drbd_conf *mdev = peer_req->w.mdev;
84
85         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
86         mdev->read_cnt += peer_req->i.size >> 9;
87         list_del(&peer_req->w.list);
88         if (list_empty(&mdev->read_ee))
89                 wake_up(&mdev->ee_wait);
90         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
91                 __drbd_chk_io_error(mdev, false);
92         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
93
94         drbd_queue_work(&mdev->tconn->data.work, &peer_req->w);
95         put_ldev(mdev);
96 }
97
98 /* writes on behalf of the partner, or resync writes,
99  * "submitted" by the receiver, final stage.  */
100 static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
101 {
102         unsigned long flags = 0;
103         struct drbd_conf *mdev = peer_req->w.mdev;
104         sector_t e_sector;
105         int do_wake;
106         u64 block_id;
107         int do_al_complete_io;
108
109         /* after we moved peer_req to done_ee,
110          * we may no longer access it,
111          * it may be freed/reused already!
112          * (as soon as we release the req_lock) */
113         e_sector = peer_req->i.sector;
114         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
115         block_id = peer_req->block_id;
116
117         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
118         mdev->writ_cnt += peer_req->i.size >> 9;
119         list_del(&peer_req->w.list); /* has been on active_ee or sync_ee */
120         list_add_tail(&peer_req->w.list, &mdev->done_ee);
121
122         /*
123          * Do not remove from the write_requests tree here: we did not send the
124          * Ack yet and did not wake possibly waiting conflicting requests.
125          * Removed from the tree from "drbd_process_done_ee" within the
126          * appropriate w.cb (e_end_block/e_end_resync_block) or from
127          * _drbd_clear_done_ee.
128          */
129
130         do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
131
132         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
133                 __drbd_chk_io_error(mdev, false);
134         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
135
136         if (block_id == ID_SYNCER)
137                 drbd_rs_complete_io(mdev, e_sector);
138
139         if (do_wake)
140                 wake_up(&mdev->ee_wait);
141
142         if (do_al_complete_io)
143                 drbd_al_complete_io(mdev, e_sector);
144
145         wake_asender(mdev->tconn);
146         put_ldev(mdev);
147 }
148
149 /* writes on behalf of the partner, or resync writes,
150  * "submitted" by the receiver.
151  */
152 void drbd_peer_request_endio(struct bio *bio, int error)
153 {
154         struct drbd_peer_request *peer_req = bio->bi_private;
155         struct drbd_conf *mdev = peer_req->w.mdev;
156         int uptodate = bio_flagged(bio, BIO_UPTODATE);
157         int is_write = bio_data_dir(bio) == WRITE;
158
159         if (error && __ratelimit(&drbd_ratelimit_state))
160                 dev_warn(DEV, "%s: error=%d s=%llus\n",
161                                 is_write ? "write" : "read", error,
162                                 (unsigned long long)peer_req->i.sector);
163         if (!error && !uptodate) {
164                 if (__ratelimit(&drbd_ratelimit_state))
165                         dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
166                                         is_write ? "write" : "read",
167                                         (unsigned long long)peer_req->i.sector);
168                 /* strange behavior of some lower level drivers...
169                  * fail the request by clearing the uptodate flag,
170                  * but do not return any error?! */
171                 error = -EIO;
172         }
173
174         if (error)
175                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
176
177         bio_put(bio); /* no need for the bio anymore */
178         if (atomic_dec_and_test(&peer_req->pending_bios)) {
179                 if (is_write)
180                         drbd_endio_write_sec_final(peer_req);
181                 else
182                         drbd_endio_read_sec_final(peer_req);
183         }
184 }
185
186 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
187  */
188 void drbd_request_endio(struct bio *bio, int error)
189 {
190         unsigned long flags;
191         struct drbd_request *req = bio->bi_private;
192         struct drbd_conf *mdev = req->w.mdev;
193         struct bio_and_error m;
194         enum drbd_req_event what;
195         int uptodate = bio_flagged(bio, BIO_UPTODATE);
196
197         if (!error && !uptodate) {
198                 dev_warn(DEV, "p %s: setting error to -EIO\n",
199                          bio_data_dir(bio) == WRITE ? "write" : "read");
200                 /* strange behavior of some lower level drivers...
201                  * fail the request by clearing the uptodate flag,
202                  * but do not return any error?! */
203                 error = -EIO;
204         }
205
206         /* to avoid recursion in __req_mod */
207         if (unlikely(error)) {
208                 what = (bio_data_dir(bio) == WRITE)
209                         ? WRITE_COMPLETED_WITH_ERROR
210                         : (bio_rw(bio) == READ)
211                           ? READ_COMPLETED_WITH_ERROR
212                           : READ_AHEAD_COMPLETED_WITH_ERROR;
213         } else
214                 what = COMPLETED_OK;
215
216         bio_put(req->private_bio);
217         req->private_bio = ERR_PTR(error);
218
219         /* not req_mod(), we need irqsave here! */
220         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
221         __req_mod(req, what, &m);
222         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
223
224         if (m.bio)
225                 complete_master_bio(mdev, &m);
226 }
227
228 int w_read_retry_remote(struct drbd_work *w, int cancel)
229 {
230         struct drbd_request *req = container_of(w, struct drbd_request, w);
231         struct drbd_conf *mdev = w->mdev;
232
233         /* We should not detach for read io-error,
234          * but try to WRITE the P_DATA_REPLY to the failed location,
235          * to give the disk the chance to relocate that block */
236
237         spin_lock_irq(&mdev->tconn->req_lock);
238         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
239                 _req_mod(req, READ_RETRY_REMOTE_CANCELED);
240                 spin_unlock_irq(&mdev->tconn->req_lock);
241                 return 0;
242         }
243         spin_unlock_irq(&mdev->tconn->req_lock);
244
245         return w_send_read_req(w, 0);
246 }
247
248 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
249                   struct drbd_peer_request *peer_req, void *digest)
250 {
251         struct hash_desc desc;
252         struct scatterlist sg;
253         struct page *page = peer_req->pages;
254         struct page *tmp;
255         unsigned len;
256
257         desc.tfm = tfm;
258         desc.flags = 0;
259
260         sg_init_table(&sg, 1);
261         crypto_hash_init(&desc);
262
263         while ((tmp = page_chain_next(page))) {
264                 /* all but the last page will be fully used */
265                 sg_set_page(&sg, page, PAGE_SIZE, 0);
266                 crypto_hash_update(&desc, &sg, sg.length);
267                 page = tmp;
268         }
269         /* and now the last, possibly only partially used page */
270         len = peer_req->i.size & (PAGE_SIZE - 1);
271         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
272         crypto_hash_update(&desc, &sg, sg.length);
273         crypto_hash_final(&desc, digest);
274 }
275
276 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
277 {
278         struct hash_desc desc;
279         struct scatterlist sg;
280         struct bio_vec *bvec;
281         int i;
282
283         desc.tfm = tfm;
284         desc.flags = 0;
285
286         sg_init_table(&sg, 1);
287         crypto_hash_init(&desc);
288
289         __bio_for_each_segment(bvec, bio, i, 0) {
290                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
291                 crypto_hash_update(&desc, &sg, sg.length);
292         }
293         crypto_hash_final(&desc, digest);
294 }
295
296 /* MAYBE merge common code with w_e_end_ov_req */
297 static int w_e_send_csum(struct drbd_work *w, int cancel)
298 {
299         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
300         struct drbd_conf *mdev = w->mdev;
301         int digest_size;
302         void *digest;
303         int err = 0;
304
305         if (unlikely(cancel))
306                 goto out;
307
308         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
309                 goto out;
310
311         digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
312         digest = kmalloc(digest_size, GFP_NOIO);
313         if (digest) {
314                 sector_t sector = peer_req->i.sector;
315                 unsigned int size = peer_req->i.size;
316                 drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
317                 /* Free peer_req and pages before send.
318                  * In case we block on congestion, we could otherwise run into
319                  * some distributed deadlock, if the other side blocks on
320                  * congestion as well, because our receiver blocks in
321                  * drbd_pp_alloc due to pp_in_use > max_buffers. */
322                 drbd_free_ee(mdev, peer_req);
323                 peer_req = NULL;
324                 inc_rs_pending(mdev);
325                 err = drbd_send_drequest_csum(mdev, sector, size,
326                                               digest, digest_size,
327                                               P_CSUM_RS_REQUEST);
328                 kfree(digest);
329         } else {
330                 dev_err(DEV, "kmalloc() of digest failed.\n");
331                 err = -ENOMEM;
332         }
333
334 out:
335         if (peer_req)
336                 drbd_free_ee(mdev, peer_req);
337
338         if (unlikely(err))
339                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
340         return err;
341 }
342
343 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
344
345 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
346 {
347         struct drbd_peer_request *peer_req;
348
349         if (!get_ldev(mdev))
350                 return -EIO;
351
352         if (drbd_rs_should_slow_down(mdev, sector))
353                 goto defer;
354
355         /* GFP_TRY, because if there is no memory available right now, this may
356          * be rescheduled for later. It is "only" background resync, after all. */
357         peer_req = drbd_alloc_ee(mdev, ID_SYNCER /* unused */, sector, size, GFP_TRY);
358         if (!peer_req)
359                 goto defer;
360
361         peer_req->w.cb = w_e_send_csum;
362         spin_lock_irq(&mdev->tconn->req_lock);
363         list_add(&peer_req->w.list, &mdev->read_ee);
364         spin_unlock_irq(&mdev->tconn->req_lock);
365
366         atomic_add(size >> 9, &mdev->rs_sect_ev);
367         if (drbd_submit_peer_request(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
368                 return 0;
369
370         /* If it failed because of ENOMEM, retry should help.  If it failed
371          * because bio_add_page failed (probably broken lower level driver),
372          * retry may or may not help.
373          * If it does not, you may need to force disconnect. */
374         spin_lock_irq(&mdev->tconn->req_lock);
375         list_del(&peer_req->w.list);
376         spin_unlock_irq(&mdev->tconn->req_lock);
377
378         drbd_free_ee(mdev, peer_req);
379 defer:
380         put_ldev(mdev);
381         return -EAGAIN;
382 }
383
384 int w_resync_timer(struct drbd_work *w, int cancel)
385 {
386         struct drbd_conf *mdev = w->mdev;
387         switch (mdev->state.conn) {
388         case C_VERIFY_S:
389                 w_make_ov_request(w, cancel);
390                 break;
391         case C_SYNC_TARGET:
392                 w_make_resync_request(w, cancel);
393                 break;
394         }
395
396         return 0;
397 }
398
399 void resync_timer_fn(unsigned long data)
400 {
401         struct drbd_conf *mdev = (struct drbd_conf *) data;
402
403         if (list_empty(&mdev->resync_work.list))
404                 drbd_queue_work(&mdev->tconn->data.work, &mdev->resync_work);
405 }
406
407 static void fifo_set(struct fifo_buffer *fb, int value)
408 {
409         int i;
410
411         for (i = 0; i < fb->size; i++)
412                 fb->values[i] = value;
413 }
414
415 static int fifo_push(struct fifo_buffer *fb, int value)
416 {
417         int ov;
418
419         ov = fb->values[fb->head_index];
420         fb->values[fb->head_index++] = value;
421
422         if (fb->head_index >= fb->size)
423                 fb->head_index = 0;
424
425         return ov;
426 }
427
428 static void fifo_add_val(struct fifo_buffer *fb, int value)
429 {
430         int i;
431
432         for (i = 0; i < fb->size; i++)
433                 fb->values[i] += value;
434 }
435
436 static int drbd_rs_controller(struct drbd_conf *mdev)
437 {
438         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
439         unsigned int want;     /* The number of sectors we want in the proxy */
440         int req_sect; /* Number of sectors to request in this turn */
441         int correction; /* Number of sectors more we need in the proxy*/
442         int cps; /* correction per invocation of drbd_rs_controller() */
443         int steps; /* Number of time steps to plan ahead */
444         int curr_corr;
445         int max_sect;
446
447         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
448         mdev->rs_in_flight -= sect_in;
449
450         spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
451
452         steps = mdev->rs_plan_s.size; /* (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
453
454         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
455                 want = ((mdev->ldev->dc.resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
456         } else { /* normal path */
457                 want = mdev->ldev->dc.c_fill_target ? mdev->ldev->dc.c_fill_target :
458                         sect_in * mdev->ldev->dc.c_delay_target * HZ / (SLEEP_TIME * 10);
459         }
460
461         correction = want - mdev->rs_in_flight - mdev->rs_planed;
462
463         /* Plan ahead */
464         cps = correction / steps;
465         fifo_add_val(&mdev->rs_plan_s, cps);
466         mdev->rs_planed += cps * steps;
467
468         /* What we do in this step */
469         curr_corr = fifo_push(&mdev->rs_plan_s, 0);
470         spin_unlock(&mdev->peer_seq_lock);
471         mdev->rs_planed -= curr_corr;
472
473         req_sect = sect_in + curr_corr;
474         if (req_sect < 0)
475                 req_sect = 0;
476
477         max_sect = (mdev->ldev->dc.c_max_rate * 2 * SLEEP_TIME) / HZ;
478         if (req_sect > max_sect)
479                 req_sect = max_sect;
480
481         /*
482         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
483                  sect_in, mdev->rs_in_flight, want, correction,
484                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
485         */
486
487         return req_sect;
488 }
489
490 static int drbd_rs_number_requests(struct drbd_conf *mdev)
491 {
492         int number;
493         if (mdev->rs_plan_s.size) { /* mdev->ldev->dc.c_plan_ahead */
494                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
495                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
496         } else {
497                 mdev->c_sync_rate = mdev->ldev->dc.resync_rate;
498                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
499         }
500
501         /* ignore the amount of pending requests, the resync controller should
502          * throttle down to incoming reply rate soon enough anyways. */
503         return number;
504 }
505
506 int w_make_resync_request(struct drbd_work *w, int cancel)
507 {
508         struct drbd_conf *mdev = w->mdev;
509         unsigned long bit;
510         sector_t sector;
511         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
512         int max_bio_size;
513         int number, rollback_i, size;
514         int align, queued, sndbuf;
515         int i = 0;
516
517         if (unlikely(cancel))
518                 return 0;
519
520         if (mdev->rs_total == 0) {
521                 /* empty resync? */
522                 drbd_resync_finished(mdev);
523                 return 0;
524         }
525
526         if (!get_ldev(mdev)) {
527                 /* Since we only need to access mdev->rsync a
528                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
529                    to continue resync with a broken disk makes no sense at
530                    all */
531                 dev_err(DEV, "Disk broke down during resync!\n");
532                 return 0;
533         }
534
535         max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
536         number = drbd_rs_number_requests(mdev);
537         if (number == 0)
538                 goto requeue;
539
540         for (i = 0; i < number; i++) {
541                 /* Stop generating RS requests, when half of the send buffer is filled */
542                 mutex_lock(&mdev->tconn->data.mutex);
543                 if (mdev->tconn->data.socket) {
544                         queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
545                         sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
546                 } else {
547                         queued = 1;
548                         sndbuf = 0;
549                 }
550                 mutex_unlock(&mdev->tconn->data.mutex);
551                 if (queued > sndbuf / 2)
552                         goto requeue;
553
554 next_sector:
555                 size = BM_BLOCK_SIZE;
556                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
557
558                 if (bit == DRBD_END_OF_BITMAP) {
559                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
560                         put_ldev(mdev);
561                         return 0;
562                 }
563
564                 sector = BM_BIT_TO_SECT(bit);
565
566                 if (drbd_rs_should_slow_down(mdev, sector) ||
567                     drbd_try_rs_begin_io(mdev, sector)) {
568                         mdev->bm_resync_fo = bit;
569                         goto requeue;
570                 }
571                 mdev->bm_resync_fo = bit + 1;
572
573                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
574                         drbd_rs_complete_io(mdev, sector);
575                         goto next_sector;
576                 }
577
578 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
579                 /* try to find some adjacent bits.
580                  * we stop if we have already the maximum req size.
581                  *
582                  * Additionally always align bigger requests, in order to
583                  * be prepared for all stripe sizes of software RAIDs.
584                  */
585                 align = 1;
586                 rollback_i = i;
587                 for (;;) {
588                         if (size + BM_BLOCK_SIZE > max_bio_size)
589                                 break;
590
591                         /* Be always aligned */
592                         if (sector & ((1<<(align+3))-1))
593                                 break;
594
595                         /* do not cross extent boundaries */
596                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
597                                 break;
598                         /* now, is it actually dirty, after all?
599                          * caution, drbd_bm_test_bit is tri-state for some
600                          * obscure reason; ( b == 0 ) would get the out-of-band
601                          * only accidentally right because of the "oddly sized"
602                          * adjustment below */
603                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
604                                 break;
605                         bit++;
606                         size += BM_BLOCK_SIZE;
607                         if ((BM_BLOCK_SIZE << align) <= size)
608                                 align++;
609                         i++;
610                 }
611                 /* if we merged some,
612                  * reset the offset to start the next drbd_bm_find_next from */
613                 if (size > BM_BLOCK_SIZE)
614                         mdev->bm_resync_fo = bit + 1;
615 #endif
616
617                 /* adjust very last sectors, in case we are oddly sized */
618                 if (sector + (size>>9) > capacity)
619                         size = (capacity-sector)<<9;
620                 if (mdev->tconn->agreed_pro_version >= 89 && mdev->tconn->csums_tfm) {
621                         switch (read_for_csum(mdev, sector, size)) {
622                         case -EIO: /* Disk failure */
623                                 put_ldev(mdev);
624                                 return -EIO;
625                         case -EAGAIN: /* allocation failed, or ldev busy */
626                                 drbd_rs_complete_io(mdev, sector);
627                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
628                                 i = rollback_i;
629                                 goto requeue;
630                         case 0:
631                                 /* everything ok */
632                                 break;
633                         default:
634                                 BUG();
635                         }
636                 } else {
637                         int err;
638
639                         inc_rs_pending(mdev);
640                         err = drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
641                                                  sector, size, ID_SYNCER);
642                         if (err) {
643                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
644                                 dec_rs_pending(mdev);
645                                 put_ldev(mdev);
646                                 return err;
647                         }
648                 }
649         }
650
651         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
652                 /* last syncer _request_ was sent,
653                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
654                  * next sync group will resume), as soon as we receive the last
655                  * resync data block, and the last bit is cleared.
656                  * until then resync "work" is "inactive" ...
657                  */
658                 put_ldev(mdev);
659                 return 0;
660         }
661
662  requeue:
663         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
664         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
665         put_ldev(mdev);
666         return 0;
667 }
668
669 static int w_make_ov_request(struct drbd_work *w, int cancel)
670 {
671         struct drbd_conf *mdev = w->mdev;
672         int number, i, size;
673         sector_t sector;
674         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
675
676         if (unlikely(cancel))
677                 return 1;
678
679         number = drbd_rs_number_requests(mdev);
680
681         sector = mdev->ov_position;
682         for (i = 0; i < number; i++) {
683                 if (sector >= capacity) {
684                         return 1;
685                 }
686
687                 size = BM_BLOCK_SIZE;
688
689                 if (drbd_rs_should_slow_down(mdev, sector) ||
690                     drbd_try_rs_begin_io(mdev, sector)) {
691                         mdev->ov_position = sector;
692                         goto requeue;
693                 }
694
695                 if (sector + (size>>9) > capacity)
696                         size = (capacity-sector)<<9;
697
698                 inc_rs_pending(mdev);
699                 if (drbd_send_ov_request(mdev, sector, size)) {
700                         dec_rs_pending(mdev);
701                         return 0;
702                 }
703                 sector += BM_SECT_PER_BIT;
704         }
705         mdev->ov_position = sector;
706
707  requeue:
708         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
709         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
710         return 1;
711 }
712
713 int w_ov_finished(struct drbd_work *w, int cancel)
714 {
715         struct drbd_conf *mdev = w->mdev;
716         kfree(w);
717         ov_oos_print(mdev);
718         drbd_resync_finished(mdev);
719
720         return 0;
721 }
722
723 static int w_resync_finished(struct drbd_work *w, int cancel)
724 {
725         struct drbd_conf *mdev = w->mdev;
726         kfree(w);
727
728         drbd_resync_finished(mdev);
729
730         return 0;
731 }
732
733 static void ping_peer(struct drbd_conf *mdev)
734 {
735         struct drbd_tconn *tconn = mdev->tconn;
736
737         clear_bit(GOT_PING_ACK, &tconn->flags);
738         request_ping(tconn);
739         wait_event(tconn->ping_wait,
740                    test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED);
741 }
742
743 int drbd_resync_finished(struct drbd_conf *mdev)
744 {
745         unsigned long db, dt, dbdt;
746         unsigned long n_oos;
747         union drbd_state os, ns;
748         struct drbd_work *w;
749         char *khelper_cmd = NULL;
750         int verify_done = 0;
751
752         /* Remove all elements from the resync LRU. Since future actions
753          * might set bits in the (main) bitmap, then the entries in the
754          * resync LRU would be wrong. */
755         if (drbd_rs_del_all(mdev)) {
756                 /* In case this is not possible now, most probably because
757                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
758                  * queue (or even the read operations for those packets
759                  * is not finished by now).   Retry in 100ms. */
760
761                 schedule_timeout_interruptible(HZ / 10);
762                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
763                 if (w) {
764                         w->cb = w_resync_finished;
765                         drbd_queue_work(&mdev->tconn->data.work, w);
766                         return 1;
767                 }
768                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
769         }
770
771         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
772         if (dt <= 0)
773                 dt = 1;
774         db = mdev->rs_total;
775         dbdt = Bit2KB(db/dt);
776         mdev->rs_paused /= HZ;
777
778         if (!get_ldev(mdev))
779                 goto out;
780
781         ping_peer(mdev);
782
783         spin_lock_irq(&mdev->tconn->req_lock);
784         os = mdev->state;
785
786         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
787
788         /* This protects us against multiple calls (that can happen in the presence
789            of application IO), and against connectivity loss just before we arrive here. */
790         if (os.conn <= C_CONNECTED)
791                 goto out_unlock;
792
793         ns = os;
794         ns.conn = C_CONNECTED;
795
796         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
797              verify_done ? "Online verify " : "Resync",
798              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
799
800         n_oos = drbd_bm_total_weight(mdev);
801
802         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
803                 if (n_oos) {
804                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
805                               n_oos, Bit2KB(1));
806                         khelper_cmd = "out-of-sync";
807                 }
808         } else {
809                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
810
811                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
812                         khelper_cmd = "after-resync-target";
813
814                 if (mdev->tconn->csums_tfm && mdev->rs_total) {
815                         const unsigned long s = mdev->rs_same_csum;
816                         const unsigned long t = mdev->rs_total;
817                         const int ratio =
818                                 (t == 0)     ? 0 :
819                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
820                         dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
821                              "transferred %luK total %luK\n",
822                              ratio,
823                              Bit2KB(mdev->rs_same_csum),
824                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
825                              Bit2KB(mdev->rs_total));
826                 }
827         }
828
829         if (mdev->rs_failed) {
830                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
831
832                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
833                         ns.disk = D_INCONSISTENT;
834                         ns.pdsk = D_UP_TO_DATE;
835                 } else {
836                         ns.disk = D_UP_TO_DATE;
837                         ns.pdsk = D_INCONSISTENT;
838                 }
839         } else {
840                 ns.disk = D_UP_TO_DATE;
841                 ns.pdsk = D_UP_TO_DATE;
842
843                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
844                         if (mdev->p_uuid) {
845                                 int i;
846                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
847                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
848                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
849                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
850                         } else {
851                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
852                         }
853                 }
854
855                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
856                         /* for verify runs, we don't update uuids here,
857                          * so there would be nothing to report. */
858                         drbd_uuid_set_bm(mdev, 0UL);
859                         drbd_print_uuids(mdev, "updated UUIDs");
860                         if (mdev->p_uuid) {
861                                 /* Now the two UUID sets are equal, update what we
862                                  * know of the peer. */
863                                 int i;
864                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
865                                         mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
866                         }
867                 }
868         }
869
870         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
871 out_unlock:
872         spin_unlock_irq(&mdev->tconn->req_lock);
873         put_ldev(mdev);
874 out:
875         mdev->rs_total  = 0;
876         mdev->rs_failed = 0;
877         mdev->rs_paused = 0;
878         if (verify_done)
879                 mdev->ov_start_sector = 0;
880
881         drbd_md_sync(mdev);
882
883         if (khelper_cmd)
884                 drbd_khelper(mdev, khelper_cmd);
885
886         return 1;
887 }
888
889 /* helper */
890 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
891 {
892         if (drbd_ee_has_active_page(peer_req)) {
893                 /* This might happen if sendpage() has not finished */
894                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
895                 atomic_add(i, &mdev->pp_in_use_by_net);
896                 atomic_sub(i, &mdev->pp_in_use);
897                 spin_lock_irq(&mdev->tconn->req_lock);
898                 list_add_tail(&peer_req->w.list, &mdev->net_ee);
899                 spin_unlock_irq(&mdev->tconn->req_lock);
900                 wake_up(&drbd_pp_wait);
901         } else
902                 drbd_free_ee(mdev, peer_req);
903 }
904
905 /**
906  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
907  * @mdev:       DRBD device.
908  * @w:          work object.
909  * @cancel:     The connection will be closed anyways
910  */
911 int w_e_end_data_req(struct drbd_work *w, int cancel)
912 {
913         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
914         struct drbd_conf *mdev = w->mdev;
915         int err;
916
917         if (unlikely(cancel)) {
918                 drbd_free_ee(mdev, peer_req);
919                 dec_unacked(mdev);
920                 return 0;
921         }
922
923         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
924                 err = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
925         } else {
926                 if (__ratelimit(&drbd_ratelimit_state))
927                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
928                             (unsigned long long)peer_req->i.sector);
929
930                 err = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
931         }
932
933         dec_unacked(mdev);
934
935         move_to_net_ee_or_free(mdev, peer_req);
936
937         if (unlikely(err))
938                 dev_err(DEV, "drbd_send_block() failed\n");
939         return err;
940 }
941
942 /**
943  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
944  * @mdev:       DRBD device.
945  * @w:          work object.
946  * @cancel:     The connection will be closed anyways
947  */
948 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
949 {
950         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
951         struct drbd_conf *mdev = w->mdev;
952         int err;
953
954         if (unlikely(cancel)) {
955                 drbd_free_ee(mdev, peer_req);
956                 dec_unacked(mdev);
957                 return 0;
958         }
959
960         if (get_ldev_if_state(mdev, D_FAILED)) {
961                 drbd_rs_complete_io(mdev, peer_req->i.sector);
962                 put_ldev(mdev);
963         }
964
965         if (mdev->state.conn == C_AHEAD) {
966                 err = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
967         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
968                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
969                         inc_rs_pending(mdev);
970                         err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
971                 } else {
972                         if (__ratelimit(&drbd_ratelimit_state))
973                                 dev_err(DEV, "Not sending RSDataReply, "
974                                     "partner DISKLESS!\n");
975                         err = 0;
976                 }
977         } else {
978                 if (__ratelimit(&drbd_ratelimit_state))
979                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
980                             (unsigned long long)peer_req->i.sector);
981
982                 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
983
984                 /* update resync data with failure */
985                 drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
986         }
987
988         dec_unacked(mdev);
989
990         move_to_net_ee_or_free(mdev, peer_req);
991
992         if (unlikely(err))
993                 dev_err(DEV, "drbd_send_block() failed\n");
994         return err;
995 }
996
997 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
998 {
999         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1000         struct drbd_conf *mdev = w->mdev;
1001         struct digest_info *di;
1002         int digest_size;
1003         void *digest = NULL;
1004         int err, eq = 0;
1005
1006         if (unlikely(cancel)) {
1007                 drbd_free_ee(mdev, peer_req);
1008                 dec_unacked(mdev);
1009                 return 0;
1010         }
1011
1012         if (get_ldev(mdev)) {
1013                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1014                 put_ldev(mdev);
1015         }
1016
1017         di = peer_req->digest;
1018
1019         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1020                 /* quick hack to try to avoid a race against reconfiguration.
1021                  * a real fix would be much more involved,
1022                  * introducing more locking mechanisms */
1023                 if (mdev->tconn->csums_tfm) {
1024                         digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
1025                         D_ASSERT(digest_size == di->digest_size);
1026                         digest = kmalloc(digest_size, GFP_NOIO);
1027                 }
1028                 if (digest) {
1029                         drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
1030                         eq = !memcmp(digest, di->digest, digest_size);
1031                         kfree(digest);
1032                 }
1033
1034                 if (eq) {
1035                         drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1036                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1037                         mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1038                         err = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1039                 } else {
1040                         inc_rs_pending(mdev);
1041                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1042                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1043                         kfree(di);
1044                         err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1045                 }
1046         } else {
1047                 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1048                 if (__ratelimit(&drbd_ratelimit_state))
1049                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1050         }
1051
1052         dec_unacked(mdev);
1053         move_to_net_ee_or_free(mdev, peer_req);
1054
1055         if (unlikely(err))
1056                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1057         return err;
1058 }
1059
1060 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1061 {
1062         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1063         struct drbd_conf *mdev = w->mdev;
1064         sector_t sector = peer_req->i.sector;
1065         unsigned int size = peer_req->i.size;
1066         int digest_size;
1067         void *digest;
1068         int err = 0;
1069
1070         if (unlikely(cancel))
1071                 goto out;
1072
1073         digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1074         digest = kmalloc(digest_size, GFP_NOIO);
1075         if (!digest) {
1076                 err = 1;        /* terminate the connection in case the allocation failed */
1077                 goto out;
1078         }
1079
1080         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1081                 drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1082         else
1083                 memset(digest, 0, digest_size);
1084
1085         /* Free e and pages before send.
1086          * In case we block on congestion, we could otherwise run into
1087          * some distributed deadlock, if the other side blocks on
1088          * congestion as well, because our receiver blocks in
1089          * drbd_pp_alloc due to pp_in_use > max_buffers. */
1090         drbd_free_ee(mdev, peer_req);
1091         peer_req = NULL;
1092         inc_rs_pending(mdev);
1093         err = drbd_send_drequest_csum(mdev, sector, size, digest, digest_size, P_OV_REPLY);
1094         if (err)
1095                 dec_rs_pending(mdev);
1096         kfree(digest);
1097
1098 out:
1099         if (peer_req)
1100                 drbd_free_ee(mdev, peer_req);
1101         dec_unacked(mdev);
1102         return err;
1103 }
1104
1105 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1106 {
1107         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1108                 mdev->ov_last_oos_size += size>>9;
1109         } else {
1110                 mdev->ov_last_oos_start = sector;
1111                 mdev->ov_last_oos_size = size>>9;
1112         }
1113         drbd_set_out_of_sync(mdev, sector, size);
1114 }
1115
1116 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1117 {
1118         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1119         struct drbd_conf *mdev = w->mdev;
1120         struct digest_info *di;
1121         void *digest;
1122         sector_t sector = peer_req->i.sector;
1123         unsigned int size = peer_req->i.size;
1124         int digest_size;
1125         int err, eq = 0;
1126
1127         if (unlikely(cancel)) {
1128                 drbd_free_ee(mdev, peer_req);
1129                 dec_unacked(mdev);
1130                 return 0;
1131         }
1132
1133         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1134          * the resync lru has been cleaned up already */
1135         if (get_ldev(mdev)) {
1136                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1137                 put_ldev(mdev);
1138         }
1139
1140         di = peer_req->digest;
1141
1142         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1143                 digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1144                 digest = kmalloc(digest_size, GFP_NOIO);
1145                 if (digest) {
1146                         drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1147
1148                         D_ASSERT(digest_size == di->digest_size);
1149                         eq = !memcmp(digest, di->digest, digest_size);
1150                         kfree(digest);
1151                 }
1152         }
1153
1154         /* Free peer_req and pages before send.
1155          * In case we block on congestion, we could otherwise run into
1156          * some distributed deadlock, if the other side blocks on
1157          * congestion as well, because our receiver blocks in
1158          * drbd_pp_alloc due to pp_in_use > max_buffers. */
1159         drbd_free_ee(mdev, peer_req);
1160         if (!eq)
1161                 drbd_ov_oos_found(mdev, sector, size);
1162         else
1163                 ov_oos_print(mdev);
1164
1165         err = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1166                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1167
1168         dec_unacked(mdev);
1169
1170         --mdev->ov_left;
1171
1172         /* let's advance progress step marks only for every other megabyte */
1173         if ((mdev->ov_left & 0x200) == 0x200)
1174                 drbd_advance_rs_marks(mdev, mdev->ov_left);
1175
1176         if (mdev->ov_left == 0) {
1177                 ov_oos_print(mdev);
1178                 drbd_resync_finished(mdev);
1179         }
1180
1181         return err;
1182 }
1183
1184 int w_prev_work_done(struct drbd_work *w, int cancel)
1185 {
1186         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1187
1188         complete(&b->done);
1189         return 0;
1190 }
1191
1192 int w_send_barrier(struct drbd_work *w, int cancel)
1193 {
1194         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1195         struct drbd_conf *mdev = w->mdev;
1196         struct p_barrier *p = &mdev->tconn->data.sbuf.barrier;
1197         int err = 0;
1198
1199         /* really avoid racing with tl_clear.  w.cb may have been referenced
1200          * just before it was reassigned and re-queued, so double check that.
1201          * actually, this race was harmless, since we only try to send the
1202          * barrier packet here, and otherwise do nothing with the object.
1203          * but compare with the head of w_clear_epoch */
1204         spin_lock_irq(&mdev->tconn->req_lock);
1205         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1206                 cancel = 1;
1207         spin_unlock_irq(&mdev->tconn->req_lock);
1208         if (cancel)
1209                 return 0;
1210
1211         err = drbd_get_data_sock(mdev->tconn);
1212         if (err)
1213                 return err;
1214         p->barrier = b->br_number;
1215         /* inc_ap_pending was done where this was queued.
1216          * dec_ap_pending will be done in got_BarrierAck
1217          * or (on connection loss) in w_clear_epoch.  */
1218         err = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_BARRIER,
1219                              &p->head, sizeof(*p), 0);
1220         drbd_put_data_sock(mdev->tconn);
1221
1222         return err;
1223 }
1224
1225 int w_send_write_hint(struct drbd_work *w, int cancel)
1226 {
1227         struct drbd_conf *mdev = w->mdev;
1228         if (cancel)
1229                 return 0;
1230         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1231 }
1232
1233 int w_send_oos(struct drbd_work *w, int cancel)
1234 {
1235         struct drbd_request *req = container_of(w, struct drbd_request, w);
1236         struct drbd_conf *mdev = w->mdev;
1237         int err;
1238
1239         if (unlikely(cancel)) {
1240                 req_mod(req, SEND_CANCELED);
1241                 return 0;
1242         }
1243
1244         err = drbd_send_oos(mdev, req);
1245         req_mod(req, OOS_HANDED_TO_NETWORK);
1246
1247         return err;
1248 }
1249
1250 /**
1251  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1252  * @mdev:       DRBD device.
1253  * @w:          work object.
1254  * @cancel:     The connection will be closed anyways
1255  */
1256 int w_send_dblock(struct drbd_work *w, int cancel)
1257 {
1258         struct drbd_request *req = container_of(w, struct drbd_request, w);
1259         struct drbd_conf *mdev = w->mdev;
1260         int err;
1261
1262         if (unlikely(cancel)) {
1263                 req_mod(req, SEND_CANCELED);
1264                 return 0;
1265         }
1266
1267         err = drbd_send_dblock(mdev, req);
1268         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1269
1270         return err;
1271 }
1272
1273 /**
1274  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1275  * @mdev:       DRBD device.
1276  * @w:          work object.
1277  * @cancel:     The connection will be closed anyways
1278  */
1279 int w_send_read_req(struct drbd_work *w, int cancel)
1280 {
1281         struct drbd_request *req = container_of(w, struct drbd_request, w);
1282         struct drbd_conf *mdev = w->mdev;
1283         int err;
1284
1285         if (unlikely(cancel)) {
1286                 req_mod(req, SEND_CANCELED);
1287                 return 0;
1288         }
1289
1290         err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1291                                  (unsigned long)req);
1292
1293         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1294
1295         return err;
1296 }
1297
1298 int w_restart_disk_io(struct drbd_work *w, int cancel)
1299 {
1300         struct drbd_request *req = container_of(w, struct drbd_request, w);
1301         struct drbd_conf *mdev = w->mdev;
1302
1303         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1304                 drbd_al_begin_io(mdev, req->i.sector);
1305         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1306            theoretically. Practically it can not deadlock, since this is
1307            only used when unfreezing IOs. All the extents of the requests
1308            that made it into the TL are already active */
1309
1310         drbd_req_make_private_bio(req, req->master_bio);
1311         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1312         generic_make_request(req->private_bio);
1313
1314         return 0;
1315 }
1316
1317 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1318 {
1319         struct drbd_conf *odev = mdev;
1320
1321         while (1) {
1322                 if (odev->ldev->dc.resync_after == -1)
1323                         return 1;
1324                 odev = minor_to_mdev(odev->ldev->dc.resync_after);
1325                 if (!expect(odev))
1326                         return 1;
1327                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1328                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1329                     odev->state.aftr_isp || odev->state.peer_isp ||
1330                     odev->state.user_isp)
1331                         return 0;
1332         }
1333 }
1334
1335 /**
1336  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1337  * @mdev:       DRBD device.
1338  *
1339  * Called from process context only (admin command and after_state_ch).
1340  */
1341 static int _drbd_pause_after(struct drbd_conf *mdev)
1342 {
1343         struct drbd_conf *odev;
1344         int i, rv = 0;
1345
1346         idr_for_each_entry(&minors, odev, i) {
1347                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1348                         continue;
1349                 if (!_drbd_may_sync_now(odev))
1350                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1351                                != SS_NOTHING_TO_DO);
1352         }
1353
1354         return rv;
1355 }
1356
1357 /**
1358  * _drbd_resume_next() - Resume resync on all devices that may resync now
1359  * @mdev:       DRBD device.
1360  *
1361  * Called from process context only (admin command and worker).
1362  */
1363 static int _drbd_resume_next(struct drbd_conf *mdev)
1364 {
1365         struct drbd_conf *odev;
1366         int i, rv = 0;
1367
1368         idr_for_each_entry(&minors, odev, i) {
1369                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1370                         continue;
1371                 if (odev->state.aftr_isp) {
1372                         if (_drbd_may_sync_now(odev))
1373                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1374                                                         CS_HARD, NULL)
1375                                        != SS_NOTHING_TO_DO) ;
1376                 }
1377         }
1378         return rv;
1379 }
1380
1381 void resume_next_sg(struct drbd_conf *mdev)
1382 {
1383         write_lock_irq(&global_state_lock);
1384         _drbd_resume_next(mdev);
1385         write_unlock_irq(&global_state_lock);
1386 }
1387
1388 void suspend_other_sg(struct drbd_conf *mdev)
1389 {
1390         write_lock_irq(&global_state_lock);
1391         _drbd_pause_after(mdev);
1392         write_unlock_irq(&global_state_lock);
1393 }
1394
1395 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1396 {
1397         struct drbd_conf *odev;
1398
1399         if (o_minor == -1)
1400                 return NO_ERROR;
1401         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1402                 return ERR_SYNC_AFTER;
1403
1404         /* check for loops */
1405         odev = minor_to_mdev(o_minor);
1406         while (1) {
1407                 if (odev == mdev)
1408                         return ERR_SYNC_AFTER_CYCLE;
1409
1410                 /* dependency chain ends here, no cycles. */
1411                 if (odev->ldev->dc.resync_after == -1)
1412                         return NO_ERROR;
1413
1414                 /* follow the dependency chain */
1415                 odev = minor_to_mdev(odev->ldev->dc.resync_after);
1416         }
1417 }
1418
1419 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1420 {
1421         int changes;
1422         int retcode;
1423
1424         write_lock_irq(&global_state_lock);
1425         retcode = sync_after_error(mdev, na);
1426         if (retcode == NO_ERROR) {
1427                 mdev->ldev->dc.resync_after = na;
1428                 do {
1429                         changes  = _drbd_pause_after(mdev);
1430                         changes |= _drbd_resume_next(mdev);
1431                 } while (changes);
1432         }
1433         write_unlock_irq(&global_state_lock);
1434         return retcode;
1435 }
1436
1437 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1438 {
1439         atomic_set(&mdev->rs_sect_in, 0);
1440         atomic_set(&mdev->rs_sect_ev, 0);
1441         mdev->rs_in_flight = 0;
1442         mdev->rs_planed = 0;
1443         spin_lock(&mdev->peer_seq_lock);
1444         fifo_set(&mdev->rs_plan_s, 0);
1445         spin_unlock(&mdev->peer_seq_lock);
1446 }
1447
1448 void start_resync_timer_fn(unsigned long data)
1449 {
1450         struct drbd_conf *mdev = (struct drbd_conf *) data;
1451
1452         drbd_queue_work(&mdev->tconn->data.work, &mdev->start_resync_work);
1453 }
1454
1455 int w_start_resync(struct drbd_work *w, int cancel)
1456 {
1457         struct drbd_conf *mdev = w->mdev;
1458
1459         if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1460                 dev_warn(DEV, "w_start_resync later...\n");
1461                 mdev->start_resync_timer.expires = jiffies + HZ/10;
1462                 add_timer(&mdev->start_resync_timer);
1463                 return 0;
1464         }
1465
1466         drbd_start_resync(mdev, C_SYNC_SOURCE);
1467         clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
1468         return 0;
1469 }
1470
1471 /**
1472  * drbd_start_resync() - Start the resync process
1473  * @mdev:       DRBD device.
1474  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1475  *
1476  * This function might bring you directly into one of the
1477  * C_PAUSED_SYNC_* states.
1478  */
1479 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1480 {
1481         union drbd_state ns;
1482         int r;
1483
1484         if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1485                 dev_err(DEV, "Resync already running!\n");
1486                 return;
1487         }
1488
1489         if (mdev->state.conn < C_AHEAD) {
1490                 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1491                 drbd_rs_cancel_all(mdev);
1492                 /* This should be done when we abort the resync. We definitely do not
1493                    want to have this for connections going back and forth between
1494                    Ahead/Behind and SyncSource/SyncTarget */
1495         }
1496
1497         if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1498                 if (side == C_SYNC_TARGET) {
1499                         /* Since application IO was locked out during C_WF_BITMAP_T and
1500                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1501                            we check that we might make the data inconsistent. */
1502                         r = drbd_khelper(mdev, "before-resync-target");
1503                         r = (r >> 8) & 0xff;
1504                         if (r > 0) {
1505                                 dev_info(DEV, "before-resync-target handler returned %d, "
1506                                          "dropping connection.\n", r);
1507                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1508                                 return;
1509                         }
1510                 } else /* C_SYNC_SOURCE */ {
1511                         r = drbd_khelper(mdev, "before-resync-source");
1512                         r = (r >> 8) & 0xff;
1513                         if (r > 0) {
1514                                 if (r == 3) {
1515                                         dev_info(DEV, "before-resync-source handler returned %d, "
1516                                                  "ignoring. Old userland tools?", r);
1517                                 } else {
1518                                         dev_info(DEV, "before-resync-source handler returned %d, "
1519                                                  "dropping connection.\n", r);
1520                                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1521                                         return;
1522                                 }
1523                         }
1524                 }
1525         }
1526
1527         if (current == mdev->tconn->worker.task) {
1528                 /* The worker should not sleep waiting for state_mutex,
1529                    that can take long */
1530                 if (!mutex_trylock(mdev->state_mutex)) {
1531                         set_bit(B_RS_H_DONE, &mdev->flags);
1532                         mdev->start_resync_timer.expires = jiffies + HZ/5;
1533                         add_timer(&mdev->start_resync_timer);
1534                         return;
1535                 }
1536         } else {
1537                 mutex_lock(mdev->state_mutex);
1538         }
1539         clear_bit(B_RS_H_DONE, &mdev->flags);
1540
1541         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1542                 mutex_unlock(mdev->state_mutex);
1543                 return;
1544         }
1545
1546         write_lock_irq(&global_state_lock);
1547         ns = mdev->state;
1548
1549         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1550
1551         ns.conn = side;
1552
1553         if (side == C_SYNC_TARGET)
1554                 ns.disk = D_INCONSISTENT;
1555         else /* side == C_SYNC_SOURCE */
1556                 ns.pdsk = D_INCONSISTENT;
1557
1558         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1559         ns = mdev->state;
1560
1561         if (ns.conn < C_CONNECTED)
1562                 r = SS_UNKNOWN_ERROR;
1563
1564         if (r == SS_SUCCESS) {
1565                 unsigned long tw = drbd_bm_total_weight(mdev);
1566                 unsigned long now = jiffies;
1567                 int i;
1568
1569                 mdev->rs_failed    = 0;
1570                 mdev->rs_paused    = 0;
1571                 mdev->rs_same_csum = 0;
1572                 mdev->rs_last_events = 0;
1573                 mdev->rs_last_sect_ev = 0;
1574                 mdev->rs_total     = tw;
1575                 mdev->rs_start     = now;
1576                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1577                         mdev->rs_mark_left[i] = tw;
1578                         mdev->rs_mark_time[i] = now;
1579                 }
1580                 _drbd_pause_after(mdev);
1581         }
1582         write_unlock_irq(&global_state_lock);
1583
1584         if (r == SS_SUCCESS) {
1585                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1586                      drbd_conn_str(ns.conn),
1587                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1588                      (unsigned long) mdev->rs_total);
1589                 if (side == C_SYNC_TARGET)
1590                         mdev->bm_resync_fo = 0;
1591
1592                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1593                  * with w_send_oos, or the sync target will get confused as to
1594                  * how much bits to resync.  We cannot do that always, because for an
1595                  * empty resync and protocol < 95, we need to do it here, as we call
1596                  * drbd_resync_finished from here in that case.
1597                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1598                  * and from after_state_ch otherwise. */
1599                 if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1600                         drbd_gen_and_send_sync_uuid(mdev);
1601
1602                 if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1603                         /* This still has a race (about when exactly the peers
1604                          * detect connection loss) that can lead to a full sync
1605                          * on next handshake. In 8.3.9 we fixed this with explicit
1606                          * resync-finished notifications, but the fix
1607                          * introduces a protocol change.  Sleeping for some
1608                          * time longer than the ping interval + timeout on the
1609                          * SyncSource, to give the SyncTarget the chance to
1610                          * detect connection loss, then waiting for a ping
1611                          * response (implicit in drbd_resync_finished) reduces
1612                          * the race considerably, but does not solve it. */
1613                         if (side == C_SYNC_SOURCE)
1614                                 schedule_timeout_interruptible(
1615                                         mdev->tconn->net_conf->ping_int * HZ +
1616                                         mdev->tconn->net_conf->ping_timeo*HZ/9);
1617                         drbd_resync_finished(mdev);
1618                 }
1619
1620                 drbd_rs_controller_reset(mdev);
1621                 /* ns.conn may already be != mdev->state.conn,
1622                  * we may have been paused in between, or become paused until
1623                  * the timer triggers.
1624                  * No matter, that is handled in resync_timer_fn() */
1625                 if (ns.conn == C_SYNC_TARGET)
1626                         mod_timer(&mdev->resync_timer, jiffies);
1627
1628                 drbd_md_sync(mdev);
1629         }
1630         put_ldev(mdev);
1631         mutex_unlock(mdev->state_mutex);
1632 }
1633
1634 int drbd_worker(struct drbd_thread *thi)
1635 {
1636         struct drbd_tconn *tconn = thi->tconn;
1637         struct drbd_work *w = NULL;
1638         struct drbd_conf *mdev;
1639         LIST_HEAD(work_list);
1640         int vnr, intr = 0;
1641
1642         while (get_t_state(thi) == RUNNING) {
1643                 drbd_thread_current_set_cpu(thi);
1644
1645                 if (down_trylock(&tconn->data.work.s)) {
1646                         mutex_lock(&tconn->data.mutex);
1647                         if (tconn->data.socket && !tconn->net_conf->no_cork)
1648                                 drbd_tcp_uncork(tconn->data.socket);
1649                         mutex_unlock(&tconn->data.mutex);
1650
1651                         intr = down_interruptible(&tconn->data.work.s);
1652
1653                         mutex_lock(&tconn->data.mutex);
1654                         if (tconn->data.socket  && !tconn->net_conf->no_cork)
1655                                 drbd_tcp_cork(tconn->data.socket);
1656                         mutex_unlock(&tconn->data.mutex);
1657                 }
1658
1659                 if (intr) {
1660                         flush_signals(current);
1661                         if (get_t_state(thi) == RUNNING) {
1662                                 conn_warn(tconn, "Worker got an unexpected signal\n");
1663                                 continue;
1664                         }
1665                         break;
1666                 }
1667
1668                 if (get_t_state(thi) != RUNNING)
1669                         break;
1670                 /* With this break, we have done a down() but not consumed
1671                    the entry from the list. The cleanup code takes care of
1672                    this...   */
1673
1674                 w = NULL;
1675                 spin_lock_irq(&tconn->data.work.q_lock);
1676                 if (list_empty(&tconn->data.work.q)) {
1677                         /* something terribly wrong in our logic.
1678                          * we were able to down() the semaphore,
1679                          * but the list is empty... doh.
1680                          *
1681                          * what is the best thing to do now?
1682                          * try again from scratch, restarting the receiver,
1683                          * asender, whatnot? could break even more ugly,
1684                          * e.g. when we are primary, but no good local data.
1685                          *
1686                          * I'll try to get away just starting over this loop.
1687                          */
1688                         conn_warn(tconn, "Work list unexpectedly empty\n");
1689                         spin_unlock_irq(&tconn->data.work.q_lock);
1690                         continue;
1691                 }
1692                 w = list_entry(tconn->data.work.q.next, struct drbd_work, list);
1693                 list_del_init(&w->list);
1694                 spin_unlock_irq(&tconn->data.work.q_lock);
1695
1696                 if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) {
1697                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1698                         if (tconn->cstate >= C_WF_REPORT_PARAMS)
1699                                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1700                 }
1701         }
1702
1703         spin_lock_irq(&tconn->data.work.q_lock);
1704         while (!list_empty(&tconn->data.work.q)) {
1705                 list_splice_init(&tconn->data.work.q, &work_list);
1706                 spin_unlock_irq(&tconn->data.work.q_lock);
1707
1708                 while (!list_empty(&work_list)) {
1709                         w = list_entry(work_list.next, struct drbd_work, list);
1710                         list_del_init(&w->list);
1711                         w->cb(w, 1);
1712                 }
1713
1714                 spin_lock_irq(&tconn->data.work.q_lock);
1715         }
1716         sema_init(&tconn->data.work.s, 0);
1717         /* DANGEROUS race: if someone did queue his work within the spinlock,
1718          * but up() ed outside the spinlock, we could get an up() on the
1719          * semaphore without corresponding list entry.
1720          * So don't do that.
1721          */
1722         spin_unlock_irq(&tconn->data.work.q_lock);
1723
1724         drbd_thread_stop(&tconn->receiver);
1725         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1726                 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1727                 /* _drbd_set_state only uses stop_nowait.
1728                  * wait here for the exiting receiver. */
1729                 drbd_mdev_cleanup(mdev);
1730         }
1731         clear_bit(OBJECT_DYING, &tconn->flags);
1732         clear_bit(CONFIG_PENDING, &tconn->flags);
1733         wake_up(&tconn->ping_wait);
1734
1735         return 0;
1736 }