gfs2: Initialize atime of I_NEW inodes
[cascardo/linux.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
41
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
44
45 /* endio handlers:
46  *   drbd_md_endio (defined here)
47  *   drbd_request_endio (defined here)
48  *   drbd_peer_request_endio (defined here)
49  *   drbd_bm_endio (defined in drbd_bitmap.c)
50  *
51  * For all these callbacks, note the following:
52  * The callbacks will be called in irq context by the IDE drivers,
53  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54  * Try to get the locking right :)
55  *
56  */
57
58 /* used for synchronous meta data and bitmap IO
59  * submitted by drbd_md_sync_page_io()
60  */
61 void drbd_md_endio(struct bio *bio)
62 {
63         struct drbd_device *device;
64
65         device = bio->bi_private;
66         device->md_io.error = bio->bi_error;
67
68         /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
69          * to timeout on the lower level device, and eventually detach from it.
70          * If this io completion runs after that timeout expired, this
71          * drbd_md_put_buffer() may allow us to finally try and re-attach.
72          * During normal operation, this only puts that extra reference
73          * down to 1 again.
74          * Make sure we first drop the reference, and only then signal
75          * completion, or we may (in drbd_al_read_log()) cycle so fast into the
76          * next drbd_md_sync_page_io(), that we trigger the
77          * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
78          */
79         drbd_md_put_buffer(device);
80         device->md_io.done = 1;
81         wake_up(&device->misc_wait);
82         bio_put(bio);
83         if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
84                 put_ldev(device);
85 }
86
87 /* reads on behalf of the partner,
88  * "submitted" by the receiver
89  */
90 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
91 {
92         unsigned long flags = 0;
93         struct drbd_peer_device *peer_device = peer_req->peer_device;
94         struct drbd_device *device = peer_device->device;
95
96         spin_lock_irqsave(&device->resource->req_lock, flags);
97         device->read_cnt += peer_req->i.size >> 9;
98         list_del(&peer_req->w.list);
99         if (list_empty(&device->read_ee))
100                 wake_up(&device->ee_wait);
101         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
102                 __drbd_chk_io_error(device, DRBD_READ_ERROR);
103         spin_unlock_irqrestore(&device->resource->req_lock, flags);
104
105         drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
106         put_ldev(device);
107 }
108
109 /* writes on behalf of the partner, or resync writes,
110  * "submitted" by the receiver, final stage.  */
111 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
112 {
113         unsigned long flags = 0;
114         struct drbd_peer_device *peer_device = peer_req->peer_device;
115         struct drbd_device *device = peer_device->device;
116         struct drbd_connection *connection = peer_device->connection;
117         struct drbd_interval i;
118         int do_wake;
119         u64 block_id;
120         int do_al_complete_io;
121
122         /* after we moved peer_req to done_ee,
123          * we may no longer access it,
124          * it may be freed/reused already!
125          * (as soon as we release the req_lock) */
126         i = peer_req->i;
127         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
128         block_id = peer_req->block_id;
129         peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
130
131         spin_lock_irqsave(&device->resource->req_lock, flags);
132         device->writ_cnt += peer_req->i.size >> 9;
133         list_move_tail(&peer_req->w.list, &device->done_ee);
134
135         /*
136          * Do not remove from the write_requests tree here: we did not send the
137          * Ack yet and did not wake possibly waiting conflicting requests.
138          * Removed from the tree from "drbd_process_done_ee" within the
139          * appropriate dw.cb (e_end_block/e_end_resync_block) or from
140          * _drbd_clear_done_ee.
141          */
142
143         do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
144
145         /* FIXME do we want to detach for failed REQ_DISCARD?
146          * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
147         if (peer_req->flags & EE_WAS_ERROR)
148                 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
149
150         if (connection->cstate >= C_WF_REPORT_PARAMS) {
151                 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
152                 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
153                         kref_put(&device->kref, drbd_destroy_device);
154         }
155         spin_unlock_irqrestore(&device->resource->req_lock, flags);
156
157         if (block_id == ID_SYNCER)
158                 drbd_rs_complete_io(device, i.sector);
159
160         if (do_wake)
161                 wake_up(&device->ee_wait);
162
163         if (do_al_complete_io)
164                 drbd_al_complete_io(device, &i);
165
166         put_ldev(device);
167 }
168
169 /* writes on behalf of the partner, or resync writes,
170  * "submitted" by the receiver.
171  */
172 void drbd_peer_request_endio(struct bio *bio)
173 {
174         struct drbd_peer_request *peer_req = bio->bi_private;
175         struct drbd_device *device = peer_req->peer_device->device;
176         bool is_write = bio_data_dir(bio) == WRITE;
177         bool is_discard = !!(bio_op(bio) == REQ_OP_DISCARD);
178
179         if (bio->bi_error && __ratelimit(&drbd_ratelimit_state))
180                 drbd_warn(device, "%s: error=%d s=%llus\n",
181                                 is_write ? (is_discard ? "discard" : "write")
182                                         : "read", bio->bi_error,
183                                 (unsigned long long)peer_req->i.sector);
184
185         if (bio->bi_error)
186                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
187
188         bio_put(bio); /* no need for the bio anymore */
189         if (atomic_dec_and_test(&peer_req->pending_bios)) {
190                 if (is_write)
191                         drbd_endio_write_sec_final(peer_req);
192                 else
193                         drbd_endio_read_sec_final(peer_req);
194         }
195 }
196
197 void drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
198 {
199         panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
200                 device->minor, device->resource->name, device->vnr);
201 }
202
203 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
204  */
205 void drbd_request_endio(struct bio *bio)
206 {
207         unsigned long flags;
208         struct drbd_request *req = bio->bi_private;
209         struct drbd_device *device = req->device;
210         struct bio_and_error m;
211         enum drbd_req_event what;
212
213         /* If this request was aborted locally before,
214          * but now was completed "successfully",
215          * chances are that this caused arbitrary data corruption.
216          *
217          * "aborting" requests, or force-detaching the disk, is intended for
218          * completely blocked/hung local backing devices which do no longer
219          * complete requests at all, not even do error completions.  In this
220          * situation, usually a hard-reset and failover is the only way out.
221          *
222          * By "aborting", basically faking a local error-completion,
223          * we allow for a more graceful swichover by cleanly migrating services.
224          * Still the affected node has to be rebooted "soon".
225          *
226          * By completing these requests, we allow the upper layers to re-use
227          * the associated data pages.
228          *
229          * If later the local backing device "recovers", and now DMAs some data
230          * from disk into the original request pages, in the best case it will
231          * just put random data into unused pages; but typically it will corrupt
232          * meanwhile completely unrelated data, causing all sorts of damage.
233          *
234          * Which means delayed successful completion,
235          * especially for READ requests,
236          * is a reason to panic().
237          *
238          * We assume that a delayed *error* completion is OK,
239          * though we still will complain noisily about it.
240          */
241         if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
242                 if (__ratelimit(&drbd_ratelimit_state))
243                         drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
244
245                 if (!bio->bi_error)
246                         drbd_panic_after_delayed_completion_of_aborted_request(device);
247         }
248
249         /* to avoid recursion in __req_mod */
250         if (unlikely(bio->bi_error)) {
251                 switch (bio_op(bio)) {
252                 case REQ_OP_DISCARD:
253                         if (bio->bi_error == -EOPNOTSUPP)
254                                 what = DISCARD_COMPLETED_NOTSUPP;
255                         else
256                                 what = DISCARD_COMPLETED_WITH_ERROR;
257                         break;
258                 case REQ_OP_READ:
259                         if (bio->bi_rw & REQ_RAHEAD)
260                                 what = READ_AHEAD_COMPLETED_WITH_ERROR;
261                         else
262                                 what = READ_COMPLETED_WITH_ERROR;
263                         break;
264                 default:
265                         what = WRITE_COMPLETED_WITH_ERROR;
266                         break;
267                 }
268         } else {
269                 what = COMPLETED_OK;
270         }
271
272         bio_put(req->private_bio);
273         req->private_bio = ERR_PTR(bio->bi_error);
274
275         /* not req_mod(), we need irqsave here! */
276         spin_lock_irqsave(&device->resource->req_lock, flags);
277         __req_mod(req, what, &m);
278         spin_unlock_irqrestore(&device->resource->req_lock, flags);
279         put_ldev(device);
280
281         if (m.bio)
282                 complete_master_bio(device, &m);
283 }
284
285 void drbd_csum_ee(struct crypto_ahash *tfm, struct drbd_peer_request *peer_req, void *digest)
286 {
287         AHASH_REQUEST_ON_STACK(req, tfm);
288         struct scatterlist sg;
289         struct page *page = peer_req->pages;
290         struct page *tmp;
291         unsigned len;
292
293         ahash_request_set_tfm(req, tfm);
294         ahash_request_set_callback(req, 0, NULL, NULL);
295
296         sg_init_table(&sg, 1);
297         crypto_ahash_init(req);
298
299         while ((tmp = page_chain_next(page))) {
300                 /* all but the last page will be fully used */
301                 sg_set_page(&sg, page, PAGE_SIZE, 0);
302                 ahash_request_set_crypt(req, &sg, NULL, sg.length);
303                 crypto_ahash_update(req);
304                 page = tmp;
305         }
306         /* and now the last, possibly only partially used page */
307         len = peer_req->i.size & (PAGE_SIZE - 1);
308         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
309         ahash_request_set_crypt(req, &sg, digest, sg.length);
310         crypto_ahash_finup(req);
311         ahash_request_zero(req);
312 }
313
314 void drbd_csum_bio(struct crypto_ahash *tfm, struct bio *bio, void *digest)
315 {
316         AHASH_REQUEST_ON_STACK(req, tfm);
317         struct scatterlist sg;
318         struct bio_vec bvec;
319         struct bvec_iter iter;
320
321         ahash_request_set_tfm(req, tfm);
322         ahash_request_set_callback(req, 0, NULL, NULL);
323
324         sg_init_table(&sg, 1);
325         crypto_ahash_init(req);
326
327         bio_for_each_segment(bvec, bio, iter) {
328                 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
329                 ahash_request_set_crypt(req, &sg, NULL, sg.length);
330                 crypto_ahash_update(req);
331                 /* REQ_OP_WRITE_SAME has only one segment,
332                  * checksum the payload only once. */
333                 if (bio_op(bio) == REQ_OP_WRITE_SAME)
334                         break;
335         }
336         ahash_request_set_crypt(req, NULL, digest, 0);
337         crypto_ahash_final(req);
338         ahash_request_zero(req);
339 }
340
341 /* MAYBE merge common code with w_e_end_ov_req */
342 static int w_e_send_csum(struct drbd_work *w, int cancel)
343 {
344         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
345         struct drbd_peer_device *peer_device = peer_req->peer_device;
346         struct drbd_device *device = peer_device->device;
347         int digest_size;
348         void *digest;
349         int err = 0;
350
351         if (unlikely(cancel))
352                 goto out;
353
354         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
355                 goto out;
356
357         digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
358         digest = kmalloc(digest_size, GFP_NOIO);
359         if (digest) {
360                 sector_t sector = peer_req->i.sector;
361                 unsigned int size = peer_req->i.size;
362                 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
363                 /* Free peer_req and pages before send.
364                  * In case we block on congestion, we could otherwise run into
365                  * some distributed deadlock, if the other side blocks on
366                  * congestion as well, because our receiver blocks in
367                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
368                 drbd_free_peer_req(device, peer_req);
369                 peer_req = NULL;
370                 inc_rs_pending(device);
371                 err = drbd_send_drequest_csum(peer_device, sector, size,
372                                               digest, digest_size,
373                                               P_CSUM_RS_REQUEST);
374                 kfree(digest);
375         } else {
376                 drbd_err(device, "kmalloc() of digest failed.\n");
377                 err = -ENOMEM;
378         }
379
380 out:
381         if (peer_req)
382                 drbd_free_peer_req(device, peer_req);
383
384         if (unlikely(err))
385                 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
386         return err;
387 }
388
389 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
390
391 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
392 {
393         struct drbd_device *device = peer_device->device;
394         struct drbd_peer_request *peer_req;
395
396         if (!get_ldev(device))
397                 return -EIO;
398
399         /* GFP_TRY, because if there is no memory available right now, this may
400          * be rescheduled for later. It is "only" background resync, after all. */
401         peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
402                                        size, size, GFP_TRY);
403         if (!peer_req)
404                 goto defer;
405
406         peer_req->w.cb = w_e_send_csum;
407         spin_lock_irq(&device->resource->req_lock);
408         list_add_tail(&peer_req->w.list, &device->read_ee);
409         spin_unlock_irq(&device->resource->req_lock);
410
411         atomic_add(size >> 9, &device->rs_sect_ev);
412         if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
413                                      DRBD_FAULT_RS_RD) == 0)
414                 return 0;
415
416         /* If it failed because of ENOMEM, retry should help.  If it failed
417          * because bio_add_page failed (probably broken lower level driver),
418          * retry may or may not help.
419          * If it does not, you may need to force disconnect. */
420         spin_lock_irq(&device->resource->req_lock);
421         list_del(&peer_req->w.list);
422         spin_unlock_irq(&device->resource->req_lock);
423
424         drbd_free_peer_req(device, peer_req);
425 defer:
426         put_ldev(device);
427         return -EAGAIN;
428 }
429
430 int w_resync_timer(struct drbd_work *w, int cancel)
431 {
432         struct drbd_device *device =
433                 container_of(w, struct drbd_device, resync_work);
434
435         switch (device->state.conn) {
436         case C_VERIFY_S:
437                 make_ov_request(device, cancel);
438                 break;
439         case C_SYNC_TARGET:
440                 make_resync_request(device, cancel);
441                 break;
442         }
443
444         return 0;
445 }
446
447 void resync_timer_fn(unsigned long data)
448 {
449         struct drbd_device *device = (struct drbd_device *) data;
450
451         drbd_queue_work_if_unqueued(
452                 &first_peer_device(device)->connection->sender_work,
453                 &device->resync_work);
454 }
455
456 static void fifo_set(struct fifo_buffer *fb, int value)
457 {
458         int i;
459
460         for (i = 0; i < fb->size; i++)
461                 fb->values[i] = value;
462 }
463
464 static int fifo_push(struct fifo_buffer *fb, int value)
465 {
466         int ov;
467
468         ov = fb->values[fb->head_index];
469         fb->values[fb->head_index++] = value;
470
471         if (fb->head_index >= fb->size)
472                 fb->head_index = 0;
473
474         return ov;
475 }
476
477 static void fifo_add_val(struct fifo_buffer *fb, int value)
478 {
479         int i;
480
481         for (i = 0; i < fb->size; i++)
482                 fb->values[i] += value;
483 }
484
485 struct fifo_buffer *fifo_alloc(int fifo_size)
486 {
487         struct fifo_buffer *fb;
488
489         fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
490         if (!fb)
491                 return NULL;
492
493         fb->head_index = 0;
494         fb->size = fifo_size;
495         fb->total = 0;
496
497         return fb;
498 }
499
500 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
501 {
502         struct disk_conf *dc;
503         unsigned int want;     /* The number of sectors we want in-flight */
504         int req_sect; /* Number of sectors to request in this turn */
505         int correction; /* Number of sectors more we need in-flight */
506         int cps; /* correction per invocation of drbd_rs_controller() */
507         int steps; /* Number of time steps to plan ahead */
508         int curr_corr;
509         int max_sect;
510         struct fifo_buffer *plan;
511
512         dc = rcu_dereference(device->ldev->disk_conf);
513         plan = rcu_dereference(device->rs_plan_s);
514
515         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
516
517         if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
518                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
519         } else { /* normal path */
520                 want = dc->c_fill_target ? dc->c_fill_target :
521                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
522         }
523
524         correction = want - device->rs_in_flight - plan->total;
525
526         /* Plan ahead */
527         cps = correction / steps;
528         fifo_add_val(plan, cps);
529         plan->total += cps * steps;
530
531         /* What we do in this step */
532         curr_corr = fifo_push(plan, 0);
533         plan->total -= curr_corr;
534
535         req_sect = sect_in + curr_corr;
536         if (req_sect < 0)
537                 req_sect = 0;
538
539         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
540         if (req_sect > max_sect)
541                 req_sect = max_sect;
542
543         /*
544         drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
545                  sect_in, device->rs_in_flight, want, correction,
546                  steps, cps, device->rs_planed, curr_corr, req_sect);
547         */
548
549         return req_sect;
550 }
551
552 static int drbd_rs_number_requests(struct drbd_device *device)
553 {
554         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
555         int number, mxb;
556
557         sect_in = atomic_xchg(&device->rs_sect_in, 0);
558         device->rs_in_flight -= sect_in;
559
560         rcu_read_lock();
561         mxb = drbd_get_max_buffers(device) / 2;
562         if (rcu_dereference(device->rs_plan_s)->size) {
563                 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
564                 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
565         } else {
566                 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
567                 number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
568         }
569         rcu_read_unlock();
570
571         /* Don't have more than "max-buffers"/2 in-flight.
572          * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
573          * potentially causing a distributed deadlock on congestion during
574          * online-verify or (checksum-based) resync, if max-buffers,
575          * socket buffer sizes and resync rate settings are mis-configured. */
576
577         /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
578          * mxb (as used here, and in drbd_alloc_pages on the peer) is
579          * "number of pages" (typically also 4k),
580          * but "rs_in_flight" is in "sectors" (512 Byte). */
581         if (mxb - device->rs_in_flight/8 < number)
582                 number = mxb - device->rs_in_flight/8;
583
584         return number;
585 }
586
587 static int make_resync_request(struct drbd_device *const device, int cancel)
588 {
589         struct drbd_peer_device *const peer_device = first_peer_device(device);
590         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
591         unsigned long bit;
592         sector_t sector;
593         const sector_t capacity = drbd_get_capacity(device->this_bdev);
594         int max_bio_size;
595         int number, rollback_i, size;
596         int align, requeue = 0;
597         int i = 0;
598         int discard_granularity = 0;
599
600         if (unlikely(cancel))
601                 return 0;
602
603         if (device->rs_total == 0) {
604                 /* empty resync? */
605                 drbd_resync_finished(device);
606                 return 0;
607         }
608
609         if (!get_ldev(device)) {
610                 /* Since we only need to access device->rsync a
611                    get_ldev_if_state(device,D_FAILED) would be sufficient, but
612                    to continue resync with a broken disk makes no sense at
613                    all */
614                 drbd_err(device, "Disk broke down during resync!\n");
615                 return 0;
616         }
617
618         if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
619                 rcu_read_lock();
620                 discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
621                 rcu_read_unlock();
622         }
623
624         max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
625         number = drbd_rs_number_requests(device);
626         if (number <= 0)
627                 goto requeue;
628
629         for (i = 0; i < number; i++) {
630                 /* Stop generating RS requests when half of the send buffer is filled,
631                  * but notify TCP that we'd like to have more space. */
632                 mutex_lock(&connection->data.mutex);
633                 if (connection->data.socket) {
634                         struct sock *sk = connection->data.socket->sk;
635                         int queued = sk->sk_wmem_queued;
636                         int sndbuf = sk->sk_sndbuf;
637                         if (queued > sndbuf / 2) {
638                                 requeue = 1;
639                                 if (sk->sk_socket)
640                                         set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
641                         }
642                 } else
643                         requeue = 1;
644                 mutex_unlock(&connection->data.mutex);
645                 if (requeue)
646                         goto requeue;
647
648 next_sector:
649                 size = BM_BLOCK_SIZE;
650                 bit  = drbd_bm_find_next(device, device->bm_resync_fo);
651
652                 if (bit == DRBD_END_OF_BITMAP) {
653                         device->bm_resync_fo = drbd_bm_bits(device);
654                         put_ldev(device);
655                         return 0;
656                 }
657
658                 sector = BM_BIT_TO_SECT(bit);
659
660                 if (drbd_try_rs_begin_io(device, sector)) {
661                         device->bm_resync_fo = bit;
662                         goto requeue;
663                 }
664                 device->bm_resync_fo = bit + 1;
665
666                 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
667                         drbd_rs_complete_io(device, sector);
668                         goto next_sector;
669                 }
670
671 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
672                 /* try to find some adjacent bits.
673                  * we stop if we have already the maximum req size.
674                  *
675                  * Additionally always align bigger requests, in order to
676                  * be prepared for all stripe sizes of software RAIDs.
677                  */
678                 align = 1;
679                 rollback_i = i;
680                 while (i < number) {
681                         if (size + BM_BLOCK_SIZE > max_bio_size)
682                                 break;
683
684                         /* Be always aligned */
685                         if (sector & ((1<<(align+3))-1))
686                                 break;
687
688                         if (discard_granularity && size == discard_granularity)
689                                 break;
690
691                         /* do not cross extent boundaries */
692                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
693                                 break;
694                         /* now, is it actually dirty, after all?
695                          * caution, drbd_bm_test_bit is tri-state for some
696                          * obscure reason; ( b == 0 ) would get the out-of-band
697                          * only accidentally right because of the "oddly sized"
698                          * adjustment below */
699                         if (drbd_bm_test_bit(device, bit+1) != 1)
700                                 break;
701                         bit++;
702                         size += BM_BLOCK_SIZE;
703                         if ((BM_BLOCK_SIZE << align) <= size)
704                                 align++;
705                         i++;
706                 }
707                 /* if we merged some,
708                  * reset the offset to start the next drbd_bm_find_next from */
709                 if (size > BM_BLOCK_SIZE)
710                         device->bm_resync_fo = bit + 1;
711 #endif
712
713                 /* adjust very last sectors, in case we are oddly sized */
714                 if (sector + (size>>9) > capacity)
715                         size = (capacity-sector)<<9;
716
717                 if (device->use_csums) {
718                         switch (read_for_csum(peer_device, sector, size)) {
719                         case -EIO: /* Disk failure */
720                                 put_ldev(device);
721                                 return -EIO;
722                         case -EAGAIN: /* allocation failed, or ldev busy */
723                                 drbd_rs_complete_io(device, sector);
724                                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
725                                 i = rollback_i;
726                                 goto requeue;
727                         case 0:
728                                 /* everything ok */
729                                 break;
730                         default:
731                                 BUG();
732                         }
733                 } else {
734                         int err;
735
736                         inc_rs_pending(device);
737                         err = drbd_send_drequest(peer_device,
738                                                  size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
739                                                  sector, size, ID_SYNCER);
740                         if (err) {
741                                 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
742                                 dec_rs_pending(device);
743                                 put_ldev(device);
744                                 return err;
745                         }
746                 }
747         }
748
749         if (device->bm_resync_fo >= drbd_bm_bits(device)) {
750                 /* last syncer _request_ was sent,
751                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
752                  * next sync group will resume), as soon as we receive the last
753                  * resync data block, and the last bit is cleared.
754                  * until then resync "work" is "inactive" ...
755                  */
756                 put_ldev(device);
757                 return 0;
758         }
759
760  requeue:
761         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
762         mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
763         put_ldev(device);
764         return 0;
765 }
766
767 static int make_ov_request(struct drbd_device *device, int cancel)
768 {
769         int number, i, size;
770         sector_t sector;
771         const sector_t capacity = drbd_get_capacity(device->this_bdev);
772         bool stop_sector_reached = false;
773
774         if (unlikely(cancel))
775                 return 1;
776
777         number = drbd_rs_number_requests(device);
778
779         sector = device->ov_position;
780         for (i = 0; i < number; i++) {
781                 if (sector >= capacity)
782                         return 1;
783
784                 /* We check for "finished" only in the reply path:
785                  * w_e_end_ov_reply().
786                  * We need to send at least one request out. */
787                 stop_sector_reached = i > 0
788                         && verify_can_do_stop_sector(device)
789                         && sector >= device->ov_stop_sector;
790                 if (stop_sector_reached)
791                         break;
792
793                 size = BM_BLOCK_SIZE;
794
795                 if (drbd_try_rs_begin_io(device, sector)) {
796                         device->ov_position = sector;
797                         goto requeue;
798                 }
799
800                 if (sector + (size>>9) > capacity)
801                         size = (capacity-sector)<<9;
802
803                 inc_rs_pending(device);
804                 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
805                         dec_rs_pending(device);
806                         return 0;
807                 }
808                 sector += BM_SECT_PER_BIT;
809         }
810         device->ov_position = sector;
811
812  requeue:
813         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
814         if (i == 0 || !stop_sector_reached)
815                 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
816         return 1;
817 }
818
819 int w_ov_finished(struct drbd_work *w, int cancel)
820 {
821         struct drbd_device_work *dw =
822                 container_of(w, struct drbd_device_work, w);
823         struct drbd_device *device = dw->device;
824         kfree(dw);
825         ov_out_of_sync_print(device);
826         drbd_resync_finished(device);
827
828         return 0;
829 }
830
831 static int w_resync_finished(struct drbd_work *w, int cancel)
832 {
833         struct drbd_device_work *dw =
834                 container_of(w, struct drbd_device_work, w);
835         struct drbd_device *device = dw->device;
836         kfree(dw);
837
838         drbd_resync_finished(device);
839
840         return 0;
841 }
842
843 static void ping_peer(struct drbd_device *device)
844 {
845         struct drbd_connection *connection = first_peer_device(device)->connection;
846
847         clear_bit(GOT_PING_ACK, &connection->flags);
848         request_ping(connection);
849         wait_event(connection->ping_wait,
850                    test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
851 }
852
853 int drbd_resync_finished(struct drbd_device *device)
854 {
855         struct drbd_connection *connection = first_peer_device(device)->connection;
856         unsigned long db, dt, dbdt;
857         unsigned long n_oos;
858         union drbd_state os, ns;
859         struct drbd_device_work *dw;
860         char *khelper_cmd = NULL;
861         int verify_done = 0;
862
863         /* Remove all elements from the resync LRU. Since future actions
864          * might set bits in the (main) bitmap, then the entries in the
865          * resync LRU would be wrong. */
866         if (drbd_rs_del_all(device)) {
867                 /* In case this is not possible now, most probably because
868                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
869                  * queue (or even the read operations for those packets
870                  * is not finished by now).   Retry in 100ms. */
871
872                 schedule_timeout_interruptible(HZ / 10);
873                 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
874                 if (dw) {
875                         dw->w.cb = w_resync_finished;
876                         dw->device = device;
877                         drbd_queue_work(&connection->sender_work, &dw->w);
878                         return 1;
879                 }
880                 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
881         }
882
883         dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
884         if (dt <= 0)
885                 dt = 1;
886
887         db = device->rs_total;
888         /* adjust for verify start and stop sectors, respective reached position */
889         if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
890                 db -= device->ov_left;
891
892         dbdt = Bit2KB(db/dt);
893         device->rs_paused /= HZ;
894
895         if (!get_ldev(device))
896                 goto out;
897
898         ping_peer(device);
899
900         spin_lock_irq(&device->resource->req_lock);
901         os = drbd_read_state(device);
902
903         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
904
905         /* This protects us against multiple calls (that can happen in the presence
906            of application IO), and against connectivity loss just before we arrive here. */
907         if (os.conn <= C_CONNECTED)
908                 goto out_unlock;
909
910         ns = os;
911         ns.conn = C_CONNECTED;
912
913         drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
914              verify_done ? "Online verify" : "Resync",
915              dt + device->rs_paused, device->rs_paused, dbdt);
916
917         n_oos = drbd_bm_total_weight(device);
918
919         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
920                 if (n_oos) {
921                         drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
922                               n_oos, Bit2KB(1));
923                         khelper_cmd = "out-of-sync";
924                 }
925         } else {
926                 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
927
928                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
929                         khelper_cmd = "after-resync-target";
930
931                 if (device->use_csums && device->rs_total) {
932                         const unsigned long s = device->rs_same_csum;
933                         const unsigned long t = device->rs_total;
934                         const int ratio =
935                                 (t == 0)     ? 0 :
936                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
937                         drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
938                              "transferred %luK total %luK\n",
939                              ratio,
940                              Bit2KB(device->rs_same_csum),
941                              Bit2KB(device->rs_total - device->rs_same_csum),
942                              Bit2KB(device->rs_total));
943                 }
944         }
945
946         if (device->rs_failed) {
947                 drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
948
949                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
950                         ns.disk = D_INCONSISTENT;
951                         ns.pdsk = D_UP_TO_DATE;
952                 } else {
953                         ns.disk = D_UP_TO_DATE;
954                         ns.pdsk = D_INCONSISTENT;
955                 }
956         } else {
957                 ns.disk = D_UP_TO_DATE;
958                 ns.pdsk = D_UP_TO_DATE;
959
960                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
961                         if (device->p_uuid) {
962                                 int i;
963                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
964                                         _drbd_uuid_set(device, i, device->p_uuid[i]);
965                                 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
966                                 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
967                         } else {
968                                 drbd_err(device, "device->p_uuid is NULL! BUG\n");
969                         }
970                 }
971
972                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
973                         /* for verify runs, we don't update uuids here,
974                          * so there would be nothing to report. */
975                         drbd_uuid_set_bm(device, 0UL);
976                         drbd_print_uuids(device, "updated UUIDs");
977                         if (device->p_uuid) {
978                                 /* Now the two UUID sets are equal, update what we
979                                  * know of the peer. */
980                                 int i;
981                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
982                                         device->p_uuid[i] = device->ldev->md.uuid[i];
983                         }
984                 }
985         }
986
987         _drbd_set_state(device, ns, CS_VERBOSE, NULL);
988 out_unlock:
989         spin_unlock_irq(&device->resource->req_lock);
990
991         /* If we have been sync source, and have an effective fencing-policy,
992          * once *all* volumes are back in sync, call "unfence". */
993         if (os.conn == C_SYNC_SOURCE) {
994                 enum drbd_disk_state disk_state = D_MASK;
995                 enum drbd_disk_state pdsk_state = D_MASK;
996                 enum drbd_fencing_p fp = FP_DONT_CARE;
997
998                 rcu_read_lock();
999                 fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1000                 if (fp != FP_DONT_CARE) {
1001                         struct drbd_peer_device *peer_device;
1002                         int vnr;
1003                         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1004                                 struct drbd_device *device = peer_device->device;
1005                                 disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1006                                 pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1007                         }
1008                 }
1009                 rcu_read_unlock();
1010                 if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1011                         conn_khelper(connection, "unfence-peer");
1012         }
1013
1014         put_ldev(device);
1015 out:
1016         device->rs_total  = 0;
1017         device->rs_failed = 0;
1018         device->rs_paused = 0;
1019
1020         /* reset start sector, if we reached end of device */
1021         if (verify_done && device->ov_left == 0)
1022                 device->ov_start_sector = 0;
1023
1024         drbd_md_sync(device);
1025
1026         if (khelper_cmd)
1027                 drbd_khelper(device, khelper_cmd);
1028
1029         return 1;
1030 }
1031
1032 /* helper */
1033 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1034 {
1035         if (drbd_peer_req_has_active_page(peer_req)) {
1036                 /* This might happen if sendpage() has not finished */
1037                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1038                 atomic_add(i, &device->pp_in_use_by_net);
1039                 atomic_sub(i, &device->pp_in_use);
1040                 spin_lock_irq(&device->resource->req_lock);
1041                 list_add_tail(&peer_req->w.list, &device->net_ee);
1042                 spin_unlock_irq(&device->resource->req_lock);
1043                 wake_up(&drbd_pp_wait);
1044         } else
1045                 drbd_free_peer_req(device, peer_req);
1046 }
1047
1048 /**
1049  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1050  * @w:          work object.
1051  * @cancel:     The connection will be closed anyways
1052  */
1053 int w_e_end_data_req(struct drbd_work *w, int cancel)
1054 {
1055         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1056         struct drbd_peer_device *peer_device = peer_req->peer_device;
1057         struct drbd_device *device = peer_device->device;
1058         int err;
1059
1060         if (unlikely(cancel)) {
1061                 drbd_free_peer_req(device, peer_req);
1062                 dec_unacked(device);
1063                 return 0;
1064         }
1065
1066         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1067                 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1068         } else {
1069                 if (__ratelimit(&drbd_ratelimit_state))
1070                         drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1071                             (unsigned long long)peer_req->i.sector);
1072
1073                 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1074         }
1075
1076         dec_unacked(device);
1077
1078         move_to_net_ee_or_free(device, peer_req);
1079
1080         if (unlikely(err))
1081                 drbd_err(device, "drbd_send_block() failed\n");
1082         return err;
1083 }
1084
1085 static bool all_zero(struct drbd_peer_request *peer_req)
1086 {
1087         struct page *page = peer_req->pages;
1088         unsigned int len = peer_req->i.size;
1089
1090         page_chain_for_each(page) {
1091                 unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1092                 unsigned int i, words = l / sizeof(long);
1093                 unsigned long *d;
1094
1095                 d = kmap_atomic(page);
1096                 for (i = 0; i < words; i++) {
1097                         if (d[i]) {
1098                                 kunmap_atomic(d);
1099                                 return false;
1100                         }
1101                 }
1102                 kunmap_atomic(d);
1103                 len -= l;
1104         }
1105
1106         return true;
1107 }
1108
1109 /**
1110  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1111  * @w:          work object.
1112  * @cancel:     The connection will be closed anyways
1113  */
1114 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1115 {
1116         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1117         struct drbd_peer_device *peer_device = peer_req->peer_device;
1118         struct drbd_device *device = peer_device->device;
1119         int err;
1120
1121         if (unlikely(cancel)) {
1122                 drbd_free_peer_req(device, peer_req);
1123                 dec_unacked(device);
1124                 return 0;
1125         }
1126
1127         if (get_ldev_if_state(device, D_FAILED)) {
1128                 drbd_rs_complete_io(device, peer_req->i.sector);
1129                 put_ldev(device);
1130         }
1131
1132         if (device->state.conn == C_AHEAD) {
1133                 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1134         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1135                 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1136                         inc_rs_pending(device);
1137                         if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1138                                 err = drbd_send_rs_deallocated(peer_device, peer_req);
1139                         else
1140                                 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1141                 } else {
1142                         if (__ratelimit(&drbd_ratelimit_state))
1143                                 drbd_err(device, "Not sending RSDataReply, "
1144                                     "partner DISKLESS!\n");
1145                         err = 0;
1146                 }
1147         } else {
1148                 if (__ratelimit(&drbd_ratelimit_state))
1149                         drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1150                             (unsigned long long)peer_req->i.sector);
1151
1152                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1153
1154                 /* update resync data with failure */
1155                 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1156         }
1157
1158         dec_unacked(device);
1159
1160         move_to_net_ee_or_free(device, peer_req);
1161
1162         if (unlikely(err))
1163                 drbd_err(device, "drbd_send_block() failed\n");
1164         return err;
1165 }
1166
1167 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1168 {
1169         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1170         struct drbd_peer_device *peer_device = peer_req->peer_device;
1171         struct drbd_device *device = peer_device->device;
1172         struct digest_info *di;
1173         int digest_size;
1174         void *digest = NULL;
1175         int err, eq = 0;
1176
1177         if (unlikely(cancel)) {
1178                 drbd_free_peer_req(device, peer_req);
1179                 dec_unacked(device);
1180                 return 0;
1181         }
1182
1183         if (get_ldev(device)) {
1184                 drbd_rs_complete_io(device, peer_req->i.sector);
1185                 put_ldev(device);
1186         }
1187
1188         di = peer_req->digest;
1189
1190         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1191                 /* quick hack to try to avoid a race against reconfiguration.
1192                  * a real fix would be much more involved,
1193                  * introducing more locking mechanisms */
1194                 if (peer_device->connection->csums_tfm) {
1195                         digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
1196                         D_ASSERT(device, digest_size == di->digest_size);
1197                         digest = kmalloc(digest_size, GFP_NOIO);
1198                 }
1199                 if (digest) {
1200                         drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1201                         eq = !memcmp(digest, di->digest, digest_size);
1202                         kfree(digest);
1203                 }
1204
1205                 if (eq) {
1206                         drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1207                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1208                         device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1209                         err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1210                 } else {
1211                         inc_rs_pending(device);
1212                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1213                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1214                         kfree(di);
1215                         err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1216                 }
1217         } else {
1218                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1219                 if (__ratelimit(&drbd_ratelimit_state))
1220                         drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1221         }
1222
1223         dec_unacked(device);
1224         move_to_net_ee_or_free(device, peer_req);
1225
1226         if (unlikely(err))
1227                 drbd_err(device, "drbd_send_block/ack() failed\n");
1228         return err;
1229 }
1230
1231 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1232 {
1233         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1234         struct drbd_peer_device *peer_device = peer_req->peer_device;
1235         struct drbd_device *device = peer_device->device;
1236         sector_t sector = peer_req->i.sector;
1237         unsigned int size = peer_req->i.size;
1238         int digest_size;
1239         void *digest;
1240         int err = 0;
1241
1242         if (unlikely(cancel))
1243                 goto out;
1244
1245         digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1246         digest = kmalloc(digest_size, GFP_NOIO);
1247         if (!digest) {
1248                 err = 1;        /* terminate the connection in case the allocation failed */
1249                 goto out;
1250         }
1251
1252         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1253                 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1254         else
1255                 memset(digest, 0, digest_size);
1256
1257         /* Free e and pages before send.
1258          * In case we block on congestion, we could otherwise run into
1259          * some distributed deadlock, if the other side blocks on
1260          * congestion as well, because our receiver blocks in
1261          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1262         drbd_free_peer_req(device, peer_req);
1263         peer_req = NULL;
1264         inc_rs_pending(device);
1265         err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1266         if (err)
1267                 dec_rs_pending(device);
1268         kfree(digest);
1269
1270 out:
1271         if (peer_req)
1272                 drbd_free_peer_req(device, peer_req);
1273         dec_unacked(device);
1274         return err;
1275 }
1276
1277 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1278 {
1279         if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1280                 device->ov_last_oos_size += size>>9;
1281         } else {
1282                 device->ov_last_oos_start = sector;
1283                 device->ov_last_oos_size = size>>9;
1284         }
1285         drbd_set_out_of_sync(device, sector, size);
1286 }
1287
1288 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1289 {
1290         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1291         struct drbd_peer_device *peer_device = peer_req->peer_device;
1292         struct drbd_device *device = peer_device->device;
1293         struct digest_info *di;
1294         void *digest;
1295         sector_t sector = peer_req->i.sector;
1296         unsigned int size = peer_req->i.size;
1297         int digest_size;
1298         int err, eq = 0;
1299         bool stop_sector_reached = false;
1300
1301         if (unlikely(cancel)) {
1302                 drbd_free_peer_req(device, peer_req);
1303                 dec_unacked(device);
1304                 return 0;
1305         }
1306
1307         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1308          * the resync lru has been cleaned up already */
1309         if (get_ldev(device)) {
1310                 drbd_rs_complete_io(device, peer_req->i.sector);
1311                 put_ldev(device);
1312         }
1313
1314         di = peer_req->digest;
1315
1316         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1317                 digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1318                 digest = kmalloc(digest_size, GFP_NOIO);
1319                 if (digest) {
1320                         drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1321
1322                         D_ASSERT(device, digest_size == di->digest_size);
1323                         eq = !memcmp(digest, di->digest, digest_size);
1324                         kfree(digest);
1325                 }
1326         }
1327
1328         /* Free peer_req and pages before send.
1329          * In case we block on congestion, we could otherwise run into
1330          * some distributed deadlock, if the other side blocks on
1331          * congestion as well, because our receiver blocks in
1332          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1333         drbd_free_peer_req(device, peer_req);
1334         if (!eq)
1335                 drbd_ov_out_of_sync_found(device, sector, size);
1336         else
1337                 ov_out_of_sync_print(device);
1338
1339         err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1340                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1341
1342         dec_unacked(device);
1343
1344         --device->ov_left;
1345
1346         /* let's advance progress step marks only for every other megabyte */
1347         if ((device->ov_left & 0x200) == 0x200)
1348                 drbd_advance_rs_marks(device, device->ov_left);
1349
1350         stop_sector_reached = verify_can_do_stop_sector(device) &&
1351                 (sector + (size>>9)) >= device->ov_stop_sector;
1352
1353         if (device->ov_left == 0 || stop_sector_reached) {
1354                 ov_out_of_sync_print(device);
1355                 drbd_resync_finished(device);
1356         }
1357
1358         return err;
1359 }
1360
1361 /* FIXME
1362  * We need to track the number of pending barrier acks,
1363  * and to be able to wait for them.
1364  * See also comment in drbd_adm_attach before drbd_suspend_io.
1365  */
1366 static int drbd_send_barrier(struct drbd_connection *connection)
1367 {
1368         struct p_barrier *p;
1369         struct drbd_socket *sock;
1370
1371         sock = &connection->data;
1372         p = conn_prepare_command(connection, sock);
1373         if (!p)
1374                 return -EIO;
1375         p->barrier = connection->send.current_epoch_nr;
1376         p->pad = 0;
1377         connection->send.current_epoch_writes = 0;
1378         connection->send.last_sent_barrier_jif = jiffies;
1379
1380         return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1381 }
1382
1383 int w_send_write_hint(struct drbd_work *w, int cancel)
1384 {
1385         struct drbd_device *device =
1386                 container_of(w, struct drbd_device, unplug_work);
1387         struct drbd_socket *sock;
1388
1389         if (cancel)
1390                 return 0;
1391         sock = &first_peer_device(device)->connection->data;
1392         if (!drbd_prepare_command(first_peer_device(device), sock))
1393                 return -EIO;
1394         return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1395 }
1396
1397 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1398 {
1399         if (!connection->send.seen_any_write_yet) {
1400                 connection->send.seen_any_write_yet = true;
1401                 connection->send.current_epoch_nr = epoch;
1402                 connection->send.current_epoch_writes = 0;
1403                 connection->send.last_sent_barrier_jif = jiffies;
1404         }
1405 }
1406
1407 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1408 {
1409         /* re-init if first write on this connection */
1410         if (!connection->send.seen_any_write_yet)
1411                 return;
1412         if (connection->send.current_epoch_nr != epoch) {
1413                 if (connection->send.current_epoch_writes)
1414                         drbd_send_barrier(connection);
1415                 connection->send.current_epoch_nr = epoch;
1416         }
1417 }
1418
1419 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1420 {
1421         struct drbd_request *req = container_of(w, struct drbd_request, w);
1422         struct drbd_device *device = req->device;
1423         struct drbd_peer_device *const peer_device = first_peer_device(device);
1424         struct drbd_connection *const connection = peer_device->connection;
1425         int err;
1426
1427         if (unlikely(cancel)) {
1428                 req_mod(req, SEND_CANCELED);
1429                 return 0;
1430         }
1431         req->pre_send_jif = jiffies;
1432
1433         /* this time, no connection->send.current_epoch_writes++;
1434          * If it was sent, it was the closing barrier for the last
1435          * replicated epoch, before we went into AHEAD mode.
1436          * No more barriers will be sent, until we leave AHEAD mode again. */
1437         maybe_send_barrier(connection, req->epoch);
1438
1439         err = drbd_send_out_of_sync(peer_device, req);
1440         req_mod(req, OOS_HANDED_TO_NETWORK);
1441
1442         return err;
1443 }
1444
1445 /**
1446  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1447  * @w:          work object.
1448  * @cancel:     The connection will be closed anyways
1449  */
1450 int w_send_dblock(struct drbd_work *w, int cancel)
1451 {
1452         struct drbd_request *req = container_of(w, struct drbd_request, w);
1453         struct drbd_device *device = req->device;
1454         struct drbd_peer_device *const peer_device = first_peer_device(device);
1455         struct drbd_connection *connection = peer_device->connection;
1456         int err;
1457
1458         if (unlikely(cancel)) {
1459                 req_mod(req, SEND_CANCELED);
1460                 return 0;
1461         }
1462         req->pre_send_jif = jiffies;
1463
1464         re_init_if_first_write(connection, req->epoch);
1465         maybe_send_barrier(connection, req->epoch);
1466         connection->send.current_epoch_writes++;
1467
1468         err = drbd_send_dblock(peer_device, req);
1469         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1470
1471         return err;
1472 }
1473
1474 /**
1475  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1476  * @w:          work object.
1477  * @cancel:     The connection will be closed anyways
1478  */
1479 int w_send_read_req(struct drbd_work *w, int cancel)
1480 {
1481         struct drbd_request *req = container_of(w, struct drbd_request, w);
1482         struct drbd_device *device = req->device;
1483         struct drbd_peer_device *const peer_device = first_peer_device(device);
1484         struct drbd_connection *connection = peer_device->connection;
1485         int err;
1486
1487         if (unlikely(cancel)) {
1488                 req_mod(req, SEND_CANCELED);
1489                 return 0;
1490         }
1491         req->pre_send_jif = jiffies;
1492
1493         /* Even read requests may close a write epoch,
1494          * if there was any yet. */
1495         maybe_send_barrier(connection, req->epoch);
1496
1497         err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1498                                  (unsigned long)req);
1499
1500         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1501
1502         return err;
1503 }
1504
1505 int w_restart_disk_io(struct drbd_work *w, int cancel)
1506 {
1507         struct drbd_request *req = container_of(w, struct drbd_request, w);
1508         struct drbd_device *device = req->device;
1509
1510         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1511                 drbd_al_begin_io(device, &req->i);
1512
1513         drbd_req_make_private_bio(req, req->master_bio);
1514         req->private_bio->bi_bdev = device->ldev->backing_bdev;
1515         generic_make_request(req->private_bio);
1516
1517         return 0;
1518 }
1519
1520 static int _drbd_may_sync_now(struct drbd_device *device)
1521 {
1522         struct drbd_device *odev = device;
1523         int resync_after;
1524
1525         while (1) {
1526                 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1527                         return 1;
1528                 rcu_read_lock();
1529                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1530                 rcu_read_unlock();
1531                 if (resync_after == -1)
1532                         return 1;
1533                 odev = minor_to_device(resync_after);
1534                 if (!odev)
1535                         return 1;
1536                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1537                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1538                     odev->state.aftr_isp || odev->state.peer_isp ||
1539                     odev->state.user_isp)
1540                         return 0;
1541         }
1542 }
1543
1544 /**
1545  * drbd_pause_after() - Pause resync on all devices that may not resync now
1546  * @device:     DRBD device.
1547  *
1548  * Called from process context only (admin command and after_state_ch).
1549  */
1550 static bool drbd_pause_after(struct drbd_device *device)
1551 {
1552         bool changed = false;
1553         struct drbd_device *odev;
1554         int i;
1555
1556         rcu_read_lock();
1557         idr_for_each_entry(&drbd_devices, odev, i) {
1558                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1559                         continue;
1560                 if (!_drbd_may_sync_now(odev) &&
1561                     _drbd_set_state(_NS(odev, aftr_isp, 1),
1562                                     CS_HARD, NULL) != SS_NOTHING_TO_DO)
1563                         changed = true;
1564         }
1565         rcu_read_unlock();
1566
1567         return changed;
1568 }
1569
1570 /**
1571  * drbd_resume_next() - Resume resync on all devices that may resync now
1572  * @device:     DRBD device.
1573  *
1574  * Called from process context only (admin command and worker).
1575  */
1576 static bool drbd_resume_next(struct drbd_device *device)
1577 {
1578         bool changed = false;
1579         struct drbd_device *odev;
1580         int i;
1581
1582         rcu_read_lock();
1583         idr_for_each_entry(&drbd_devices, odev, i) {
1584                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1585                         continue;
1586                 if (odev->state.aftr_isp) {
1587                         if (_drbd_may_sync_now(odev) &&
1588                             _drbd_set_state(_NS(odev, aftr_isp, 0),
1589                                             CS_HARD, NULL) != SS_NOTHING_TO_DO)
1590                                 changed = true;
1591                 }
1592         }
1593         rcu_read_unlock();
1594         return changed;
1595 }
1596
1597 void resume_next_sg(struct drbd_device *device)
1598 {
1599         lock_all_resources();
1600         drbd_resume_next(device);
1601         unlock_all_resources();
1602 }
1603
1604 void suspend_other_sg(struct drbd_device *device)
1605 {
1606         lock_all_resources();
1607         drbd_pause_after(device);
1608         unlock_all_resources();
1609 }
1610
1611 /* caller must lock_all_resources() */
1612 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1613 {
1614         struct drbd_device *odev;
1615         int resync_after;
1616
1617         if (o_minor == -1)
1618                 return NO_ERROR;
1619         if (o_minor < -1 || o_minor > MINORMASK)
1620                 return ERR_RESYNC_AFTER;
1621
1622         /* check for loops */
1623         odev = minor_to_device(o_minor);
1624         while (1) {
1625                 if (odev == device)
1626                         return ERR_RESYNC_AFTER_CYCLE;
1627
1628                 /* You are free to depend on diskless, non-existing,
1629                  * or not yet/no longer existing minors.
1630                  * We only reject dependency loops.
1631                  * We cannot follow the dependency chain beyond a detached or
1632                  * missing minor.
1633                  */
1634                 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1635                         return NO_ERROR;
1636
1637                 rcu_read_lock();
1638                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1639                 rcu_read_unlock();
1640                 /* dependency chain ends here, no cycles. */
1641                 if (resync_after == -1)
1642                         return NO_ERROR;
1643
1644                 /* follow the dependency chain */
1645                 odev = minor_to_device(resync_after);
1646         }
1647 }
1648
1649 /* caller must lock_all_resources() */
1650 void drbd_resync_after_changed(struct drbd_device *device)
1651 {
1652         int changed;
1653
1654         do {
1655                 changed  = drbd_pause_after(device);
1656                 changed |= drbd_resume_next(device);
1657         } while (changed);
1658 }
1659
1660 void drbd_rs_controller_reset(struct drbd_device *device)
1661 {
1662         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1663         struct fifo_buffer *plan;
1664
1665         atomic_set(&device->rs_sect_in, 0);
1666         atomic_set(&device->rs_sect_ev, 0);
1667         device->rs_in_flight = 0;
1668         device->rs_last_events =
1669                 (int)part_stat_read(&disk->part0, sectors[0]) +
1670                 (int)part_stat_read(&disk->part0, sectors[1]);
1671
1672         /* Updating the RCU protected object in place is necessary since
1673            this function gets called from atomic context.
1674            It is valid since all other updates also lead to an completely
1675            empty fifo */
1676         rcu_read_lock();
1677         plan = rcu_dereference(device->rs_plan_s);
1678         plan->total = 0;
1679         fifo_set(plan, 0);
1680         rcu_read_unlock();
1681 }
1682
1683 void start_resync_timer_fn(unsigned long data)
1684 {
1685         struct drbd_device *device = (struct drbd_device *) data;
1686         drbd_device_post_work(device, RS_START);
1687 }
1688
1689 static void do_start_resync(struct drbd_device *device)
1690 {
1691         if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1692                 drbd_warn(device, "postponing start_resync ...\n");
1693                 device->start_resync_timer.expires = jiffies + HZ/10;
1694                 add_timer(&device->start_resync_timer);
1695                 return;
1696         }
1697
1698         drbd_start_resync(device, C_SYNC_SOURCE);
1699         clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1700 }
1701
1702 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1703 {
1704         bool csums_after_crash_only;
1705         rcu_read_lock();
1706         csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1707         rcu_read_unlock();
1708         return connection->agreed_pro_version >= 89 &&          /* supported? */
1709                 connection->csums_tfm &&                        /* configured? */
1710                 (csums_after_crash_only == false                /* use for each resync? */
1711                  || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1712 }
1713
1714 /**
1715  * drbd_start_resync() - Start the resync process
1716  * @device:     DRBD device.
1717  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1718  *
1719  * This function might bring you directly into one of the
1720  * C_PAUSED_SYNC_* states.
1721  */
1722 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1723 {
1724         struct drbd_peer_device *peer_device = first_peer_device(device);
1725         struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1726         union drbd_state ns;
1727         int r;
1728
1729         if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1730                 drbd_err(device, "Resync already running!\n");
1731                 return;
1732         }
1733
1734         if (!test_bit(B_RS_H_DONE, &device->flags)) {
1735                 if (side == C_SYNC_TARGET) {
1736                         /* Since application IO was locked out during C_WF_BITMAP_T and
1737                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1738                            we check that we might make the data inconsistent. */
1739                         r = drbd_khelper(device, "before-resync-target");
1740                         r = (r >> 8) & 0xff;
1741                         if (r > 0) {
1742                                 drbd_info(device, "before-resync-target handler returned %d, "
1743                                          "dropping connection.\n", r);
1744                                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1745                                 return;
1746                         }
1747                 } else /* C_SYNC_SOURCE */ {
1748                         r = drbd_khelper(device, "before-resync-source");
1749                         r = (r >> 8) & 0xff;
1750                         if (r > 0) {
1751                                 if (r == 3) {
1752                                         drbd_info(device, "before-resync-source handler returned %d, "
1753                                                  "ignoring. Old userland tools?", r);
1754                                 } else {
1755                                         drbd_info(device, "before-resync-source handler returned %d, "
1756                                                  "dropping connection.\n", r);
1757                                         conn_request_state(connection,
1758                                                            NS(conn, C_DISCONNECTING), CS_HARD);
1759                                         return;
1760                                 }
1761                         }
1762                 }
1763         }
1764
1765         if (current == connection->worker.task) {
1766                 /* The worker should not sleep waiting for state_mutex,
1767                    that can take long */
1768                 if (!mutex_trylock(device->state_mutex)) {
1769                         set_bit(B_RS_H_DONE, &device->flags);
1770                         device->start_resync_timer.expires = jiffies + HZ/5;
1771                         add_timer(&device->start_resync_timer);
1772                         return;
1773                 }
1774         } else {
1775                 mutex_lock(device->state_mutex);
1776         }
1777
1778         lock_all_resources();
1779         clear_bit(B_RS_H_DONE, &device->flags);
1780         /* Did some connection breakage or IO error race with us? */
1781         if (device->state.conn < C_CONNECTED
1782         || !get_ldev_if_state(device, D_NEGOTIATING)) {
1783                 unlock_all_resources();
1784                 goto out;
1785         }
1786
1787         ns = drbd_read_state(device);
1788
1789         ns.aftr_isp = !_drbd_may_sync_now(device);
1790
1791         ns.conn = side;
1792
1793         if (side == C_SYNC_TARGET)
1794                 ns.disk = D_INCONSISTENT;
1795         else /* side == C_SYNC_SOURCE */
1796                 ns.pdsk = D_INCONSISTENT;
1797
1798         r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1799         ns = drbd_read_state(device);
1800
1801         if (ns.conn < C_CONNECTED)
1802                 r = SS_UNKNOWN_ERROR;
1803
1804         if (r == SS_SUCCESS) {
1805                 unsigned long tw = drbd_bm_total_weight(device);
1806                 unsigned long now = jiffies;
1807                 int i;
1808
1809                 device->rs_failed    = 0;
1810                 device->rs_paused    = 0;
1811                 device->rs_same_csum = 0;
1812                 device->rs_last_sect_ev = 0;
1813                 device->rs_total     = tw;
1814                 device->rs_start     = now;
1815                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1816                         device->rs_mark_left[i] = tw;
1817                         device->rs_mark_time[i] = now;
1818                 }
1819                 drbd_pause_after(device);
1820                 /* Forget potentially stale cached per resync extent bit-counts.
1821                  * Open coded drbd_rs_cancel_all(device), we already have IRQs
1822                  * disabled, and know the disk state is ok. */
1823                 spin_lock(&device->al_lock);
1824                 lc_reset(device->resync);
1825                 device->resync_locked = 0;
1826                 device->resync_wenr = LC_FREE;
1827                 spin_unlock(&device->al_lock);
1828         }
1829         unlock_all_resources();
1830
1831         if (r == SS_SUCCESS) {
1832                 wake_up(&device->al_wait); /* for lc_reset() above */
1833                 /* reset rs_last_bcast when a resync or verify is started,
1834                  * to deal with potential jiffies wrap. */
1835                 device->rs_last_bcast = jiffies - HZ;
1836
1837                 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1838                      drbd_conn_str(ns.conn),
1839                      (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1840                      (unsigned long) device->rs_total);
1841                 if (side == C_SYNC_TARGET) {
1842                         device->bm_resync_fo = 0;
1843                         device->use_csums = use_checksum_based_resync(connection, device);
1844                 } else {
1845                         device->use_csums = false;
1846                 }
1847
1848                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1849                  * with w_send_oos, or the sync target will get confused as to
1850                  * how much bits to resync.  We cannot do that always, because for an
1851                  * empty resync and protocol < 95, we need to do it here, as we call
1852                  * drbd_resync_finished from here in that case.
1853                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1854                  * and from after_state_ch otherwise. */
1855                 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1856                         drbd_gen_and_send_sync_uuid(peer_device);
1857
1858                 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1859                         /* This still has a race (about when exactly the peers
1860                          * detect connection loss) that can lead to a full sync
1861                          * on next handshake. In 8.3.9 we fixed this with explicit
1862                          * resync-finished notifications, but the fix
1863                          * introduces a protocol change.  Sleeping for some
1864                          * time longer than the ping interval + timeout on the
1865                          * SyncSource, to give the SyncTarget the chance to
1866                          * detect connection loss, then waiting for a ping
1867                          * response (implicit in drbd_resync_finished) reduces
1868                          * the race considerably, but does not solve it. */
1869                         if (side == C_SYNC_SOURCE) {
1870                                 struct net_conf *nc;
1871                                 int timeo;
1872
1873                                 rcu_read_lock();
1874                                 nc = rcu_dereference(connection->net_conf);
1875                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1876                                 rcu_read_unlock();
1877                                 schedule_timeout_interruptible(timeo);
1878                         }
1879                         drbd_resync_finished(device);
1880                 }
1881
1882                 drbd_rs_controller_reset(device);
1883                 /* ns.conn may already be != device->state.conn,
1884                  * we may have been paused in between, or become paused until
1885                  * the timer triggers.
1886                  * No matter, that is handled in resync_timer_fn() */
1887                 if (ns.conn == C_SYNC_TARGET)
1888                         mod_timer(&device->resync_timer, jiffies);
1889
1890                 drbd_md_sync(device);
1891         }
1892         put_ldev(device);
1893 out:
1894         mutex_unlock(device->state_mutex);
1895 }
1896
1897 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1898 {
1899         struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1900         device->rs_last_bcast = jiffies;
1901
1902         if (!get_ldev(device))
1903                 return;
1904
1905         drbd_bm_write_lazy(device, 0);
1906         if (resync_done && is_sync_state(device->state.conn))
1907                 drbd_resync_finished(device);
1908
1909         drbd_bcast_event(device, &sib);
1910         /* update timestamp, in case it took a while to write out stuff */
1911         device->rs_last_bcast = jiffies;
1912         put_ldev(device);
1913 }
1914
1915 static void drbd_ldev_destroy(struct drbd_device *device)
1916 {
1917         lc_destroy(device->resync);
1918         device->resync = NULL;
1919         lc_destroy(device->act_log);
1920         device->act_log = NULL;
1921
1922         __acquire(local);
1923         drbd_backing_dev_free(device, device->ldev);
1924         device->ldev = NULL;
1925         __release(local);
1926
1927         clear_bit(GOING_DISKLESS, &device->flags);
1928         wake_up(&device->misc_wait);
1929 }
1930
1931 static void go_diskless(struct drbd_device *device)
1932 {
1933         D_ASSERT(device, device->state.disk == D_FAILED);
1934         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1935          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1936          * the protected members anymore, though, so once put_ldev reaches zero
1937          * again, it will be safe to free them. */
1938
1939         /* Try to write changed bitmap pages, read errors may have just
1940          * set some bits outside the area covered by the activity log.
1941          *
1942          * If we have an IO error during the bitmap writeout,
1943          * we will want a full sync next time, just in case.
1944          * (Do we want a specific meta data flag for this?)
1945          *
1946          * If that does not make it to stable storage either,
1947          * we cannot do anything about that anymore.
1948          *
1949          * We still need to check if both bitmap and ldev are present, we may
1950          * end up here after a failed attach, before ldev was even assigned.
1951          */
1952         if (device->bitmap && device->ldev) {
1953                 /* An interrupted resync or similar is allowed to recounts bits
1954                  * while we detach.
1955                  * Any modifications would not be expected anymore, though.
1956                  */
1957                 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1958                                         "detach", BM_LOCKED_TEST_ALLOWED)) {
1959                         if (test_bit(WAS_READ_ERROR, &device->flags)) {
1960                                 drbd_md_set_flag(device, MDF_FULL_SYNC);
1961                                 drbd_md_sync(device);
1962                         }
1963                 }
1964         }
1965
1966         drbd_force_state(device, NS(disk, D_DISKLESS));
1967 }
1968
1969 static int do_md_sync(struct drbd_device *device)
1970 {
1971         drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1972         drbd_md_sync(device);
1973         return 0;
1974 }
1975
1976 /* only called from drbd_worker thread, no locking */
1977 void __update_timing_details(
1978                 struct drbd_thread_timing_details *tdp,
1979                 unsigned int *cb_nr,
1980                 void *cb,
1981                 const char *fn, const unsigned int line)
1982 {
1983         unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1984         struct drbd_thread_timing_details *td = tdp + i;
1985
1986         td->start_jif = jiffies;
1987         td->cb_addr = cb;
1988         td->caller_fn = fn;
1989         td->line = line;
1990         td->cb_nr = *cb_nr;
1991
1992         i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1993         td = tdp + i;
1994         memset(td, 0, sizeof(*td));
1995
1996         ++(*cb_nr);
1997 }
1998
1999 static void do_device_work(struct drbd_device *device, const unsigned long todo)
2000 {
2001         if (test_bit(MD_SYNC, &todo))
2002                 do_md_sync(device);
2003         if (test_bit(RS_DONE, &todo) ||
2004             test_bit(RS_PROGRESS, &todo))
2005                 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2006         if (test_bit(GO_DISKLESS, &todo))
2007                 go_diskless(device);
2008         if (test_bit(DESTROY_DISK, &todo))
2009                 drbd_ldev_destroy(device);
2010         if (test_bit(RS_START, &todo))
2011                 do_start_resync(device);
2012 }
2013
2014 #define DRBD_DEVICE_WORK_MASK   \
2015         ((1UL << GO_DISKLESS)   \
2016         |(1UL << DESTROY_DISK)  \
2017         |(1UL << MD_SYNC)       \
2018         |(1UL << RS_START)      \
2019         |(1UL << RS_PROGRESS)   \
2020         |(1UL << RS_DONE)       \
2021         )
2022
2023 static unsigned long get_work_bits(unsigned long *flags)
2024 {
2025         unsigned long old, new;
2026         do {
2027                 old = *flags;
2028                 new = old & ~DRBD_DEVICE_WORK_MASK;
2029         } while (cmpxchg(flags, old, new) != old);
2030         return old & DRBD_DEVICE_WORK_MASK;
2031 }
2032
2033 static void do_unqueued_work(struct drbd_connection *connection)
2034 {
2035         struct drbd_peer_device *peer_device;
2036         int vnr;
2037
2038         rcu_read_lock();
2039         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2040                 struct drbd_device *device = peer_device->device;
2041                 unsigned long todo = get_work_bits(&device->flags);
2042                 if (!todo)
2043                         continue;
2044
2045                 kref_get(&device->kref);
2046                 rcu_read_unlock();
2047                 do_device_work(device, todo);
2048                 kref_put(&device->kref, drbd_destroy_device);
2049                 rcu_read_lock();
2050         }
2051         rcu_read_unlock();
2052 }
2053
2054 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2055 {
2056         spin_lock_irq(&queue->q_lock);
2057         list_splice_tail_init(&queue->q, work_list);
2058         spin_unlock_irq(&queue->q_lock);
2059         return !list_empty(work_list);
2060 }
2061
2062 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2063 {
2064         DEFINE_WAIT(wait);
2065         struct net_conf *nc;
2066         int uncork, cork;
2067
2068         dequeue_work_batch(&connection->sender_work, work_list);
2069         if (!list_empty(work_list))
2070                 return;
2071
2072         /* Still nothing to do?
2073          * Maybe we still need to close the current epoch,
2074          * even if no new requests are queued yet.
2075          *
2076          * Also, poke TCP, just in case.
2077          * Then wait for new work (or signal). */
2078         rcu_read_lock();
2079         nc = rcu_dereference(connection->net_conf);
2080         uncork = nc ? nc->tcp_cork : 0;
2081         rcu_read_unlock();
2082         if (uncork) {
2083                 mutex_lock(&connection->data.mutex);
2084                 if (connection->data.socket)
2085                         drbd_tcp_uncork(connection->data.socket);
2086                 mutex_unlock(&connection->data.mutex);
2087         }
2088
2089         for (;;) {
2090                 int send_barrier;
2091                 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2092                 spin_lock_irq(&connection->resource->req_lock);
2093                 spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
2094                 if (!list_empty(&connection->sender_work.q))
2095                         list_splice_tail_init(&connection->sender_work.q, work_list);
2096                 spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
2097                 if (!list_empty(work_list) || signal_pending(current)) {
2098                         spin_unlock_irq(&connection->resource->req_lock);
2099                         break;
2100                 }
2101
2102                 /* We found nothing new to do, no to-be-communicated request,
2103                  * no other work item.  We may still need to close the last
2104                  * epoch.  Next incoming request epoch will be connection ->
2105                  * current transfer log epoch number.  If that is different
2106                  * from the epoch of the last request we communicated, it is
2107                  * safe to send the epoch separating barrier now.
2108                  */
2109                 send_barrier =
2110                         atomic_read(&connection->current_tle_nr) !=
2111                         connection->send.current_epoch_nr;
2112                 spin_unlock_irq(&connection->resource->req_lock);
2113
2114                 if (send_barrier)
2115                         maybe_send_barrier(connection,
2116                                         connection->send.current_epoch_nr + 1);
2117
2118                 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2119                         break;
2120
2121                 /* drbd_send() may have called flush_signals() */
2122                 if (get_t_state(&connection->worker) != RUNNING)
2123                         break;
2124
2125                 schedule();
2126                 /* may be woken up for other things but new work, too,
2127                  * e.g. if the current epoch got closed.
2128                  * In which case we send the barrier above. */
2129         }
2130         finish_wait(&connection->sender_work.q_wait, &wait);
2131
2132         /* someone may have changed the config while we have been waiting above. */
2133         rcu_read_lock();
2134         nc = rcu_dereference(connection->net_conf);
2135         cork = nc ? nc->tcp_cork : 0;
2136         rcu_read_unlock();
2137         mutex_lock(&connection->data.mutex);
2138         if (connection->data.socket) {
2139                 if (cork)
2140                         drbd_tcp_cork(connection->data.socket);
2141                 else if (!uncork)
2142                         drbd_tcp_uncork(connection->data.socket);
2143         }
2144         mutex_unlock(&connection->data.mutex);
2145 }
2146
2147 int drbd_worker(struct drbd_thread *thi)
2148 {
2149         struct drbd_connection *connection = thi->connection;
2150         struct drbd_work *w = NULL;
2151         struct drbd_peer_device *peer_device;
2152         LIST_HEAD(work_list);
2153         int vnr;
2154
2155         while (get_t_state(thi) == RUNNING) {
2156                 drbd_thread_current_set_cpu(thi);
2157
2158                 if (list_empty(&work_list)) {
2159                         update_worker_timing_details(connection, wait_for_work);
2160                         wait_for_work(connection, &work_list);
2161                 }
2162
2163                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2164                         update_worker_timing_details(connection, do_unqueued_work);
2165                         do_unqueued_work(connection);
2166                 }
2167
2168                 if (signal_pending(current)) {
2169                         flush_signals(current);
2170                         if (get_t_state(thi) == RUNNING) {
2171                                 drbd_warn(connection, "Worker got an unexpected signal\n");
2172                                 continue;
2173                         }
2174                         break;
2175                 }
2176
2177                 if (get_t_state(thi) != RUNNING)
2178                         break;
2179
2180                 if (!list_empty(&work_list)) {
2181                         w = list_first_entry(&work_list, struct drbd_work, list);
2182                         list_del_init(&w->list);
2183                         update_worker_timing_details(connection, w->cb);
2184                         if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2185                                 continue;
2186                         if (connection->cstate >= C_WF_REPORT_PARAMS)
2187                                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2188                 }
2189         }
2190
2191         do {
2192                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2193                         update_worker_timing_details(connection, do_unqueued_work);
2194                         do_unqueued_work(connection);
2195                 }
2196                 if (!list_empty(&work_list)) {
2197                         w = list_first_entry(&work_list, struct drbd_work, list);
2198                         list_del_init(&w->list);
2199                         update_worker_timing_details(connection, w->cb);
2200                         w->cb(w, 1);
2201                 } else
2202                         dequeue_work_batch(&connection->sender_work, &work_list);
2203         } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2204
2205         rcu_read_lock();
2206         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2207                 struct drbd_device *device = peer_device->device;
2208                 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2209                 kref_get(&device->kref);
2210                 rcu_read_unlock();
2211                 drbd_device_cleanup(device);
2212                 kref_put(&device->kref, drbd_destroy_device);
2213                 rcu_read_lock();
2214         }
2215         rcu_read_unlock();
2216
2217         return 0;
2218 }