ASoC: Intel: Restore Baytrail ADSP streams only when ADSP was in reset
[cascardo/linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_protocol.h"
48 #include "drbd_req.h"
49 #include "drbd_vli.h"
50
51 #define PRO_FEATURES (FF_TRIM)
52
53 struct packet_info {
54         enum drbd_packet cmd;
55         unsigned int size;
56         unsigned int vnr;
57         void *data;
58 };
59
60 enum finish_epoch {
61         FE_STILL_LIVE,
62         FE_DESTROYED,
63         FE_RECYCLED,
64 };
65
66 static int drbd_do_features(struct drbd_connection *connection);
67 static int drbd_do_auth(struct drbd_connection *connection);
68 static int drbd_disconnected(struct drbd_peer_device *);
69 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
71 static int e_end_block(struct drbd_work *, int);
72
73
74 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
75
76 /*
77  * some helper functions to deal with single linked page lists,
78  * page->private being our "next" pointer.
79  */
80
81 /* If at least n pages are linked at head, get n pages off.
82  * Otherwise, don't modify head, and return NULL.
83  * Locking is the responsibility of the caller.
84  */
85 static struct page *page_chain_del(struct page **head, int n)
86 {
87         struct page *page;
88         struct page *tmp;
89
90         BUG_ON(!n);
91         BUG_ON(!head);
92
93         page = *head;
94
95         if (!page)
96                 return NULL;
97
98         while (page) {
99                 tmp = page_chain_next(page);
100                 if (--n == 0)
101                         break; /* found sufficient pages */
102                 if (tmp == NULL)
103                         /* insufficient pages, don't use any of them. */
104                         return NULL;
105                 page = tmp;
106         }
107
108         /* add end of list marker for the returned list */
109         set_page_private(page, 0);
110         /* actual return value, and adjustment of head */
111         page = *head;
112         *head = tmp;
113         return page;
114 }
115
116 /* may be used outside of locks to find the tail of a (usually short)
117  * "private" page chain, before adding it back to a global chain head
118  * with page_chain_add() under a spinlock. */
119 static struct page *page_chain_tail(struct page *page, int *len)
120 {
121         struct page *tmp;
122         int i = 1;
123         while ((tmp = page_chain_next(page)))
124                 ++i, page = tmp;
125         if (len)
126                 *len = i;
127         return page;
128 }
129
130 static int page_chain_free(struct page *page)
131 {
132         struct page *tmp;
133         int i = 0;
134         page_chain_for_each_safe(page, tmp) {
135                 put_page(page);
136                 ++i;
137         }
138         return i;
139 }
140
141 static void page_chain_add(struct page **head,
142                 struct page *chain_first, struct page *chain_last)
143 {
144 #if 1
145         struct page *tmp;
146         tmp = page_chain_tail(chain_first, NULL);
147         BUG_ON(tmp != chain_last);
148 #endif
149
150         /* add chain to head */
151         set_page_private(chain_last, (unsigned long)*head);
152         *head = chain_first;
153 }
154
155 static struct page *__drbd_alloc_pages(struct drbd_device *device,
156                                        unsigned int number)
157 {
158         struct page *page = NULL;
159         struct page *tmp = NULL;
160         unsigned int i = 0;
161
162         /* Yes, testing drbd_pp_vacant outside the lock is racy.
163          * So what. It saves a spin_lock. */
164         if (drbd_pp_vacant >= number) {
165                 spin_lock(&drbd_pp_lock);
166                 page = page_chain_del(&drbd_pp_pool, number);
167                 if (page)
168                         drbd_pp_vacant -= number;
169                 spin_unlock(&drbd_pp_lock);
170                 if (page)
171                         return page;
172         }
173
174         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175          * "criss-cross" setup, that might cause write-out on some other DRBD,
176          * which in turn might block on the other node at this very place.  */
177         for (i = 0; i < number; i++) {
178                 tmp = alloc_page(GFP_TRY);
179                 if (!tmp)
180                         break;
181                 set_page_private(tmp, (unsigned long)page);
182                 page = tmp;
183         }
184
185         if (i == number)
186                 return page;
187
188         /* Not enough pages immediately available this time.
189          * No need to jump around here, drbd_alloc_pages will retry this
190          * function "soon". */
191         if (page) {
192                 tmp = page_chain_tail(page, NULL);
193                 spin_lock(&drbd_pp_lock);
194                 page_chain_add(&drbd_pp_pool, page, tmp);
195                 drbd_pp_vacant += i;
196                 spin_unlock(&drbd_pp_lock);
197         }
198         return NULL;
199 }
200
201 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
202                                            struct list_head *to_be_freed)
203 {
204         struct drbd_peer_request *peer_req, *tmp;
205
206         /* The EEs are always appended to the end of the list. Since
207            they are sent in order over the wire, they have to finish
208            in order. As soon as we see the first not finished we can
209            stop to examine the list... */
210
211         list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(&peer_req->w.list, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&device->resource->req_lock);
224         reclaim_finished_net_peer_reqs(device, &reclaimed);
225         spin_unlock_irq(&device->resource->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(device, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @device:     DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * If this allocation would exceed the max_buffers setting, we throttle
242  * allocation (schedule_timeout) to give the system some room to breathe.
243  *
244  * We do not use max-buffers as hard limit, because it could lead to
245  * congestion and further to a distributed deadlock during online-verify or
246  * (checksum based) resync, if the max-buffers, socket buffer sizes and
247  * resync-rate settings are mis-configured.
248  *
249  * Returns a page chain linked via page->private.
250  */
251 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
252                               bool retry)
253 {
254         struct drbd_device *device = peer_device->device;
255         struct page *page = NULL;
256         struct net_conf *nc;
257         DEFINE_WAIT(wait);
258         unsigned int mxb;
259
260         rcu_read_lock();
261         nc = rcu_dereference(peer_device->connection->net_conf);
262         mxb = nc ? nc->max_buffers : 1000000;
263         rcu_read_unlock();
264
265         if (atomic_read(&device->pp_in_use) < mxb)
266                 page = __drbd_alloc_pages(device, number);
267
268         while (page == NULL) {
269                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
270
271                 drbd_kick_lo_and_reclaim_net(device);
272
273                 if (atomic_read(&device->pp_in_use) < mxb) {
274                         page = __drbd_alloc_pages(device, number);
275                         if (page)
276                                 break;
277                 }
278
279                 if (!retry)
280                         break;
281
282                 if (signal_pending(current)) {
283                         drbd_warn(device, "drbd_alloc_pages interrupted!\n");
284                         break;
285                 }
286
287                 if (schedule_timeout(HZ/10) == 0)
288                         mxb = UINT_MAX;
289         }
290         finish_wait(&drbd_pp_wait, &wait);
291
292         if (page)
293                 atomic_add(number, &device->pp_in_use);
294         return page;
295 }
296
297 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
298  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
299  * Either links the page chain back to the global pool,
300  * or returns all pages to the system. */
301 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
302 {
303         atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
304         int i;
305
306         if (page == NULL)
307                 return;
308
309         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
310                 i = page_chain_free(page);
311         else {
312                 struct page *tmp;
313                 tmp = page_chain_tail(page, &i);
314                 spin_lock(&drbd_pp_lock);
315                 page_chain_add(&drbd_pp_pool, page, tmp);
316                 drbd_pp_vacant += i;
317                 spin_unlock(&drbd_pp_lock);
318         }
319         i = atomic_sub_return(i, a);
320         if (i < 0)
321                 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
322                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
323         wake_up(&drbd_pp_wait);
324 }
325
326 /*
327 You need to hold the req_lock:
328  _drbd_wait_ee_list_empty()
329
330 You must not have the req_lock:
331  drbd_free_peer_req()
332  drbd_alloc_peer_req()
333  drbd_free_peer_reqs()
334  drbd_ee_fix_bhs()
335  drbd_finish_peer_reqs()
336  drbd_clear_done_ee()
337  drbd_wait_ee_list_empty()
338 */
339
340 struct drbd_peer_request *
341 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
342                     unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
343 {
344         struct drbd_device *device = peer_device->device;
345         struct drbd_peer_request *peer_req;
346         struct page *page = NULL;
347         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
348
349         if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
350                 return NULL;
351
352         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
353         if (!peer_req) {
354                 if (!(gfp_mask & __GFP_NOWARN))
355                         drbd_err(device, "%s: allocation failed\n", __func__);
356                 return NULL;
357         }
358
359         if (has_payload && data_size) {
360                 page = drbd_alloc_pages(peer_device, nr_pages, (gfp_mask & __GFP_WAIT));
361                 if (!page)
362                         goto fail;
363         }
364
365         drbd_clear_interval(&peer_req->i);
366         peer_req->i.size = data_size;
367         peer_req->i.sector = sector;
368         peer_req->i.local = false;
369         peer_req->i.waiting = false;
370
371         peer_req->epoch = NULL;
372         peer_req->peer_device = peer_device;
373         peer_req->pages = page;
374         atomic_set(&peer_req->pending_bios, 0);
375         peer_req->flags = 0;
376         /*
377          * The block_id is opaque to the receiver.  It is not endianness
378          * converted, and sent back to the sender unchanged.
379          */
380         peer_req->block_id = id;
381
382         return peer_req;
383
384  fail:
385         mempool_free(peer_req, drbd_ee_mempool);
386         return NULL;
387 }
388
389 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
390                        int is_net)
391 {
392         if (peer_req->flags & EE_HAS_DIGEST)
393                 kfree(peer_req->digest);
394         drbd_free_pages(device, peer_req->pages, is_net);
395         D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
396         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
397         mempool_free(peer_req, drbd_ee_mempool);
398 }
399
400 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
401 {
402         LIST_HEAD(work_list);
403         struct drbd_peer_request *peer_req, *t;
404         int count = 0;
405         int is_net = list == &device->net_ee;
406
407         spin_lock_irq(&device->resource->req_lock);
408         list_splice_init(list, &work_list);
409         spin_unlock_irq(&device->resource->req_lock);
410
411         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
412                 __drbd_free_peer_req(device, peer_req, is_net);
413                 count++;
414         }
415         return count;
416 }
417
418 /*
419  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
420  */
421 static int drbd_finish_peer_reqs(struct drbd_device *device)
422 {
423         LIST_HEAD(work_list);
424         LIST_HEAD(reclaimed);
425         struct drbd_peer_request *peer_req, *t;
426         int err = 0;
427
428         spin_lock_irq(&device->resource->req_lock);
429         reclaim_finished_net_peer_reqs(device, &reclaimed);
430         list_splice_init(&device->done_ee, &work_list);
431         spin_unlock_irq(&device->resource->req_lock);
432
433         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
434                 drbd_free_net_peer_req(device, peer_req);
435
436         /* possible callbacks here:
437          * e_end_block, and e_end_resync_block, e_send_superseded.
438          * all ignore the last argument.
439          */
440         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
441                 int err2;
442
443                 /* list_del not necessary, next/prev members not touched */
444                 err2 = peer_req->w.cb(&peer_req->w, !!err);
445                 if (!err)
446                         err = err2;
447                 drbd_free_peer_req(device, peer_req);
448         }
449         wake_up(&device->ee_wait);
450
451         return err;
452 }
453
454 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
455                                      struct list_head *head)
456 {
457         DEFINE_WAIT(wait);
458
459         /* avoids spin_lock/unlock
460          * and calling prepare_to_wait in the fast path */
461         while (!list_empty(head)) {
462                 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
463                 spin_unlock_irq(&device->resource->req_lock);
464                 io_schedule();
465                 finish_wait(&device->ee_wait, &wait);
466                 spin_lock_irq(&device->resource->req_lock);
467         }
468 }
469
470 static void drbd_wait_ee_list_empty(struct drbd_device *device,
471                                     struct list_head *head)
472 {
473         spin_lock_irq(&device->resource->req_lock);
474         _drbd_wait_ee_list_empty(device, head);
475         spin_unlock_irq(&device->resource->req_lock);
476 }
477
478 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
479 {
480         struct kvec iov = {
481                 .iov_base = buf,
482                 .iov_len = size,
483         };
484         struct msghdr msg = {
485                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
486         };
487         return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
488 }
489
490 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
491 {
492         int rv;
493
494         rv = drbd_recv_short(connection->data.socket, buf, size, 0);
495
496         if (rv < 0) {
497                 if (rv == -ECONNRESET)
498                         drbd_info(connection, "sock was reset by peer\n");
499                 else if (rv != -ERESTARTSYS)
500                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
501         } else if (rv == 0) {
502                 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
503                         long t;
504                         rcu_read_lock();
505                         t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
506                         rcu_read_unlock();
507
508                         t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
509
510                         if (t)
511                                 goto out;
512                 }
513                 drbd_info(connection, "sock was shut down by peer\n");
514         }
515
516         if (rv != size)
517                 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
518
519 out:
520         return rv;
521 }
522
523 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
524 {
525         int err;
526
527         err = drbd_recv(connection, buf, size);
528         if (err != size) {
529                 if (err >= 0)
530                         err = -EIO;
531         } else
532                 err = 0;
533         return err;
534 }
535
536 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
537 {
538         int err;
539
540         err = drbd_recv_all(connection, buf, size);
541         if (err && !signal_pending(current))
542                 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
543         return err;
544 }
545
546 /* quoting tcp(7):
547  *   On individual connections, the socket buffer size must be set prior to the
548  *   listen(2) or connect(2) calls in order to have it take effect.
549  * This is our wrapper to do so.
550  */
551 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
552                 unsigned int rcv)
553 {
554         /* open coded SO_SNDBUF, SO_RCVBUF */
555         if (snd) {
556                 sock->sk->sk_sndbuf = snd;
557                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
558         }
559         if (rcv) {
560                 sock->sk->sk_rcvbuf = rcv;
561                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
562         }
563 }
564
565 static struct socket *drbd_try_connect(struct drbd_connection *connection)
566 {
567         const char *what;
568         struct socket *sock;
569         struct sockaddr_in6 src_in6;
570         struct sockaddr_in6 peer_in6;
571         struct net_conf *nc;
572         int err, peer_addr_len, my_addr_len;
573         int sndbuf_size, rcvbuf_size, connect_int;
574         int disconnect_on_error = 1;
575
576         rcu_read_lock();
577         nc = rcu_dereference(connection->net_conf);
578         if (!nc) {
579                 rcu_read_unlock();
580                 return NULL;
581         }
582         sndbuf_size = nc->sndbuf_size;
583         rcvbuf_size = nc->rcvbuf_size;
584         connect_int = nc->connect_int;
585         rcu_read_unlock();
586
587         my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
588         memcpy(&src_in6, &connection->my_addr, my_addr_len);
589
590         if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
591                 src_in6.sin6_port = 0;
592         else
593                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
594
595         peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
596         memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
597
598         what = "sock_create_kern";
599         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
600                                SOCK_STREAM, IPPROTO_TCP, &sock);
601         if (err < 0) {
602                 sock = NULL;
603                 goto out;
604         }
605
606         sock->sk->sk_rcvtimeo =
607         sock->sk->sk_sndtimeo = connect_int * HZ;
608         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
609
610        /* explicitly bind to the configured IP as source IP
611         *  for the outgoing connections.
612         *  This is needed for multihomed hosts and to be
613         *  able to use lo: interfaces for drbd.
614         * Make sure to use 0 as port number, so linux selects
615         *  a free one dynamically.
616         */
617         what = "bind before connect";
618         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
619         if (err < 0)
620                 goto out;
621
622         /* connect may fail, peer not yet available.
623          * stay C_WF_CONNECTION, don't go Disconnecting! */
624         disconnect_on_error = 0;
625         what = "connect";
626         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
627
628 out:
629         if (err < 0) {
630                 if (sock) {
631                         sock_release(sock);
632                         sock = NULL;
633                 }
634                 switch (-err) {
635                         /* timeout, busy, signal pending */
636                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
637                 case EINTR: case ERESTARTSYS:
638                         /* peer not (yet) available, network problem */
639                 case ECONNREFUSED: case ENETUNREACH:
640                 case EHOSTDOWN:    case EHOSTUNREACH:
641                         disconnect_on_error = 0;
642                         break;
643                 default:
644                         drbd_err(connection, "%s failed, err = %d\n", what, err);
645                 }
646                 if (disconnect_on_error)
647                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
648         }
649
650         return sock;
651 }
652
653 struct accept_wait_data {
654         struct drbd_connection *connection;
655         struct socket *s_listen;
656         struct completion door_bell;
657         void (*original_sk_state_change)(struct sock *sk);
658
659 };
660
661 static void drbd_incoming_connection(struct sock *sk)
662 {
663         struct accept_wait_data *ad = sk->sk_user_data;
664         void (*state_change)(struct sock *sk);
665
666         state_change = ad->original_sk_state_change;
667         if (sk->sk_state == TCP_ESTABLISHED)
668                 complete(&ad->door_bell);
669         state_change(sk);
670 }
671
672 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
673 {
674         int err, sndbuf_size, rcvbuf_size, my_addr_len;
675         struct sockaddr_in6 my_addr;
676         struct socket *s_listen;
677         struct net_conf *nc;
678         const char *what;
679
680         rcu_read_lock();
681         nc = rcu_dereference(connection->net_conf);
682         if (!nc) {
683                 rcu_read_unlock();
684                 return -EIO;
685         }
686         sndbuf_size = nc->sndbuf_size;
687         rcvbuf_size = nc->rcvbuf_size;
688         rcu_read_unlock();
689
690         my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
691         memcpy(&my_addr, &connection->my_addr, my_addr_len);
692
693         what = "sock_create_kern";
694         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
695                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
696         if (err) {
697                 s_listen = NULL;
698                 goto out;
699         }
700
701         s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
702         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
703
704         what = "bind before listen";
705         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
706         if (err < 0)
707                 goto out;
708
709         ad->s_listen = s_listen;
710         write_lock_bh(&s_listen->sk->sk_callback_lock);
711         ad->original_sk_state_change = s_listen->sk->sk_state_change;
712         s_listen->sk->sk_state_change = drbd_incoming_connection;
713         s_listen->sk->sk_user_data = ad;
714         write_unlock_bh(&s_listen->sk->sk_callback_lock);
715
716         what = "listen";
717         err = s_listen->ops->listen(s_listen, 5);
718         if (err < 0)
719                 goto out;
720
721         return 0;
722 out:
723         if (s_listen)
724                 sock_release(s_listen);
725         if (err < 0) {
726                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
727                         drbd_err(connection, "%s failed, err = %d\n", what, err);
728                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
729                 }
730         }
731
732         return -EIO;
733 }
734
735 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
736 {
737         write_lock_bh(&sk->sk_callback_lock);
738         sk->sk_state_change = ad->original_sk_state_change;
739         sk->sk_user_data = NULL;
740         write_unlock_bh(&sk->sk_callback_lock);
741 }
742
743 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
744 {
745         int timeo, connect_int, err = 0;
746         struct socket *s_estab = NULL;
747         struct net_conf *nc;
748
749         rcu_read_lock();
750         nc = rcu_dereference(connection->net_conf);
751         if (!nc) {
752                 rcu_read_unlock();
753                 return NULL;
754         }
755         connect_int = nc->connect_int;
756         rcu_read_unlock();
757
758         timeo = connect_int * HZ;
759         /* 28.5% random jitter */
760         timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
761
762         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
763         if (err <= 0)
764                 return NULL;
765
766         err = kernel_accept(ad->s_listen, &s_estab, 0);
767         if (err < 0) {
768                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
769                         drbd_err(connection, "accept failed, err = %d\n", err);
770                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
771                 }
772         }
773
774         if (s_estab)
775                 unregister_state_change(s_estab->sk, ad);
776
777         return s_estab;
778 }
779
780 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
781
782 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
783                              enum drbd_packet cmd)
784 {
785         if (!conn_prepare_command(connection, sock))
786                 return -EIO;
787         return conn_send_command(connection, sock, cmd, 0, NULL, 0);
788 }
789
790 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
791 {
792         unsigned int header_size = drbd_header_size(connection);
793         struct packet_info pi;
794         int err;
795
796         err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
797         if (err != header_size) {
798                 if (err >= 0)
799                         err = -EIO;
800                 return err;
801         }
802         err = decode_header(connection, connection->data.rbuf, &pi);
803         if (err)
804                 return err;
805         return pi.cmd;
806 }
807
808 /**
809  * drbd_socket_okay() - Free the socket if its connection is not okay
810  * @sock:       pointer to the pointer to the socket.
811  */
812 static int drbd_socket_okay(struct socket **sock)
813 {
814         int rr;
815         char tb[4];
816
817         if (!*sock)
818                 return false;
819
820         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
821
822         if (rr > 0 || rr == -EAGAIN) {
823                 return true;
824         } else {
825                 sock_release(*sock);
826                 *sock = NULL;
827                 return false;
828         }
829 }
830 /* Gets called if a connection is established, or if a new minor gets created
831    in a connection */
832 int drbd_connected(struct drbd_peer_device *peer_device)
833 {
834         struct drbd_device *device = peer_device->device;
835         int err;
836
837         atomic_set(&device->packet_seq, 0);
838         device->peer_seq = 0;
839
840         device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
841                 &peer_device->connection->cstate_mutex :
842                 &device->own_state_mutex;
843
844         err = drbd_send_sync_param(peer_device);
845         if (!err)
846                 err = drbd_send_sizes(peer_device, 0, 0);
847         if (!err)
848                 err = drbd_send_uuids(peer_device);
849         if (!err)
850                 err = drbd_send_current_state(peer_device);
851         clear_bit(USE_DEGR_WFC_T, &device->flags);
852         clear_bit(RESIZE_PENDING, &device->flags);
853         atomic_set(&device->ap_in_flight, 0);
854         mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
855         return err;
856 }
857
858 /*
859  * return values:
860  *   1 yes, we have a valid connection
861  *   0 oops, did not work out, please try again
862  *  -1 peer talks different language,
863  *     no point in trying again, please go standalone.
864  *  -2 We do not have a network config...
865  */
866 static int conn_connect(struct drbd_connection *connection)
867 {
868         struct drbd_socket sock, msock;
869         struct drbd_peer_device *peer_device;
870         struct net_conf *nc;
871         int vnr, timeout, h, ok;
872         bool discard_my_data;
873         enum drbd_state_rv rv;
874         struct accept_wait_data ad = {
875                 .connection = connection,
876                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
877         };
878
879         clear_bit(DISCONNECT_SENT, &connection->flags);
880         if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
881                 return -2;
882
883         mutex_init(&sock.mutex);
884         sock.sbuf = connection->data.sbuf;
885         sock.rbuf = connection->data.rbuf;
886         sock.socket = NULL;
887         mutex_init(&msock.mutex);
888         msock.sbuf = connection->meta.sbuf;
889         msock.rbuf = connection->meta.rbuf;
890         msock.socket = NULL;
891
892         /* Assume that the peer only understands protocol 80 until we know better.  */
893         connection->agreed_pro_version = 80;
894
895         if (prepare_listen_socket(connection, &ad))
896                 return 0;
897
898         do {
899                 struct socket *s;
900
901                 s = drbd_try_connect(connection);
902                 if (s) {
903                         if (!sock.socket) {
904                                 sock.socket = s;
905                                 send_first_packet(connection, &sock, P_INITIAL_DATA);
906                         } else if (!msock.socket) {
907                                 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
908                                 msock.socket = s;
909                                 send_first_packet(connection, &msock, P_INITIAL_META);
910                         } else {
911                                 drbd_err(connection, "Logic error in conn_connect()\n");
912                                 goto out_release_sockets;
913                         }
914                 }
915
916                 if (sock.socket && msock.socket) {
917                         rcu_read_lock();
918                         nc = rcu_dereference(connection->net_conf);
919                         timeout = nc->ping_timeo * HZ / 10;
920                         rcu_read_unlock();
921                         schedule_timeout_interruptible(timeout);
922                         ok = drbd_socket_okay(&sock.socket);
923                         ok = drbd_socket_okay(&msock.socket) && ok;
924                         if (ok)
925                                 break;
926                 }
927
928 retry:
929                 s = drbd_wait_for_connect(connection, &ad);
930                 if (s) {
931                         int fp = receive_first_packet(connection, s);
932                         drbd_socket_okay(&sock.socket);
933                         drbd_socket_okay(&msock.socket);
934                         switch (fp) {
935                         case P_INITIAL_DATA:
936                                 if (sock.socket) {
937                                         drbd_warn(connection, "initial packet S crossed\n");
938                                         sock_release(sock.socket);
939                                         sock.socket = s;
940                                         goto randomize;
941                                 }
942                                 sock.socket = s;
943                                 break;
944                         case P_INITIAL_META:
945                                 set_bit(RESOLVE_CONFLICTS, &connection->flags);
946                                 if (msock.socket) {
947                                         drbd_warn(connection, "initial packet M crossed\n");
948                                         sock_release(msock.socket);
949                                         msock.socket = s;
950                                         goto randomize;
951                                 }
952                                 msock.socket = s;
953                                 break;
954                         default:
955                                 drbd_warn(connection, "Error receiving initial packet\n");
956                                 sock_release(s);
957 randomize:
958                                 if (prandom_u32() & 1)
959                                         goto retry;
960                         }
961                 }
962
963                 if (connection->cstate <= C_DISCONNECTING)
964                         goto out_release_sockets;
965                 if (signal_pending(current)) {
966                         flush_signals(current);
967                         smp_rmb();
968                         if (get_t_state(&connection->receiver) == EXITING)
969                                 goto out_release_sockets;
970                 }
971
972                 ok = drbd_socket_okay(&sock.socket);
973                 ok = drbd_socket_okay(&msock.socket) && ok;
974         } while (!ok);
975
976         if (ad.s_listen)
977                 sock_release(ad.s_listen);
978
979         sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
980         msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
981
982         sock.socket->sk->sk_allocation = GFP_NOIO;
983         msock.socket->sk->sk_allocation = GFP_NOIO;
984
985         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
986         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
987
988         /* NOT YET ...
989          * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
990          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
991          * first set it to the P_CONNECTION_FEATURES timeout,
992          * which we set to 4x the configured ping_timeout. */
993         rcu_read_lock();
994         nc = rcu_dereference(connection->net_conf);
995
996         sock.socket->sk->sk_sndtimeo =
997         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
998
999         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1000         timeout = nc->timeout * HZ / 10;
1001         discard_my_data = nc->discard_my_data;
1002         rcu_read_unlock();
1003
1004         msock.socket->sk->sk_sndtimeo = timeout;
1005
1006         /* we don't want delays.
1007          * we use TCP_CORK where appropriate, though */
1008         drbd_tcp_nodelay(sock.socket);
1009         drbd_tcp_nodelay(msock.socket);
1010
1011         connection->data.socket = sock.socket;
1012         connection->meta.socket = msock.socket;
1013         connection->last_received = jiffies;
1014
1015         h = drbd_do_features(connection);
1016         if (h <= 0)
1017                 return h;
1018
1019         if (connection->cram_hmac_tfm) {
1020                 /* drbd_request_state(device, NS(conn, WFAuth)); */
1021                 switch (drbd_do_auth(connection)) {
1022                 case -1:
1023                         drbd_err(connection, "Authentication of peer failed\n");
1024                         return -1;
1025                 case 0:
1026                         drbd_err(connection, "Authentication of peer failed, trying again.\n");
1027                         return 0;
1028                 }
1029         }
1030
1031         connection->data.socket->sk->sk_sndtimeo = timeout;
1032         connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1033
1034         if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1035                 return -1;
1036
1037         /* Prevent a race between resync-handshake and
1038          * being promoted to Primary.
1039          *
1040          * Grab and release the state mutex, so we know that any current
1041          * drbd_set_role() is finished, and any incoming drbd_set_role
1042          * will see the STATE_SENT flag, and wait for it to be cleared.
1043          */
1044         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1045                 mutex_lock(peer_device->device->state_mutex);
1046
1047         set_bit(STATE_SENT, &connection->flags);
1048
1049         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1050                 mutex_unlock(peer_device->device->state_mutex);
1051
1052         rcu_read_lock();
1053         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1054                 struct drbd_device *device = peer_device->device;
1055                 kref_get(&device->kref);
1056                 rcu_read_unlock();
1057
1058                 if (discard_my_data)
1059                         set_bit(DISCARD_MY_DATA, &device->flags);
1060                 else
1061                         clear_bit(DISCARD_MY_DATA, &device->flags);
1062
1063                 drbd_connected(peer_device);
1064                 kref_put(&device->kref, drbd_destroy_device);
1065                 rcu_read_lock();
1066         }
1067         rcu_read_unlock();
1068
1069         rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1070         if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1071                 clear_bit(STATE_SENT, &connection->flags);
1072                 return 0;
1073         }
1074
1075         drbd_thread_start(&connection->asender);
1076
1077         mutex_lock(&connection->resource->conf_update);
1078         /* The discard_my_data flag is a single-shot modifier to the next
1079          * connection attempt, the handshake of which is now well underway.
1080          * No need for rcu style copying of the whole struct
1081          * just to clear a single value. */
1082         connection->net_conf->discard_my_data = 0;
1083         mutex_unlock(&connection->resource->conf_update);
1084
1085         return h;
1086
1087 out_release_sockets:
1088         if (ad.s_listen)
1089                 sock_release(ad.s_listen);
1090         if (sock.socket)
1091                 sock_release(sock.socket);
1092         if (msock.socket)
1093                 sock_release(msock.socket);
1094         return -1;
1095 }
1096
1097 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1098 {
1099         unsigned int header_size = drbd_header_size(connection);
1100
1101         if (header_size == sizeof(struct p_header100) &&
1102             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1103                 struct p_header100 *h = header;
1104                 if (h->pad != 0) {
1105                         drbd_err(connection, "Header padding is not zero\n");
1106                         return -EINVAL;
1107                 }
1108                 pi->vnr = be16_to_cpu(h->volume);
1109                 pi->cmd = be16_to_cpu(h->command);
1110                 pi->size = be32_to_cpu(h->length);
1111         } else if (header_size == sizeof(struct p_header95) &&
1112                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1113                 struct p_header95 *h = header;
1114                 pi->cmd = be16_to_cpu(h->command);
1115                 pi->size = be32_to_cpu(h->length);
1116                 pi->vnr = 0;
1117         } else if (header_size == sizeof(struct p_header80) &&
1118                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1119                 struct p_header80 *h = header;
1120                 pi->cmd = be16_to_cpu(h->command);
1121                 pi->size = be16_to_cpu(h->length);
1122                 pi->vnr = 0;
1123         } else {
1124                 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1125                          be32_to_cpu(*(__be32 *)header),
1126                          connection->agreed_pro_version);
1127                 return -EINVAL;
1128         }
1129         pi->data = header + header_size;
1130         return 0;
1131 }
1132
1133 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1134 {
1135         void *buffer = connection->data.rbuf;
1136         int err;
1137
1138         err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1139         if (err)
1140                 return err;
1141
1142         err = decode_header(connection, buffer, pi);
1143         connection->last_received = jiffies;
1144
1145         return err;
1146 }
1147
1148 static void drbd_flush(struct drbd_connection *connection)
1149 {
1150         int rv;
1151         struct drbd_peer_device *peer_device;
1152         int vnr;
1153
1154         if (connection->write_ordering >= WO_bdev_flush) {
1155                 rcu_read_lock();
1156                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1157                         struct drbd_device *device = peer_device->device;
1158
1159                         if (!get_ldev(device))
1160                                 continue;
1161                         kref_get(&device->kref);
1162                         rcu_read_unlock();
1163
1164                         rv = blkdev_issue_flush(device->ldev->backing_bdev,
1165                                         GFP_NOIO, NULL);
1166                         if (rv) {
1167                                 drbd_info(device, "local disk flush failed with status %d\n", rv);
1168                                 /* would rather check on EOPNOTSUPP, but that is not reliable.
1169                                  * don't try again for ANY return value != 0
1170                                  * if (rv == -EOPNOTSUPP) */
1171                                 drbd_bump_write_ordering(connection, WO_drain_io);
1172                         }
1173                         put_ldev(device);
1174                         kref_put(&device->kref, drbd_destroy_device);
1175
1176                         rcu_read_lock();
1177                         if (rv)
1178                                 break;
1179                 }
1180                 rcu_read_unlock();
1181         }
1182 }
1183
1184 /**
1185  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1186  * @device:     DRBD device.
1187  * @epoch:      Epoch object.
1188  * @ev:         Epoch event.
1189  */
1190 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1191                                                struct drbd_epoch *epoch,
1192                                                enum epoch_event ev)
1193 {
1194         int epoch_size;
1195         struct drbd_epoch *next_epoch;
1196         enum finish_epoch rv = FE_STILL_LIVE;
1197
1198         spin_lock(&connection->epoch_lock);
1199         do {
1200                 next_epoch = NULL;
1201
1202                 epoch_size = atomic_read(&epoch->epoch_size);
1203
1204                 switch (ev & ~EV_CLEANUP) {
1205                 case EV_PUT:
1206                         atomic_dec(&epoch->active);
1207                         break;
1208                 case EV_GOT_BARRIER_NR:
1209                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1210                         break;
1211                 case EV_BECAME_LAST:
1212                         /* nothing to do*/
1213                         break;
1214                 }
1215
1216                 if (epoch_size != 0 &&
1217                     atomic_read(&epoch->active) == 0 &&
1218                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1219                         if (!(ev & EV_CLEANUP)) {
1220                                 spin_unlock(&connection->epoch_lock);
1221                                 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1222                                 spin_lock(&connection->epoch_lock);
1223                         }
1224 #if 0
1225                         /* FIXME: dec unacked on connection, once we have
1226                          * something to count pending connection packets in. */
1227                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1228                                 dec_unacked(epoch->connection);
1229 #endif
1230
1231                         if (connection->current_epoch != epoch) {
1232                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1233                                 list_del(&epoch->list);
1234                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1235                                 connection->epochs--;
1236                                 kfree(epoch);
1237
1238                                 if (rv == FE_STILL_LIVE)
1239                                         rv = FE_DESTROYED;
1240                         } else {
1241                                 epoch->flags = 0;
1242                                 atomic_set(&epoch->epoch_size, 0);
1243                                 /* atomic_set(&epoch->active, 0); is already zero */
1244                                 if (rv == FE_STILL_LIVE)
1245                                         rv = FE_RECYCLED;
1246                         }
1247                 }
1248
1249                 if (!next_epoch)
1250                         break;
1251
1252                 epoch = next_epoch;
1253         } while (1);
1254
1255         spin_unlock(&connection->epoch_lock);
1256
1257         return rv;
1258 }
1259
1260 /**
1261  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1262  * @connection: DRBD connection.
1263  * @wo:         Write ordering method to try.
1264  */
1265 void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo)
1266 {
1267         struct disk_conf *dc;
1268         struct drbd_peer_device *peer_device;
1269         enum write_ordering_e pwo;
1270         int vnr;
1271         static char *write_ordering_str[] = {
1272                 [WO_none] = "none",
1273                 [WO_drain_io] = "drain",
1274                 [WO_bdev_flush] = "flush",
1275         };
1276
1277         pwo = connection->write_ordering;
1278         wo = min(pwo, wo);
1279         rcu_read_lock();
1280         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1281                 struct drbd_device *device = peer_device->device;
1282
1283                 if (!get_ldev_if_state(device, D_ATTACHING))
1284                         continue;
1285                 dc = rcu_dereference(device->ldev->disk_conf);
1286
1287                 if (wo == WO_bdev_flush && !dc->disk_flushes)
1288                         wo = WO_drain_io;
1289                 if (wo == WO_drain_io && !dc->disk_drain)
1290                         wo = WO_none;
1291                 put_ldev(device);
1292         }
1293         rcu_read_unlock();
1294         connection->write_ordering = wo;
1295         if (pwo != connection->write_ordering || wo == WO_bdev_flush)
1296                 drbd_info(connection, "Method to ensure write ordering: %s\n", write_ordering_str[connection->write_ordering]);
1297 }
1298
1299 /**
1300  * drbd_submit_peer_request()
1301  * @device:     DRBD device.
1302  * @peer_req:   peer request
1303  * @rw:         flag field, see bio->bi_rw
1304  *
1305  * May spread the pages to multiple bios,
1306  * depending on bio_add_page restrictions.
1307  *
1308  * Returns 0 if all bios have been submitted,
1309  * -ENOMEM if we could not allocate enough bios,
1310  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1311  *  single page to an empty bio (which should never happen and likely indicates
1312  *  that the lower level IO stack is in some way broken). This has been observed
1313  *  on certain Xen deployments.
1314  */
1315 /* TODO allocate from our own bio_set. */
1316 int drbd_submit_peer_request(struct drbd_device *device,
1317                              struct drbd_peer_request *peer_req,
1318                              const unsigned rw, const int fault_type)
1319 {
1320         struct bio *bios = NULL;
1321         struct bio *bio;
1322         struct page *page = peer_req->pages;
1323         sector_t sector = peer_req->i.sector;
1324         unsigned ds = peer_req->i.size;
1325         unsigned n_bios = 0;
1326         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1327         int err = -ENOMEM;
1328
1329         if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) {
1330                 /* wait for all pending IO completions, before we start
1331                  * zeroing things out. */
1332                 conn_wait_active_ee_empty(first_peer_device(device)->connection);
1333                 if (blkdev_issue_zeroout(device->ldev->backing_bdev,
1334                         sector, ds >> 9, GFP_NOIO))
1335                         peer_req->flags |= EE_WAS_ERROR;
1336                 drbd_endio_write_sec_final(peer_req);
1337                 return 0;
1338         }
1339
1340         if (peer_req->flags & EE_IS_TRIM)
1341                 nr_pages = 0; /* discards don't have any payload. */
1342
1343         /* In most cases, we will only need one bio.  But in case the lower
1344          * level restrictions happen to be different at this offset on this
1345          * side than those of the sending peer, we may need to submit the
1346          * request in more than one bio.
1347          *
1348          * Plain bio_alloc is good enough here, this is no DRBD internally
1349          * generated bio, but a bio allocated on behalf of the peer.
1350          */
1351 next_bio:
1352         bio = bio_alloc(GFP_NOIO, nr_pages);
1353         if (!bio) {
1354                 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1355                 goto fail;
1356         }
1357         /* > peer_req->i.sector, unless this is the first bio */
1358         bio->bi_iter.bi_sector = sector;
1359         bio->bi_bdev = device->ldev->backing_bdev;
1360         bio->bi_rw = rw;
1361         bio->bi_private = peer_req;
1362         bio->bi_end_io = drbd_peer_request_endio;
1363
1364         bio->bi_next = bios;
1365         bios = bio;
1366         ++n_bios;
1367
1368         if (rw & REQ_DISCARD) {
1369                 bio->bi_iter.bi_size = ds;
1370                 goto submit;
1371         }
1372
1373         page_chain_for_each(page) {
1374                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1375                 if (!bio_add_page(bio, page, len, 0)) {
1376                         /* A single page must always be possible!
1377                          * But in case it fails anyways,
1378                          * we deal with it, and complain (below). */
1379                         if (bio->bi_vcnt == 0) {
1380                                 drbd_err(device,
1381                                         "bio_add_page failed for len=%u, "
1382                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1383                                         len, (uint64_t)bio->bi_iter.bi_sector);
1384                                 err = -ENOSPC;
1385                                 goto fail;
1386                         }
1387                         goto next_bio;
1388                 }
1389                 ds -= len;
1390                 sector += len >> 9;
1391                 --nr_pages;
1392         }
1393         D_ASSERT(device, ds == 0);
1394 submit:
1395         D_ASSERT(device, page == NULL);
1396
1397         atomic_set(&peer_req->pending_bios, n_bios);
1398         do {
1399                 bio = bios;
1400                 bios = bios->bi_next;
1401                 bio->bi_next = NULL;
1402
1403                 drbd_generic_make_request(device, fault_type, bio);
1404         } while (bios);
1405         return 0;
1406
1407 fail:
1408         while (bios) {
1409                 bio = bios;
1410                 bios = bios->bi_next;
1411                 bio_put(bio);
1412         }
1413         return err;
1414 }
1415
1416 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1417                                              struct drbd_peer_request *peer_req)
1418 {
1419         struct drbd_interval *i = &peer_req->i;
1420
1421         drbd_remove_interval(&device->write_requests, i);
1422         drbd_clear_interval(i);
1423
1424         /* Wake up any processes waiting for this peer request to complete.  */
1425         if (i->waiting)
1426                 wake_up(&device->misc_wait);
1427 }
1428
1429 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1430 {
1431         struct drbd_peer_device *peer_device;
1432         int vnr;
1433
1434         rcu_read_lock();
1435         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1436                 struct drbd_device *device = peer_device->device;
1437
1438                 kref_get(&device->kref);
1439                 rcu_read_unlock();
1440                 drbd_wait_ee_list_empty(device, &device->active_ee);
1441                 kref_put(&device->kref, drbd_destroy_device);
1442                 rcu_read_lock();
1443         }
1444         rcu_read_unlock();
1445 }
1446
1447 static struct drbd_peer_device *
1448 conn_peer_device(struct drbd_connection *connection, int volume_number)
1449 {
1450         return idr_find(&connection->peer_devices, volume_number);
1451 }
1452
1453 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1454 {
1455         int rv;
1456         struct p_barrier *p = pi->data;
1457         struct drbd_epoch *epoch;
1458
1459         /* FIXME these are unacked on connection,
1460          * not a specific (peer)device.
1461          */
1462         connection->current_epoch->barrier_nr = p->barrier;
1463         connection->current_epoch->connection = connection;
1464         rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1465
1466         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1467          * the activity log, which means it would not be resynced in case the
1468          * R_PRIMARY crashes now.
1469          * Therefore we must send the barrier_ack after the barrier request was
1470          * completed. */
1471         switch (connection->write_ordering) {
1472         case WO_none:
1473                 if (rv == FE_RECYCLED)
1474                         return 0;
1475
1476                 /* receiver context, in the writeout path of the other node.
1477                  * avoid potential distributed deadlock */
1478                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1479                 if (epoch)
1480                         break;
1481                 else
1482                         drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1483                         /* Fall through */
1484
1485         case WO_bdev_flush:
1486         case WO_drain_io:
1487                 conn_wait_active_ee_empty(connection);
1488                 drbd_flush(connection);
1489
1490                 if (atomic_read(&connection->current_epoch->epoch_size)) {
1491                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1492                         if (epoch)
1493                                 break;
1494                 }
1495
1496                 return 0;
1497         default:
1498                 drbd_err(connection, "Strangeness in connection->write_ordering %d\n", connection->write_ordering);
1499                 return -EIO;
1500         }
1501
1502         epoch->flags = 0;
1503         atomic_set(&epoch->epoch_size, 0);
1504         atomic_set(&epoch->active, 0);
1505
1506         spin_lock(&connection->epoch_lock);
1507         if (atomic_read(&connection->current_epoch->epoch_size)) {
1508                 list_add(&epoch->list, &connection->current_epoch->list);
1509                 connection->current_epoch = epoch;
1510                 connection->epochs++;
1511         } else {
1512                 /* The current_epoch got recycled while we allocated this one... */
1513                 kfree(epoch);
1514         }
1515         spin_unlock(&connection->epoch_lock);
1516
1517         return 0;
1518 }
1519
1520 /* used from receive_RSDataReply (recv_resync_read)
1521  * and from receive_Data */
1522 static struct drbd_peer_request *
1523 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1524               struct packet_info *pi) __must_hold(local)
1525 {
1526         struct drbd_device *device = peer_device->device;
1527         const sector_t capacity = drbd_get_capacity(device->this_bdev);
1528         struct drbd_peer_request *peer_req;
1529         struct page *page;
1530         int dgs, ds, err;
1531         int data_size = pi->size;
1532         void *dig_in = peer_device->connection->int_dig_in;
1533         void *dig_vv = peer_device->connection->int_dig_vv;
1534         unsigned long *data;
1535         struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1536
1537         dgs = 0;
1538         if (!trim && peer_device->connection->peer_integrity_tfm) {
1539                 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1540                 /*
1541                  * FIXME: Receive the incoming digest into the receive buffer
1542                  *        here, together with its struct p_data?
1543                  */
1544                 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1545                 if (err)
1546                         return NULL;
1547                 data_size -= dgs;
1548         }
1549
1550         if (trim) {
1551                 D_ASSERT(peer_device, data_size == 0);
1552                 data_size = be32_to_cpu(trim->size);
1553         }
1554
1555         if (!expect(IS_ALIGNED(data_size, 512)))
1556                 return NULL;
1557         /* prepare for larger trim requests. */
1558         if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
1559                 return NULL;
1560
1561         /* even though we trust out peer,
1562          * we sometimes have to double check. */
1563         if (sector + (data_size>>9) > capacity) {
1564                 drbd_err(device, "request from peer beyond end of local disk: "
1565                         "capacity: %llus < sector: %llus + size: %u\n",
1566                         (unsigned long long)capacity,
1567                         (unsigned long long)sector, data_size);
1568                 return NULL;
1569         }
1570
1571         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1572          * "criss-cross" setup, that might cause write-out on some other DRBD,
1573          * which in turn might block on the other node at this very place.  */
1574         peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
1575         if (!peer_req)
1576                 return NULL;
1577
1578         if (trim)
1579                 return peer_req;
1580
1581         ds = data_size;
1582         page = peer_req->pages;
1583         page_chain_for_each(page) {
1584                 unsigned len = min_t(int, ds, PAGE_SIZE);
1585                 data = kmap(page);
1586                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1587                 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1588                         drbd_err(device, "Fault injection: Corrupting data on receive\n");
1589                         data[0] = data[0] ^ (unsigned long)-1;
1590                 }
1591                 kunmap(page);
1592                 if (err) {
1593                         drbd_free_peer_req(device, peer_req);
1594                         return NULL;
1595                 }
1596                 ds -= len;
1597         }
1598
1599         if (dgs) {
1600                 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1601                 if (memcmp(dig_in, dig_vv, dgs)) {
1602                         drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1603                                 (unsigned long long)sector, data_size);
1604                         drbd_free_peer_req(device, peer_req);
1605                         return NULL;
1606                 }
1607         }
1608         device->recv_cnt += data_size>>9;
1609         return peer_req;
1610 }
1611
1612 /* drbd_drain_block() just takes a data block
1613  * out of the socket input buffer, and discards it.
1614  */
1615 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1616 {
1617         struct page *page;
1618         int err = 0;
1619         void *data;
1620
1621         if (!data_size)
1622                 return 0;
1623
1624         page = drbd_alloc_pages(peer_device, 1, 1);
1625
1626         data = kmap(page);
1627         while (data_size) {
1628                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1629
1630                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1631                 if (err)
1632                         break;
1633                 data_size -= len;
1634         }
1635         kunmap(page);
1636         drbd_free_pages(peer_device->device, page, 0);
1637         return err;
1638 }
1639
1640 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1641                            sector_t sector, int data_size)
1642 {
1643         struct bio_vec bvec;
1644         struct bvec_iter iter;
1645         struct bio *bio;
1646         int dgs, err, expect;
1647         void *dig_in = peer_device->connection->int_dig_in;
1648         void *dig_vv = peer_device->connection->int_dig_vv;
1649
1650         dgs = 0;
1651         if (peer_device->connection->peer_integrity_tfm) {
1652                 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1653                 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1654                 if (err)
1655                         return err;
1656                 data_size -= dgs;
1657         }
1658
1659         /* optimistically update recv_cnt.  if receiving fails below,
1660          * we disconnect anyways, and counters will be reset. */
1661         peer_device->device->recv_cnt += data_size>>9;
1662
1663         bio = req->master_bio;
1664         D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1665
1666         bio_for_each_segment(bvec, bio, iter) {
1667                 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1668                 expect = min_t(int, data_size, bvec.bv_len);
1669                 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1670                 kunmap(bvec.bv_page);
1671                 if (err)
1672                         return err;
1673                 data_size -= expect;
1674         }
1675
1676         if (dgs) {
1677                 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1678                 if (memcmp(dig_in, dig_vv, dgs)) {
1679                         drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1680                         return -EINVAL;
1681                 }
1682         }
1683
1684         D_ASSERT(peer_device->device, data_size == 0);
1685         return 0;
1686 }
1687
1688 /*
1689  * e_end_resync_block() is called in asender context via
1690  * drbd_finish_peer_reqs().
1691  */
1692 static int e_end_resync_block(struct drbd_work *w, int unused)
1693 {
1694         struct drbd_peer_request *peer_req =
1695                 container_of(w, struct drbd_peer_request, w);
1696         struct drbd_peer_device *peer_device = peer_req->peer_device;
1697         struct drbd_device *device = peer_device->device;
1698         sector_t sector = peer_req->i.sector;
1699         int err;
1700
1701         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1702
1703         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1704                 drbd_set_in_sync(device, sector, peer_req->i.size);
1705                 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1706         } else {
1707                 /* Record failure to sync */
1708                 drbd_rs_failed_io(device, sector, peer_req->i.size);
1709
1710                 err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1711         }
1712         dec_unacked(device);
1713
1714         return err;
1715 }
1716
1717 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1718                             struct packet_info *pi) __releases(local)
1719 {
1720         struct drbd_device *device = peer_device->device;
1721         struct drbd_peer_request *peer_req;
1722
1723         peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1724         if (!peer_req)
1725                 goto fail;
1726
1727         dec_rs_pending(device);
1728
1729         inc_unacked(device);
1730         /* corresponding dec_unacked() in e_end_resync_block()
1731          * respective _drbd_clear_done_ee */
1732
1733         peer_req->w.cb = e_end_resync_block;
1734
1735         spin_lock_irq(&device->resource->req_lock);
1736         list_add(&peer_req->w.list, &device->sync_ee);
1737         spin_unlock_irq(&device->resource->req_lock);
1738
1739         atomic_add(pi->size >> 9, &device->rs_sect_ev);
1740         if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1741                 return 0;
1742
1743         /* don't care for the reason here */
1744         drbd_err(device, "submit failed, triggering re-connect\n");
1745         spin_lock_irq(&device->resource->req_lock);
1746         list_del(&peer_req->w.list);
1747         spin_unlock_irq(&device->resource->req_lock);
1748
1749         drbd_free_peer_req(device, peer_req);
1750 fail:
1751         put_ldev(device);
1752         return -EIO;
1753 }
1754
1755 static struct drbd_request *
1756 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1757              sector_t sector, bool missing_ok, const char *func)
1758 {
1759         struct drbd_request *req;
1760
1761         /* Request object according to our peer */
1762         req = (struct drbd_request *)(unsigned long)id;
1763         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1764                 return req;
1765         if (!missing_ok) {
1766                 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1767                         (unsigned long)id, (unsigned long long)sector);
1768         }
1769         return NULL;
1770 }
1771
1772 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1773 {
1774         struct drbd_peer_device *peer_device;
1775         struct drbd_device *device;
1776         struct drbd_request *req;
1777         sector_t sector;
1778         int err;
1779         struct p_data *p = pi->data;
1780
1781         peer_device = conn_peer_device(connection, pi->vnr);
1782         if (!peer_device)
1783                 return -EIO;
1784         device = peer_device->device;
1785
1786         sector = be64_to_cpu(p->sector);
1787
1788         spin_lock_irq(&device->resource->req_lock);
1789         req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1790         spin_unlock_irq(&device->resource->req_lock);
1791         if (unlikely(!req))
1792                 return -EIO;
1793
1794         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1795          * special casing it there for the various failure cases.
1796          * still no race with drbd_fail_pending_reads */
1797         err = recv_dless_read(peer_device, req, sector, pi->size);
1798         if (!err)
1799                 req_mod(req, DATA_RECEIVED);
1800         /* else: nothing. handled from drbd_disconnect...
1801          * I don't think we may complete this just yet
1802          * in case we are "on-disconnect: freeze" */
1803
1804         return err;
1805 }
1806
1807 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1808 {
1809         struct drbd_peer_device *peer_device;
1810         struct drbd_device *device;
1811         sector_t sector;
1812         int err;
1813         struct p_data *p = pi->data;
1814
1815         peer_device = conn_peer_device(connection, pi->vnr);
1816         if (!peer_device)
1817                 return -EIO;
1818         device = peer_device->device;
1819
1820         sector = be64_to_cpu(p->sector);
1821         D_ASSERT(device, p->block_id == ID_SYNCER);
1822
1823         if (get_ldev(device)) {
1824                 /* data is submitted to disk within recv_resync_read.
1825                  * corresponding put_ldev done below on error,
1826                  * or in drbd_peer_request_endio. */
1827                 err = recv_resync_read(peer_device, sector, pi);
1828         } else {
1829                 if (__ratelimit(&drbd_ratelimit_state))
1830                         drbd_err(device, "Can not write resync data to local disk.\n");
1831
1832                 err = drbd_drain_block(peer_device, pi->size);
1833
1834                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1835         }
1836
1837         atomic_add(pi->size >> 9, &device->rs_sect_in);
1838
1839         return err;
1840 }
1841
1842 static void restart_conflicting_writes(struct drbd_device *device,
1843                                        sector_t sector, int size)
1844 {
1845         struct drbd_interval *i;
1846         struct drbd_request *req;
1847
1848         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1849                 if (!i->local)
1850                         continue;
1851                 req = container_of(i, struct drbd_request, i);
1852                 if (req->rq_state & RQ_LOCAL_PENDING ||
1853                     !(req->rq_state & RQ_POSTPONED))
1854                         continue;
1855                 /* as it is RQ_POSTPONED, this will cause it to
1856                  * be queued on the retry workqueue. */
1857                 __req_mod(req, CONFLICT_RESOLVED, NULL);
1858         }
1859 }
1860
1861 /*
1862  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1863  */
1864 static int e_end_block(struct drbd_work *w, int cancel)
1865 {
1866         struct drbd_peer_request *peer_req =
1867                 container_of(w, struct drbd_peer_request, w);
1868         struct drbd_peer_device *peer_device = peer_req->peer_device;
1869         struct drbd_device *device = peer_device->device;
1870         sector_t sector = peer_req->i.sector;
1871         int err = 0, pcmd;
1872
1873         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1874                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1875                         pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1876                                 device->state.conn <= C_PAUSED_SYNC_T &&
1877                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1878                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1879                         err = drbd_send_ack(peer_device, pcmd, peer_req);
1880                         if (pcmd == P_RS_WRITE_ACK)
1881                                 drbd_set_in_sync(device, sector, peer_req->i.size);
1882                 } else {
1883                         err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1884                         /* we expect it to be marked out of sync anyways...
1885                          * maybe assert this?  */
1886                 }
1887                 dec_unacked(device);
1888         }
1889         /* we delete from the conflict detection hash _after_ we sent out the
1890          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1891         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1892                 spin_lock_irq(&device->resource->req_lock);
1893                 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1894                 drbd_remove_epoch_entry_interval(device, peer_req);
1895                 if (peer_req->flags & EE_RESTART_REQUESTS)
1896                         restart_conflicting_writes(device, sector, peer_req->i.size);
1897                 spin_unlock_irq(&device->resource->req_lock);
1898         } else
1899                 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1900
1901         drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1902
1903         return err;
1904 }
1905
1906 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1907 {
1908         struct drbd_peer_request *peer_req =
1909                 container_of(w, struct drbd_peer_request, w);
1910         struct drbd_peer_device *peer_device = peer_req->peer_device;
1911         int err;
1912
1913         err = drbd_send_ack(peer_device, ack, peer_req);
1914         dec_unacked(peer_device->device);
1915
1916         return err;
1917 }
1918
1919 static int e_send_superseded(struct drbd_work *w, int unused)
1920 {
1921         return e_send_ack(w, P_SUPERSEDED);
1922 }
1923
1924 static int e_send_retry_write(struct drbd_work *w, int unused)
1925 {
1926         struct drbd_peer_request *peer_req =
1927                 container_of(w, struct drbd_peer_request, w);
1928         struct drbd_connection *connection = peer_req->peer_device->connection;
1929
1930         return e_send_ack(w, connection->agreed_pro_version >= 100 ?
1931                              P_RETRY_WRITE : P_SUPERSEDED);
1932 }
1933
1934 static bool seq_greater(u32 a, u32 b)
1935 {
1936         /*
1937          * We assume 32-bit wrap-around here.
1938          * For 24-bit wrap-around, we would have to shift:
1939          *  a <<= 8; b <<= 8;
1940          */
1941         return (s32)a - (s32)b > 0;
1942 }
1943
1944 static u32 seq_max(u32 a, u32 b)
1945 {
1946         return seq_greater(a, b) ? a : b;
1947 }
1948
1949 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
1950 {
1951         struct drbd_device *device = peer_device->device;
1952         unsigned int newest_peer_seq;
1953
1954         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
1955                 spin_lock(&device->peer_seq_lock);
1956                 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
1957                 device->peer_seq = newest_peer_seq;
1958                 spin_unlock(&device->peer_seq_lock);
1959                 /* wake up only if we actually changed device->peer_seq */
1960                 if (peer_seq == newest_peer_seq)
1961                         wake_up(&device->seq_wait);
1962         }
1963 }
1964
1965 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1966 {
1967         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1968 }
1969
1970 /* maybe change sync_ee into interval trees as well? */
1971 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
1972 {
1973         struct drbd_peer_request *rs_req;
1974         bool rv = 0;
1975
1976         spin_lock_irq(&device->resource->req_lock);
1977         list_for_each_entry(rs_req, &device->sync_ee, w.list) {
1978                 if (overlaps(peer_req->i.sector, peer_req->i.size,
1979                              rs_req->i.sector, rs_req->i.size)) {
1980                         rv = 1;
1981                         break;
1982                 }
1983         }
1984         spin_unlock_irq(&device->resource->req_lock);
1985
1986         return rv;
1987 }
1988
1989 /* Called from receive_Data.
1990  * Synchronize packets on sock with packets on msock.
1991  *
1992  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1993  * packet traveling on msock, they are still processed in the order they have
1994  * been sent.
1995  *
1996  * Note: we don't care for Ack packets overtaking P_DATA packets.
1997  *
1998  * In case packet_seq is larger than device->peer_seq number, there are
1999  * outstanding packets on the msock. We wait for them to arrive.
2000  * In case we are the logically next packet, we update device->peer_seq
2001  * ourselves. Correctly handles 32bit wrap around.
2002  *
2003  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2004  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2005  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2006  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2007  *
2008  * returns 0 if we may process the packet,
2009  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2010 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2011 {
2012         struct drbd_device *device = peer_device->device;
2013         DEFINE_WAIT(wait);
2014         long timeout;
2015         int ret = 0, tp;
2016
2017         if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2018                 return 0;
2019
2020         spin_lock(&device->peer_seq_lock);
2021         for (;;) {
2022                 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2023                         device->peer_seq = seq_max(device->peer_seq, peer_seq);
2024                         break;
2025                 }
2026
2027                 if (signal_pending(current)) {
2028                         ret = -ERESTARTSYS;
2029                         break;
2030                 }
2031
2032                 rcu_read_lock();
2033                 tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
2034                 rcu_read_unlock();
2035
2036                 if (!tp)
2037                         break;
2038
2039                 /* Only need to wait if two_primaries is enabled */
2040                 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2041                 spin_unlock(&device->peer_seq_lock);
2042                 rcu_read_lock();
2043                 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2044                 rcu_read_unlock();
2045                 timeout = schedule_timeout(timeout);
2046                 spin_lock(&device->peer_seq_lock);
2047                 if (!timeout) {
2048                         ret = -ETIMEDOUT;
2049                         drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2050                         break;
2051                 }
2052         }
2053         spin_unlock(&device->peer_seq_lock);
2054         finish_wait(&device->seq_wait, &wait);
2055         return ret;
2056 }
2057
2058 /* see also bio_flags_to_wire()
2059  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2060  * flags and back. We may replicate to other kernel versions. */
2061 static unsigned long wire_flags_to_bio(u32 dpf)
2062 {
2063         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2064                 (dpf & DP_FUA ? REQ_FUA : 0) |
2065                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2066                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2067 }
2068
2069 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2070                                     unsigned int size)
2071 {
2072         struct drbd_interval *i;
2073
2074     repeat:
2075         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2076                 struct drbd_request *req;
2077                 struct bio_and_error m;
2078
2079                 if (!i->local)
2080                         continue;
2081                 req = container_of(i, struct drbd_request, i);
2082                 if (!(req->rq_state & RQ_POSTPONED))
2083                         continue;
2084                 req->rq_state &= ~RQ_POSTPONED;
2085                 __req_mod(req, NEG_ACKED, &m);
2086                 spin_unlock_irq(&device->resource->req_lock);
2087                 if (m.bio)
2088                         complete_master_bio(device, &m);
2089                 spin_lock_irq(&device->resource->req_lock);
2090                 goto repeat;
2091         }
2092 }
2093
2094 static int handle_write_conflicts(struct drbd_device *device,
2095                                   struct drbd_peer_request *peer_req)
2096 {
2097         struct drbd_connection *connection = peer_req->peer_device->connection;
2098         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2099         sector_t sector = peer_req->i.sector;
2100         const unsigned int size = peer_req->i.size;
2101         struct drbd_interval *i;
2102         bool equal;
2103         int err;
2104
2105         /*
2106          * Inserting the peer request into the write_requests tree will prevent
2107          * new conflicting local requests from being added.
2108          */
2109         drbd_insert_interval(&device->write_requests, &peer_req->i);
2110
2111     repeat:
2112         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2113                 if (i == &peer_req->i)
2114                         continue;
2115
2116                 if (!i->local) {
2117                         /*
2118                          * Our peer has sent a conflicting remote request; this
2119                          * should not happen in a two-node setup.  Wait for the
2120                          * earlier peer request to complete.
2121                          */
2122                         err = drbd_wait_misc(device, i);
2123                         if (err)
2124                                 goto out;
2125                         goto repeat;
2126                 }
2127
2128                 equal = i->sector == sector && i->size == size;
2129                 if (resolve_conflicts) {
2130                         /*
2131                          * If the peer request is fully contained within the
2132                          * overlapping request, it can be considered overwritten
2133                          * and thus superseded; otherwise, it will be retried
2134                          * once all overlapping requests have completed.
2135                          */
2136                         bool superseded = i->sector <= sector && i->sector +
2137                                        (i->size >> 9) >= sector + (size >> 9);
2138
2139                         if (!equal)
2140                                 drbd_alert(device, "Concurrent writes detected: "
2141                                                "local=%llus +%u, remote=%llus +%u, "
2142                                                "assuming %s came first\n",
2143                                           (unsigned long long)i->sector, i->size,
2144                                           (unsigned long long)sector, size,
2145                                           superseded ? "local" : "remote");
2146
2147                         inc_unacked(device);
2148                         peer_req->w.cb = superseded ? e_send_superseded :
2149                                                    e_send_retry_write;
2150                         list_add_tail(&peer_req->w.list, &device->done_ee);
2151                         wake_asender(connection);
2152
2153                         err = -ENOENT;
2154                         goto out;
2155                 } else {
2156                         struct drbd_request *req =
2157                                 container_of(i, struct drbd_request, i);
2158
2159                         if (!equal)
2160                                 drbd_alert(device, "Concurrent writes detected: "
2161                                                "local=%llus +%u, remote=%llus +%u\n",
2162                                           (unsigned long long)i->sector, i->size,
2163                                           (unsigned long long)sector, size);
2164
2165                         if (req->rq_state & RQ_LOCAL_PENDING ||
2166                             !(req->rq_state & RQ_POSTPONED)) {
2167                                 /*
2168                                  * Wait for the node with the discard flag to
2169                                  * decide if this request has been superseded
2170                                  * or needs to be retried.
2171                                  * Requests that have been superseded will
2172                                  * disappear from the write_requests tree.
2173                                  *
2174                                  * In addition, wait for the conflicting
2175                                  * request to finish locally before submitting
2176                                  * the conflicting peer request.
2177                                  */
2178                                 err = drbd_wait_misc(device, &req->i);
2179                                 if (err) {
2180                                         _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2181                                         fail_postponed_requests(device, sector, size);
2182                                         goto out;
2183                                 }
2184                                 goto repeat;
2185                         }
2186                         /*
2187                          * Remember to restart the conflicting requests after
2188                          * the new peer request has completed.
2189                          */
2190                         peer_req->flags |= EE_RESTART_REQUESTS;
2191                 }
2192         }
2193         err = 0;
2194
2195     out:
2196         if (err)
2197                 drbd_remove_epoch_entry_interval(device, peer_req);
2198         return err;
2199 }
2200
2201 /* mirrored write */
2202 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2203 {
2204         struct drbd_peer_device *peer_device;
2205         struct drbd_device *device;
2206         sector_t sector;
2207         struct drbd_peer_request *peer_req;
2208         struct p_data *p = pi->data;
2209         u32 peer_seq = be32_to_cpu(p->seq_num);
2210         int rw = WRITE;
2211         u32 dp_flags;
2212         int err, tp;
2213
2214         peer_device = conn_peer_device(connection, pi->vnr);
2215         if (!peer_device)
2216                 return -EIO;
2217         device = peer_device->device;
2218
2219         if (!get_ldev(device)) {
2220                 int err2;
2221
2222                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2223                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2224                 atomic_inc(&connection->current_epoch->epoch_size);
2225                 err2 = drbd_drain_block(peer_device, pi->size);
2226                 if (!err)
2227                         err = err2;
2228                 return err;
2229         }
2230
2231         /*
2232          * Corresponding put_ldev done either below (on various errors), or in
2233          * drbd_peer_request_endio, if we successfully submit the data at the
2234          * end of this function.
2235          */
2236
2237         sector = be64_to_cpu(p->sector);
2238         peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2239         if (!peer_req) {
2240                 put_ldev(device);
2241                 return -EIO;
2242         }
2243
2244         peer_req->w.cb = e_end_block;
2245
2246         dp_flags = be32_to_cpu(p->dp_flags);
2247         rw |= wire_flags_to_bio(dp_flags);
2248         if (pi->cmd == P_TRIM) {
2249                 struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
2250                 peer_req->flags |= EE_IS_TRIM;
2251                 if (!blk_queue_discard(q))
2252                         peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
2253                 D_ASSERT(peer_device, peer_req->i.size > 0);
2254                 D_ASSERT(peer_device, rw & REQ_DISCARD);
2255                 D_ASSERT(peer_device, peer_req->pages == NULL);
2256         } else if (peer_req->pages == NULL) {
2257                 D_ASSERT(device, peer_req->i.size == 0);
2258                 D_ASSERT(device, dp_flags & DP_FLUSH);
2259         }
2260
2261         if (dp_flags & DP_MAY_SET_IN_SYNC)
2262                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2263
2264         spin_lock(&connection->epoch_lock);
2265         peer_req->epoch = connection->current_epoch;
2266         atomic_inc(&peer_req->epoch->epoch_size);
2267         atomic_inc(&peer_req->epoch->active);
2268         spin_unlock(&connection->epoch_lock);
2269
2270         rcu_read_lock();
2271         tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2272         rcu_read_unlock();
2273         if (tp) {
2274                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2275                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2276                 if (err)
2277                         goto out_interrupted;
2278                 spin_lock_irq(&device->resource->req_lock);
2279                 err = handle_write_conflicts(device, peer_req);
2280                 if (err) {
2281                         spin_unlock_irq(&device->resource->req_lock);
2282                         if (err == -ENOENT) {
2283                                 put_ldev(device);
2284                                 return 0;
2285                         }
2286                         goto out_interrupted;
2287                 }
2288         } else {
2289                 update_peer_seq(peer_device, peer_seq);
2290                 spin_lock_irq(&device->resource->req_lock);
2291         }
2292         /* if we use the zeroout fallback code, we process synchronously
2293          * and we wait for all pending requests, respectively wait for
2294          * active_ee to become empty in drbd_submit_peer_request();
2295          * better not add ourselves here. */
2296         if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
2297                 list_add(&peer_req->w.list, &device->active_ee);
2298         spin_unlock_irq(&device->resource->req_lock);
2299
2300         if (device->state.conn == C_SYNC_TARGET)
2301                 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2302
2303         if (peer_device->connection->agreed_pro_version < 100) {
2304                 rcu_read_lock();
2305                 switch (rcu_dereference(peer_device->connection->net_conf)->wire_protocol) {
2306                 case DRBD_PROT_C:
2307                         dp_flags |= DP_SEND_WRITE_ACK;
2308                         break;
2309                 case DRBD_PROT_B:
2310                         dp_flags |= DP_SEND_RECEIVE_ACK;
2311                         break;
2312                 }
2313                 rcu_read_unlock();
2314         }
2315
2316         if (dp_flags & DP_SEND_WRITE_ACK) {
2317                 peer_req->flags |= EE_SEND_WRITE_ACK;
2318                 inc_unacked(device);
2319                 /* corresponding dec_unacked() in e_end_block()
2320                  * respective _drbd_clear_done_ee */
2321         }
2322
2323         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2324                 /* I really don't like it that the receiver thread
2325                  * sends on the msock, but anyways */
2326                 drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
2327         }
2328
2329         if (device->state.pdsk < D_INCONSISTENT) {
2330                 /* In case we have the only disk of the cluster, */
2331                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2332                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2333                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2334                 drbd_al_begin_io(device, &peer_req->i, true);
2335         }
2336
2337         err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2338         if (!err)
2339                 return 0;
2340
2341         /* don't care for the reason here */
2342         drbd_err(device, "submit failed, triggering re-connect\n");
2343         spin_lock_irq(&device->resource->req_lock);
2344         list_del(&peer_req->w.list);
2345         drbd_remove_epoch_entry_interval(device, peer_req);
2346         spin_unlock_irq(&device->resource->req_lock);
2347         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2348                 drbd_al_complete_io(device, &peer_req->i);
2349
2350 out_interrupted:
2351         drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2352         put_ldev(device);
2353         drbd_free_peer_req(device, peer_req);
2354         return err;
2355 }
2356
2357 /* We may throttle resync, if the lower device seems to be busy,
2358  * and current sync rate is above c_min_rate.
2359  *
2360  * To decide whether or not the lower device is busy, we use a scheme similar
2361  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2362  * (more than 64 sectors) of activity we cannot account for with our own resync
2363  * activity, it obviously is "busy".
2364  *
2365  * The current sync rate used here uses only the most recent two step marks,
2366  * to have a short time average so we can react faster.
2367  */
2368 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector)
2369 {
2370         struct lc_element *tmp;
2371         bool throttle = true;
2372
2373         if (!drbd_rs_c_min_rate_throttle(device))
2374                 return false;
2375
2376         spin_lock_irq(&device->al_lock);
2377         tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2378         if (tmp) {
2379                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2380                 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2381                         throttle = false;
2382                 /* Do not slow down if app IO is already waiting for this extent */
2383         }
2384         spin_unlock_irq(&device->al_lock);
2385
2386         return throttle;
2387 }
2388
2389 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2390 {
2391         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2392         unsigned long db, dt, dbdt;
2393         unsigned int c_min_rate;
2394         int curr_events;
2395
2396         rcu_read_lock();
2397         c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2398         rcu_read_unlock();
2399
2400         /* feature disabled? */
2401         if (c_min_rate == 0)
2402                 return false;
2403
2404         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2405                       (int)part_stat_read(&disk->part0, sectors[1]) -
2406                         atomic_read(&device->rs_sect_ev);
2407         if (!device->rs_last_events || curr_events - device->rs_last_events > 64) {
2408                 unsigned long rs_left;
2409                 int i;
2410
2411                 device->rs_last_events = curr_events;
2412
2413                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2414                  * approx. */
2415                 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2416
2417                 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2418                         rs_left = device->ov_left;
2419                 else
2420                         rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2421
2422                 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2423                 if (!dt)
2424                         dt++;
2425                 db = device->rs_mark_left[i] - rs_left;
2426                 dbdt = Bit2KB(db/dt);
2427
2428                 if (dbdt > c_min_rate)
2429                         return true;
2430         }
2431         return false;
2432 }
2433
2434 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2435 {
2436         struct drbd_peer_device *peer_device;
2437         struct drbd_device *device;
2438         sector_t sector;
2439         sector_t capacity;
2440         struct drbd_peer_request *peer_req;
2441         struct digest_info *di = NULL;
2442         int size, verb;
2443         unsigned int fault_type;
2444         struct p_block_req *p = pi->data;
2445
2446         peer_device = conn_peer_device(connection, pi->vnr);
2447         if (!peer_device)
2448                 return -EIO;
2449         device = peer_device->device;
2450         capacity = drbd_get_capacity(device->this_bdev);
2451
2452         sector = be64_to_cpu(p->sector);
2453         size   = be32_to_cpu(p->blksize);
2454
2455         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2456                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2457                                 (unsigned long long)sector, size);
2458                 return -EINVAL;
2459         }
2460         if (sector + (size>>9) > capacity) {
2461                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2462                                 (unsigned long long)sector, size);
2463                 return -EINVAL;
2464         }
2465
2466         if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2467                 verb = 1;
2468                 switch (pi->cmd) {
2469                 case P_DATA_REQUEST:
2470                         drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2471                         break;
2472                 case P_RS_DATA_REQUEST:
2473                 case P_CSUM_RS_REQUEST:
2474                 case P_OV_REQUEST:
2475                         drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2476                         break;
2477                 case P_OV_REPLY:
2478                         verb = 0;
2479                         dec_rs_pending(device);
2480                         drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2481                         break;
2482                 default:
2483                         BUG();
2484                 }
2485                 if (verb && __ratelimit(&drbd_ratelimit_state))
2486                         drbd_err(device, "Can not satisfy peer's read request, "
2487                             "no local data.\n");
2488
2489                 /* drain possibly payload */
2490                 return drbd_drain_block(peer_device, pi->size);
2491         }
2492
2493         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2494          * "criss-cross" setup, that might cause write-out on some other DRBD,
2495          * which in turn might block on the other node at this very place.  */
2496         peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2497                         true /* has real payload */, GFP_NOIO);
2498         if (!peer_req) {
2499                 put_ldev(device);
2500                 return -ENOMEM;
2501         }
2502
2503         switch (pi->cmd) {
2504         case P_DATA_REQUEST:
2505                 peer_req->w.cb = w_e_end_data_req;
2506                 fault_type = DRBD_FAULT_DT_RD;
2507                 /* application IO, don't drbd_rs_begin_io */
2508                 goto submit;
2509
2510         case P_RS_DATA_REQUEST:
2511                 peer_req->w.cb = w_e_end_rsdata_req;
2512                 fault_type = DRBD_FAULT_RS_RD;
2513                 /* used in the sector offset progress display */
2514                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2515                 break;
2516
2517         case P_OV_REPLY:
2518         case P_CSUM_RS_REQUEST:
2519                 fault_type = DRBD_FAULT_RS_RD;
2520                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2521                 if (!di)
2522                         goto out_free_e;
2523
2524                 di->digest_size = pi->size;
2525                 di->digest = (((char *)di)+sizeof(struct digest_info));
2526
2527                 peer_req->digest = di;
2528                 peer_req->flags |= EE_HAS_DIGEST;
2529
2530                 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2531                         goto out_free_e;
2532
2533                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2534                         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2535                         peer_req->w.cb = w_e_end_csum_rs_req;
2536                         /* used in the sector offset progress display */
2537                         device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2538                 } else if (pi->cmd == P_OV_REPLY) {
2539                         /* track progress, we may need to throttle */
2540                         atomic_add(size >> 9, &device->rs_sect_in);
2541                         peer_req->w.cb = w_e_end_ov_reply;
2542                         dec_rs_pending(device);
2543                         /* drbd_rs_begin_io done when we sent this request,
2544                          * but accounting still needs to be done. */
2545                         goto submit_for_resync;
2546                 }
2547                 break;
2548
2549         case P_OV_REQUEST:
2550                 if (device->ov_start_sector == ~(sector_t)0 &&
2551                     peer_device->connection->agreed_pro_version >= 90) {
2552                         unsigned long now = jiffies;
2553                         int i;
2554                         device->ov_start_sector = sector;
2555                         device->ov_position = sector;
2556                         device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2557                         device->rs_total = device->ov_left;
2558                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2559                                 device->rs_mark_left[i] = device->ov_left;
2560                                 device->rs_mark_time[i] = now;
2561                         }
2562                         drbd_info(device, "Online Verify start sector: %llu\n",
2563                                         (unsigned long long)sector);
2564                 }
2565                 peer_req->w.cb = w_e_end_ov_req;
2566                 fault_type = DRBD_FAULT_RS_RD;
2567                 break;
2568
2569         default:
2570                 BUG();
2571         }
2572
2573         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2574          * wrt the receiver, but it is not as straightforward as it may seem.
2575          * Various places in the resync start and stop logic assume resync
2576          * requests are processed in order, requeuing this on the worker thread
2577          * introduces a bunch of new code for synchronization between threads.
2578          *
2579          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2580          * "forever", throttling after drbd_rs_begin_io will lock that extent
2581          * for application writes for the same time.  For now, just throttle
2582          * here, where the rest of the code expects the receiver to sleep for
2583          * a while, anyways.
2584          */
2585
2586         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2587          * this defers syncer requests for some time, before letting at least
2588          * on request through.  The resync controller on the receiving side
2589          * will adapt to the incoming rate accordingly.
2590          *
2591          * We cannot throttle here if remote is Primary/SyncTarget:
2592          * we would also throttle its application reads.
2593          * In that case, throttling is done on the SyncTarget only.
2594          */
2595         if (device->state.peer != R_PRIMARY && drbd_rs_should_slow_down(device, sector))
2596                 schedule_timeout_uninterruptible(HZ/10);
2597         if (drbd_rs_begin_io(device, sector))
2598                 goto out_free_e;
2599
2600 submit_for_resync:
2601         atomic_add(size >> 9, &device->rs_sect_ev);
2602
2603 submit:
2604         inc_unacked(device);
2605         spin_lock_irq(&device->resource->req_lock);
2606         list_add_tail(&peer_req->w.list, &device->read_ee);
2607         spin_unlock_irq(&device->resource->req_lock);
2608
2609         if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2610                 return 0;
2611
2612         /* don't care for the reason here */
2613         drbd_err(device, "submit failed, triggering re-connect\n");
2614         spin_lock_irq(&device->resource->req_lock);
2615         list_del(&peer_req->w.list);
2616         spin_unlock_irq(&device->resource->req_lock);
2617         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2618
2619 out_free_e:
2620         put_ldev(device);
2621         drbd_free_peer_req(device, peer_req);
2622         return -EIO;
2623 }
2624
2625 /**
2626  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2627  */
2628 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2629 {
2630         struct drbd_device *device = peer_device->device;
2631         int self, peer, rv = -100;
2632         unsigned long ch_self, ch_peer;
2633         enum drbd_after_sb_p after_sb_0p;
2634
2635         self = device->ldev->md.uuid[UI_BITMAP] & 1;
2636         peer = device->p_uuid[UI_BITMAP] & 1;
2637
2638         ch_peer = device->p_uuid[UI_SIZE];
2639         ch_self = device->comm_bm_set;
2640
2641         rcu_read_lock();
2642         after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2643         rcu_read_unlock();
2644         switch (after_sb_0p) {
2645         case ASB_CONSENSUS:
2646         case ASB_DISCARD_SECONDARY:
2647         case ASB_CALL_HELPER:
2648         case ASB_VIOLENTLY:
2649                 drbd_err(device, "Configuration error.\n");
2650                 break;
2651         case ASB_DISCONNECT:
2652                 break;
2653         case ASB_DISCARD_YOUNGER_PRI:
2654                 if (self == 0 && peer == 1) {
2655                         rv = -1;
2656                         break;
2657                 }
2658                 if (self == 1 && peer == 0) {
2659                         rv =  1;
2660                         break;
2661                 }
2662                 /* Else fall through to one of the other strategies... */
2663         case ASB_DISCARD_OLDER_PRI:
2664                 if (self == 0 && peer == 1) {
2665                         rv = 1;
2666                         break;
2667                 }
2668                 if (self == 1 && peer == 0) {
2669                         rv = -1;
2670                         break;
2671                 }
2672                 /* Else fall through to one of the other strategies... */
2673                 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2674                      "Using discard-least-changes instead\n");
2675         case ASB_DISCARD_ZERO_CHG:
2676                 if (ch_peer == 0 && ch_self == 0) {
2677                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2678                                 ? -1 : 1;
2679                         break;
2680                 } else {
2681                         if (ch_peer == 0) { rv =  1; break; }
2682                         if (ch_self == 0) { rv = -1; break; }
2683                 }
2684                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2685                         break;
2686         case ASB_DISCARD_LEAST_CHG:
2687                 if      (ch_self < ch_peer)
2688                         rv = -1;
2689                 else if (ch_self > ch_peer)
2690                         rv =  1;
2691                 else /* ( ch_self == ch_peer ) */
2692                      /* Well, then use something else. */
2693                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2694                                 ? -1 : 1;
2695                 break;
2696         case ASB_DISCARD_LOCAL:
2697                 rv = -1;
2698                 break;
2699         case ASB_DISCARD_REMOTE:
2700                 rv =  1;
2701         }
2702
2703         return rv;
2704 }
2705
2706 /**
2707  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
2708  */
2709 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2710 {
2711         struct drbd_device *device = peer_device->device;
2712         int hg, rv = -100;
2713         enum drbd_after_sb_p after_sb_1p;
2714
2715         rcu_read_lock();
2716         after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2717         rcu_read_unlock();
2718         switch (after_sb_1p) {
2719         case ASB_DISCARD_YOUNGER_PRI:
2720         case ASB_DISCARD_OLDER_PRI:
2721         case ASB_DISCARD_LEAST_CHG:
2722         case ASB_DISCARD_LOCAL:
2723         case ASB_DISCARD_REMOTE:
2724         case ASB_DISCARD_ZERO_CHG:
2725                 drbd_err(device, "Configuration error.\n");
2726                 break;
2727         case ASB_DISCONNECT:
2728                 break;
2729         case ASB_CONSENSUS:
2730                 hg = drbd_asb_recover_0p(peer_device);
2731                 if (hg == -1 && device->state.role == R_SECONDARY)
2732                         rv = hg;
2733                 if (hg == 1  && device->state.role == R_PRIMARY)
2734                         rv = hg;
2735                 break;
2736         case ASB_VIOLENTLY:
2737                 rv = drbd_asb_recover_0p(peer_device);
2738                 break;
2739         case ASB_DISCARD_SECONDARY:
2740                 return device->state.role == R_PRIMARY ? 1 : -1;
2741         case ASB_CALL_HELPER:
2742                 hg = drbd_asb_recover_0p(peer_device);
2743                 if (hg == -1 && device->state.role == R_PRIMARY) {
2744                         enum drbd_state_rv rv2;
2745
2746                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2747                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2748                           * we do not need to wait for the after state change work either. */
2749                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2750                         if (rv2 != SS_SUCCESS) {
2751                                 drbd_khelper(device, "pri-lost-after-sb");
2752                         } else {
2753                                 drbd_warn(device, "Successfully gave up primary role.\n");
2754                                 rv = hg;
2755                         }
2756                 } else
2757                         rv = hg;
2758         }
2759
2760         return rv;
2761 }
2762
2763 /**
2764  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
2765  */
2766 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2767 {
2768         struct drbd_device *device = peer_device->device;
2769         int hg, rv = -100;
2770         enum drbd_after_sb_p after_sb_2p;
2771
2772         rcu_read_lock();
2773         after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2774         rcu_read_unlock();
2775         switch (after_sb_2p) {
2776         case ASB_DISCARD_YOUNGER_PRI:
2777         case ASB_DISCARD_OLDER_PRI:
2778         case ASB_DISCARD_LEAST_CHG:
2779         case ASB_DISCARD_LOCAL:
2780         case ASB_DISCARD_REMOTE:
2781         case ASB_CONSENSUS:
2782         case ASB_DISCARD_SECONDARY:
2783         case ASB_DISCARD_ZERO_CHG:
2784                 drbd_err(device, "Configuration error.\n");
2785                 break;
2786         case ASB_VIOLENTLY:
2787                 rv = drbd_asb_recover_0p(peer_device);
2788                 break;
2789         case ASB_DISCONNECT:
2790                 break;
2791         case ASB_CALL_HELPER:
2792                 hg = drbd_asb_recover_0p(peer_device);
2793                 if (hg == -1) {
2794                         enum drbd_state_rv rv2;
2795
2796                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2797                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2798                           * we do not need to wait for the after state change work either. */
2799                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2800                         if (rv2 != SS_SUCCESS) {
2801                                 drbd_khelper(device, "pri-lost-after-sb");
2802                         } else {
2803                                 drbd_warn(device, "Successfully gave up primary role.\n");
2804                                 rv = hg;
2805                         }
2806                 } else
2807                         rv = hg;
2808         }
2809
2810         return rv;
2811 }
2812
2813 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2814                            u64 bits, u64 flags)
2815 {
2816         if (!uuid) {
2817                 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2818                 return;
2819         }
2820         drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2821              text,
2822              (unsigned long long)uuid[UI_CURRENT],
2823              (unsigned long long)uuid[UI_BITMAP],
2824              (unsigned long long)uuid[UI_HISTORY_START],
2825              (unsigned long long)uuid[UI_HISTORY_END],
2826              (unsigned long long)bits,
2827              (unsigned long long)flags);
2828 }
2829
2830 /*
2831   100   after split brain try auto recover
2832     2   C_SYNC_SOURCE set BitMap
2833     1   C_SYNC_SOURCE use BitMap
2834     0   no Sync
2835    -1   C_SYNC_TARGET use BitMap
2836    -2   C_SYNC_TARGET set BitMap
2837  -100   after split brain, disconnect
2838 -1000   unrelated data
2839 -1091   requires proto 91
2840 -1096   requires proto 96
2841  */
2842 static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_hold(local)
2843 {
2844         u64 self, peer;
2845         int i, j;
2846
2847         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2848         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2849
2850         *rule_nr = 10;
2851         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2852                 return 0;
2853
2854         *rule_nr = 20;
2855         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2856              peer != UUID_JUST_CREATED)
2857                 return -2;
2858
2859         *rule_nr = 30;
2860         if (self != UUID_JUST_CREATED &&
2861             (peer == UUID_JUST_CREATED || peer == (u64)0))
2862                 return 2;
2863
2864         if (self == peer) {
2865                 int rct, dc; /* roles at crash time */
2866
2867                 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2868
2869                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2870                                 return -1091;
2871
2872                         if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2873                             (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2874                                 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2875                                 drbd_uuid_move_history(device);
2876                                 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2877                                 device->ldev->md.uuid[UI_BITMAP] = 0;
2878
2879                                 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2880                                                device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2881                                 *rule_nr = 34;
2882                         } else {
2883                                 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
2884                                 *rule_nr = 36;
2885                         }
2886
2887                         return 1;
2888                 }
2889
2890                 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
2891
2892                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2893                                 return -1091;
2894
2895                         if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2896                             (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2897                                 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2898
2899                                 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
2900                                 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
2901                                 device->p_uuid[UI_BITMAP] = 0UL;
2902
2903                                 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2904                                 *rule_nr = 35;
2905                         } else {
2906                                 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
2907                                 *rule_nr = 37;
2908                         }
2909
2910                         return -1;
2911                 }
2912
2913                 /* Common power [off|failure] */
2914                 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
2915                         (device->p_uuid[UI_FLAGS] & 2);
2916                 /* lowest bit is set when we were primary,
2917                  * next bit (weight 2) is set when peer was primary */
2918                 *rule_nr = 40;
2919
2920                 switch (rct) {
2921                 case 0: /* !self_pri && !peer_pri */ return 0;
2922                 case 1: /*  self_pri && !peer_pri */ return 1;
2923                 case 2: /* !self_pri &&  peer_pri */ return -1;
2924                 case 3: /*  self_pri &&  peer_pri */
2925                         dc = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags);
2926                         return dc ? -1 : 1;
2927                 }
2928         }
2929
2930         *rule_nr = 50;
2931         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
2932         if (self == peer)
2933                 return -1;
2934
2935         *rule_nr = 51;
2936         peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
2937         if (self == peer) {
2938                 if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2939                     (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2940                     (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2941                     peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
2942                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2943                            resync as sync source modifications of the peer's UUIDs. */
2944
2945                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2946                                 return -1091;
2947
2948                         device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
2949                         device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
2950
2951                         drbd_info(device, "Lost last syncUUID packet, corrected:\n");
2952                         drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2953
2954                         return -1;
2955                 }
2956         }
2957
2958         *rule_nr = 60;
2959         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2960         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2961                 peer = device->p_uuid[i] & ~((u64)1);
2962                 if (self == peer)
2963                         return -2;
2964         }
2965
2966         *rule_nr = 70;
2967         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2968         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2969         if (self == peer)
2970                 return 1;
2971
2972         *rule_nr = 71;
2973         self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2974         if (self == peer) {
2975                 if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2976                     (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2977                     (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2978                     self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2979                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2980                            resync as sync source modifications of our UUIDs. */
2981
2982                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2983                                 return -1091;
2984
2985                         __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
2986                         __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
2987
2988                         drbd_info(device, "Last syncUUID did not get through, corrected:\n");
2989                         drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2990                                        device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2991
2992                         return 1;
2993                 }
2994         }
2995
2996
2997         *rule_nr = 80;
2998         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2999         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3000                 self = device->ldev->md.uuid[i] & ~((u64)1);
3001                 if (self == peer)
3002                         return 2;
3003         }
3004
3005         *rule_nr = 90;
3006         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3007         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3008         if (self == peer && self != ((u64)0))
3009                 return 100;
3010
3011         *rule_nr = 100;
3012         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3013                 self = device->ldev->md.uuid[i] & ~((u64)1);
3014                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3015                         peer = device->p_uuid[j] & ~((u64)1);
3016                         if (self == peer)
3017                                 return -100;
3018                 }
3019         }
3020
3021         return -1000;
3022 }
3023
3024 /* drbd_sync_handshake() returns the new conn state on success, or
3025    CONN_MASK (-1) on failure.
3026  */
3027 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3028                                            enum drbd_role peer_role,
3029                                            enum drbd_disk_state peer_disk) __must_hold(local)
3030 {
3031         struct drbd_device *device = peer_device->device;
3032         enum drbd_conns rv = C_MASK;
3033         enum drbd_disk_state mydisk;
3034         struct net_conf *nc;
3035         int hg, rule_nr, rr_conflict, tentative;
3036
3037         mydisk = device->state.disk;
3038         if (mydisk == D_NEGOTIATING)
3039                 mydisk = device->new_state_tmp.disk;
3040
3041         drbd_info(device, "drbd_sync_handshake:\n");
3042
3043         spin_lock_irq(&device->ldev->md.uuid_lock);
3044         drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3045         drbd_uuid_dump(device, "peer", device->p_uuid,
3046                        device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3047
3048         hg = drbd_uuid_compare(device, &rule_nr);
3049         spin_unlock_irq(&device->ldev->md.uuid_lock);
3050
3051         drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3052
3053         if (hg == -1000) {
3054                 drbd_alert(device, "Unrelated data, aborting!\n");
3055                 return C_MASK;
3056         }
3057         if (hg < -1000) {
3058                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3059                 return C_MASK;
3060         }
3061
3062         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3063             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3064                 int f = (hg == -100) || abs(hg) == 2;
3065                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3066                 if (f)
3067                         hg = hg*2;
3068                 drbd_info(device, "Becoming sync %s due to disk states.\n",
3069                      hg > 0 ? "source" : "target");
3070         }
3071
3072         if (abs(hg) == 100)
3073                 drbd_khelper(device, "initial-split-brain");
3074
3075         rcu_read_lock();
3076         nc = rcu_dereference(peer_device->connection->net_conf);
3077
3078         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3079                 int pcount = (device->state.role == R_PRIMARY)
3080                            + (peer_role == R_PRIMARY);
3081                 int forced = (hg == -100);
3082
3083                 switch (pcount) {
3084                 case 0:
3085                         hg = drbd_asb_recover_0p(peer_device);
3086                         break;
3087                 case 1:
3088                         hg = drbd_asb_recover_1p(peer_device);
3089                         break;
3090                 case 2:
3091                         hg = drbd_asb_recover_2p(peer_device);
3092                         break;
3093                 }
3094                 if (abs(hg) < 100) {
3095                         drbd_warn(device, "Split-Brain detected, %d primaries, "
3096                              "automatically solved. Sync from %s node\n",
3097                              pcount, (hg < 0) ? "peer" : "this");
3098                         if (forced) {
3099                                 drbd_warn(device, "Doing a full sync, since"
3100                                      " UUIDs where ambiguous.\n");
3101                                 hg = hg*2;
3102                         }
3103                 }
3104         }
3105
3106         if (hg == -100) {
3107                 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3108                         hg = -1;
3109                 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3110                         hg = 1;
3111
3112                 if (abs(hg) < 100)
3113                         drbd_warn(device, "Split-Brain detected, manually solved. "
3114                              "Sync from %s node\n",
3115                              (hg < 0) ? "peer" : "this");
3116         }
3117         rr_conflict = nc->rr_conflict;
3118         tentative = nc->tentative;
3119         rcu_read_unlock();
3120
3121         if (hg == -100) {
3122                 /* FIXME this log message is not correct if we end up here
3123                  * after an attempted attach on a diskless node.
3124                  * We just refuse to attach -- well, we drop the "connection"
3125                  * to that disk, in a way... */
3126                 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3127                 drbd_khelper(device, "split-brain");
3128                 return C_MASK;
3129         }
3130
3131         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3132                 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3133                 return C_MASK;
3134         }
3135
3136         if (hg < 0 && /* by intention we do not use mydisk here. */
3137             device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3138                 switch (rr_conflict) {
3139                 case ASB_CALL_HELPER:
3140                         drbd_khelper(device, "pri-lost");
3141                         /* fall through */
3142                 case ASB_DISCONNECT:
3143                         drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3144                         return C_MASK;
3145                 case ASB_VIOLENTLY:
3146                         drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3147                              "assumption\n");
3148                 }
3149         }
3150
3151         if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3152                 if (hg == 0)
3153                         drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3154                 else
3155                         drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3156                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3157                                  abs(hg) >= 2 ? "full" : "bit-map based");
3158                 return C_MASK;
3159         }
3160
3161         if (abs(hg) >= 2) {
3162                 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3163                 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3164                                         BM_LOCKED_SET_ALLOWED))
3165                         return C_MASK;
3166         }
3167
3168         if (hg > 0) { /* become sync source. */
3169                 rv = C_WF_BITMAP_S;
3170         } else if (hg < 0) { /* become sync target */
3171                 rv = C_WF_BITMAP_T;
3172         } else {
3173                 rv = C_CONNECTED;
3174                 if (drbd_bm_total_weight(device)) {
3175                         drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3176                              drbd_bm_total_weight(device));
3177                 }
3178         }
3179
3180         return rv;
3181 }
3182
3183 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3184 {
3185         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3186         if (peer == ASB_DISCARD_REMOTE)
3187                 return ASB_DISCARD_LOCAL;
3188
3189         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3190         if (peer == ASB_DISCARD_LOCAL)
3191                 return ASB_DISCARD_REMOTE;
3192
3193         /* everything else is valid if they are equal on both sides. */
3194         return peer;
3195 }
3196
3197 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3198 {
3199         struct p_protocol *p = pi->data;
3200         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3201         int p_proto, p_discard_my_data, p_two_primaries, cf;
3202         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3203         char integrity_alg[SHARED_SECRET_MAX] = "";
3204         struct crypto_hash *peer_integrity_tfm = NULL;
3205         void *int_dig_in = NULL, *int_dig_vv = NULL;
3206
3207         p_proto         = be32_to_cpu(p->protocol);
3208         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3209         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3210         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3211         p_two_primaries = be32_to_cpu(p->two_primaries);
3212         cf              = be32_to_cpu(p->conn_flags);
3213         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3214
3215         if (connection->agreed_pro_version >= 87) {
3216                 int err;
3217
3218                 if (pi->size > sizeof(integrity_alg))
3219                         return -EIO;
3220                 err = drbd_recv_all(connection, integrity_alg, pi->size);
3221                 if (err)
3222                         return err;
3223                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3224         }
3225
3226         if (pi->cmd != P_PROTOCOL_UPDATE) {
3227                 clear_bit(CONN_DRY_RUN, &connection->flags);
3228
3229                 if (cf & CF_DRY_RUN)
3230                         set_bit(CONN_DRY_RUN, &connection->flags);
3231
3232                 rcu_read_lock();
3233                 nc = rcu_dereference(connection->net_conf);
3234
3235                 if (p_proto != nc->wire_protocol) {
3236                         drbd_err(connection, "incompatible %s settings\n", "protocol");
3237                         goto disconnect_rcu_unlock;
3238                 }
3239
3240                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3241                         drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3242                         goto disconnect_rcu_unlock;
3243                 }
3244
3245                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3246                         drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3247                         goto disconnect_rcu_unlock;
3248                 }
3249
3250                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3251                         drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3252                         goto disconnect_rcu_unlock;
3253                 }
3254
3255                 if (p_discard_my_data && nc->discard_my_data) {
3256                         drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3257                         goto disconnect_rcu_unlock;
3258                 }
3259
3260                 if (p_two_primaries != nc->two_primaries) {
3261                         drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3262                         goto disconnect_rcu_unlock;
3263                 }
3264
3265                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3266                         drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3267                         goto disconnect_rcu_unlock;
3268                 }
3269
3270                 rcu_read_unlock();
3271         }
3272
3273         if (integrity_alg[0]) {
3274                 int hash_size;
3275
3276                 /*
3277                  * We can only change the peer data integrity algorithm
3278                  * here.  Changing our own data integrity algorithm
3279                  * requires that we send a P_PROTOCOL_UPDATE packet at
3280                  * the same time; otherwise, the peer has no way to
3281                  * tell between which packets the algorithm should
3282                  * change.
3283                  */
3284
3285                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3286                 if (!peer_integrity_tfm) {
3287                         drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3288                                  integrity_alg);
3289                         goto disconnect;
3290                 }
3291
3292                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3293                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3294                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3295                 if (!(int_dig_in && int_dig_vv)) {
3296                         drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3297                         goto disconnect;
3298                 }
3299         }
3300
3301         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3302         if (!new_net_conf) {
3303                 drbd_err(connection, "Allocation of new net_conf failed\n");
3304                 goto disconnect;
3305         }
3306
3307         mutex_lock(&connection->data.mutex);
3308         mutex_lock(&connection->resource->conf_update);
3309         old_net_conf = connection->net_conf;
3310         *new_net_conf = *old_net_conf;
3311
3312         new_net_conf->wire_protocol = p_proto;
3313         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3314         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3315         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3316         new_net_conf->two_primaries = p_two_primaries;
3317
3318         rcu_assign_pointer(connection->net_conf, new_net_conf);
3319         mutex_unlock(&connection->resource->conf_update);
3320         mutex_unlock(&connection->data.mutex);
3321
3322         crypto_free_hash(connection->peer_integrity_tfm);
3323         kfree(connection->int_dig_in);
3324         kfree(connection->int_dig_vv);
3325         connection->peer_integrity_tfm = peer_integrity_tfm;
3326         connection->int_dig_in = int_dig_in;
3327         connection->int_dig_vv = int_dig_vv;
3328
3329         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3330                 drbd_info(connection, "peer data-integrity-alg: %s\n",
3331                           integrity_alg[0] ? integrity_alg : "(none)");
3332
3333         synchronize_rcu();
3334         kfree(old_net_conf);
3335         return 0;
3336
3337 disconnect_rcu_unlock:
3338         rcu_read_unlock();
3339 disconnect:
3340         crypto_free_hash(peer_integrity_tfm);
3341         kfree(int_dig_in);
3342         kfree(int_dig_vv);
3343         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3344         return -EIO;
3345 }
3346
3347 /* helper function
3348  * input: alg name, feature name
3349  * return: NULL (alg name was "")
3350  *         ERR_PTR(error) if something goes wrong
3351  *         or the crypto hash ptr, if it worked out ok. */
3352 static
3353 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3354                 const char *alg, const char *name)
3355 {
3356         struct crypto_hash *tfm;
3357
3358         if (!alg[0])
3359                 return NULL;
3360
3361         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3362         if (IS_ERR(tfm)) {
3363                 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3364                         alg, name, PTR_ERR(tfm));
3365                 return tfm;
3366         }
3367         return tfm;
3368 }
3369
3370 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3371 {
3372         void *buffer = connection->data.rbuf;
3373         int size = pi->size;
3374
3375         while (size) {
3376                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3377                 s = drbd_recv(connection, buffer, s);
3378                 if (s <= 0) {
3379                         if (s < 0)
3380                                 return s;
3381                         break;
3382                 }
3383                 size -= s;
3384         }
3385         if (size)
3386                 return -EIO;
3387         return 0;
3388 }
3389
3390 /*
3391  * config_unknown_volume  -  device configuration command for unknown volume
3392  *
3393  * When a device is added to an existing connection, the node on which the
3394  * device is added first will send configuration commands to its peer but the
3395  * peer will not know about the device yet.  It will warn and ignore these
3396  * commands.  Once the device is added on the second node, the second node will
3397  * send the same device configuration commands, but in the other direction.
3398  *
3399  * (We can also end up here if drbd is misconfigured.)
3400  */
3401 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3402 {
3403         drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3404                   cmdname(pi->cmd), pi->vnr);
3405         return ignore_remaining_packet(connection, pi);
3406 }
3407
3408 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3409 {
3410         struct drbd_peer_device *peer_device;
3411         struct drbd_device *device;
3412         struct p_rs_param_95 *p;
3413         unsigned int header_size, data_size, exp_max_sz;
3414         struct crypto_hash *verify_tfm = NULL;
3415         struct crypto_hash *csums_tfm = NULL;
3416         struct net_conf *old_net_conf, *new_net_conf = NULL;
3417         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3418         const int apv = connection->agreed_pro_version;
3419         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3420         int fifo_size = 0;
3421         int err;
3422
3423         peer_device = conn_peer_device(connection, pi->vnr);
3424         if (!peer_device)
3425                 return config_unknown_volume(connection, pi);
3426         device = peer_device->device;
3427
3428         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3429                     : apv == 88 ? sizeof(struct p_rs_param)
3430                                         + SHARED_SECRET_MAX
3431                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3432                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3433
3434         if (pi->size > exp_max_sz) {
3435                 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3436                     pi->size, exp_max_sz);
3437                 return -EIO;
3438         }
3439
3440         if (apv <= 88) {
3441                 header_size = sizeof(struct p_rs_param);
3442                 data_size = pi->size - header_size;
3443         } else if (apv <= 94) {
3444                 header_size = sizeof(struct p_rs_param_89);
3445                 data_size = pi->size - header_size;
3446                 D_ASSERT(device, data_size == 0);
3447         } else {
3448                 header_size = sizeof(struct p_rs_param_95);
3449                 data_size = pi->size - header_size;
3450                 D_ASSERT(device, data_size == 0);
3451         }
3452
3453         /* initialize verify_alg and csums_alg */
3454         p = pi->data;
3455         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3456
3457         err = drbd_recv_all(peer_device->connection, p, header_size);
3458         if (err)
3459                 return err;
3460
3461         mutex_lock(&connection->resource->conf_update);
3462         old_net_conf = peer_device->connection->net_conf;
3463         if (get_ldev(device)) {
3464                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3465                 if (!new_disk_conf) {
3466                         put_ldev(device);
3467                         mutex_unlock(&connection->resource->conf_update);
3468                         drbd_err(device, "Allocation of new disk_conf failed\n");
3469                         return -ENOMEM;
3470                 }
3471
3472                 old_disk_conf = device->ldev->disk_conf;
3473                 *new_disk_conf = *old_disk_conf;
3474
3475                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3476         }
3477
3478         if (apv >= 88) {
3479                 if (apv == 88) {
3480                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3481                                 drbd_err(device, "verify-alg of wrong size, "
3482                                         "peer wants %u, accepting only up to %u byte\n",
3483                                         data_size, SHARED_SECRET_MAX);
3484                                 err = -EIO;
3485                                 goto reconnect;
3486                         }
3487
3488                         err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3489                         if (err)
3490                                 goto reconnect;
3491                         /* we expect NUL terminated string */
3492                         /* but just in case someone tries to be evil */
3493                         D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3494                         p->verify_alg[data_size-1] = 0;
3495
3496                 } else /* apv >= 89 */ {
3497                         /* we still expect NUL terminated strings */
3498                         /* but just in case someone tries to be evil */
3499                         D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3500                         D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3501                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3502                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3503                 }
3504
3505                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3506                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3507                                 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3508                                     old_net_conf->verify_alg, p->verify_alg);
3509                                 goto disconnect;
3510                         }
3511                         verify_tfm = drbd_crypto_alloc_digest_safe(device,
3512                                         p->verify_alg, "verify-alg");
3513                         if (IS_ERR(verify_tfm)) {
3514                                 verify_tfm = NULL;
3515                                 goto disconnect;
3516                         }
3517                 }
3518
3519                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3520                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3521                                 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3522                                     old_net_conf->csums_alg, p->csums_alg);
3523                                 goto disconnect;
3524                         }
3525                         csums_tfm = drbd_crypto_alloc_digest_safe(device,
3526                                         p->csums_alg, "csums-alg");
3527                         if (IS_ERR(csums_tfm)) {
3528                                 csums_tfm = NULL;
3529                                 goto disconnect;
3530                         }
3531                 }
3532
3533                 if (apv > 94 && new_disk_conf) {
3534                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3535                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3536                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3537                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3538
3539                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3540                         if (fifo_size != device->rs_plan_s->size) {
3541                                 new_plan = fifo_alloc(fifo_size);
3542                                 if (!new_plan) {
3543                                         drbd_err(device, "kmalloc of fifo_buffer failed");
3544                                         put_ldev(device);
3545                                         goto disconnect;
3546                                 }
3547                         }
3548                 }
3549
3550                 if (verify_tfm || csums_tfm) {
3551                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3552                         if (!new_net_conf) {
3553                                 drbd_err(device, "Allocation of new net_conf failed\n");
3554                                 goto disconnect;
3555                         }
3556
3557                         *new_net_conf = *old_net_conf;
3558
3559                         if (verify_tfm) {
3560                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3561                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3562                                 crypto_free_hash(peer_device->connection->verify_tfm);
3563                                 peer_device->connection->verify_tfm = verify_tfm;
3564                                 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3565                         }
3566                         if (csums_tfm) {
3567                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3568                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3569                                 crypto_free_hash(peer_device->connection->csums_tfm);
3570                                 peer_device->connection->csums_tfm = csums_tfm;
3571                                 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3572                         }
3573                         rcu_assign_pointer(connection->net_conf, new_net_conf);
3574                 }
3575         }
3576
3577         if (new_disk_conf) {
3578                 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3579                 put_ldev(device);
3580         }
3581
3582         if (new_plan) {
3583                 old_plan = device->rs_plan_s;
3584                 rcu_assign_pointer(device->rs_plan_s, new_plan);
3585         }
3586
3587         mutex_unlock(&connection->resource->conf_update);
3588         synchronize_rcu();
3589         if (new_net_conf)
3590                 kfree(old_net_conf);
3591         kfree(old_disk_conf);
3592         kfree(old_plan);
3593
3594         return 0;
3595
3596 reconnect:
3597         if (new_disk_conf) {
3598                 put_ldev(device);
3599                 kfree(new_disk_conf);
3600         }
3601         mutex_unlock(&connection->resource->conf_update);
3602         return -EIO;
3603
3604 disconnect:
3605         kfree(new_plan);
3606         if (new_disk_conf) {
3607                 put_ldev(device);
3608                 kfree(new_disk_conf);
3609         }
3610         mutex_unlock(&connection->resource->conf_update);
3611         /* just for completeness: actually not needed,
3612          * as this is not reached if csums_tfm was ok. */
3613         crypto_free_hash(csums_tfm);
3614         /* but free the verify_tfm again, if csums_tfm did not work out */
3615         crypto_free_hash(verify_tfm);
3616         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3617         return -EIO;
3618 }
3619
3620 /* warn if the arguments differ by more than 12.5% */
3621 static void warn_if_differ_considerably(struct drbd_device *device,
3622         const char *s, sector_t a, sector_t b)
3623 {
3624         sector_t d;
3625         if (a == 0 || b == 0)
3626                 return;
3627         d = (a > b) ? (a - b) : (b - a);
3628         if (d > (a>>3) || d > (b>>3))
3629                 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3630                      (unsigned long long)a, (unsigned long long)b);
3631 }
3632
3633 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3634 {
3635         struct drbd_peer_device *peer_device;
3636         struct drbd_device *device;
3637         struct p_sizes *p = pi->data;
3638         enum determine_dev_size dd = DS_UNCHANGED;
3639         sector_t p_size, p_usize, my_usize;
3640         int ldsc = 0; /* local disk size changed */
3641         enum dds_flags ddsf;
3642
3643         peer_device = conn_peer_device(connection, pi->vnr);
3644         if (!peer_device)
3645                 return config_unknown_volume(connection, pi);
3646         device = peer_device->device;
3647
3648         p_size = be64_to_cpu(p->d_size);
3649         p_usize = be64_to_cpu(p->u_size);
3650
3651         /* just store the peer's disk size for now.
3652          * we still need to figure out whether we accept that. */
3653         device->p_size = p_size;
3654
3655         if (get_ldev(device)) {
3656                 rcu_read_lock();
3657                 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3658                 rcu_read_unlock();
3659
3660                 warn_if_differ_considerably(device, "lower level device sizes",
3661                            p_size, drbd_get_max_capacity(device->ldev));
3662                 warn_if_differ_considerably(device, "user requested size",
3663                                             p_usize, my_usize);
3664
3665                 /* if this is the first connect, or an otherwise expected
3666                  * param exchange, choose the minimum */
3667                 if (device->state.conn == C_WF_REPORT_PARAMS)
3668                         p_usize = min_not_zero(my_usize, p_usize);
3669
3670                 /* Never shrink a device with usable data during connect.
3671                    But allow online shrinking if we are connected. */
3672                 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3673                     drbd_get_capacity(device->this_bdev) &&
3674                     device->state.disk >= D_OUTDATED &&
3675                     device->state.conn < C_CONNECTED) {
3676                         drbd_err(device, "The peer's disk size is too small!\n");
3677                         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3678                         put_ldev(device);
3679                         return -EIO;
3680                 }
3681
3682                 if (my_usize != p_usize) {
3683                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3684
3685                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3686                         if (!new_disk_conf) {
3687                                 drbd_err(device, "Allocation of new disk_conf failed\n");
3688                                 put_ldev(device);
3689                                 return -ENOMEM;
3690                         }
3691
3692                         mutex_lock(&connection->resource->conf_update);
3693                         old_disk_conf = device->ldev->disk_conf;
3694                         *new_disk_conf = *old_disk_conf;
3695                         new_disk_conf->disk_size = p_usize;
3696
3697                         rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3698                         mutex_unlock(&connection->resource->conf_update);
3699                         synchronize_rcu();
3700                         kfree(old_disk_conf);
3701
3702                         drbd_info(device, "Peer sets u_size to %lu sectors\n",
3703                                  (unsigned long)my_usize);
3704                 }
3705
3706                 put_ldev(device);
3707         }
3708
3709         device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3710         drbd_reconsider_max_bio_size(device);
3711         /* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
3712            In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3713            drbd_reconsider_max_bio_size(), we can be sure that after
3714            drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3715
3716         ddsf = be16_to_cpu(p->dds_flags);
3717         if (get_ldev(device)) {
3718                 dd = drbd_determine_dev_size(device, ddsf, NULL);
3719                 put_ldev(device);
3720                 if (dd == DS_ERROR)
3721                         return -EIO;
3722                 drbd_md_sync(device);
3723         } else {
3724                 /* I am diskless, need to accept the peer's size. */
3725                 drbd_set_my_capacity(device, p_size);
3726         }
3727
3728         if (get_ldev(device)) {
3729                 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3730                         device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3731                         ldsc = 1;
3732                 }
3733
3734                 put_ldev(device);
3735         }
3736
3737         if (device->state.conn > C_WF_REPORT_PARAMS) {
3738                 if (be64_to_cpu(p->c_size) !=
3739                     drbd_get_capacity(device->this_bdev) || ldsc) {
3740                         /* we have different sizes, probably peer
3741                          * needs to know my new size... */
3742                         drbd_send_sizes(peer_device, 0, ddsf);
3743                 }
3744                 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3745                     (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3746                         if (device->state.pdsk >= D_INCONSISTENT &&
3747                             device->state.disk >= D_INCONSISTENT) {
3748                                 if (ddsf & DDSF_NO_RESYNC)
3749                                         drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3750                                 else
3751                                         resync_after_online_grow(device);
3752                         } else
3753                                 set_bit(RESYNC_AFTER_NEG, &device->flags);
3754                 }
3755         }
3756
3757         return 0;
3758 }
3759
3760 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3761 {
3762         struct drbd_peer_device *peer_device;
3763         struct drbd_device *device;
3764         struct p_uuids *p = pi->data;
3765         u64 *p_uuid;
3766         int i, updated_uuids = 0;
3767
3768         peer_device = conn_peer_device(connection, pi->vnr);
3769         if (!peer_device)
3770                 return config_unknown_volume(connection, pi);
3771         device = peer_device->device;
3772
3773         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3774         if (!p_uuid) {
3775                 drbd_err(device, "kmalloc of p_uuid failed\n");
3776                 return false;
3777         }
3778
3779         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3780                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3781
3782         kfree(device->p_uuid);
3783         device->p_uuid = p_uuid;
3784
3785         if (device->state.conn < C_CONNECTED &&
3786             device->state.disk < D_INCONSISTENT &&
3787             device->state.role == R_PRIMARY &&
3788             (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3789                 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3790                     (unsigned long long)device->ed_uuid);
3791                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3792                 return -EIO;
3793         }
3794
3795         if (get_ldev(device)) {
3796                 int skip_initial_sync =
3797                         device->state.conn == C_CONNECTED &&
3798                         peer_device->connection->agreed_pro_version >= 90 &&
3799                         device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3800                         (p_uuid[UI_FLAGS] & 8);
3801                 if (skip_initial_sync) {
3802                         drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3803                         drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3804                                         "clear_n_write from receive_uuids",
3805                                         BM_LOCKED_TEST_ALLOWED);
3806                         _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3807                         _drbd_uuid_set(device, UI_BITMAP, 0);
3808                         _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3809                                         CS_VERBOSE, NULL);
3810                         drbd_md_sync(device);
3811                         updated_uuids = 1;
3812                 }
3813                 put_ldev(device);
3814         } else if (device->state.disk < D_INCONSISTENT &&
3815                    device->state.role == R_PRIMARY) {
3816                 /* I am a diskless primary, the peer just created a new current UUID
3817                    for me. */
3818                 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3819         }
3820
3821         /* Before we test for the disk state, we should wait until an eventually
3822            ongoing cluster wide state change is finished. That is important if
3823            we are primary and are detaching from our disk. We need to see the
3824            new disk state... */
3825         mutex_lock(device->state_mutex);
3826         mutex_unlock(device->state_mutex);
3827         if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3828                 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3829
3830         if (updated_uuids)
3831                 drbd_print_uuids(device, "receiver updated UUIDs to");
3832
3833         return 0;
3834 }
3835
3836 /**
3837  * convert_state() - Converts the peer's view of the cluster state to our point of view
3838  * @ps:         The state as seen by the peer.
3839  */
3840 static union drbd_state convert_state(union drbd_state ps)
3841 {
3842         union drbd_state ms;
3843
3844         static enum drbd_conns c_tab[] = {
3845                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3846                 [C_CONNECTED] = C_CONNECTED,
3847
3848                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3849                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3850                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3851                 [C_VERIFY_S]       = C_VERIFY_T,
3852                 [C_MASK]   = C_MASK,
3853         };
3854
3855         ms.i = ps.i;
3856
3857         ms.conn = c_tab[ps.conn];
3858         ms.peer = ps.role;
3859         ms.role = ps.peer;
3860         ms.pdsk = ps.disk;
3861         ms.disk = ps.pdsk;
3862         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3863
3864         return ms;
3865 }
3866
3867 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
3868 {
3869         struct drbd_peer_device *peer_device;
3870         struct drbd_device *device;
3871         struct p_req_state *p = pi->data;
3872         union drbd_state mask, val;
3873         enum drbd_state_rv rv;
3874
3875         peer_device = conn_peer_device(connection, pi->vnr);
3876         if (!peer_device)
3877                 return -EIO;
3878         device = peer_device->device;
3879
3880         mask.i = be32_to_cpu(p->mask);
3881         val.i = be32_to_cpu(p->val);
3882
3883         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
3884             mutex_is_locked(device->state_mutex)) {
3885                 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
3886                 return 0;
3887         }
3888
3889         mask = convert_state(mask);
3890         val = convert_state(val);
3891
3892         rv = drbd_change_state(device, CS_VERBOSE, mask, val);
3893         drbd_send_sr_reply(peer_device, rv);
3894
3895         drbd_md_sync(device);
3896
3897         return 0;
3898 }
3899
3900 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
3901 {
3902         struct p_req_state *p = pi->data;
3903         union drbd_state mask, val;
3904         enum drbd_state_rv rv;
3905
3906         mask.i = be32_to_cpu(p->mask);
3907         val.i = be32_to_cpu(p->val);
3908
3909         if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
3910             mutex_is_locked(&connection->cstate_mutex)) {
3911                 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
3912                 return 0;
3913         }
3914
3915         mask = convert_state(mask);
3916         val = convert_state(val);
3917
3918         rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3919         conn_send_sr_reply(connection, rv);
3920
3921         return 0;
3922 }
3923
3924 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
3925 {
3926         struct drbd_peer_device *peer_device;
3927         struct drbd_device *device;
3928         struct p_state *p = pi->data;
3929         union drbd_state os, ns, peer_state;
3930         enum drbd_disk_state real_peer_disk;
3931         enum chg_state_flags cs_flags;
3932         int rv;
3933
3934         peer_device = conn_peer_device(connection, pi->vnr);
3935         if (!peer_device)
3936                 return config_unknown_volume(connection, pi);
3937         device = peer_device->device;
3938
3939         peer_state.i = be32_to_cpu(p->state);
3940
3941         real_peer_disk = peer_state.disk;
3942         if (peer_state.disk == D_NEGOTIATING) {
3943                 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3944                 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3945         }
3946
3947         spin_lock_irq(&device->resource->req_lock);
3948  retry:
3949         os = ns = drbd_read_state(device);
3950         spin_unlock_irq(&device->resource->req_lock);
3951
3952         /* If some other part of the code (asender thread, timeout)
3953          * already decided to close the connection again,
3954          * we must not "re-establish" it here. */
3955         if (os.conn <= C_TEAR_DOWN)
3956                 return -ECONNRESET;
3957
3958         /* If this is the "end of sync" confirmation, usually the peer disk
3959          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3960          * set) resync started in PausedSyncT, or if the timing of pause-/
3961          * unpause-sync events has been "just right", the peer disk may
3962          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3963          */
3964         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3965             real_peer_disk == D_UP_TO_DATE &&
3966             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3967                 /* If we are (becoming) SyncSource, but peer is still in sync
3968                  * preparation, ignore its uptodate-ness to avoid flapping, it
3969                  * will change to inconsistent once the peer reaches active
3970                  * syncing states.
3971                  * It may have changed syncer-paused flags, however, so we
3972                  * cannot ignore this completely. */
3973                 if (peer_state.conn > C_CONNECTED &&
3974                     peer_state.conn < C_SYNC_SOURCE)
3975                         real_peer_disk = D_INCONSISTENT;
3976
3977                 /* if peer_state changes to connected at the same time,
3978                  * it explicitly notifies us that it finished resync.
3979                  * Maybe we should finish it up, too? */
3980                 else if (os.conn >= C_SYNC_SOURCE &&
3981                          peer_state.conn == C_CONNECTED) {
3982                         if (drbd_bm_total_weight(device) <= device->rs_failed)
3983                                 drbd_resync_finished(device);
3984                         return 0;
3985                 }
3986         }
3987
3988         /* explicit verify finished notification, stop sector reached. */
3989         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3990             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3991                 ov_out_of_sync_print(device);
3992                 drbd_resync_finished(device);
3993                 return 0;
3994         }
3995
3996         /* peer says his disk is inconsistent, while we think it is uptodate,
3997          * and this happens while the peer still thinks we have a sync going on,
3998          * but we think we are already done with the sync.
3999          * We ignore this to avoid flapping pdsk.
4000          * This should not happen, if the peer is a recent version of drbd. */
4001         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4002             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4003                 real_peer_disk = D_UP_TO_DATE;
4004
4005         if (ns.conn == C_WF_REPORT_PARAMS)
4006                 ns.conn = C_CONNECTED;
4007
4008         if (peer_state.conn == C_AHEAD)
4009                 ns.conn = C_BEHIND;
4010
4011         if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4012             get_ldev_if_state(device, D_NEGOTIATING)) {
4013                 int cr; /* consider resync */
4014
4015                 /* if we established a new connection */
4016                 cr  = (os.conn < C_CONNECTED);
4017                 /* if we had an established connection
4018                  * and one of the nodes newly attaches a disk */
4019                 cr |= (os.conn == C_CONNECTED &&
4020                        (peer_state.disk == D_NEGOTIATING ||
4021                         os.disk == D_NEGOTIATING));
4022                 /* if we have both been inconsistent, and the peer has been
4023                  * forced to be UpToDate with --overwrite-data */
4024                 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4025                 /* if we had been plain connected, and the admin requested to
4026                  * start a sync by "invalidate" or "invalidate-remote" */
4027                 cr |= (os.conn == C_CONNECTED &&
4028                                 (peer_state.conn >= C_STARTING_SYNC_S &&
4029                                  peer_state.conn <= C_WF_BITMAP_T));
4030
4031                 if (cr)
4032                         ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4033
4034                 put_ldev(device);
4035                 if (ns.conn == C_MASK) {
4036                         ns.conn = C_CONNECTED;
4037                         if (device->state.disk == D_NEGOTIATING) {
4038                                 drbd_force_state(device, NS(disk, D_FAILED));
4039                         } else if (peer_state.disk == D_NEGOTIATING) {
4040                                 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4041                                 peer_state.disk = D_DISKLESS;
4042                                 real_peer_disk = D_DISKLESS;
4043                         } else {
4044                                 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4045                                         return -EIO;
4046                                 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4047                                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4048                                 return -EIO;
4049                         }
4050                 }
4051         }
4052
4053         spin_lock_irq(&device->resource->req_lock);
4054         if (os.i != drbd_read_state(device).i)
4055                 goto retry;
4056         clear_bit(CONSIDER_RESYNC, &device->flags);
4057         ns.peer = peer_state.role;
4058         ns.pdsk = real_peer_disk;
4059         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4060         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4061                 ns.disk = device->new_state_tmp.disk;
4062         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4063         if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4064             test_bit(NEW_CUR_UUID, &device->flags)) {
4065                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4066                    for temporal network outages! */
4067                 spin_unlock_irq(&device->resource->req_lock);
4068                 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4069                 tl_clear(peer_device->connection);
4070                 drbd_uuid_new_current(device);
4071                 clear_bit(NEW_CUR_UUID, &device->flags);
4072                 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4073                 return -EIO;
4074         }
4075         rv = _drbd_set_state(device, ns, cs_flags, NULL);
4076         ns = drbd_read_state(device);
4077         spin_unlock_irq(&device->resource->req_lock);
4078
4079         if (rv < SS_SUCCESS) {
4080                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4081                 return -EIO;
4082         }
4083
4084         if (os.conn > C_WF_REPORT_PARAMS) {
4085                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4086                     peer_state.disk != D_NEGOTIATING ) {
4087                         /* we want resync, peer has not yet decided to sync... */
4088                         /* Nowadays only used when forcing a node into primary role and
4089                            setting its disk to UpToDate with that */
4090                         drbd_send_uuids(peer_device);
4091                         drbd_send_current_state(peer_device);
4092                 }
4093         }
4094
4095         clear_bit(DISCARD_MY_DATA, &device->flags);
4096
4097         drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4098
4099         return 0;
4100 }
4101
4102 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4103 {
4104         struct drbd_peer_device *peer_device;
4105         struct drbd_device *device;
4106         struct p_rs_uuid *p = pi->data;
4107
4108         peer_device = conn_peer_device(connection, pi->vnr);
4109         if (!peer_device)
4110                 return -EIO;
4111         device = peer_device->device;
4112
4113         wait_event(device->misc_wait,
4114                    device->state.conn == C_WF_SYNC_UUID ||
4115                    device->state.conn == C_BEHIND ||
4116                    device->state.conn < C_CONNECTED ||
4117                    device->state.disk < D_NEGOTIATING);
4118
4119         /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4120
4121         /* Here the _drbd_uuid_ functions are right, current should
4122            _not_ be rotated into the history */
4123         if (get_ldev_if_state(device, D_NEGOTIATING)) {
4124                 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4125                 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4126
4127                 drbd_print_uuids(device, "updated sync uuid");
4128                 drbd_start_resync(device, C_SYNC_TARGET);
4129
4130                 put_ldev(device);
4131         } else
4132                 drbd_err(device, "Ignoring SyncUUID packet!\n");
4133
4134         return 0;
4135 }
4136
4137 /**
4138  * receive_bitmap_plain
4139  *
4140  * Return 0 when done, 1 when another iteration is needed, and a negative error
4141  * code upon failure.
4142  */
4143 static int
4144 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4145                      unsigned long *p, struct bm_xfer_ctx *c)
4146 {
4147         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4148                                  drbd_header_size(peer_device->connection);
4149         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4150                                        c->bm_words - c->word_offset);
4151         unsigned int want = num_words * sizeof(*p);
4152         int err;
4153
4154         if (want != size) {
4155                 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4156                 return -EIO;
4157         }
4158         if (want == 0)
4159                 return 0;
4160         err = drbd_recv_all(peer_device->connection, p, want);
4161         if (err)
4162                 return err;
4163
4164         drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4165
4166         c->word_offset += num_words;
4167         c->bit_offset = c->word_offset * BITS_PER_LONG;
4168         if (c->bit_offset > c->bm_bits)
4169                 c->bit_offset = c->bm_bits;
4170
4171         return 1;
4172 }
4173
4174 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4175 {
4176         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4177 }
4178
4179 static int dcbp_get_start(struct p_compressed_bm *p)
4180 {
4181         return (p->encoding & 0x80) != 0;
4182 }
4183
4184 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4185 {
4186         return (p->encoding >> 4) & 0x7;
4187 }
4188
4189 /**
4190  * recv_bm_rle_bits
4191  *
4192  * Return 0 when done, 1 when another iteration is needed, and a negative error
4193  * code upon failure.
4194  */
4195 static int
4196 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4197                 struct p_compressed_bm *p,
4198                  struct bm_xfer_ctx *c,
4199                  unsigned int len)
4200 {
4201         struct bitstream bs;
4202         u64 look_ahead;
4203         u64 rl;
4204         u64 tmp;
4205         unsigned long s = c->bit_offset;
4206         unsigned long e;
4207         int toggle = dcbp_get_start(p);
4208         int have;
4209         int bits;
4210
4211         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4212
4213         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4214         if (bits < 0)
4215                 return -EIO;
4216
4217         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4218                 bits = vli_decode_bits(&rl, look_ahead);
4219                 if (bits <= 0)
4220                         return -EIO;
4221
4222                 if (toggle) {
4223                         e = s + rl -1;
4224                         if (e >= c->bm_bits) {
4225                                 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4226                                 return -EIO;
4227                         }
4228                         _drbd_bm_set_bits(peer_device->device, s, e);
4229                 }
4230
4231                 if (have < bits) {
4232                         drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4233                                 have, bits, look_ahead,
4234                                 (unsigned int)(bs.cur.b - p->code),
4235                                 (unsigned int)bs.buf_len);
4236                         return -EIO;
4237                 }
4238                 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4239                 if (likely(bits < 64))
4240                         look_ahead >>= bits;
4241                 else
4242                         look_ahead = 0;
4243                 have -= bits;
4244
4245                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4246                 if (bits < 0)
4247                         return -EIO;
4248                 look_ahead |= tmp << have;
4249                 have += bits;
4250         }
4251
4252         c->bit_offset = s;
4253         bm_xfer_ctx_bit_to_word_offset(c);
4254
4255         return (s != c->bm_bits);
4256 }
4257
4258 /**
4259  * decode_bitmap_c
4260  *
4261  * Return 0 when done, 1 when another iteration is needed, and a negative error
4262  * code upon failure.
4263  */
4264 static int
4265 decode_bitmap_c(struct drbd_peer_device *peer_device,
4266                 struct p_compressed_bm *p,
4267                 struct bm_xfer_ctx *c,
4268                 unsigned int len)
4269 {
4270         if (dcbp_get_code(p) == RLE_VLI_Bits)
4271                 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4272
4273         /* other variants had been implemented for evaluation,
4274          * but have been dropped as this one turned out to be "best"
4275          * during all our tests. */
4276
4277         drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4278         conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4279         return -EIO;
4280 }
4281
4282 void INFO_bm_xfer_stats(struct drbd_device *device,
4283                 const char *direction, struct bm_xfer_ctx *c)
4284 {
4285         /* what would it take to transfer it "plaintext" */
4286         unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4287         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4288         unsigned int plain =
4289                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4290                 c->bm_words * sizeof(unsigned long);
4291         unsigned int total = c->bytes[0] + c->bytes[1];
4292         unsigned int r;
4293
4294         /* total can not be zero. but just in case: */
4295         if (total == 0)
4296                 return;
4297
4298         /* don't report if not compressed */
4299         if (total >= plain)
4300                 return;
4301
4302         /* total < plain. check for overflow, still */
4303         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4304                                     : (1000 * total / plain);
4305
4306         if (r > 1000)
4307                 r = 1000;
4308
4309         r = 1000 - r;
4310         drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4311              "total %u; compression: %u.%u%%\n",
4312                         direction,
4313                         c->bytes[1], c->packets[1],
4314                         c->bytes[0], c->packets[0],
4315                         total, r/10, r % 10);
4316 }
4317
4318 /* Since we are processing the bitfield from lower addresses to higher,
4319    it does not matter if the process it in 32 bit chunks or 64 bit
4320    chunks as long as it is little endian. (Understand it as byte stream,
4321    beginning with the lowest byte...) If we would use big endian
4322    we would need to process it from the highest address to the lowest,
4323    in order to be agnostic to the 32 vs 64 bits issue.
4324
4325    returns 0 on failure, 1 if we successfully received it. */
4326 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4327 {
4328         struct drbd_peer_device *peer_device;
4329         struct drbd_device *device;
4330         struct bm_xfer_ctx c;
4331         int err;
4332
4333         peer_device = conn_peer_device(connection, pi->vnr);
4334         if (!peer_device)
4335                 return -EIO;
4336         device = peer_device->device;
4337
4338         drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4339         /* you are supposed to send additional out-of-sync information
4340          * if you actually set bits during this phase */
4341
4342         c = (struct bm_xfer_ctx) {
4343                 .bm_bits = drbd_bm_bits(device),
4344                 .bm_words = drbd_bm_words(device),
4345         };
4346
4347         for(;;) {
4348                 if (pi->cmd == P_BITMAP)
4349                         err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4350                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4351                         /* MAYBE: sanity check that we speak proto >= 90,
4352                          * and the feature is enabled! */
4353                         struct p_compressed_bm *p = pi->data;
4354
4355                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4356                                 drbd_err(device, "ReportCBitmap packet too large\n");
4357                                 err = -EIO;
4358                                 goto out;
4359                         }
4360                         if (pi->size <= sizeof(*p)) {
4361                                 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4362                                 err = -EIO;
4363                                 goto out;
4364                         }
4365                         err = drbd_recv_all(peer_device->connection, p, pi->size);
4366                         if (err)
4367                                goto out;
4368                         err = decode_bitmap_c(peer_device, p, &c, pi->size);
4369                 } else {
4370                         drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4371                         err = -EIO;
4372                         goto out;
4373                 }
4374
4375                 c.packets[pi->cmd == P_BITMAP]++;
4376                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4377
4378                 if (err <= 0) {
4379                         if (err < 0)
4380                                 goto out;
4381                         break;
4382                 }
4383                 err = drbd_recv_header(peer_device->connection, pi);
4384                 if (err)
4385                         goto out;
4386         }
4387
4388         INFO_bm_xfer_stats(device, "receive", &c);
4389
4390         if (device->state.conn == C_WF_BITMAP_T) {
4391                 enum drbd_state_rv rv;
4392
4393                 err = drbd_send_bitmap(device);
4394                 if (err)
4395                         goto out;
4396                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4397                 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4398                 D_ASSERT(device, rv == SS_SUCCESS);
4399         } else if (device->state.conn != C_WF_BITMAP_S) {
4400                 /* admin may have requested C_DISCONNECTING,
4401                  * other threads may have noticed network errors */
4402                 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4403                     drbd_conn_str(device->state.conn));
4404         }
4405         err = 0;
4406
4407  out:
4408         drbd_bm_unlock(device);
4409         if (!err && device->state.conn == C_WF_BITMAP_S)
4410                 drbd_start_resync(device, C_SYNC_SOURCE);
4411         return err;
4412 }
4413
4414 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4415 {
4416         drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4417                  pi->cmd, pi->size);
4418
4419         return ignore_remaining_packet(connection, pi);
4420 }
4421
4422 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4423 {
4424         /* Make sure we've acked all the TCP data associated
4425          * with the data requests being unplugged */
4426         drbd_tcp_quickack(connection->data.socket);
4427
4428         return 0;
4429 }
4430
4431 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4432 {
4433         struct drbd_peer_device *peer_device;
4434         struct drbd_device *device;
4435         struct p_block_desc *p = pi->data;
4436
4437         peer_device = conn_peer_device(connection, pi->vnr);
4438         if (!peer_device)
4439                 return -EIO;
4440         device = peer_device->device;
4441
4442         switch (device->state.conn) {
4443         case C_WF_SYNC_UUID:
4444         case C_WF_BITMAP_T:
4445         case C_BEHIND:
4446                         break;
4447         default:
4448                 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4449                                 drbd_conn_str(device->state.conn));
4450         }
4451
4452         drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4453
4454         return 0;
4455 }
4456
4457 struct data_cmd {
4458         int expect_payload;
4459         size_t pkt_size;
4460         int (*fn)(struct drbd_connection *, struct packet_info *);
4461 };
4462
4463 static struct data_cmd drbd_cmd_handler[] = {
4464         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4465         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4466         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4467         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4468         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4469         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4470         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4471         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4472         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4473         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4474         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4475         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4476         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4477         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4478         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4479         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4480         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4481         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4482         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4483         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4484         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4485         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4486         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4487         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4488         [P_TRIM]            = { 0, sizeof(struct p_trim), receive_Data },
4489 };
4490
4491 static void drbdd(struct drbd_connection *connection)
4492 {
4493         struct packet_info pi;
4494         size_t shs; /* sub header size */
4495         int err;
4496
4497         while (get_t_state(&connection->receiver) == RUNNING) {
4498                 struct data_cmd *cmd;
4499
4500                 drbd_thread_current_set_cpu(&connection->receiver);
4501                 if (drbd_recv_header(connection, &pi))
4502                         goto err_out;
4503
4504                 cmd = &drbd_cmd_handler[pi.cmd];
4505                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4506                         drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4507                                  cmdname(pi.cmd), pi.cmd);
4508                         goto err_out;
4509                 }
4510
4511                 shs = cmd->pkt_size;
4512                 if (pi.size > shs && !cmd->expect_payload) {
4513                         drbd_err(connection, "No payload expected %s l:%d\n",
4514                                  cmdname(pi.cmd), pi.size);
4515                         goto err_out;
4516                 }
4517
4518                 if (shs) {
4519                         err = drbd_recv_all_warn(connection, pi.data, shs);
4520                         if (err)
4521                                 goto err_out;
4522                         pi.size -= shs;
4523                 }
4524
4525                 err = cmd->fn(connection, &pi);
4526                 if (err) {
4527                         drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4528                                  cmdname(pi.cmd), err, pi.size);
4529                         goto err_out;
4530                 }
4531         }
4532         return;
4533
4534     err_out:
4535         conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4536 }
4537
4538 static void conn_disconnect(struct drbd_connection *connection)
4539 {
4540         struct drbd_peer_device *peer_device;
4541         enum drbd_conns oc;
4542         int vnr;
4543
4544         if (connection->cstate == C_STANDALONE)
4545                 return;
4546
4547         /* We are about to start the cleanup after connection loss.
4548          * Make sure drbd_make_request knows about that.
4549          * Usually we should be in some network failure state already,
4550          * but just in case we are not, we fix it up here.
4551          */
4552         conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4553
4554         /* asender does not clean up anything. it must not interfere, either */
4555         drbd_thread_stop(&connection->asender);
4556         drbd_free_sock(connection);
4557
4558         rcu_read_lock();
4559         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4560                 struct drbd_device *device = peer_device->device;
4561                 kref_get(&device->kref);
4562                 rcu_read_unlock();
4563                 drbd_disconnected(peer_device);
4564                 kref_put(&device->kref, drbd_destroy_device);
4565                 rcu_read_lock();
4566         }
4567         rcu_read_unlock();
4568
4569         if (!list_empty(&connection->current_epoch->list))
4570                 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4571         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4572         atomic_set(&connection->current_epoch->epoch_size, 0);
4573         connection->send.seen_any_write_yet = false;
4574
4575         drbd_info(connection, "Connection closed\n");
4576
4577         if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4578                 conn_try_outdate_peer_async(connection);
4579
4580         spin_lock_irq(&connection->resource->req_lock);
4581         oc = connection->cstate;
4582         if (oc >= C_UNCONNECTED)
4583                 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4584
4585         spin_unlock_irq(&connection->resource->req_lock);
4586
4587         if (oc == C_DISCONNECTING)
4588                 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4589 }
4590
4591 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4592 {
4593         struct drbd_device *device = peer_device->device;
4594         unsigned int i;
4595
4596         /* wait for current activity to cease. */
4597         spin_lock_irq(&device->resource->req_lock);
4598         _drbd_wait_ee_list_empty(device, &device->active_ee);
4599         _drbd_wait_ee_list_empty(device, &device->sync_ee);
4600         _drbd_wait_ee_list_empty(device, &device->read_ee);
4601         spin_unlock_irq(&device->resource->req_lock);
4602
4603         /* We do not have data structures that would allow us to
4604          * get the rs_pending_cnt down to 0 again.
4605          *  * On C_SYNC_TARGET we do not have any data structures describing
4606          *    the pending RSDataRequest's we have sent.
4607          *  * On C_SYNC_SOURCE there is no data structure that tracks
4608          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4609          *  And no, it is not the sum of the reference counts in the
4610          *  resync_LRU. The resync_LRU tracks the whole operation including
4611          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4612          *  on the fly. */
4613         drbd_rs_cancel_all(device);
4614         device->rs_total = 0;
4615         device->rs_failed = 0;
4616         atomic_set(&device->rs_pending_cnt, 0);
4617         wake_up(&device->misc_wait);
4618
4619         del_timer_sync(&device->resync_timer);
4620         resync_timer_fn((unsigned long)device);
4621
4622         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4623          * w_make_resync_request etc. which may still be on the worker queue
4624          * to be "canceled" */
4625         drbd_flush_workqueue(&peer_device->connection->sender_work);
4626
4627         drbd_finish_peer_reqs(device);
4628
4629         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4630            might have issued a work again. The one before drbd_finish_peer_reqs() is
4631            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4632         drbd_flush_workqueue(&peer_device->connection->sender_work);
4633
4634         /* need to do it again, drbd_finish_peer_reqs() may have populated it
4635          * again via drbd_try_clear_on_disk_bm(). */
4636         drbd_rs_cancel_all(device);
4637
4638         kfree(device->p_uuid);
4639         device->p_uuid = NULL;
4640
4641         if (!drbd_suspended(device))
4642                 tl_clear(peer_device->connection);
4643
4644         drbd_md_sync(device);
4645
4646         /* serialize with bitmap writeout triggered by the state change,
4647          * if any. */
4648         wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4649
4650         /* tcp_close and release of sendpage pages can be deferred.  I don't
4651          * want to use SO_LINGER, because apparently it can be deferred for
4652          * more than 20 seconds (longest time I checked).
4653          *
4654          * Actually we don't care for exactly when the network stack does its
4655          * put_page(), but release our reference on these pages right here.
4656          */
4657         i = drbd_free_peer_reqs(device, &device->net_ee);
4658         if (i)
4659                 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4660         i = atomic_read(&device->pp_in_use_by_net);
4661         if (i)
4662                 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4663         i = atomic_read(&device->pp_in_use);
4664         if (i)
4665                 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4666
4667         D_ASSERT(device, list_empty(&device->read_ee));
4668         D_ASSERT(device, list_empty(&device->active_ee));
4669         D_ASSERT(device, list_empty(&device->sync_ee));
4670         D_ASSERT(device, list_empty(&device->done_ee));
4671
4672         return 0;
4673 }
4674
4675 /*
4676  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4677  * we can agree on is stored in agreed_pro_version.
4678  *
4679  * feature flags and the reserved array should be enough room for future
4680  * enhancements of the handshake protocol, and possible plugins...
4681  *
4682  * for now, they are expected to be zero, but ignored.
4683  */
4684 static int drbd_send_features(struct drbd_connection *connection)
4685 {
4686         struct drbd_socket *sock;
4687         struct p_connection_features *p;
4688
4689         sock = &connection->data;
4690         p = conn_prepare_command(connection, sock);
4691         if (!p)
4692                 return -EIO;
4693         memset(p, 0, sizeof(*p));
4694         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4695         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4696         p->feature_flags = cpu_to_be32(PRO_FEATURES);
4697         return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4698 }
4699
4700 /*
4701  * return values:
4702  *   1 yes, we have a valid connection
4703  *   0 oops, did not work out, please try again
4704  *  -1 peer talks different language,
4705  *     no point in trying again, please go standalone.
4706  */
4707 static int drbd_do_features(struct drbd_connection *connection)
4708 {
4709         /* ASSERT current == connection->receiver ... */
4710         struct p_connection_features *p;
4711         const int expect = sizeof(struct p_connection_features);
4712         struct packet_info pi;
4713         int err;
4714
4715         err = drbd_send_features(connection);
4716         if (err)
4717                 return 0;
4718
4719         err = drbd_recv_header(connection, &pi);
4720         if (err)
4721                 return 0;
4722
4723         if (pi.cmd != P_CONNECTION_FEATURES) {
4724                 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4725                          cmdname(pi.cmd), pi.cmd);
4726                 return -1;
4727         }
4728
4729         if (pi.size != expect) {
4730                 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4731                      expect, pi.size);
4732                 return -1;
4733         }
4734
4735         p = pi.data;
4736         err = drbd_recv_all_warn(connection, p, expect);
4737         if (err)
4738                 return 0;
4739
4740         p->protocol_min = be32_to_cpu(p->protocol_min);
4741         p->protocol_max = be32_to_cpu(p->protocol_max);
4742         if (p->protocol_max == 0)
4743                 p->protocol_max = p->protocol_min;
4744
4745         if (PRO_VERSION_MAX < p->protocol_min ||
4746             PRO_VERSION_MIN > p->protocol_max)
4747                 goto incompat;
4748
4749         connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4750         connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
4751
4752         drbd_info(connection, "Handshake successful: "
4753              "Agreed network protocol version %d\n", connection->agreed_pro_version);
4754
4755         drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
4756                   connection->agreed_features & FF_TRIM ? " " : " not ");
4757
4758         return 1;
4759
4760  incompat:
4761         drbd_err(connection, "incompatible DRBD dialects: "
4762             "I support %d-%d, peer supports %d-%d\n",
4763             PRO_VERSION_MIN, PRO_VERSION_MAX,
4764             p->protocol_min, p->protocol_max);
4765         return -1;
4766 }
4767
4768 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4769 static int drbd_do_auth(struct drbd_connection *connection)
4770 {
4771         drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4772         drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4773         return -1;
4774 }
4775 #else
4776 #define CHALLENGE_LEN 64
4777
4778 /* Return value:
4779         1 - auth succeeded,
4780         0 - failed, try again (network error),
4781         -1 - auth failed, don't try again.
4782 */
4783
4784 static int drbd_do_auth(struct drbd_connection *connection)
4785 {
4786         struct drbd_socket *sock;
4787         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4788         struct scatterlist sg;
4789         char *response = NULL;
4790         char *right_response = NULL;
4791         char *peers_ch = NULL;
4792         unsigned int key_len;
4793         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4794         unsigned int resp_size;
4795         struct hash_desc desc;
4796         struct packet_info pi;
4797         struct net_conf *nc;
4798         int err, rv;
4799
4800         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4801
4802         rcu_read_lock();
4803         nc = rcu_dereference(connection->net_conf);
4804         key_len = strlen(nc->shared_secret);
4805         memcpy(secret, nc->shared_secret, key_len);
4806         rcu_read_unlock();
4807
4808         desc.tfm = connection->cram_hmac_tfm;
4809         desc.flags = 0;
4810
4811         rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4812         if (rv) {
4813                 drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
4814                 rv = -1;
4815                 goto fail;
4816         }
4817
4818         get_random_bytes(my_challenge, CHALLENGE_LEN);
4819
4820         sock = &connection->data;
4821         if (!conn_prepare_command(connection, sock)) {
4822                 rv = 0;
4823                 goto fail;
4824         }
4825         rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4826                                 my_challenge, CHALLENGE_LEN);
4827         if (!rv)
4828                 goto fail;
4829
4830         err = drbd_recv_header(connection, &pi);
4831         if (err) {
4832                 rv = 0;
4833                 goto fail;
4834         }
4835
4836         if (pi.cmd != P_AUTH_CHALLENGE) {
4837                 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4838                          cmdname(pi.cmd), pi.cmd);
4839                 rv = 0;
4840                 goto fail;
4841         }
4842
4843         if (pi.size > CHALLENGE_LEN * 2) {
4844                 drbd_err(connection, "expected AuthChallenge payload too big.\n");
4845                 rv = -1;
4846                 goto fail;
4847         }
4848
4849         if (pi.size < CHALLENGE_LEN) {
4850                 drbd_err(connection, "AuthChallenge payload too small.\n");
4851                 rv = -1;
4852                 goto fail;
4853         }
4854
4855         peers_ch = kmalloc(pi.size, GFP_NOIO);
4856         if (peers_ch == NULL) {
4857                 drbd_err(connection, "kmalloc of peers_ch failed\n");
4858                 rv = -1;
4859                 goto fail;
4860         }
4861
4862         err = drbd_recv_all_warn(connection, peers_ch, pi.size);
4863         if (err) {
4864                 rv = 0;
4865                 goto fail;
4866         }
4867
4868         if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
4869                 drbd_err(connection, "Peer presented the same challenge!\n");
4870                 rv = -1;
4871                 goto fail;
4872         }
4873
4874         resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
4875         response = kmalloc(resp_size, GFP_NOIO);
4876         if (response == NULL) {
4877                 drbd_err(connection, "kmalloc of response failed\n");
4878                 rv = -1;
4879                 goto fail;
4880         }
4881
4882         sg_init_table(&sg, 1);
4883         sg_set_buf(&sg, peers_ch, pi.size);
4884
4885         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4886         if (rv) {
4887                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4888                 rv = -1;
4889                 goto fail;
4890         }
4891
4892         if (!conn_prepare_command(connection, sock)) {
4893                 rv = 0;
4894                 goto fail;
4895         }
4896         rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
4897                                 response, resp_size);
4898         if (!rv)
4899                 goto fail;
4900
4901         err = drbd_recv_header(connection, &pi);
4902         if (err) {
4903                 rv = 0;
4904                 goto fail;
4905         }
4906
4907         if (pi.cmd != P_AUTH_RESPONSE) {
4908                 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
4909                          cmdname(pi.cmd), pi.cmd);
4910                 rv = 0;
4911                 goto fail;
4912         }
4913
4914         if (pi.size != resp_size) {
4915                 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
4916                 rv = 0;
4917                 goto fail;
4918         }
4919
4920         err = drbd_recv_all_warn(connection, response , resp_size);
4921         if (err) {
4922                 rv = 0;
4923                 goto fail;
4924         }
4925
4926         right_response = kmalloc(resp_size, GFP_NOIO);
4927         if (right_response == NULL) {
4928                 drbd_err(connection, "kmalloc of right_response failed\n");
4929                 rv = -1;
4930                 goto fail;
4931         }
4932
4933         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4934
4935         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4936         if (rv) {
4937                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4938                 rv = -1;
4939                 goto fail;
4940         }
4941
4942         rv = !memcmp(response, right_response, resp_size);
4943
4944         if (rv)
4945                 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
4946                      resp_size);
4947         else
4948                 rv = -1;
4949
4950  fail:
4951         kfree(peers_ch);
4952         kfree(response);
4953         kfree(right_response);
4954
4955         return rv;
4956 }
4957 #endif
4958
4959 int drbd_receiver(struct drbd_thread *thi)
4960 {
4961         struct drbd_connection *connection = thi->connection;
4962         int h;
4963
4964         drbd_info(connection, "receiver (re)started\n");
4965
4966         do {
4967                 h = conn_connect(connection);
4968                 if (h == 0) {
4969                         conn_disconnect(connection);
4970                         schedule_timeout_interruptible(HZ);
4971                 }
4972                 if (h == -1) {
4973                         drbd_warn(connection, "Discarding network configuration.\n");
4974                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
4975                 }
4976         } while (h == 0);
4977
4978         if (h > 0)
4979                 drbdd(connection);
4980
4981         conn_disconnect(connection);
4982
4983         drbd_info(connection, "receiver terminated\n");
4984         return 0;
4985 }
4986
4987 /* ********* acknowledge sender ******** */
4988
4989 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
4990 {
4991         struct p_req_state_reply *p = pi->data;
4992         int retcode = be32_to_cpu(p->retcode);
4993
4994         if (retcode >= SS_SUCCESS) {
4995                 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
4996         } else {
4997                 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
4998                 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
4999                          drbd_set_st_err_str(retcode), retcode);
5000         }
5001         wake_up(&connection->ping_wait);
5002
5003         return 0;
5004 }
5005
5006 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5007 {
5008         struct drbd_peer_device *peer_device;
5009         struct drbd_device *device;
5010         struct p_req_state_reply *p = pi->data;
5011         int retcode = be32_to_cpu(p->retcode);
5012
5013         peer_device = conn_peer_device(connection, pi->vnr);
5014         if (!peer_device)
5015                 return -EIO;
5016         device = peer_device->device;
5017
5018         if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5019                 D_ASSERT(device, connection->agreed_pro_version < 100);
5020                 return got_conn_RqSReply(connection, pi);
5021         }
5022
5023         if (retcode >= SS_SUCCESS) {
5024                 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5025         } else {
5026                 set_bit(CL_ST_CHG_FAIL, &device->flags);
5027                 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5028                         drbd_set_st_err_str(retcode), retcode);
5029         }
5030         wake_up(&device->state_wait);
5031
5032         return 0;
5033 }
5034
5035 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5036 {
5037         return drbd_send_ping_ack(connection);
5038
5039 }
5040
5041 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5042 {
5043         /* restore idle timeout */
5044         connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5045         if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5046                 wake_up(&connection->ping_wait);
5047
5048         return 0;
5049 }
5050
5051 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5052 {
5053         struct drbd_peer_device *peer_device;
5054         struct drbd_device *device;
5055         struct p_block_ack *p = pi->data;
5056         sector_t sector = be64_to_cpu(p->sector);
5057         int blksize = be32_to_cpu(p->blksize);
5058
5059         peer_device = conn_peer_device(connection, pi->vnr);
5060         if (!peer_device)
5061                 return -EIO;
5062         device = peer_device->device;
5063
5064         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5065
5066         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5067
5068         if (get_ldev(device)) {
5069                 drbd_rs_complete_io(device, sector);
5070                 drbd_set_in_sync(device, sector, blksize);
5071                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5072                 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5073                 put_ldev(device);
5074         }
5075         dec_rs_pending(device);
5076         atomic_add(blksize >> 9, &device->rs_sect_in);
5077
5078         return 0;
5079 }
5080
5081 static int
5082 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5083                               struct rb_root *root, const char *func,
5084                               enum drbd_req_event what, bool missing_ok)
5085 {
5086         struct drbd_request *req;
5087         struct bio_and_error m;
5088
5089         spin_lock_irq(&device->resource->req_lock);
5090         req = find_request(device, root, id, sector, missing_ok, func);
5091         if (unlikely(!req)) {
5092                 spin_unlock_irq(&device->resource->req_lock);
5093                 return -EIO;
5094         }
5095         __req_mod(req, what, &m);
5096         spin_unlock_irq(&device->resource->req_lock);
5097
5098         if (m.bio)
5099                 complete_master_bio(device, &m);
5100         return 0;
5101 }
5102
5103 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5104 {
5105         struct drbd_peer_device *peer_device;
5106         struct drbd_device *device;
5107         struct p_block_ack *p = pi->data;
5108         sector_t sector = be64_to_cpu(p->sector);
5109         int blksize = be32_to_cpu(p->blksize);
5110         enum drbd_req_event what;
5111
5112         peer_device = conn_peer_device(connection, pi->vnr);
5113         if (!peer_device)
5114                 return -EIO;
5115         device = peer_device->device;
5116
5117         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5118
5119         if (p->block_id == ID_SYNCER) {
5120                 drbd_set_in_sync(device, sector, blksize);
5121                 dec_rs_pending(device);
5122                 return 0;
5123         }
5124         switch (pi->cmd) {
5125         case P_RS_WRITE_ACK:
5126                 what = WRITE_ACKED_BY_PEER_AND_SIS;
5127                 break;
5128         case P_WRITE_ACK:
5129                 what = WRITE_ACKED_BY_PEER;
5130                 break;
5131         case P_RECV_ACK:
5132                 what = RECV_ACKED_BY_PEER;
5133                 break;
5134         case P_SUPERSEDED:
5135                 what = CONFLICT_RESOLVED;
5136                 break;
5137         case P_RETRY_WRITE:
5138                 what = POSTPONE_WRITE;
5139                 break;
5140         default:
5141                 BUG();
5142         }
5143
5144         return validate_req_change_req_state(device, p->block_id, sector,
5145                                              &device->write_requests, __func__,
5146                                              what, false);
5147 }
5148
5149 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5150 {
5151         struct drbd_peer_device *peer_device;
5152         struct drbd_device *device;
5153         struct p_block_ack *p = pi->data;
5154         sector_t sector = be64_to_cpu(p->sector);
5155         int size = be32_to_cpu(p->blksize);
5156         int err;
5157
5158         peer_device = conn_peer_device(connection, pi->vnr);
5159         if (!peer_device)
5160                 return -EIO;
5161         device = peer_device->device;
5162
5163         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5164
5165         if (p->block_id == ID_SYNCER) {
5166                 dec_rs_pending(device);
5167                 drbd_rs_failed_io(device, sector, size);
5168                 return 0;
5169         }
5170
5171         err = validate_req_change_req_state(device, p->block_id, sector,
5172                                             &device->write_requests, __func__,
5173                                             NEG_ACKED, true);
5174         if (err) {
5175                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5176                    The master bio might already be completed, therefore the
5177                    request is no longer in the collision hash. */
5178                 /* In Protocol B we might already have got a P_RECV_ACK
5179                    but then get a P_NEG_ACK afterwards. */
5180                 drbd_set_out_of_sync(device, sector, size);
5181         }
5182         return 0;
5183 }
5184
5185 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5186 {
5187         struct drbd_peer_device *peer_device;
5188         struct drbd_device *device;
5189         struct p_block_ack *p = pi->data;
5190         sector_t sector = be64_to_cpu(p->sector);
5191
5192         peer_device = conn_peer_device(connection, pi->vnr);
5193         if (!peer_device)
5194                 return -EIO;
5195         device = peer_device->device;
5196
5197         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5198
5199         drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5200             (unsigned long long)sector, be32_to_cpu(p->blksize));
5201
5202         return validate_req_change_req_state(device, p->block_id, sector,
5203                                              &device->read_requests, __func__,
5204                                              NEG_ACKED, false);
5205 }
5206
5207 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5208 {
5209         struct drbd_peer_device *peer_device;
5210         struct drbd_device *device;
5211         sector_t sector;
5212         int size;
5213         struct p_block_ack *p = pi->data;
5214
5215         peer_device = conn_peer_device(connection, pi->vnr);
5216         if (!peer_device)
5217                 return -EIO;
5218         device = peer_device->device;
5219
5220         sector = be64_to_cpu(p->sector);
5221         size = be32_to_cpu(p->blksize);
5222
5223         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5224
5225         dec_rs_pending(device);
5226
5227         if (get_ldev_if_state(device, D_FAILED)) {
5228                 drbd_rs_complete_io(device, sector);
5229                 switch (pi->cmd) {
5230                 case P_NEG_RS_DREPLY:
5231                         drbd_rs_failed_io(device, sector, size);
5232                 case P_RS_CANCEL:
5233                         break;
5234                 default:
5235                         BUG();
5236                 }
5237                 put_ldev(device);
5238         }
5239
5240         return 0;
5241 }
5242
5243 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5244 {
5245         struct p_barrier_ack *p = pi->data;
5246         struct drbd_peer_device *peer_device;
5247         int vnr;
5248
5249         tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5250
5251         rcu_read_lock();
5252         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5253                 struct drbd_device *device = peer_device->device;
5254
5255                 if (device->state.conn == C_AHEAD &&
5256                     atomic_read(&device->ap_in_flight) == 0 &&
5257                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5258                         device->start_resync_timer.expires = jiffies + HZ;
5259                         add_timer(&device->start_resync_timer);
5260                 }
5261         }
5262         rcu_read_unlock();
5263
5264         return 0;
5265 }
5266
5267 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5268 {
5269         struct drbd_peer_device *peer_device;
5270         struct drbd_device *device;
5271         struct p_block_ack *p = pi->data;
5272         struct drbd_device_work *dw;
5273         sector_t sector;
5274         int size;
5275
5276         peer_device = conn_peer_device(connection, pi->vnr);
5277         if (!peer_device)
5278                 return -EIO;
5279         device = peer_device->device;
5280
5281         sector = be64_to_cpu(p->sector);
5282         size = be32_to_cpu(p->blksize);
5283
5284         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5285
5286         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5287                 drbd_ov_out_of_sync_found(device, sector, size);
5288         else
5289                 ov_out_of_sync_print(device);
5290
5291         if (!get_ldev(device))
5292                 return 0;
5293
5294         drbd_rs_complete_io(device, sector);
5295         dec_rs_pending(device);
5296
5297         --device->ov_left;
5298
5299         /* let's advance progress step marks only for every other megabyte */
5300         if ((device->ov_left & 0x200) == 0x200)
5301                 drbd_advance_rs_marks(device, device->ov_left);
5302
5303         if (device->ov_left == 0) {
5304                 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5305                 if (dw) {
5306                         dw->w.cb = w_ov_finished;
5307                         dw->device = device;
5308                         drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5309                 } else {
5310                         drbd_err(device, "kmalloc(dw) failed.");
5311                         ov_out_of_sync_print(device);
5312                         drbd_resync_finished(device);
5313                 }
5314         }
5315         put_ldev(device);
5316         return 0;
5317 }
5318
5319 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5320 {
5321         return 0;
5322 }
5323
5324 static int connection_finish_peer_reqs(struct drbd_connection *connection)
5325 {
5326         struct drbd_peer_device *peer_device;
5327         int vnr, not_empty = 0;
5328
5329         do {
5330                 clear_bit(SIGNAL_ASENDER, &connection->flags);
5331                 flush_signals(current);
5332
5333                 rcu_read_lock();
5334                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5335                         struct drbd_device *device = peer_device->device;
5336                         kref_get(&device->kref);
5337                         rcu_read_unlock();
5338                         if (drbd_finish_peer_reqs(device)) {
5339                                 kref_put(&device->kref, drbd_destroy_device);
5340                                 return 1;
5341                         }
5342                         kref_put(&device->kref, drbd_destroy_device);
5343                         rcu_read_lock();
5344                 }
5345                 set_bit(SIGNAL_ASENDER, &connection->flags);
5346
5347                 spin_lock_irq(&connection->resource->req_lock);
5348                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5349                         struct drbd_device *device = peer_device->device;
5350                         not_empty = !list_empty(&device->done_ee);
5351                         if (not_empty)
5352                                 break;
5353                 }
5354                 spin_unlock_irq(&connection->resource->req_lock);
5355                 rcu_read_unlock();
5356         } while (not_empty);
5357
5358         return 0;
5359 }
5360
5361 struct asender_cmd {
5362         size_t pkt_size;
5363         int (*fn)(struct drbd_connection *connection, struct packet_info *);
5364 };
5365
5366 static struct asender_cmd asender_tbl[] = {
5367         [P_PING]            = { 0, got_Ping },
5368         [P_PING_ACK]        = { 0, got_PingAck },
5369         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5370         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5371         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5372         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5373         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5374         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5375         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5376         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5377         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5378         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5379         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5380         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5381         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5382         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5383         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5384 };
5385
5386 int drbd_asender(struct drbd_thread *thi)
5387 {
5388         struct drbd_connection *connection = thi->connection;
5389         struct asender_cmd *cmd = NULL;
5390         struct packet_info pi;
5391         int rv;
5392         void *buf    = connection->meta.rbuf;
5393         int received = 0;
5394         unsigned int header_size = drbd_header_size(connection);
5395         int expect   = header_size;
5396         bool ping_timeout_active = false;
5397         struct net_conf *nc;
5398         int ping_timeo, tcp_cork, ping_int;
5399         struct sched_param param = { .sched_priority = 2 };
5400
5401         rv = sched_setscheduler(current, SCHED_RR, &param);
5402         if (rv < 0)
5403                 drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv);
5404
5405         while (get_t_state(thi) == RUNNING) {
5406                 drbd_thread_current_set_cpu(thi);
5407
5408                 rcu_read_lock();
5409                 nc = rcu_dereference(connection->net_conf);
5410                 ping_timeo = nc->ping_timeo;
5411                 tcp_cork = nc->tcp_cork;
5412                 ping_int = nc->ping_int;
5413                 rcu_read_unlock();
5414
5415                 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5416                         if (drbd_send_ping(connection)) {
5417                                 drbd_err(connection, "drbd_send_ping has failed\n");
5418                                 goto reconnect;
5419                         }
5420                         connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5421                         ping_timeout_active = true;
5422                 }
5423
5424                 /* TODO: conditionally cork; it may hurt latency if we cork without
5425                    much to send */
5426                 if (tcp_cork)
5427                         drbd_tcp_cork(connection->meta.socket);
5428                 if (connection_finish_peer_reqs(connection)) {
5429                         drbd_err(connection, "connection_finish_peer_reqs() failed\n");
5430                         goto reconnect;
5431                 }
5432                 /* but unconditionally uncork unless disabled */
5433                 if (tcp_cork)
5434                         drbd_tcp_uncork(connection->meta.socket);
5435
5436                 /* short circuit, recv_msg would return EINTR anyways. */
5437                 if (signal_pending(current))
5438                         continue;
5439
5440                 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5441                 clear_bit(SIGNAL_ASENDER, &connection->flags);
5442
5443                 flush_signals(current);
5444
5445                 /* Note:
5446                  * -EINTR        (on meta) we got a signal
5447                  * -EAGAIN       (on meta) rcvtimeo expired
5448                  * -ECONNRESET   other side closed the connection
5449                  * -ERESTARTSYS  (on data) we got a signal
5450                  * rv <  0       other than above: unexpected error!
5451                  * rv == expected: full header or command
5452                  * rv <  expected: "woken" by signal during receive
5453                  * rv == 0       : "connection shut down by peer"
5454                  */
5455                 if (likely(rv > 0)) {
5456                         received += rv;
5457                         buf      += rv;
5458                 } else if (rv == 0) {
5459                         if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5460                                 long t;
5461                                 rcu_read_lock();
5462                                 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5463                                 rcu_read_unlock();
5464
5465                                 t = wait_event_timeout(connection->ping_wait,
5466                                                        connection->cstate < C_WF_REPORT_PARAMS,
5467                                                        t);
5468                                 if (t)
5469                                         break;
5470                         }
5471                         drbd_err(connection, "meta connection shut down by peer.\n");
5472                         goto reconnect;
5473                 } else if (rv == -EAGAIN) {
5474                         /* If the data socket received something meanwhile,
5475                          * that is good enough: peer is still alive. */
5476                         if (time_after(connection->last_received,
5477                                 jiffies - connection->meta.socket->sk->sk_rcvtimeo))
5478                                 continue;
5479                         if (ping_timeout_active) {
5480                                 drbd_err(connection, "PingAck did not arrive in time.\n");
5481                                 goto reconnect;
5482                         }
5483                         set_bit(SEND_PING, &connection->flags);
5484                         continue;
5485                 } else if (rv == -EINTR) {
5486                         continue;
5487                 } else {
5488                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5489                         goto reconnect;
5490                 }
5491
5492                 if (received == expect && cmd == NULL) {
5493                         if (decode_header(connection, connection->meta.rbuf, &pi))
5494                                 goto reconnect;
5495                         cmd = &asender_tbl[pi.cmd];
5496                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5497                                 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5498                                          cmdname(pi.cmd), pi.cmd);
5499                                 goto disconnect;
5500                         }
5501                         expect = header_size + cmd->pkt_size;
5502                         if (pi.size != expect - header_size) {
5503                                 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5504                                         pi.cmd, pi.size);
5505                                 goto reconnect;
5506                         }
5507                 }
5508                 if (received == expect) {
5509                         bool err;
5510
5511                         err = cmd->fn(connection, &pi);
5512                         if (err) {
5513                                 drbd_err(connection, "%pf failed\n", cmd->fn);
5514                                 goto reconnect;
5515                         }
5516
5517                         connection->last_received = jiffies;
5518
5519                         if (cmd == &asender_tbl[P_PING_ACK]) {
5520                                 /* restore idle timeout */
5521                                 connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5522                                 ping_timeout_active = false;
5523                         }
5524
5525                         buf      = connection->meta.rbuf;
5526                         received = 0;
5527                         expect   = header_size;
5528                         cmd      = NULL;
5529                 }
5530         }
5531
5532         if (0) {
5533 reconnect:
5534                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5535                 conn_md_sync(connection);
5536         }
5537         if (0) {
5538 disconnect:
5539                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5540         }
5541         clear_bit(SIGNAL_ASENDER, &connection->flags);
5542
5543         drbd_info(connection, "asender terminated\n");
5544
5545         return 0;
5546 }