5d9e5cc3feeb5732f3805127237bdbc8a667b1de
[cascardo/linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_protocol.h"
48 #include "drbd_req.h"
49
50 #include "drbd_vli.h"
51
52 struct packet_info {
53         enum drbd_packet cmd;
54         unsigned int size;
55         unsigned int vnr;
56         void *data;
57 };
58
59 enum finish_epoch {
60         FE_STILL_LIVE,
61         FE_DESTROYED,
62         FE_RECYCLED,
63 };
64
65 static int drbd_do_features(struct drbd_connection *connection);
66 static int drbd_do_auth(struct drbd_connection *connection);
67 static int drbd_disconnected(struct drbd_device *device);
68
69 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
70 static int e_end_block(struct drbd_work *, int);
71
72
73 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
74
75 /*
76  * some helper functions to deal with single linked page lists,
77  * page->private being our "next" pointer.
78  */
79
80 /* If at least n pages are linked at head, get n pages off.
81  * Otherwise, don't modify head, and return NULL.
82  * Locking is the responsibility of the caller.
83  */
84 static struct page *page_chain_del(struct page **head, int n)
85 {
86         struct page *page;
87         struct page *tmp;
88
89         BUG_ON(!n);
90         BUG_ON(!head);
91
92         page = *head;
93
94         if (!page)
95                 return NULL;
96
97         while (page) {
98                 tmp = page_chain_next(page);
99                 if (--n == 0)
100                         break; /* found sufficient pages */
101                 if (tmp == NULL)
102                         /* insufficient pages, don't use any of them. */
103                         return NULL;
104                 page = tmp;
105         }
106
107         /* add end of list marker for the returned list */
108         set_page_private(page, 0);
109         /* actual return value, and adjustment of head */
110         page = *head;
111         *head = tmp;
112         return page;
113 }
114
115 /* may be used outside of locks to find the tail of a (usually short)
116  * "private" page chain, before adding it back to a global chain head
117  * with page_chain_add() under a spinlock. */
118 static struct page *page_chain_tail(struct page *page, int *len)
119 {
120         struct page *tmp;
121         int i = 1;
122         while ((tmp = page_chain_next(page)))
123                 ++i, page = tmp;
124         if (len)
125                 *len = i;
126         return page;
127 }
128
129 static int page_chain_free(struct page *page)
130 {
131         struct page *tmp;
132         int i = 0;
133         page_chain_for_each_safe(page, tmp) {
134                 put_page(page);
135                 ++i;
136         }
137         return i;
138 }
139
140 static void page_chain_add(struct page **head,
141                 struct page *chain_first, struct page *chain_last)
142 {
143 #if 1
144         struct page *tmp;
145         tmp = page_chain_tail(chain_first, NULL);
146         BUG_ON(tmp != chain_last);
147 #endif
148
149         /* add chain to head */
150         set_page_private(chain_last, (unsigned long)*head);
151         *head = chain_first;
152 }
153
154 static struct page *__drbd_alloc_pages(struct drbd_device *device,
155                                        unsigned int number)
156 {
157         struct page *page = NULL;
158         struct page *tmp = NULL;
159         unsigned int i = 0;
160
161         /* Yes, testing drbd_pp_vacant outside the lock is racy.
162          * So what. It saves a spin_lock. */
163         if (drbd_pp_vacant >= number) {
164                 spin_lock(&drbd_pp_lock);
165                 page = page_chain_del(&drbd_pp_pool, number);
166                 if (page)
167                         drbd_pp_vacant -= number;
168                 spin_unlock(&drbd_pp_lock);
169                 if (page)
170                         return page;
171         }
172
173         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
174          * "criss-cross" setup, that might cause write-out on some other DRBD,
175          * which in turn might block on the other node at this very place.  */
176         for (i = 0; i < number; i++) {
177                 tmp = alloc_page(GFP_TRY);
178                 if (!tmp)
179                         break;
180                 set_page_private(tmp, (unsigned long)page);
181                 page = tmp;
182         }
183
184         if (i == number)
185                 return page;
186
187         /* Not enough pages immediately available this time.
188          * No need to jump around here, drbd_alloc_pages will retry this
189          * function "soon". */
190         if (page) {
191                 tmp = page_chain_tail(page, NULL);
192                 spin_lock(&drbd_pp_lock);
193                 page_chain_add(&drbd_pp_pool, page, tmp);
194                 drbd_pp_vacant += i;
195                 spin_unlock(&drbd_pp_lock);
196         }
197         return NULL;
198 }
199
200 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
201                                            struct list_head *to_be_freed)
202 {
203         struct drbd_peer_request *peer_req;
204         struct list_head *le, *tle;
205
206         /* The EEs are always appended to the end of the list. Since
207            they are sent in order over the wire, they have to finish
208            in order. As soon as we see the first not finished we can
209            stop to examine the list... */
210
211         list_for_each_safe(le, tle, &device->net_ee) {
212                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
213                 if (drbd_peer_req_has_active_page(peer_req))
214                         break;
215                 list_move(le, to_be_freed);
216         }
217 }
218
219 static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
220 {
221         LIST_HEAD(reclaimed);
222         struct drbd_peer_request *peer_req, *t;
223
224         spin_lock_irq(&device->resource->req_lock);
225         reclaim_finished_net_peer_reqs(device, &reclaimed);
226         spin_unlock_irq(&device->resource->req_lock);
227
228         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
229                 drbd_free_net_peer_req(device, peer_req);
230 }
231
232 /**
233  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
234  * @device:     DRBD device.
235  * @number:     number of pages requested
236  * @retry:      whether to retry, if not enough pages are available right now
237  *
238  * Tries to allocate number pages, first from our own page pool, then from
239  * the kernel, unless this allocation would exceed the max_buffers setting.
240  * Possibly retry until DRBD frees sufficient pages somewhere else.
241  *
242  * Returns a page chain linked via page->private.
243  */
244 struct page *drbd_alloc_pages(struct drbd_device *device, unsigned int number,
245                               bool retry)
246 {
247         struct page *page = NULL;
248         struct net_conf *nc;
249         DEFINE_WAIT(wait);
250         int mxb;
251
252         /* Yes, we may run up to @number over max_buffers. If we
253          * follow it strictly, the admin will get it wrong anyways. */
254         rcu_read_lock();
255         nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
256         mxb = nc ? nc->max_buffers : 1000000;
257         rcu_read_unlock();
258
259         if (atomic_read(&device->pp_in_use) < mxb)
260                 page = __drbd_alloc_pages(device, number);
261
262         while (page == NULL) {
263                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
264
265                 drbd_kick_lo_and_reclaim_net(device);
266
267                 if (atomic_read(&device->pp_in_use) < mxb) {
268                         page = __drbd_alloc_pages(device, number);
269                         if (page)
270                                 break;
271                 }
272
273                 if (!retry)
274                         break;
275
276                 if (signal_pending(current)) {
277                         drbd_warn(device, "drbd_alloc_pages interrupted!\n");
278                         break;
279                 }
280
281                 schedule();
282         }
283         finish_wait(&drbd_pp_wait, &wait);
284
285         if (page)
286                 atomic_add(number, &device->pp_in_use);
287         return page;
288 }
289
290 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
291  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
292  * Either links the page chain back to the global pool,
293  * or returns all pages to the system. */
294 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
295 {
296         atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
297         int i;
298
299         if (page == NULL)
300                 return;
301
302         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
303                 i = page_chain_free(page);
304         else {
305                 struct page *tmp;
306                 tmp = page_chain_tail(page, &i);
307                 spin_lock(&drbd_pp_lock);
308                 page_chain_add(&drbd_pp_pool, page, tmp);
309                 drbd_pp_vacant += i;
310                 spin_unlock(&drbd_pp_lock);
311         }
312         i = atomic_sub_return(i, a);
313         if (i < 0)
314                 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
315                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
316         wake_up(&drbd_pp_wait);
317 }
318
319 /*
320 You need to hold the req_lock:
321  _drbd_wait_ee_list_empty()
322
323 You must not have the req_lock:
324  drbd_free_peer_req()
325  drbd_alloc_peer_req()
326  drbd_free_peer_reqs()
327  drbd_ee_fix_bhs()
328  drbd_finish_peer_reqs()
329  drbd_clear_done_ee()
330  drbd_wait_ee_list_empty()
331 */
332
333 struct drbd_peer_request *
334 drbd_alloc_peer_req(struct drbd_device *device, u64 id, sector_t sector,
335                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
336 {
337         struct drbd_peer_request *peer_req;
338         struct page *page = NULL;
339         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
340
341         if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
342                 return NULL;
343
344         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
345         if (!peer_req) {
346                 if (!(gfp_mask & __GFP_NOWARN))
347                         drbd_err(device, "%s: allocation failed\n", __func__);
348                 return NULL;
349         }
350
351         if (data_size) {
352                 page = drbd_alloc_pages(device, nr_pages, (gfp_mask & __GFP_WAIT));
353                 if (!page)
354                         goto fail;
355         }
356
357         drbd_clear_interval(&peer_req->i);
358         peer_req->i.size = data_size;
359         peer_req->i.sector = sector;
360         peer_req->i.local = false;
361         peer_req->i.waiting = false;
362
363         peer_req->epoch = NULL;
364         peer_req->w.device = device;
365         peer_req->pages = page;
366         atomic_set(&peer_req->pending_bios, 0);
367         peer_req->flags = 0;
368         /*
369          * The block_id is opaque to the receiver.  It is not endianness
370          * converted, and sent back to the sender unchanged.
371          */
372         peer_req->block_id = id;
373
374         return peer_req;
375
376  fail:
377         mempool_free(peer_req, drbd_ee_mempool);
378         return NULL;
379 }
380
381 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
382                        int is_net)
383 {
384         if (peer_req->flags & EE_HAS_DIGEST)
385                 kfree(peer_req->digest);
386         drbd_free_pages(device, peer_req->pages, is_net);
387         D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
388         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
389         mempool_free(peer_req, drbd_ee_mempool);
390 }
391
392 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
393 {
394         LIST_HEAD(work_list);
395         struct drbd_peer_request *peer_req, *t;
396         int count = 0;
397         int is_net = list == &device->net_ee;
398
399         spin_lock_irq(&device->resource->req_lock);
400         list_splice_init(list, &work_list);
401         spin_unlock_irq(&device->resource->req_lock);
402
403         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
404                 __drbd_free_peer_req(device, peer_req, is_net);
405                 count++;
406         }
407         return count;
408 }
409
410 /*
411  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
412  */
413 static int drbd_finish_peer_reqs(struct drbd_device *device)
414 {
415         LIST_HEAD(work_list);
416         LIST_HEAD(reclaimed);
417         struct drbd_peer_request *peer_req, *t;
418         int err = 0;
419
420         spin_lock_irq(&device->resource->req_lock);
421         reclaim_finished_net_peer_reqs(device, &reclaimed);
422         list_splice_init(&device->done_ee, &work_list);
423         spin_unlock_irq(&device->resource->req_lock);
424
425         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
426                 drbd_free_net_peer_req(device, peer_req);
427
428         /* possible callbacks here:
429          * e_end_block, and e_end_resync_block, e_send_superseded.
430          * all ignore the last argument.
431          */
432         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
433                 int err2;
434
435                 /* list_del not necessary, next/prev members not touched */
436                 err2 = peer_req->w.cb(&peer_req->w, !!err);
437                 if (!err)
438                         err = err2;
439                 drbd_free_peer_req(device, peer_req);
440         }
441         wake_up(&device->ee_wait);
442
443         return err;
444 }
445
446 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
447                                      struct list_head *head)
448 {
449         DEFINE_WAIT(wait);
450
451         /* avoids spin_lock/unlock
452          * and calling prepare_to_wait in the fast path */
453         while (!list_empty(head)) {
454                 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
455                 spin_unlock_irq(&device->resource->req_lock);
456                 io_schedule();
457                 finish_wait(&device->ee_wait, &wait);
458                 spin_lock_irq(&device->resource->req_lock);
459         }
460 }
461
462 static void drbd_wait_ee_list_empty(struct drbd_device *device,
463                                     struct list_head *head)
464 {
465         spin_lock_irq(&device->resource->req_lock);
466         _drbd_wait_ee_list_empty(device, head);
467         spin_unlock_irq(&device->resource->req_lock);
468 }
469
470 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
471 {
472         mm_segment_t oldfs;
473         struct kvec iov = {
474                 .iov_base = buf,
475                 .iov_len = size,
476         };
477         struct msghdr msg = {
478                 .msg_iovlen = 1,
479                 .msg_iov = (struct iovec *)&iov,
480                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
481         };
482         int rv;
483
484         oldfs = get_fs();
485         set_fs(KERNEL_DS);
486         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
487         set_fs(oldfs);
488
489         return rv;
490 }
491
492 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
493 {
494         int rv;
495
496         rv = drbd_recv_short(connection->data.socket, buf, size, 0);
497
498         if (rv < 0) {
499                 if (rv == -ECONNRESET)
500                         drbd_info(connection, "sock was reset by peer\n");
501                 else if (rv != -ERESTARTSYS)
502                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
503         } else if (rv == 0) {
504                 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
505                         long t;
506                         rcu_read_lock();
507                         t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
508                         rcu_read_unlock();
509
510                         t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
511
512                         if (t)
513                                 goto out;
514                 }
515                 drbd_info(connection, "sock was shut down by peer\n");
516         }
517
518         if (rv != size)
519                 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
520
521 out:
522         return rv;
523 }
524
525 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
526 {
527         int err;
528
529         err = drbd_recv(connection, buf, size);
530         if (err != size) {
531                 if (err >= 0)
532                         err = -EIO;
533         } else
534                 err = 0;
535         return err;
536 }
537
538 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
539 {
540         int err;
541
542         err = drbd_recv_all(connection, buf, size);
543         if (err && !signal_pending(current))
544                 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
545         return err;
546 }
547
548 /* quoting tcp(7):
549  *   On individual connections, the socket buffer size must be set prior to the
550  *   listen(2) or connect(2) calls in order to have it take effect.
551  * This is our wrapper to do so.
552  */
553 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
554                 unsigned int rcv)
555 {
556         /* open coded SO_SNDBUF, SO_RCVBUF */
557         if (snd) {
558                 sock->sk->sk_sndbuf = snd;
559                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
560         }
561         if (rcv) {
562                 sock->sk->sk_rcvbuf = rcv;
563                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
564         }
565 }
566
567 static struct socket *drbd_try_connect(struct drbd_connection *connection)
568 {
569         const char *what;
570         struct socket *sock;
571         struct sockaddr_in6 src_in6;
572         struct sockaddr_in6 peer_in6;
573         struct net_conf *nc;
574         int err, peer_addr_len, my_addr_len;
575         int sndbuf_size, rcvbuf_size, connect_int;
576         int disconnect_on_error = 1;
577
578         rcu_read_lock();
579         nc = rcu_dereference(connection->net_conf);
580         if (!nc) {
581                 rcu_read_unlock();
582                 return NULL;
583         }
584         sndbuf_size = nc->sndbuf_size;
585         rcvbuf_size = nc->rcvbuf_size;
586         connect_int = nc->connect_int;
587         rcu_read_unlock();
588
589         my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
590         memcpy(&src_in6, &connection->my_addr, my_addr_len);
591
592         if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
593                 src_in6.sin6_port = 0;
594         else
595                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
596
597         peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
598         memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
599
600         what = "sock_create_kern";
601         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
602                                SOCK_STREAM, IPPROTO_TCP, &sock);
603         if (err < 0) {
604                 sock = NULL;
605                 goto out;
606         }
607
608         sock->sk->sk_rcvtimeo =
609         sock->sk->sk_sndtimeo = connect_int * HZ;
610         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
611
612        /* explicitly bind to the configured IP as source IP
613         *  for the outgoing connections.
614         *  This is needed for multihomed hosts and to be
615         *  able to use lo: interfaces for drbd.
616         * Make sure to use 0 as port number, so linux selects
617         *  a free one dynamically.
618         */
619         what = "bind before connect";
620         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
621         if (err < 0)
622                 goto out;
623
624         /* connect may fail, peer not yet available.
625          * stay C_WF_CONNECTION, don't go Disconnecting! */
626         disconnect_on_error = 0;
627         what = "connect";
628         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
629
630 out:
631         if (err < 0) {
632                 if (sock) {
633                         sock_release(sock);
634                         sock = NULL;
635                 }
636                 switch (-err) {
637                         /* timeout, busy, signal pending */
638                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
639                 case EINTR: case ERESTARTSYS:
640                         /* peer not (yet) available, network problem */
641                 case ECONNREFUSED: case ENETUNREACH:
642                 case EHOSTDOWN:    case EHOSTUNREACH:
643                         disconnect_on_error = 0;
644                         break;
645                 default:
646                         drbd_err(connection, "%s failed, err = %d\n", what, err);
647                 }
648                 if (disconnect_on_error)
649                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
650         }
651
652         return sock;
653 }
654
655 struct accept_wait_data {
656         struct drbd_connection *connection;
657         struct socket *s_listen;
658         struct completion door_bell;
659         void (*original_sk_state_change)(struct sock *sk);
660
661 };
662
663 static void drbd_incoming_connection(struct sock *sk)
664 {
665         struct accept_wait_data *ad = sk->sk_user_data;
666         void (*state_change)(struct sock *sk);
667
668         state_change = ad->original_sk_state_change;
669         if (sk->sk_state == TCP_ESTABLISHED)
670                 complete(&ad->door_bell);
671         state_change(sk);
672 }
673
674 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
675 {
676         int err, sndbuf_size, rcvbuf_size, my_addr_len;
677         struct sockaddr_in6 my_addr;
678         struct socket *s_listen;
679         struct net_conf *nc;
680         const char *what;
681
682         rcu_read_lock();
683         nc = rcu_dereference(connection->net_conf);
684         if (!nc) {
685                 rcu_read_unlock();
686                 return -EIO;
687         }
688         sndbuf_size = nc->sndbuf_size;
689         rcvbuf_size = nc->rcvbuf_size;
690         rcu_read_unlock();
691
692         my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
693         memcpy(&my_addr, &connection->my_addr, my_addr_len);
694
695         what = "sock_create_kern";
696         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
697                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
698         if (err) {
699                 s_listen = NULL;
700                 goto out;
701         }
702
703         s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
704         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
705
706         what = "bind before listen";
707         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
708         if (err < 0)
709                 goto out;
710
711         ad->s_listen = s_listen;
712         write_lock_bh(&s_listen->sk->sk_callback_lock);
713         ad->original_sk_state_change = s_listen->sk->sk_state_change;
714         s_listen->sk->sk_state_change = drbd_incoming_connection;
715         s_listen->sk->sk_user_data = ad;
716         write_unlock_bh(&s_listen->sk->sk_callback_lock);
717
718         what = "listen";
719         err = s_listen->ops->listen(s_listen, 5);
720         if (err < 0)
721                 goto out;
722
723         return 0;
724 out:
725         if (s_listen)
726                 sock_release(s_listen);
727         if (err < 0) {
728                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
729                         drbd_err(connection, "%s failed, err = %d\n", what, err);
730                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
731                 }
732         }
733
734         return -EIO;
735 }
736
737 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
738 {
739         write_lock_bh(&sk->sk_callback_lock);
740         sk->sk_state_change = ad->original_sk_state_change;
741         sk->sk_user_data = NULL;
742         write_unlock_bh(&sk->sk_callback_lock);
743 }
744
745 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
746 {
747         int timeo, connect_int, err = 0;
748         struct socket *s_estab = NULL;
749         struct net_conf *nc;
750
751         rcu_read_lock();
752         nc = rcu_dereference(connection->net_conf);
753         if (!nc) {
754                 rcu_read_unlock();
755                 return NULL;
756         }
757         connect_int = nc->connect_int;
758         rcu_read_unlock();
759
760         timeo = connect_int * HZ;
761         /* 28.5% random jitter */
762         timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
763
764         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
765         if (err <= 0)
766                 return NULL;
767
768         err = kernel_accept(ad->s_listen, &s_estab, 0);
769         if (err < 0) {
770                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
771                         drbd_err(connection, "accept failed, err = %d\n", err);
772                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
773                 }
774         }
775
776         if (s_estab)
777                 unregister_state_change(s_estab->sk, ad);
778
779         return s_estab;
780 }
781
782 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
783
784 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
785                              enum drbd_packet cmd)
786 {
787         if (!conn_prepare_command(connection, sock))
788                 return -EIO;
789         return conn_send_command(connection, sock, cmd, 0, NULL, 0);
790 }
791
792 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
793 {
794         unsigned int header_size = drbd_header_size(connection);
795         struct packet_info pi;
796         int err;
797
798         err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
799         if (err != header_size) {
800                 if (err >= 0)
801                         err = -EIO;
802                 return err;
803         }
804         err = decode_header(connection, connection->data.rbuf, &pi);
805         if (err)
806                 return err;
807         return pi.cmd;
808 }
809
810 /**
811  * drbd_socket_okay() - Free the socket if its connection is not okay
812  * @sock:       pointer to the pointer to the socket.
813  */
814 static int drbd_socket_okay(struct socket **sock)
815 {
816         int rr;
817         char tb[4];
818
819         if (!*sock)
820                 return false;
821
822         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
823
824         if (rr > 0 || rr == -EAGAIN) {
825                 return true;
826         } else {
827                 sock_release(*sock);
828                 *sock = NULL;
829                 return false;
830         }
831 }
832 /* Gets called if a connection is established, or if a new minor gets created
833    in a connection */
834 int drbd_connected(struct drbd_device *device)
835 {
836         int err;
837
838         atomic_set(&device->packet_seq, 0);
839         device->peer_seq = 0;
840
841         device->state_mutex = first_peer_device(device)->connection->agreed_pro_version < 100 ?
842                 &first_peer_device(device)->connection->cstate_mutex :
843                 &device->own_state_mutex;
844
845         err = drbd_send_sync_param(device);
846         if (!err)
847                 err = drbd_send_sizes(device, 0, 0);
848         if (!err)
849                 err = drbd_send_uuids(device);
850         if (!err)
851                 err = drbd_send_current_state(device);
852         clear_bit(USE_DEGR_WFC_T, &device->flags);
853         clear_bit(RESIZE_PENDING, &device->flags);
854         atomic_set(&device->ap_in_flight, 0);
855         mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
856         return err;
857 }
858
859 /*
860  * return values:
861  *   1 yes, we have a valid connection
862  *   0 oops, did not work out, please try again
863  *  -1 peer talks different language,
864  *     no point in trying again, please go standalone.
865  *  -2 We do not have a network config...
866  */
867 static int conn_connect(struct drbd_connection *connection)
868 {
869         struct drbd_socket sock, msock;
870         struct drbd_peer_device *peer_device;
871         struct net_conf *nc;
872         int vnr, timeout, h, ok;
873         bool discard_my_data;
874         enum drbd_state_rv rv;
875         struct accept_wait_data ad = {
876                 .connection = connection,
877                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
878         };
879
880         clear_bit(DISCONNECT_SENT, &connection->flags);
881         if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
882                 return -2;
883
884         mutex_init(&sock.mutex);
885         sock.sbuf = connection->data.sbuf;
886         sock.rbuf = connection->data.rbuf;
887         sock.socket = NULL;
888         mutex_init(&msock.mutex);
889         msock.sbuf = connection->meta.sbuf;
890         msock.rbuf = connection->meta.rbuf;
891         msock.socket = NULL;
892
893         /* Assume that the peer only understands protocol 80 until we know better.  */
894         connection->agreed_pro_version = 80;
895
896         if (prepare_listen_socket(connection, &ad))
897                 return 0;
898
899         do {
900                 struct socket *s;
901
902                 s = drbd_try_connect(connection);
903                 if (s) {
904                         if (!sock.socket) {
905                                 sock.socket = s;
906                                 send_first_packet(connection, &sock, P_INITIAL_DATA);
907                         } else if (!msock.socket) {
908                                 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
909                                 msock.socket = s;
910                                 send_first_packet(connection, &msock, P_INITIAL_META);
911                         } else {
912                                 drbd_err(connection, "Logic error in conn_connect()\n");
913                                 goto out_release_sockets;
914                         }
915                 }
916
917                 if (sock.socket && msock.socket) {
918                         rcu_read_lock();
919                         nc = rcu_dereference(connection->net_conf);
920                         timeout = nc->ping_timeo * HZ / 10;
921                         rcu_read_unlock();
922                         schedule_timeout_interruptible(timeout);
923                         ok = drbd_socket_okay(&sock.socket);
924                         ok = drbd_socket_okay(&msock.socket) && ok;
925                         if (ok)
926                                 break;
927                 }
928
929 retry:
930                 s = drbd_wait_for_connect(connection, &ad);
931                 if (s) {
932                         int fp = receive_first_packet(connection, s);
933                         drbd_socket_okay(&sock.socket);
934                         drbd_socket_okay(&msock.socket);
935                         switch (fp) {
936                         case P_INITIAL_DATA:
937                                 if (sock.socket) {
938                                         drbd_warn(connection, "initial packet S crossed\n");
939                                         sock_release(sock.socket);
940                                         sock.socket = s;
941                                         goto randomize;
942                                 }
943                                 sock.socket = s;
944                                 break;
945                         case P_INITIAL_META:
946                                 set_bit(RESOLVE_CONFLICTS, &connection->flags);
947                                 if (msock.socket) {
948                                         drbd_warn(connection, "initial packet M crossed\n");
949                                         sock_release(msock.socket);
950                                         msock.socket = s;
951                                         goto randomize;
952                                 }
953                                 msock.socket = s;
954                                 break;
955                         default:
956                                 drbd_warn(connection, "Error receiving initial packet\n");
957                                 sock_release(s);
958 randomize:
959                                 if (prandom_u32() & 1)
960                                         goto retry;
961                         }
962                 }
963
964                 if (connection->cstate <= C_DISCONNECTING)
965                         goto out_release_sockets;
966                 if (signal_pending(current)) {
967                         flush_signals(current);
968                         smp_rmb();
969                         if (get_t_state(&connection->receiver) == EXITING)
970                                 goto out_release_sockets;
971                 }
972
973                 ok = drbd_socket_okay(&sock.socket);
974                 ok = drbd_socket_okay(&msock.socket) && ok;
975         } while (!ok);
976
977         if (ad.s_listen)
978                 sock_release(ad.s_listen);
979
980         sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
981         msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
982
983         sock.socket->sk->sk_allocation = GFP_NOIO;
984         msock.socket->sk->sk_allocation = GFP_NOIO;
985
986         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
987         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
988
989         /* NOT YET ...
990          * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
991          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
992          * first set it to the P_CONNECTION_FEATURES timeout,
993          * which we set to 4x the configured ping_timeout. */
994         rcu_read_lock();
995         nc = rcu_dereference(connection->net_conf);
996
997         sock.socket->sk->sk_sndtimeo =
998         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
999
1000         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1001         timeout = nc->timeout * HZ / 10;
1002         discard_my_data = nc->discard_my_data;
1003         rcu_read_unlock();
1004
1005         msock.socket->sk->sk_sndtimeo = timeout;
1006
1007         /* we don't want delays.
1008          * we use TCP_CORK where appropriate, though */
1009         drbd_tcp_nodelay(sock.socket);
1010         drbd_tcp_nodelay(msock.socket);
1011
1012         connection->data.socket = sock.socket;
1013         connection->meta.socket = msock.socket;
1014         connection->last_received = jiffies;
1015
1016         h = drbd_do_features(connection);
1017         if (h <= 0)
1018                 return h;
1019
1020         if (connection->cram_hmac_tfm) {
1021                 /* drbd_request_state(device, NS(conn, WFAuth)); */
1022                 switch (drbd_do_auth(connection)) {
1023                 case -1:
1024                         drbd_err(connection, "Authentication of peer failed\n");
1025                         return -1;
1026                 case 0:
1027                         drbd_err(connection, "Authentication of peer failed, trying again.\n");
1028                         return 0;
1029                 }
1030         }
1031
1032         connection->data.socket->sk->sk_sndtimeo = timeout;
1033         connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1034
1035         if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1036                 return -1;
1037
1038         set_bit(STATE_SENT, &connection->flags);
1039
1040         rcu_read_lock();
1041         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1042                 struct drbd_device *device = peer_device->device;
1043                 kref_get(&device->kref);
1044                 rcu_read_unlock();
1045
1046                 /* Prevent a race between resync-handshake and
1047                  * being promoted to Primary.
1048                  *
1049                  * Grab and release the state mutex, so we know that any current
1050                  * drbd_set_role() is finished, and any incoming drbd_set_role
1051                  * will see the STATE_SENT flag, and wait for it to be cleared.
1052                  */
1053                 mutex_lock(device->state_mutex);
1054                 mutex_unlock(device->state_mutex);
1055
1056                 if (discard_my_data)
1057                         set_bit(DISCARD_MY_DATA, &device->flags);
1058                 else
1059                         clear_bit(DISCARD_MY_DATA, &device->flags);
1060
1061                 drbd_connected(device);
1062                 kref_put(&device->kref, drbd_destroy_device);
1063                 rcu_read_lock();
1064         }
1065         rcu_read_unlock();
1066
1067         rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1068         if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1069                 clear_bit(STATE_SENT, &connection->flags);
1070                 return 0;
1071         }
1072
1073         drbd_thread_start(&connection->asender);
1074
1075         mutex_lock(&connection->resource->conf_update);
1076         /* The discard_my_data flag is a single-shot modifier to the next
1077          * connection attempt, the handshake of which is now well underway.
1078          * No need for rcu style copying of the whole struct
1079          * just to clear a single value. */
1080         connection->net_conf->discard_my_data = 0;
1081         mutex_unlock(&connection->resource->conf_update);
1082
1083         return h;
1084
1085 out_release_sockets:
1086         if (ad.s_listen)
1087                 sock_release(ad.s_listen);
1088         if (sock.socket)
1089                 sock_release(sock.socket);
1090         if (msock.socket)
1091                 sock_release(msock.socket);
1092         return -1;
1093 }
1094
1095 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1096 {
1097         unsigned int header_size = drbd_header_size(connection);
1098
1099         if (header_size == sizeof(struct p_header100) &&
1100             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1101                 struct p_header100 *h = header;
1102                 if (h->pad != 0) {
1103                         drbd_err(connection, "Header padding is not zero\n");
1104                         return -EINVAL;
1105                 }
1106                 pi->vnr = be16_to_cpu(h->volume);
1107                 pi->cmd = be16_to_cpu(h->command);
1108                 pi->size = be32_to_cpu(h->length);
1109         } else if (header_size == sizeof(struct p_header95) &&
1110                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1111                 struct p_header95 *h = header;
1112                 pi->cmd = be16_to_cpu(h->command);
1113                 pi->size = be32_to_cpu(h->length);
1114                 pi->vnr = 0;
1115         } else if (header_size == sizeof(struct p_header80) &&
1116                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1117                 struct p_header80 *h = header;
1118                 pi->cmd = be16_to_cpu(h->command);
1119                 pi->size = be16_to_cpu(h->length);
1120                 pi->vnr = 0;
1121         } else {
1122                 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1123                          be32_to_cpu(*(__be32 *)header),
1124                          connection->agreed_pro_version);
1125                 return -EINVAL;
1126         }
1127         pi->data = header + header_size;
1128         return 0;
1129 }
1130
1131 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1132 {
1133         void *buffer = connection->data.rbuf;
1134         int err;
1135
1136         err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1137         if (err)
1138                 return err;
1139
1140         err = decode_header(connection, buffer, pi);
1141         connection->last_received = jiffies;
1142
1143         return err;
1144 }
1145
1146 static void drbd_flush(struct drbd_connection *connection)
1147 {
1148         int rv;
1149         struct drbd_peer_device *peer_device;
1150         int vnr;
1151
1152         if (connection->write_ordering >= WO_bdev_flush) {
1153                 rcu_read_lock();
1154                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1155                         struct drbd_device *device = peer_device->device;
1156
1157                         if (!get_ldev(device))
1158                                 continue;
1159                         kref_get(&device->kref);
1160                         rcu_read_unlock();
1161
1162                         rv = blkdev_issue_flush(device->ldev->backing_bdev,
1163                                         GFP_NOIO, NULL);
1164                         if (rv) {
1165                                 drbd_info(device, "local disk flush failed with status %d\n", rv);
1166                                 /* would rather check on EOPNOTSUPP, but that is not reliable.
1167                                  * don't try again for ANY return value != 0
1168                                  * if (rv == -EOPNOTSUPP) */
1169                                 drbd_bump_write_ordering(connection, WO_drain_io);
1170                         }
1171                         put_ldev(device);
1172                         kref_put(&device->kref, drbd_destroy_device);
1173
1174                         rcu_read_lock();
1175                         if (rv)
1176                                 break;
1177                 }
1178                 rcu_read_unlock();
1179         }
1180 }
1181
1182 /**
1183  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1184  * @device:     DRBD device.
1185  * @epoch:      Epoch object.
1186  * @ev:         Epoch event.
1187  */
1188 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1189                                                struct drbd_epoch *epoch,
1190                                                enum epoch_event ev)
1191 {
1192         int epoch_size;
1193         struct drbd_epoch *next_epoch;
1194         enum finish_epoch rv = FE_STILL_LIVE;
1195
1196         spin_lock(&connection->epoch_lock);
1197         do {
1198                 next_epoch = NULL;
1199
1200                 epoch_size = atomic_read(&epoch->epoch_size);
1201
1202                 switch (ev & ~EV_CLEANUP) {
1203                 case EV_PUT:
1204                         atomic_dec(&epoch->active);
1205                         break;
1206                 case EV_GOT_BARRIER_NR:
1207                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1208                         break;
1209                 case EV_BECAME_LAST:
1210                         /* nothing to do*/
1211                         break;
1212                 }
1213
1214                 if (epoch_size != 0 &&
1215                     atomic_read(&epoch->active) == 0 &&
1216                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1217                         if (!(ev & EV_CLEANUP)) {
1218                                 spin_unlock(&connection->epoch_lock);
1219                                 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1220                                 spin_lock(&connection->epoch_lock);
1221                         }
1222 #if 0
1223                         /* FIXME: dec unacked on connection, once we have
1224                          * something to count pending connection packets in. */
1225                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1226                                 dec_unacked(epoch->connection);
1227 #endif
1228
1229                         if (connection->current_epoch != epoch) {
1230                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1231                                 list_del(&epoch->list);
1232                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1233                                 connection->epochs--;
1234                                 kfree(epoch);
1235
1236                                 if (rv == FE_STILL_LIVE)
1237                                         rv = FE_DESTROYED;
1238                         } else {
1239                                 epoch->flags = 0;
1240                                 atomic_set(&epoch->epoch_size, 0);
1241                                 /* atomic_set(&epoch->active, 0); is already zero */
1242                                 if (rv == FE_STILL_LIVE)
1243                                         rv = FE_RECYCLED;
1244                         }
1245                 }
1246
1247                 if (!next_epoch)
1248                         break;
1249
1250                 epoch = next_epoch;
1251         } while (1);
1252
1253         spin_unlock(&connection->epoch_lock);
1254
1255         return rv;
1256 }
1257
1258 /**
1259  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1260  * @connection: DRBD connection.
1261  * @wo:         Write ordering method to try.
1262  */
1263 void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo)
1264 {
1265         struct disk_conf *dc;
1266         struct drbd_peer_device *peer_device;
1267         enum write_ordering_e pwo;
1268         int vnr;
1269         static char *write_ordering_str[] = {
1270                 [WO_none] = "none",
1271                 [WO_drain_io] = "drain",
1272                 [WO_bdev_flush] = "flush",
1273         };
1274
1275         pwo = connection->write_ordering;
1276         wo = min(pwo, wo);
1277         rcu_read_lock();
1278         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1279                 struct drbd_device *device = peer_device->device;
1280
1281                 if (!get_ldev_if_state(device, D_ATTACHING))
1282                         continue;
1283                 dc = rcu_dereference(device->ldev->disk_conf);
1284
1285                 if (wo == WO_bdev_flush && !dc->disk_flushes)
1286                         wo = WO_drain_io;
1287                 if (wo == WO_drain_io && !dc->disk_drain)
1288                         wo = WO_none;
1289                 put_ldev(device);
1290         }
1291         rcu_read_unlock();
1292         connection->write_ordering = wo;
1293         if (pwo != connection->write_ordering || wo == WO_bdev_flush)
1294                 drbd_info(connection, "Method to ensure write ordering: %s\n", write_ordering_str[connection->write_ordering]);
1295 }
1296
1297 /**
1298  * drbd_submit_peer_request()
1299  * @device:     DRBD device.
1300  * @peer_req:   peer request
1301  * @rw:         flag field, see bio->bi_rw
1302  *
1303  * May spread the pages to multiple bios,
1304  * depending on bio_add_page restrictions.
1305  *
1306  * Returns 0 if all bios have been submitted,
1307  * -ENOMEM if we could not allocate enough bios,
1308  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1309  *  single page to an empty bio (which should never happen and likely indicates
1310  *  that the lower level IO stack is in some way broken). This has been observed
1311  *  on certain Xen deployments.
1312  */
1313 /* TODO allocate from our own bio_set. */
1314 int drbd_submit_peer_request(struct drbd_device *device,
1315                              struct drbd_peer_request *peer_req,
1316                              const unsigned rw, const int fault_type)
1317 {
1318         struct bio *bios = NULL;
1319         struct bio *bio;
1320         struct page *page = peer_req->pages;
1321         sector_t sector = peer_req->i.sector;
1322         unsigned ds = peer_req->i.size;
1323         unsigned n_bios = 0;
1324         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1325         int err = -ENOMEM;
1326
1327         /* In most cases, we will only need one bio.  But in case the lower
1328          * level restrictions happen to be different at this offset on this
1329          * side than those of the sending peer, we may need to submit the
1330          * request in more than one bio.
1331          *
1332          * Plain bio_alloc is good enough here, this is no DRBD internally
1333          * generated bio, but a bio allocated on behalf of the peer.
1334          */
1335 next_bio:
1336         bio = bio_alloc(GFP_NOIO, nr_pages);
1337         if (!bio) {
1338                 drbd_err(device, "submit_ee: Allocation of a bio failed\n");
1339                 goto fail;
1340         }
1341         /* > peer_req->i.sector, unless this is the first bio */
1342         bio->bi_iter.bi_sector = sector;
1343         bio->bi_bdev = device->ldev->backing_bdev;
1344         bio->bi_rw = rw;
1345         bio->bi_private = peer_req;
1346         bio->bi_end_io = drbd_peer_request_endio;
1347
1348         bio->bi_next = bios;
1349         bios = bio;
1350         ++n_bios;
1351
1352         page_chain_for_each(page) {
1353                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1354                 if (!bio_add_page(bio, page, len, 0)) {
1355                         /* A single page must always be possible!
1356                          * But in case it fails anyways,
1357                          * we deal with it, and complain (below). */
1358                         if (bio->bi_vcnt == 0) {
1359                                 drbd_err(device,
1360                                         "bio_add_page failed for len=%u, "
1361                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1362                                         len, (uint64_t)bio->bi_iter.bi_sector);
1363                                 err = -ENOSPC;
1364                                 goto fail;
1365                         }
1366                         goto next_bio;
1367                 }
1368                 ds -= len;
1369                 sector += len >> 9;
1370                 --nr_pages;
1371         }
1372         D_ASSERT(device, page == NULL);
1373         D_ASSERT(device, ds == 0);
1374
1375         atomic_set(&peer_req->pending_bios, n_bios);
1376         do {
1377                 bio = bios;
1378                 bios = bios->bi_next;
1379                 bio->bi_next = NULL;
1380
1381                 drbd_generic_make_request(device, fault_type, bio);
1382         } while (bios);
1383         return 0;
1384
1385 fail:
1386         while (bios) {
1387                 bio = bios;
1388                 bios = bios->bi_next;
1389                 bio_put(bio);
1390         }
1391         return err;
1392 }
1393
1394 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1395                                              struct drbd_peer_request *peer_req)
1396 {
1397         struct drbd_interval *i = &peer_req->i;
1398
1399         drbd_remove_interval(&device->write_requests, i);
1400         drbd_clear_interval(i);
1401
1402         /* Wake up any processes waiting for this peer request to complete.  */
1403         if (i->waiting)
1404                 wake_up(&device->misc_wait);
1405 }
1406
1407 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1408 {
1409         struct drbd_peer_device *peer_device;
1410         int vnr;
1411
1412         rcu_read_lock();
1413         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1414                 struct drbd_device *device = peer_device->device;
1415
1416                 kref_get(&device->kref);
1417                 rcu_read_unlock();
1418                 drbd_wait_ee_list_empty(device, &device->active_ee);
1419                 kref_put(&device->kref, drbd_destroy_device);
1420                 rcu_read_lock();
1421         }
1422         rcu_read_unlock();
1423 }
1424
1425 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1426 {
1427         int rv;
1428         struct p_barrier *p = pi->data;
1429         struct drbd_epoch *epoch;
1430
1431         /* FIXME these are unacked on connection,
1432          * not a specific (peer)device.
1433          */
1434         connection->current_epoch->barrier_nr = p->barrier;
1435         connection->current_epoch->connection = connection;
1436         rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1437
1438         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1439          * the activity log, which means it would not be resynced in case the
1440          * R_PRIMARY crashes now.
1441          * Therefore we must send the barrier_ack after the barrier request was
1442          * completed. */
1443         switch (connection->write_ordering) {
1444         case WO_none:
1445                 if (rv == FE_RECYCLED)
1446                         return 0;
1447
1448                 /* receiver context, in the writeout path of the other node.
1449                  * avoid potential distributed deadlock */
1450                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1451                 if (epoch)
1452                         break;
1453                 else
1454                         drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1455                         /* Fall through */
1456
1457         case WO_bdev_flush:
1458         case WO_drain_io:
1459                 conn_wait_active_ee_empty(connection);
1460                 drbd_flush(connection);
1461
1462                 if (atomic_read(&connection->current_epoch->epoch_size)) {
1463                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1464                         if (epoch)
1465                                 break;
1466                 }
1467
1468                 return 0;
1469         default:
1470                 drbd_err(connection, "Strangeness in connection->write_ordering %d\n", connection->write_ordering);
1471                 return -EIO;
1472         }
1473
1474         epoch->flags = 0;
1475         atomic_set(&epoch->epoch_size, 0);
1476         atomic_set(&epoch->active, 0);
1477
1478         spin_lock(&connection->epoch_lock);
1479         if (atomic_read(&connection->current_epoch->epoch_size)) {
1480                 list_add(&epoch->list, &connection->current_epoch->list);
1481                 connection->current_epoch = epoch;
1482                 connection->epochs++;
1483         } else {
1484                 /* The current_epoch got recycled while we allocated this one... */
1485                 kfree(epoch);
1486         }
1487         spin_unlock(&connection->epoch_lock);
1488
1489         return 0;
1490 }
1491
1492 /* used from receive_RSDataReply (recv_resync_read)
1493  * and from receive_Data */
1494 static struct drbd_peer_request *
1495 read_in_block(struct drbd_device *device, u64 id, sector_t sector,
1496               int data_size) __must_hold(local)
1497 {
1498         const sector_t capacity = drbd_get_capacity(device->this_bdev);
1499         struct drbd_peer_request *peer_req;
1500         struct page *page;
1501         int dgs, ds, err;
1502         void *dig_in = first_peer_device(device)->connection->int_dig_in;
1503         void *dig_vv = first_peer_device(device)->connection->int_dig_vv;
1504         unsigned long *data;
1505
1506         dgs = 0;
1507         if (first_peer_device(device)->connection->peer_integrity_tfm) {
1508                 dgs = crypto_hash_digestsize(first_peer_device(device)->connection->peer_integrity_tfm);
1509                 /*
1510                  * FIXME: Receive the incoming digest into the receive buffer
1511                  *        here, together with its struct p_data?
1512                  */
1513                 err = drbd_recv_all_warn(first_peer_device(device)->connection, dig_in, dgs);
1514                 if (err)
1515                         return NULL;
1516                 data_size -= dgs;
1517         }
1518
1519         if (!expect(IS_ALIGNED(data_size, 512)))
1520                 return NULL;
1521         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1522                 return NULL;
1523
1524         /* even though we trust out peer,
1525          * we sometimes have to double check. */
1526         if (sector + (data_size>>9) > capacity) {
1527                 drbd_err(device, "request from peer beyond end of local disk: "
1528                         "capacity: %llus < sector: %llus + size: %u\n",
1529                         (unsigned long long)capacity,
1530                         (unsigned long long)sector, data_size);
1531                 return NULL;
1532         }
1533
1534         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1535          * "criss-cross" setup, that might cause write-out on some other DRBD,
1536          * which in turn might block on the other node at this very place.  */
1537         peer_req = drbd_alloc_peer_req(device, id, sector, data_size, GFP_NOIO);
1538         if (!peer_req)
1539                 return NULL;
1540
1541         if (!data_size)
1542                 return peer_req;
1543
1544         ds = data_size;
1545         page = peer_req->pages;
1546         page_chain_for_each(page) {
1547                 unsigned len = min_t(int, ds, PAGE_SIZE);
1548                 data = kmap(page);
1549                 err = drbd_recv_all_warn(first_peer_device(device)->connection, data, len);
1550                 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1551                         drbd_err(device, "Fault injection: Corrupting data on receive\n");
1552                         data[0] = data[0] ^ (unsigned long)-1;
1553                 }
1554                 kunmap(page);
1555                 if (err) {
1556                         drbd_free_peer_req(device, peer_req);
1557                         return NULL;
1558                 }
1559                 ds -= len;
1560         }
1561
1562         if (dgs) {
1563                 drbd_csum_ee(device, first_peer_device(device)->connection->peer_integrity_tfm, peer_req, dig_vv);
1564                 if (memcmp(dig_in, dig_vv, dgs)) {
1565                         drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1566                                 (unsigned long long)sector, data_size);
1567                         drbd_free_peer_req(device, peer_req);
1568                         return NULL;
1569                 }
1570         }
1571         device->recv_cnt += data_size>>9;
1572         return peer_req;
1573 }
1574
1575 /* drbd_drain_block() just takes a data block
1576  * out of the socket input buffer, and discards it.
1577  */
1578 static int drbd_drain_block(struct drbd_device *device, int data_size)
1579 {
1580         struct page *page;
1581         int err = 0;
1582         void *data;
1583
1584         if (!data_size)
1585                 return 0;
1586
1587         page = drbd_alloc_pages(device, 1, 1);
1588
1589         data = kmap(page);
1590         while (data_size) {
1591                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1592
1593                 err = drbd_recv_all_warn(first_peer_device(device)->connection, data, len);
1594                 if (err)
1595                         break;
1596                 data_size -= len;
1597         }
1598         kunmap(page);
1599         drbd_free_pages(device, page, 0);
1600         return err;
1601 }
1602
1603 static int recv_dless_read(struct drbd_device *device, struct drbd_request *req,
1604                            sector_t sector, int data_size)
1605 {
1606         struct bio_vec bvec;
1607         struct bvec_iter iter;
1608         struct bio *bio;
1609         int dgs, err, expect;
1610         void *dig_in = first_peer_device(device)->connection->int_dig_in;
1611         void *dig_vv = first_peer_device(device)->connection->int_dig_vv;
1612
1613         dgs = 0;
1614         if (first_peer_device(device)->connection->peer_integrity_tfm) {
1615                 dgs = crypto_hash_digestsize(first_peer_device(device)->connection->peer_integrity_tfm);
1616                 err = drbd_recv_all_warn(first_peer_device(device)->connection, dig_in, dgs);
1617                 if (err)
1618                         return err;
1619                 data_size -= dgs;
1620         }
1621
1622         /* optimistically update recv_cnt.  if receiving fails below,
1623          * we disconnect anyways, and counters will be reset. */
1624         device->recv_cnt += data_size>>9;
1625
1626         bio = req->master_bio;
1627         D_ASSERT(device, sector == bio->bi_iter.bi_sector);
1628
1629         bio_for_each_segment(bvec, bio, iter) {
1630                 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1631                 expect = min_t(int, data_size, bvec.bv_len);
1632                 err = drbd_recv_all_warn(first_peer_device(device)->connection, mapped, expect);
1633                 kunmap(bvec.bv_page);
1634                 if (err)
1635                         return err;
1636                 data_size -= expect;
1637         }
1638
1639         if (dgs) {
1640                 drbd_csum_bio(device, first_peer_device(device)->connection->peer_integrity_tfm, bio, dig_vv);
1641                 if (memcmp(dig_in, dig_vv, dgs)) {
1642                         drbd_err(device, "Digest integrity check FAILED. Broken NICs?\n");
1643                         return -EINVAL;
1644                 }
1645         }
1646
1647         D_ASSERT(device, data_size == 0);
1648         return 0;
1649 }
1650
1651 /*
1652  * e_end_resync_block() is called in asender context via
1653  * drbd_finish_peer_reqs().
1654  */
1655 static int e_end_resync_block(struct drbd_work *w, int unused)
1656 {
1657         struct drbd_peer_request *peer_req =
1658                 container_of(w, struct drbd_peer_request, w);
1659         struct drbd_device *device = w->device;
1660         sector_t sector = peer_req->i.sector;
1661         int err;
1662
1663         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1664
1665         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1666                 drbd_set_in_sync(device, sector, peer_req->i.size);
1667                 err = drbd_send_ack(device, P_RS_WRITE_ACK, peer_req);
1668         } else {
1669                 /* Record failure to sync */
1670                 drbd_rs_failed_io(device, sector, peer_req->i.size);
1671
1672                 err  = drbd_send_ack(device, P_NEG_ACK, peer_req);
1673         }
1674         dec_unacked(device);
1675
1676         return err;
1677 }
1678
1679 static int recv_resync_read(struct drbd_device *device, sector_t sector, int data_size) __releases(local)
1680 {
1681         struct drbd_peer_request *peer_req;
1682
1683         peer_req = read_in_block(device, ID_SYNCER, sector, data_size);
1684         if (!peer_req)
1685                 goto fail;
1686
1687         dec_rs_pending(device);
1688
1689         inc_unacked(device);
1690         /* corresponding dec_unacked() in e_end_resync_block()
1691          * respective _drbd_clear_done_ee */
1692
1693         peer_req->w.cb = e_end_resync_block;
1694
1695         spin_lock_irq(&device->resource->req_lock);
1696         list_add(&peer_req->w.list, &device->sync_ee);
1697         spin_unlock_irq(&device->resource->req_lock);
1698
1699         atomic_add(data_size >> 9, &device->rs_sect_ev);
1700         if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1701                 return 0;
1702
1703         /* don't care for the reason here */
1704         drbd_err(device, "submit failed, triggering re-connect\n");
1705         spin_lock_irq(&device->resource->req_lock);
1706         list_del(&peer_req->w.list);
1707         spin_unlock_irq(&device->resource->req_lock);
1708
1709         drbd_free_peer_req(device, peer_req);
1710 fail:
1711         put_ldev(device);
1712         return -EIO;
1713 }
1714
1715 static struct drbd_request *
1716 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1717              sector_t sector, bool missing_ok, const char *func)
1718 {
1719         struct drbd_request *req;
1720
1721         /* Request object according to our peer */
1722         req = (struct drbd_request *)(unsigned long)id;
1723         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1724                 return req;
1725         if (!missing_ok) {
1726                 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1727                         (unsigned long)id, (unsigned long long)sector);
1728         }
1729         return NULL;
1730 }
1731
1732 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1733 {
1734         struct drbd_device *device;
1735         struct drbd_request *req;
1736         sector_t sector;
1737         int err;
1738         struct p_data *p = pi->data;
1739
1740         device = vnr_to_device(connection, pi->vnr);
1741         if (!device)
1742                 return -EIO;
1743
1744         sector = be64_to_cpu(p->sector);
1745
1746         spin_lock_irq(&device->resource->req_lock);
1747         req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1748         spin_unlock_irq(&device->resource->req_lock);
1749         if (unlikely(!req))
1750                 return -EIO;
1751
1752         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1753          * special casing it there for the various failure cases.
1754          * still no race with drbd_fail_pending_reads */
1755         err = recv_dless_read(device, req, sector, pi->size);
1756         if (!err)
1757                 req_mod(req, DATA_RECEIVED);
1758         /* else: nothing. handled from drbd_disconnect...
1759          * I don't think we may complete this just yet
1760          * in case we are "on-disconnect: freeze" */
1761
1762         return err;
1763 }
1764
1765 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1766 {
1767         struct drbd_device *device;
1768         sector_t sector;
1769         int err;
1770         struct p_data *p = pi->data;
1771
1772         device = vnr_to_device(connection, pi->vnr);
1773         if (!device)
1774                 return -EIO;
1775
1776         sector = be64_to_cpu(p->sector);
1777         D_ASSERT(device, p->block_id == ID_SYNCER);
1778
1779         if (get_ldev(device)) {
1780                 /* data is submitted to disk within recv_resync_read.
1781                  * corresponding put_ldev done below on error,
1782                  * or in drbd_peer_request_endio. */
1783                 err = recv_resync_read(device, sector, pi->size);
1784         } else {
1785                 if (__ratelimit(&drbd_ratelimit_state))
1786                         drbd_err(device, "Can not write resync data to local disk.\n");
1787
1788                 err = drbd_drain_block(device, pi->size);
1789
1790                 drbd_send_ack_dp(device, P_NEG_ACK, p, pi->size);
1791         }
1792
1793         atomic_add(pi->size >> 9, &device->rs_sect_in);
1794
1795         return err;
1796 }
1797
1798 static void restart_conflicting_writes(struct drbd_device *device,
1799                                        sector_t sector, int size)
1800 {
1801         struct drbd_interval *i;
1802         struct drbd_request *req;
1803
1804         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1805                 if (!i->local)
1806                         continue;
1807                 req = container_of(i, struct drbd_request, i);
1808                 if (req->rq_state & RQ_LOCAL_PENDING ||
1809                     !(req->rq_state & RQ_POSTPONED))
1810                         continue;
1811                 /* as it is RQ_POSTPONED, this will cause it to
1812                  * be queued on the retry workqueue. */
1813                 __req_mod(req, CONFLICT_RESOLVED, NULL);
1814         }
1815 }
1816
1817 /*
1818  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1819  */
1820 static int e_end_block(struct drbd_work *w, int cancel)
1821 {
1822         struct drbd_peer_request *peer_req =
1823                 container_of(w, struct drbd_peer_request, w);
1824         struct drbd_device *device = w->device;
1825         sector_t sector = peer_req->i.sector;
1826         int err = 0, pcmd;
1827
1828         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1829                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1830                         pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1831                                 device->state.conn <= C_PAUSED_SYNC_T &&
1832                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1833                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1834                         err = drbd_send_ack(device, pcmd, peer_req);
1835                         if (pcmd == P_RS_WRITE_ACK)
1836                                 drbd_set_in_sync(device, sector, peer_req->i.size);
1837                 } else {
1838                         err = drbd_send_ack(device, P_NEG_ACK, peer_req);
1839                         /* we expect it to be marked out of sync anyways...
1840                          * maybe assert this?  */
1841                 }
1842                 dec_unacked(device);
1843         }
1844         /* we delete from the conflict detection hash _after_ we sent out the
1845          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1846         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1847                 spin_lock_irq(&device->resource->req_lock);
1848                 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1849                 drbd_remove_epoch_entry_interval(device, peer_req);
1850                 if (peer_req->flags & EE_RESTART_REQUESTS)
1851                         restart_conflicting_writes(device, sector, peer_req->i.size);
1852                 spin_unlock_irq(&device->resource->req_lock);
1853         } else
1854                 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1855
1856         drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1857
1858         return err;
1859 }
1860
1861 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1862 {
1863         struct drbd_device *device = w->device;
1864         struct drbd_peer_request *peer_req =
1865                 container_of(w, struct drbd_peer_request, w);
1866         int err;
1867
1868         err = drbd_send_ack(device, ack, peer_req);
1869         dec_unacked(device);
1870
1871         return err;
1872 }
1873
1874 static int e_send_superseded(struct drbd_work *w, int unused)
1875 {
1876         return e_send_ack(w, P_SUPERSEDED);
1877 }
1878
1879 static int e_send_retry_write(struct drbd_work *w, int unused)
1880 {
1881         struct drbd_connection *connection = first_peer_device(w->device)->connection;
1882
1883         return e_send_ack(w, connection->agreed_pro_version >= 100 ?
1884                              P_RETRY_WRITE : P_SUPERSEDED);
1885 }
1886
1887 static bool seq_greater(u32 a, u32 b)
1888 {
1889         /*
1890          * We assume 32-bit wrap-around here.
1891          * For 24-bit wrap-around, we would have to shift:
1892          *  a <<= 8; b <<= 8;
1893          */
1894         return (s32)a - (s32)b > 0;
1895 }
1896
1897 static u32 seq_max(u32 a, u32 b)
1898 {
1899         return seq_greater(a, b) ? a : b;
1900 }
1901
1902 static void update_peer_seq(struct drbd_device *device, unsigned int peer_seq)
1903 {
1904         unsigned int newest_peer_seq;
1905
1906         if (test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags)) {
1907                 spin_lock(&device->peer_seq_lock);
1908                 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
1909                 device->peer_seq = newest_peer_seq;
1910                 spin_unlock(&device->peer_seq_lock);
1911                 /* wake up only if we actually changed device->peer_seq */
1912                 if (peer_seq == newest_peer_seq)
1913                         wake_up(&device->seq_wait);
1914         }
1915 }
1916
1917 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1918 {
1919         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1920 }
1921
1922 /* maybe change sync_ee into interval trees as well? */
1923 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
1924 {
1925         struct drbd_peer_request *rs_req;
1926         bool rv = 0;
1927
1928         spin_lock_irq(&device->resource->req_lock);
1929         list_for_each_entry(rs_req, &device->sync_ee, w.list) {
1930                 if (overlaps(peer_req->i.sector, peer_req->i.size,
1931                              rs_req->i.sector, rs_req->i.size)) {
1932                         rv = 1;
1933                         break;
1934                 }
1935         }
1936         spin_unlock_irq(&device->resource->req_lock);
1937
1938         return rv;
1939 }
1940
1941 /* Called from receive_Data.
1942  * Synchronize packets on sock with packets on msock.
1943  *
1944  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1945  * packet traveling on msock, they are still processed in the order they have
1946  * been sent.
1947  *
1948  * Note: we don't care for Ack packets overtaking P_DATA packets.
1949  *
1950  * In case packet_seq is larger than device->peer_seq number, there are
1951  * outstanding packets on the msock. We wait for them to arrive.
1952  * In case we are the logically next packet, we update device->peer_seq
1953  * ourselves. Correctly handles 32bit wrap around.
1954  *
1955  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1956  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1957  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1958  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1959  *
1960  * returns 0 if we may process the packet,
1961  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1962 static int wait_for_and_update_peer_seq(struct drbd_device *device, const u32 peer_seq)
1963 {
1964         DEFINE_WAIT(wait);
1965         long timeout;
1966         int ret = 0, tp;
1967
1968         if (!test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags))
1969                 return 0;
1970
1971         spin_lock(&device->peer_seq_lock);
1972         for (;;) {
1973                 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
1974                         device->peer_seq = seq_max(device->peer_seq, peer_seq);
1975                         break;
1976                 }
1977
1978                 if (signal_pending(current)) {
1979                         ret = -ERESTARTSYS;
1980                         break;
1981                 }
1982
1983                 rcu_read_lock();
1984                 tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
1985                 rcu_read_unlock();
1986
1987                 if (!tp)
1988                         break;
1989
1990                 /* Only need to wait if two_primaries is enabled */
1991                 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
1992                 spin_unlock(&device->peer_seq_lock);
1993                 rcu_read_lock();
1994                 timeout = rcu_dereference(first_peer_device(device)->connection->net_conf)->ping_timeo*HZ/10;
1995                 rcu_read_unlock();
1996                 timeout = schedule_timeout(timeout);
1997                 spin_lock(&device->peer_seq_lock);
1998                 if (!timeout) {
1999                         ret = -ETIMEDOUT;
2000                         drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2001                         break;
2002                 }
2003         }
2004         spin_unlock(&device->peer_seq_lock);
2005         finish_wait(&device->seq_wait, &wait);
2006         return ret;
2007 }
2008
2009 /* see also bio_flags_to_wire()
2010  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2011  * flags and back. We may replicate to other kernel versions. */
2012 static unsigned long wire_flags_to_bio(struct drbd_device *device, u32 dpf)
2013 {
2014         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2015                 (dpf & DP_FUA ? REQ_FUA : 0) |
2016                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2017                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2018 }
2019
2020 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2021                                     unsigned int size)
2022 {
2023         struct drbd_interval *i;
2024
2025     repeat:
2026         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2027                 struct drbd_request *req;
2028                 struct bio_and_error m;
2029
2030                 if (!i->local)
2031                         continue;
2032                 req = container_of(i, struct drbd_request, i);
2033                 if (!(req->rq_state & RQ_POSTPONED))
2034                         continue;
2035                 req->rq_state &= ~RQ_POSTPONED;
2036                 __req_mod(req, NEG_ACKED, &m);
2037                 spin_unlock_irq(&device->resource->req_lock);
2038                 if (m.bio)
2039                         complete_master_bio(device, &m);
2040                 spin_lock_irq(&device->resource->req_lock);
2041                 goto repeat;
2042         }
2043 }
2044
2045 static int handle_write_conflicts(struct drbd_device *device,
2046                                   struct drbd_peer_request *peer_req)
2047 {
2048         struct drbd_connection *connection = first_peer_device(device)->connection;
2049         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2050         sector_t sector = peer_req->i.sector;
2051         const unsigned int size = peer_req->i.size;
2052         struct drbd_interval *i;
2053         bool equal;
2054         int err;
2055
2056         /*
2057          * Inserting the peer request into the write_requests tree will prevent
2058          * new conflicting local requests from being added.
2059          */
2060         drbd_insert_interval(&device->write_requests, &peer_req->i);
2061
2062     repeat:
2063         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2064                 if (i == &peer_req->i)
2065                         continue;
2066
2067                 if (!i->local) {
2068                         /*
2069                          * Our peer has sent a conflicting remote request; this
2070                          * should not happen in a two-node setup.  Wait for the
2071                          * earlier peer request to complete.
2072                          */
2073                         err = drbd_wait_misc(device, i);
2074                         if (err)
2075                                 goto out;
2076                         goto repeat;
2077                 }
2078
2079                 equal = i->sector == sector && i->size == size;
2080                 if (resolve_conflicts) {
2081                         /*
2082                          * If the peer request is fully contained within the
2083                          * overlapping request, it can be considered overwritten
2084                          * and thus superseded; otherwise, it will be retried
2085                          * once all overlapping requests have completed.
2086                          */
2087                         bool superseded = i->sector <= sector && i->sector +
2088                                        (i->size >> 9) >= sector + (size >> 9);
2089
2090                         if (!equal)
2091                                 drbd_alert(device, "Concurrent writes detected: "
2092                                                "local=%llus +%u, remote=%llus +%u, "
2093                                                "assuming %s came first\n",
2094                                           (unsigned long long)i->sector, i->size,
2095                                           (unsigned long long)sector, size,
2096                                           superseded ? "local" : "remote");
2097
2098                         inc_unacked(device);
2099                         peer_req->w.cb = superseded ? e_send_superseded :
2100                                                    e_send_retry_write;
2101                         list_add_tail(&peer_req->w.list, &device->done_ee);
2102                         wake_asender(first_peer_device(device)->connection);
2103
2104                         err = -ENOENT;
2105                         goto out;
2106                 } else {
2107                         struct drbd_request *req =
2108                                 container_of(i, struct drbd_request, i);
2109
2110                         if (!equal)
2111                                 drbd_alert(device, "Concurrent writes detected: "
2112                                                "local=%llus +%u, remote=%llus +%u\n",
2113                                           (unsigned long long)i->sector, i->size,
2114                                           (unsigned long long)sector, size);
2115
2116                         if (req->rq_state & RQ_LOCAL_PENDING ||
2117                             !(req->rq_state & RQ_POSTPONED)) {
2118                                 /*
2119                                  * Wait for the node with the discard flag to
2120                                  * decide if this request has been superseded
2121                                  * or needs to be retried.
2122                                  * Requests that have been superseded will
2123                                  * disappear from the write_requests tree.
2124                                  *
2125                                  * In addition, wait for the conflicting
2126                                  * request to finish locally before submitting
2127                                  * the conflicting peer request.
2128                                  */
2129                                 err = drbd_wait_misc(device, &req->i);
2130                                 if (err) {
2131                                         _conn_request_state(first_peer_device(device)->connection,
2132                                                             NS(conn, C_TIMEOUT),
2133                                                             CS_HARD);
2134                                         fail_postponed_requests(device, sector, size);
2135                                         goto out;
2136                                 }
2137                                 goto repeat;
2138                         }
2139                         /*
2140                          * Remember to restart the conflicting requests after
2141                          * the new peer request has completed.
2142                          */
2143                         peer_req->flags |= EE_RESTART_REQUESTS;
2144                 }
2145         }
2146         err = 0;
2147
2148     out:
2149         if (err)
2150                 drbd_remove_epoch_entry_interval(device, peer_req);
2151         return err;
2152 }
2153
2154 /* mirrored write */
2155 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2156 {
2157         struct drbd_device *device;
2158         sector_t sector;
2159         struct drbd_peer_request *peer_req;
2160         struct p_data *p = pi->data;
2161         u32 peer_seq = be32_to_cpu(p->seq_num);
2162         int rw = WRITE;
2163         u32 dp_flags;
2164         int err, tp;
2165
2166         device = vnr_to_device(connection, pi->vnr);
2167         if (!device)
2168                 return -EIO;
2169
2170         if (!get_ldev(device)) {
2171                 int err2;
2172
2173                 err = wait_for_and_update_peer_seq(device, peer_seq);
2174                 drbd_send_ack_dp(device, P_NEG_ACK, p, pi->size);
2175                 atomic_inc(&connection->current_epoch->epoch_size);
2176                 err2 = drbd_drain_block(device, pi->size);
2177                 if (!err)
2178                         err = err2;
2179                 return err;
2180         }
2181
2182         /*
2183          * Corresponding put_ldev done either below (on various errors), or in
2184          * drbd_peer_request_endio, if we successfully submit the data at the
2185          * end of this function.
2186          */
2187
2188         sector = be64_to_cpu(p->sector);
2189         peer_req = read_in_block(device, p->block_id, sector, pi->size);
2190         if (!peer_req) {
2191                 put_ldev(device);
2192                 return -EIO;
2193         }
2194
2195         peer_req->w.cb = e_end_block;
2196
2197         dp_flags = be32_to_cpu(p->dp_flags);
2198         rw |= wire_flags_to_bio(device, dp_flags);
2199         if (peer_req->pages == NULL) {
2200                 D_ASSERT(device, peer_req->i.size == 0);
2201                 D_ASSERT(device, dp_flags & DP_FLUSH);
2202         }
2203
2204         if (dp_flags & DP_MAY_SET_IN_SYNC)
2205                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2206
2207         spin_lock(&connection->epoch_lock);
2208         peer_req->epoch = connection->current_epoch;
2209         atomic_inc(&peer_req->epoch->epoch_size);
2210         atomic_inc(&peer_req->epoch->active);
2211         spin_unlock(&connection->epoch_lock);
2212
2213         rcu_read_lock();
2214         tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
2215         rcu_read_unlock();
2216         if (tp) {
2217                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2218                 err = wait_for_and_update_peer_seq(device, peer_seq);
2219                 if (err)
2220                         goto out_interrupted;
2221                 spin_lock_irq(&device->resource->req_lock);
2222                 err = handle_write_conflicts(device, peer_req);
2223                 if (err) {
2224                         spin_unlock_irq(&device->resource->req_lock);
2225                         if (err == -ENOENT) {
2226                                 put_ldev(device);
2227                                 return 0;
2228                         }
2229                         goto out_interrupted;
2230                 }
2231         } else {
2232                 update_peer_seq(device, peer_seq);
2233                 spin_lock_irq(&device->resource->req_lock);
2234         }
2235         list_add(&peer_req->w.list, &device->active_ee);
2236         spin_unlock_irq(&device->resource->req_lock);
2237
2238         if (device->state.conn == C_SYNC_TARGET)
2239                 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2240
2241         if (first_peer_device(device)->connection->agreed_pro_version < 100) {
2242                 rcu_read_lock();
2243                 switch (rcu_dereference(first_peer_device(device)->connection->net_conf)->wire_protocol) {
2244                 case DRBD_PROT_C:
2245                         dp_flags |= DP_SEND_WRITE_ACK;
2246                         break;
2247                 case DRBD_PROT_B:
2248                         dp_flags |= DP_SEND_RECEIVE_ACK;
2249                         break;
2250                 }
2251                 rcu_read_unlock();
2252         }
2253
2254         if (dp_flags & DP_SEND_WRITE_ACK) {
2255                 peer_req->flags |= EE_SEND_WRITE_ACK;
2256                 inc_unacked(device);
2257                 /* corresponding dec_unacked() in e_end_block()
2258                  * respective _drbd_clear_done_ee */
2259         }
2260
2261         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2262                 /* I really don't like it that the receiver thread
2263                  * sends on the msock, but anyways */
2264                 drbd_send_ack(device, P_RECV_ACK, peer_req);
2265         }
2266
2267         if (device->state.pdsk < D_INCONSISTENT) {
2268                 /* In case we have the only disk of the cluster, */
2269                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2270                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2271                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2272                 drbd_al_begin_io(device, &peer_req->i, true);
2273         }
2274
2275         err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2276         if (!err)
2277                 return 0;
2278
2279         /* don't care for the reason here */
2280         drbd_err(device, "submit failed, triggering re-connect\n");
2281         spin_lock_irq(&device->resource->req_lock);
2282         list_del(&peer_req->w.list);
2283         drbd_remove_epoch_entry_interval(device, peer_req);
2284         spin_unlock_irq(&device->resource->req_lock);
2285         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2286                 drbd_al_complete_io(device, &peer_req->i);
2287
2288 out_interrupted:
2289         drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2290         put_ldev(device);
2291         drbd_free_peer_req(device, peer_req);
2292         return err;
2293 }
2294
2295 /* We may throttle resync, if the lower device seems to be busy,
2296  * and current sync rate is above c_min_rate.
2297  *
2298  * To decide whether or not the lower device is busy, we use a scheme similar
2299  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2300  * (more than 64 sectors) of activity we cannot account for with our own resync
2301  * activity, it obviously is "busy".
2302  *
2303  * The current sync rate used here uses only the most recent two step marks,
2304  * to have a short time average so we can react faster.
2305  */
2306 int drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector)
2307 {
2308         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2309         unsigned long db, dt, dbdt;
2310         struct lc_element *tmp;
2311         int curr_events;
2312         int throttle = 0;
2313         unsigned int c_min_rate;
2314
2315         rcu_read_lock();
2316         c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2317         rcu_read_unlock();
2318
2319         /* feature disabled? */
2320         if (c_min_rate == 0)
2321                 return 0;
2322
2323         spin_lock_irq(&device->al_lock);
2324         tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2325         if (tmp) {
2326                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2327                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2328                         spin_unlock_irq(&device->al_lock);
2329                         return 0;
2330                 }
2331                 /* Do not slow down if app IO is already waiting for this extent */
2332         }
2333         spin_unlock_irq(&device->al_lock);
2334
2335         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2336                       (int)part_stat_read(&disk->part0, sectors[1]) -
2337                         atomic_read(&device->rs_sect_ev);
2338
2339         if (!device->rs_last_events || curr_events - device->rs_last_events > 64) {
2340                 unsigned long rs_left;
2341                 int i;
2342
2343                 device->rs_last_events = curr_events;
2344
2345                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2346                  * approx. */
2347                 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2348
2349                 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2350                         rs_left = device->ov_left;
2351                 else
2352                         rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2353
2354                 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2355                 if (!dt)
2356                         dt++;
2357                 db = device->rs_mark_left[i] - rs_left;
2358                 dbdt = Bit2KB(db/dt);
2359
2360                 if (dbdt > c_min_rate)
2361                         throttle = 1;
2362         }
2363         return throttle;
2364 }
2365
2366
2367 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2368 {
2369         struct drbd_device *device;
2370         sector_t sector;
2371         sector_t capacity;
2372         struct drbd_peer_request *peer_req;
2373         struct digest_info *di = NULL;
2374         int size, verb;
2375         unsigned int fault_type;
2376         struct p_block_req *p = pi->data;
2377
2378         device = vnr_to_device(connection, pi->vnr);
2379         if (!device)
2380                 return -EIO;
2381         capacity = drbd_get_capacity(device->this_bdev);
2382
2383         sector = be64_to_cpu(p->sector);
2384         size   = be32_to_cpu(p->blksize);
2385
2386         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2387                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2388                                 (unsigned long long)sector, size);
2389                 return -EINVAL;
2390         }
2391         if (sector + (size>>9) > capacity) {
2392                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2393                                 (unsigned long long)sector, size);
2394                 return -EINVAL;
2395         }
2396
2397         if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2398                 verb = 1;
2399                 switch (pi->cmd) {
2400                 case P_DATA_REQUEST:
2401                         drbd_send_ack_rp(device, P_NEG_DREPLY, p);
2402                         break;
2403                 case P_RS_DATA_REQUEST:
2404                 case P_CSUM_RS_REQUEST:
2405                 case P_OV_REQUEST:
2406                         drbd_send_ack_rp(device, P_NEG_RS_DREPLY , p);
2407                         break;
2408                 case P_OV_REPLY:
2409                         verb = 0;
2410                         dec_rs_pending(device);
2411                         drbd_send_ack_ex(device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2412                         break;
2413                 default:
2414                         BUG();
2415                 }
2416                 if (verb && __ratelimit(&drbd_ratelimit_state))
2417                         drbd_err(device, "Can not satisfy peer's read request, "
2418                             "no local data.\n");
2419
2420                 /* drain possibly payload */
2421                 return drbd_drain_block(device, pi->size);
2422         }
2423
2424         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2425          * "criss-cross" setup, that might cause write-out on some other DRBD,
2426          * which in turn might block on the other node at this very place.  */
2427         peer_req = drbd_alloc_peer_req(device, p->block_id, sector, size, GFP_NOIO);
2428         if (!peer_req) {
2429                 put_ldev(device);
2430                 return -ENOMEM;
2431         }
2432
2433         switch (pi->cmd) {
2434         case P_DATA_REQUEST:
2435                 peer_req->w.cb = w_e_end_data_req;
2436                 fault_type = DRBD_FAULT_DT_RD;
2437                 /* application IO, don't drbd_rs_begin_io */
2438                 goto submit;
2439
2440         case P_RS_DATA_REQUEST:
2441                 peer_req->w.cb = w_e_end_rsdata_req;
2442                 fault_type = DRBD_FAULT_RS_RD;
2443                 /* used in the sector offset progress display */
2444                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2445                 break;
2446
2447         case P_OV_REPLY:
2448         case P_CSUM_RS_REQUEST:
2449                 fault_type = DRBD_FAULT_RS_RD;
2450                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2451                 if (!di)
2452                         goto out_free_e;
2453
2454                 di->digest_size = pi->size;
2455                 di->digest = (((char *)di)+sizeof(struct digest_info));
2456
2457                 peer_req->digest = di;
2458                 peer_req->flags |= EE_HAS_DIGEST;
2459
2460                 if (drbd_recv_all(first_peer_device(device)->connection, di->digest, pi->size))
2461                         goto out_free_e;
2462
2463                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2464                         D_ASSERT(device, first_peer_device(device)->connection->agreed_pro_version >= 89);
2465                         peer_req->w.cb = w_e_end_csum_rs_req;
2466                         /* used in the sector offset progress display */
2467                         device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2468                 } else if (pi->cmd == P_OV_REPLY) {
2469                         /* track progress, we may need to throttle */
2470                         atomic_add(size >> 9, &device->rs_sect_in);
2471                         peer_req->w.cb = w_e_end_ov_reply;
2472                         dec_rs_pending(device);
2473                         /* drbd_rs_begin_io done when we sent this request,
2474                          * but accounting still needs to be done. */
2475                         goto submit_for_resync;
2476                 }
2477                 break;
2478
2479         case P_OV_REQUEST:
2480                 if (device->ov_start_sector == ~(sector_t)0 &&
2481                     first_peer_device(device)->connection->agreed_pro_version >= 90) {
2482                         unsigned long now = jiffies;
2483                         int i;
2484                         device->ov_start_sector = sector;
2485                         device->ov_position = sector;
2486                         device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2487                         device->rs_total = device->ov_left;
2488                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2489                                 device->rs_mark_left[i] = device->ov_left;
2490                                 device->rs_mark_time[i] = now;
2491                         }
2492                         drbd_info(device, "Online Verify start sector: %llu\n",
2493                                         (unsigned long long)sector);
2494                 }
2495                 peer_req->w.cb = w_e_end_ov_req;
2496                 fault_type = DRBD_FAULT_RS_RD;
2497                 break;
2498
2499         default:
2500                 BUG();
2501         }
2502
2503         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2504          * wrt the receiver, but it is not as straightforward as it may seem.
2505          * Various places in the resync start and stop logic assume resync
2506          * requests are processed in order, requeuing this on the worker thread
2507          * introduces a bunch of new code for synchronization between threads.
2508          *
2509          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2510          * "forever", throttling after drbd_rs_begin_io will lock that extent
2511          * for application writes for the same time.  For now, just throttle
2512          * here, where the rest of the code expects the receiver to sleep for
2513          * a while, anyways.
2514          */
2515
2516         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2517          * this defers syncer requests for some time, before letting at least
2518          * on request through.  The resync controller on the receiving side
2519          * will adapt to the incoming rate accordingly.
2520          *
2521          * We cannot throttle here if remote is Primary/SyncTarget:
2522          * we would also throttle its application reads.
2523          * In that case, throttling is done on the SyncTarget only.
2524          */
2525         if (device->state.peer != R_PRIMARY && drbd_rs_should_slow_down(device, sector))
2526                 schedule_timeout_uninterruptible(HZ/10);
2527         if (drbd_rs_begin_io(device, sector))
2528                 goto out_free_e;
2529
2530 submit_for_resync:
2531         atomic_add(size >> 9, &device->rs_sect_ev);
2532
2533 submit:
2534         inc_unacked(device);
2535         spin_lock_irq(&device->resource->req_lock);
2536         list_add_tail(&peer_req->w.list, &device->read_ee);
2537         spin_unlock_irq(&device->resource->req_lock);
2538
2539         if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2540                 return 0;
2541
2542         /* don't care for the reason here */
2543         drbd_err(device, "submit failed, triggering re-connect\n");
2544         spin_lock_irq(&device->resource->req_lock);
2545         list_del(&peer_req->w.list);
2546         spin_unlock_irq(&device->resource->req_lock);
2547         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2548
2549 out_free_e:
2550         put_ldev(device);
2551         drbd_free_peer_req(device, peer_req);
2552         return -EIO;
2553 }
2554
2555 static int drbd_asb_recover_0p(struct drbd_device *device) __must_hold(local)
2556 {
2557         int self, peer, rv = -100;
2558         unsigned long ch_self, ch_peer;
2559         enum drbd_after_sb_p after_sb_0p;
2560
2561         self = device->ldev->md.uuid[UI_BITMAP] & 1;
2562         peer = device->p_uuid[UI_BITMAP] & 1;
2563
2564         ch_peer = device->p_uuid[UI_SIZE];
2565         ch_self = device->comm_bm_set;
2566
2567         rcu_read_lock();
2568         after_sb_0p = rcu_dereference(first_peer_device(device)->connection->net_conf)->after_sb_0p;
2569         rcu_read_unlock();
2570         switch (after_sb_0p) {
2571         case ASB_CONSENSUS:
2572         case ASB_DISCARD_SECONDARY:
2573         case ASB_CALL_HELPER:
2574         case ASB_VIOLENTLY:
2575                 drbd_err(device, "Configuration error.\n");
2576                 break;
2577         case ASB_DISCONNECT:
2578                 break;
2579         case ASB_DISCARD_YOUNGER_PRI:
2580                 if (self == 0 && peer == 1) {
2581                         rv = -1;
2582                         break;
2583                 }
2584                 if (self == 1 && peer == 0) {
2585                         rv =  1;
2586                         break;
2587                 }
2588                 /* Else fall through to one of the other strategies... */
2589         case ASB_DISCARD_OLDER_PRI:
2590                 if (self == 0 && peer == 1) {
2591                         rv = 1;
2592                         break;
2593                 }
2594                 if (self == 1 && peer == 0) {
2595                         rv = -1;
2596                         break;
2597                 }
2598                 /* Else fall through to one of the other strategies... */
2599                 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2600                      "Using discard-least-changes instead\n");
2601         case ASB_DISCARD_ZERO_CHG:
2602                 if (ch_peer == 0 && ch_self == 0) {
2603                         rv = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags)
2604                                 ? -1 : 1;
2605                         break;
2606                 } else {
2607                         if (ch_peer == 0) { rv =  1; break; }
2608                         if (ch_self == 0) { rv = -1; break; }
2609                 }
2610                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2611                         break;
2612         case ASB_DISCARD_LEAST_CHG:
2613                 if      (ch_self < ch_peer)
2614                         rv = -1;
2615                 else if (ch_self > ch_peer)
2616                         rv =  1;
2617                 else /* ( ch_self == ch_peer ) */
2618                      /* Well, then use something else. */
2619                         rv = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags)
2620                                 ? -1 : 1;
2621                 break;
2622         case ASB_DISCARD_LOCAL:
2623                 rv = -1;
2624                 break;
2625         case ASB_DISCARD_REMOTE:
2626                 rv =  1;
2627         }
2628
2629         return rv;
2630 }
2631
2632 static int drbd_asb_recover_1p(struct drbd_device *device) __must_hold(local)
2633 {
2634         int hg, rv = -100;
2635         enum drbd_after_sb_p after_sb_1p;
2636
2637         rcu_read_lock();
2638         after_sb_1p = rcu_dereference(first_peer_device(device)->connection->net_conf)->after_sb_1p;
2639         rcu_read_unlock();
2640         switch (after_sb_1p) {
2641         case ASB_DISCARD_YOUNGER_PRI:
2642         case ASB_DISCARD_OLDER_PRI:
2643         case ASB_DISCARD_LEAST_CHG:
2644         case ASB_DISCARD_LOCAL:
2645         case ASB_DISCARD_REMOTE:
2646         case ASB_DISCARD_ZERO_CHG:
2647                 drbd_err(device, "Configuration error.\n");
2648                 break;
2649         case ASB_DISCONNECT:
2650                 break;
2651         case ASB_CONSENSUS:
2652                 hg = drbd_asb_recover_0p(device);
2653                 if (hg == -1 && device->state.role == R_SECONDARY)
2654                         rv = hg;
2655                 if (hg == 1  && device->state.role == R_PRIMARY)
2656                         rv = hg;
2657                 break;
2658         case ASB_VIOLENTLY:
2659                 rv = drbd_asb_recover_0p(device);
2660                 break;
2661         case ASB_DISCARD_SECONDARY:
2662                 return device->state.role == R_PRIMARY ? 1 : -1;
2663         case ASB_CALL_HELPER:
2664                 hg = drbd_asb_recover_0p(device);
2665                 if (hg == -1 && device->state.role == R_PRIMARY) {
2666                         enum drbd_state_rv rv2;
2667
2668                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2669                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2670                           * we do not need to wait for the after state change work either. */
2671                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2672                         if (rv2 != SS_SUCCESS) {
2673                                 drbd_khelper(device, "pri-lost-after-sb");
2674                         } else {
2675                                 drbd_warn(device, "Successfully gave up primary role.\n");
2676                                 rv = hg;
2677                         }
2678                 } else
2679                         rv = hg;
2680         }
2681
2682         return rv;
2683 }
2684
2685 static int drbd_asb_recover_2p(struct drbd_device *device) __must_hold(local)
2686 {
2687         int hg, rv = -100;
2688         enum drbd_after_sb_p after_sb_2p;
2689
2690         rcu_read_lock();
2691         after_sb_2p = rcu_dereference(first_peer_device(device)->connection->net_conf)->after_sb_2p;
2692         rcu_read_unlock();
2693         switch (after_sb_2p) {
2694         case ASB_DISCARD_YOUNGER_PRI:
2695         case ASB_DISCARD_OLDER_PRI:
2696         case ASB_DISCARD_LEAST_CHG:
2697         case ASB_DISCARD_LOCAL:
2698         case ASB_DISCARD_REMOTE:
2699         case ASB_CONSENSUS:
2700         case ASB_DISCARD_SECONDARY:
2701         case ASB_DISCARD_ZERO_CHG:
2702                 drbd_err(device, "Configuration error.\n");
2703                 break;
2704         case ASB_VIOLENTLY:
2705                 rv = drbd_asb_recover_0p(device);
2706                 break;
2707         case ASB_DISCONNECT:
2708                 break;
2709         case ASB_CALL_HELPER:
2710                 hg = drbd_asb_recover_0p(device);
2711                 if (hg == -1) {
2712                         enum drbd_state_rv rv2;
2713
2714                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2715                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2716                           * we do not need to wait for the after state change work either. */
2717                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2718                         if (rv2 != SS_SUCCESS) {
2719                                 drbd_khelper(device, "pri-lost-after-sb");
2720                         } else {
2721                                 drbd_warn(device, "Successfully gave up primary role.\n");
2722                                 rv = hg;
2723                         }
2724                 } else
2725                         rv = hg;
2726         }
2727
2728         return rv;
2729 }
2730
2731 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2732                            u64 bits, u64 flags)
2733 {
2734         if (!uuid) {
2735                 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2736                 return;
2737         }
2738         drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2739              text,
2740              (unsigned long long)uuid[UI_CURRENT],
2741              (unsigned long long)uuid[UI_BITMAP],
2742              (unsigned long long)uuid[UI_HISTORY_START],
2743              (unsigned long long)uuid[UI_HISTORY_END],
2744              (unsigned long long)bits,
2745              (unsigned long long)flags);
2746 }
2747
2748 /*
2749   100   after split brain try auto recover
2750     2   C_SYNC_SOURCE set BitMap
2751     1   C_SYNC_SOURCE use BitMap
2752     0   no Sync
2753    -1   C_SYNC_TARGET use BitMap
2754    -2   C_SYNC_TARGET set BitMap
2755  -100   after split brain, disconnect
2756 -1000   unrelated data
2757 -1091   requires proto 91
2758 -1096   requires proto 96
2759  */
2760 static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_hold(local)
2761 {
2762         u64 self, peer;
2763         int i, j;
2764
2765         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2766         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2767
2768         *rule_nr = 10;
2769         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2770                 return 0;
2771
2772         *rule_nr = 20;
2773         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2774              peer != UUID_JUST_CREATED)
2775                 return -2;
2776
2777         *rule_nr = 30;
2778         if (self != UUID_JUST_CREATED &&
2779             (peer == UUID_JUST_CREATED || peer == (u64)0))
2780                 return 2;
2781
2782         if (self == peer) {
2783                 int rct, dc; /* roles at crash time */
2784
2785                 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2786
2787                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2788                                 return -1091;
2789
2790                         if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2791                             (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2792                                 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2793                                 drbd_uuid_move_history(device);
2794                                 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2795                                 device->ldev->md.uuid[UI_BITMAP] = 0;
2796
2797                                 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2798                                                device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2799                                 *rule_nr = 34;
2800                         } else {
2801                                 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
2802                                 *rule_nr = 36;
2803                         }
2804
2805                         return 1;
2806                 }
2807
2808                 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
2809
2810                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2811                                 return -1091;
2812
2813                         if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2814                             (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2815                                 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2816
2817                                 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
2818                                 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
2819                                 device->p_uuid[UI_BITMAP] = 0UL;
2820
2821                                 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2822                                 *rule_nr = 35;
2823                         } else {
2824                                 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
2825                                 *rule_nr = 37;
2826                         }
2827
2828                         return -1;
2829                 }
2830
2831                 /* Common power [off|failure] */
2832                 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
2833                         (device->p_uuid[UI_FLAGS] & 2);
2834                 /* lowest bit is set when we were primary,
2835                  * next bit (weight 2) is set when peer was primary */
2836                 *rule_nr = 40;
2837
2838                 switch (rct) {
2839                 case 0: /* !self_pri && !peer_pri */ return 0;
2840                 case 1: /*  self_pri && !peer_pri */ return 1;
2841                 case 2: /* !self_pri &&  peer_pri */ return -1;
2842                 case 3: /*  self_pri &&  peer_pri */
2843                         dc = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags);
2844                         return dc ? -1 : 1;
2845                 }
2846         }
2847
2848         *rule_nr = 50;
2849         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
2850         if (self == peer)
2851                 return -1;
2852
2853         *rule_nr = 51;
2854         peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
2855         if (self == peer) {
2856                 if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2857                     (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2858                     (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2859                     peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
2860                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2861                            resync as sync source modifications of the peer's UUIDs. */
2862
2863                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2864                                 return -1091;
2865
2866                         device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
2867                         device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
2868
2869                         drbd_info(device, "Lost last syncUUID packet, corrected:\n");
2870                         drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2871
2872                         return -1;
2873                 }
2874         }
2875
2876         *rule_nr = 60;
2877         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2878         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2879                 peer = device->p_uuid[i] & ~((u64)1);
2880                 if (self == peer)
2881                         return -2;
2882         }
2883
2884         *rule_nr = 70;
2885         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2886         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2887         if (self == peer)
2888                 return 1;
2889
2890         *rule_nr = 71;
2891         self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2892         if (self == peer) {
2893                 if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2894                     (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2895                     (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2896                     self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2897                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2898                            resync as sync source modifications of our UUIDs. */
2899
2900                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2901                                 return -1091;
2902
2903                         __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
2904                         __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
2905
2906                         drbd_info(device, "Last syncUUID did not get through, corrected:\n");
2907                         drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2908                                        device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2909
2910                         return 1;
2911                 }
2912         }
2913
2914
2915         *rule_nr = 80;
2916         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2917         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2918                 self = device->ldev->md.uuid[i] & ~((u64)1);
2919                 if (self == peer)
2920                         return 2;
2921         }
2922
2923         *rule_nr = 90;
2924         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2925         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
2926         if (self == peer && self != ((u64)0))
2927                 return 100;
2928
2929         *rule_nr = 100;
2930         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2931                 self = device->ldev->md.uuid[i] & ~((u64)1);
2932                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2933                         peer = device->p_uuid[j] & ~((u64)1);
2934                         if (self == peer)
2935                                 return -100;
2936                 }
2937         }
2938
2939         return -1000;
2940 }
2941
2942 /* drbd_sync_handshake() returns the new conn state on success, or
2943    CONN_MASK (-1) on failure.
2944  */
2945 static enum drbd_conns drbd_sync_handshake(struct drbd_device *device, enum drbd_role peer_role,
2946                                            enum drbd_disk_state peer_disk) __must_hold(local)
2947 {
2948         enum drbd_conns rv = C_MASK;
2949         enum drbd_disk_state mydisk;
2950         struct net_conf *nc;
2951         int hg, rule_nr, rr_conflict, tentative;
2952
2953         mydisk = device->state.disk;
2954         if (mydisk == D_NEGOTIATING)
2955                 mydisk = device->new_state_tmp.disk;
2956
2957         drbd_info(device, "drbd_sync_handshake:\n");
2958
2959         spin_lock_irq(&device->ldev->md.uuid_lock);
2960         drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
2961         drbd_uuid_dump(device, "peer", device->p_uuid,
2962                        device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2963
2964         hg = drbd_uuid_compare(device, &rule_nr);
2965         spin_unlock_irq(&device->ldev->md.uuid_lock);
2966
2967         drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2968
2969         if (hg == -1000) {
2970                 drbd_alert(device, "Unrelated data, aborting!\n");
2971                 return C_MASK;
2972         }
2973         if (hg < -1000) {
2974                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2975                 return C_MASK;
2976         }
2977
2978         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2979             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2980                 int f = (hg == -100) || abs(hg) == 2;
2981                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2982                 if (f)
2983                         hg = hg*2;
2984                 drbd_info(device, "Becoming sync %s due to disk states.\n",
2985                      hg > 0 ? "source" : "target");
2986         }
2987
2988         if (abs(hg) == 100)
2989                 drbd_khelper(device, "initial-split-brain");
2990
2991         rcu_read_lock();
2992         nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
2993
2994         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2995                 int pcount = (device->state.role == R_PRIMARY)
2996                            + (peer_role == R_PRIMARY);
2997                 int forced = (hg == -100);
2998
2999                 switch (pcount) {
3000                 case 0:
3001                         hg = drbd_asb_recover_0p(device);
3002                         break;
3003                 case 1:
3004                         hg = drbd_asb_recover_1p(device);
3005                         break;
3006                 case 2:
3007                         hg = drbd_asb_recover_2p(device);
3008                         break;
3009                 }
3010                 if (abs(hg) < 100) {
3011                         drbd_warn(device, "Split-Brain detected, %d primaries, "
3012                              "automatically solved. Sync from %s node\n",
3013                              pcount, (hg < 0) ? "peer" : "this");
3014                         if (forced) {
3015                                 drbd_warn(device, "Doing a full sync, since"
3016                                      " UUIDs where ambiguous.\n");
3017                                 hg = hg*2;
3018                         }
3019                 }
3020         }
3021
3022         if (hg == -100) {
3023                 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3024                         hg = -1;
3025                 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3026                         hg = 1;
3027
3028                 if (abs(hg) < 100)
3029                         drbd_warn(device, "Split-Brain detected, manually solved. "
3030                              "Sync from %s node\n",
3031                              (hg < 0) ? "peer" : "this");
3032         }
3033         rr_conflict = nc->rr_conflict;
3034         tentative = nc->tentative;
3035         rcu_read_unlock();
3036
3037         if (hg == -100) {
3038                 /* FIXME this log message is not correct if we end up here
3039                  * after an attempted attach on a diskless node.
3040                  * We just refuse to attach -- well, we drop the "connection"
3041                  * to that disk, in a way... */
3042                 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3043                 drbd_khelper(device, "split-brain");
3044                 return C_MASK;
3045         }
3046
3047         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3048                 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3049                 return C_MASK;
3050         }
3051
3052         if (hg < 0 && /* by intention we do not use mydisk here. */
3053             device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3054                 switch (rr_conflict) {
3055                 case ASB_CALL_HELPER:
3056                         drbd_khelper(device, "pri-lost");
3057                         /* fall through */
3058                 case ASB_DISCONNECT:
3059                         drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3060                         return C_MASK;
3061                 case ASB_VIOLENTLY:
3062                         drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3063                              "assumption\n");
3064                 }
3065         }
3066
3067         if (tentative || test_bit(CONN_DRY_RUN, &first_peer_device(device)->connection->flags)) {
3068                 if (hg == 0)
3069                         drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3070                 else
3071                         drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3072                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3073                                  abs(hg) >= 2 ? "full" : "bit-map based");
3074                 return C_MASK;
3075         }
3076
3077         if (abs(hg) >= 2) {
3078                 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3079                 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3080                                         BM_LOCKED_SET_ALLOWED))
3081                         return C_MASK;
3082         }
3083
3084         if (hg > 0) { /* become sync source. */
3085                 rv = C_WF_BITMAP_S;
3086         } else if (hg < 0) { /* become sync target */
3087                 rv = C_WF_BITMAP_T;
3088         } else {
3089                 rv = C_CONNECTED;
3090                 if (drbd_bm_total_weight(device)) {
3091                         drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3092                              drbd_bm_total_weight(device));
3093                 }
3094         }
3095
3096         return rv;
3097 }
3098
3099 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3100 {
3101         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3102         if (peer == ASB_DISCARD_REMOTE)
3103                 return ASB_DISCARD_LOCAL;
3104
3105         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3106         if (peer == ASB_DISCARD_LOCAL)
3107                 return ASB_DISCARD_REMOTE;
3108
3109         /* everything else is valid if they are equal on both sides. */
3110         return peer;
3111 }
3112
3113 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3114 {
3115         struct p_protocol *p = pi->data;
3116         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3117         int p_proto, p_discard_my_data, p_two_primaries, cf;
3118         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3119         char integrity_alg[SHARED_SECRET_MAX] = "";
3120         struct crypto_hash *peer_integrity_tfm = NULL;
3121         void *int_dig_in = NULL, *int_dig_vv = NULL;
3122
3123         p_proto         = be32_to_cpu(p->protocol);
3124         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3125         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3126         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3127         p_two_primaries = be32_to_cpu(p->two_primaries);
3128         cf              = be32_to_cpu(p->conn_flags);
3129         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3130
3131         if (connection->agreed_pro_version >= 87) {
3132                 int err;
3133
3134                 if (pi->size > sizeof(integrity_alg))
3135                         return -EIO;
3136                 err = drbd_recv_all(connection, integrity_alg, pi->size);
3137                 if (err)
3138                         return err;
3139                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3140         }
3141
3142         if (pi->cmd != P_PROTOCOL_UPDATE) {
3143                 clear_bit(CONN_DRY_RUN, &connection->flags);
3144
3145                 if (cf & CF_DRY_RUN)
3146                         set_bit(CONN_DRY_RUN, &connection->flags);
3147
3148                 rcu_read_lock();
3149                 nc = rcu_dereference(connection->net_conf);
3150
3151                 if (p_proto != nc->wire_protocol) {
3152                         drbd_err(connection, "incompatible %s settings\n", "protocol");
3153                         goto disconnect_rcu_unlock;
3154                 }
3155
3156                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3157                         drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3158                         goto disconnect_rcu_unlock;
3159                 }
3160
3161                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3162                         drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3163                         goto disconnect_rcu_unlock;
3164                 }
3165
3166                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3167                         drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3168                         goto disconnect_rcu_unlock;
3169                 }
3170
3171                 if (p_discard_my_data && nc->discard_my_data) {
3172                         drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3173                         goto disconnect_rcu_unlock;
3174                 }
3175
3176                 if (p_two_primaries != nc->two_primaries) {
3177                         drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3178                         goto disconnect_rcu_unlock;
3179                 }
3180
3181                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3182                         drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3183                         goto disconnect_rcu_unlock;
3184                 }
3185
3186                 rcu_read_unlock();
3187         }
3188
3189         if (integrity_alg[0]) {
3190                 int hash_size;
3191
3192                 /*
3193                  * We can only change the peer data integrity algorithm
3194                  * here.  Changing our own data integrity algorithm
3195                  * requires that we send a P_PROTOCOL_UPDATE packet at
3196                  * the same time; otherwise, the peer has no way to
3197                  * tell between which packets the algorithm should
3198                  * change.
3199                  */
3200
3201                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3202                 if (!peer_integrity_tfm) {
3203                         drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3204                                  integrity_alg);
3205                         goto disconnect;
3206                 }
3207
3208                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3209                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3210                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3211                 if (!(int_dig_in && int_dig_vv)) {
3212                         drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3213                         goto disconnect;
3214                 }
3215         }
3216
3217         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3218         if (!new_net_conf) {
3219                 drbd_err(connection, "Allocation of new net_conf failed\n");
3220                 goto disconnect;
3221         }
3222
3223         mutex_lock(&connection->data.mutex);
3224         mutex_lock(&connection->resource->conf_update);
3225         old_net_conf = connection->net_conf;
3226         *new_net_conf = *old_net_conf;
3227
3228         new_net_conf->wire_protocol = p_proto;
3229         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3230         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3231         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3232         new_net_conf->two_primaries = p_two_primaries;
3233
3234         rcu_assign_pointer(connection->net_conf, new_net_conf);
3235         mutex_unlock(&connection->resource->conf_update);
3236         mutex_unlock(&connection->data.mutex);
3237
3238         crypto_free_hash(connection->peer_integrity_tfm);
3239         kfree(connection->int_dig_in);
3240         kfree(connection->int_dig_vv);
3241         connection->peer_integrity_tfm = peer_integrity_tfm;
3242         connection->int_dig_in = int_dig_in;
3243         connection->int_dig_vv = int_dig_vv;
3244
3245         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3246                 drbd_info(connection, "peer data-integrity-alg: %s\n",
3247                           integrity_alg[0] ? integrity_alg : "(none)");
3248
3249         synchronize_rcu();
3250         kfree(old_net_conf);
3251         return 0;
3252
3253 disconnect_rcu_unlock:
3254         rcu_read_unlock();
3255 disconnect:
3256         crypto_free_hash(peer_integrity_tfm);
3257         kfree(int_dig_in);
3258         kfree(int_dig_vv);
3259         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3260         return -EIO;
3261 }
3262
3263 /* helper function
3264  * input: alg name, feature name
3265  * return: NULL (alg name was "")
3266  *         ERR_PTR(error) if something goes wrong
3267  *         or the crypto hash ptr, if it worked out ok. */
3268 static
3269 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3270                 const char *alg, const char *name)
3271 {
3272         struct crypto_hash *tfm;
3273
3274         if (!alg[0])
3275                 return NULL;
3276
3277         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3278         if (IS_ERR(tfm)) {
3279                 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3280                         alg, name, PTR_ERR(tfm));
3281                 return tfm;
3282         }
3283         return tfm;
3284 }
3285
3286 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3287 {
3288         void *buffer = connection->data.rbuf;
3289         int size = pi->size;
3290
3291         while (size) {
3292                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3293                 s = drbd_recv(connection, buffer, s);
3294                 if (s <= 0) {
3295                         if (s < 0)
3296                                 return s;
3297                         break;
3298                 }
3299                 size -= s;
3300         }
3301         if (size)
3302                 return -EIO;
3303         return 0;
3304 }
3305
3306 /*
3307  * config_unknown_volume  -  device configuration command for unknown volume
3308  *
3309  * When a device is added to an existing connection, the node on which the
3310  * device is added first will send configuration commands to its peer but the
3311  * peer will not know about the device yet.  It will warn and ignore these
3312  * commands.  Once the device is added on the second node, the second node will
3313  * send the same device configuration commands, but in the other direction.
3314  *
3315  * (We can also end up here if drbd is misconfigured.)
3316  */
3317 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3318 {
3319         drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3320                   cmdname(pi->cmd), pi->vnr);
3321         return ignore_remaining_packet(connection, pi);
3322 }
3323
3324 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3325 {
3326         struct drbd_device *device;
3327         struct p_rs_param_95 *p;
3328         unsigned int header_size, data_size, exp_max_sz;
3329         struct crypto_hash *verify_tfm = NULL;
3330         struct crypto_hash *csums_tfm = NULL;
3331         struct net_conf *old_net_conf, *new_net_conf = NULL;
3332         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3333         const int apv = connection->agreed_pro_version;
3334         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3335         int fifo_size = 0;
3336         int err;
3337
3338         device = vnr_to_device(connection, pi->vnr);
3339         if (!device)
3340                 return config_unknown_volume(connection, pi);
3341
3342         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3343                     : apv == 88 ? sizeof(struct p_rs_param)
3344                                         + SHARED_SECRET_MAX
3345                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3346                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3347
3348         if (pi->size > exp_max_sz) {
3349                 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3350                     pi->size, exp_max_sz);
3351                 return -EIO;
3352         }
3353
3354         if (apv <= 88) {
3355                 header_size = sizeof(struct p_rs_param);
3356                 data_size = pi->size - header_size;
3357         } else if (apv <= 94) {
3358                 header_size = sizeof(struct p_rs_param_89);
3359                 data_size = pi->size - header_size;
3360                 D_ASSERT(device, data_size == 0);
3361         } else {
3362                 header_size = sizeof(struct p_rs_param_95);
3363                 data_size = pi->size - header_size;
3364                 D_ASSERT(device, data_size == 0);
3365         }
3366
3367         /* initialize verify_alg and csums_alg */
3368         p = pi->data;
3369         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3370
3371         err = drbd_recv_all(first_peer_device(device)->connection, p, header_size);
3372         if (err)
3373                 return err;
3374
3375         mutex_lock(&connection->resource->conf_update);
3376         old_net_conf = first_peer_device(device)->connection->net_conf;
3377         if (get_ldev(device)) {
3378                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3379                 if (!new_disk_conf) {
3380                         put_ldev(device);
3381                         mutex_unlock(&connection->resource->conf_update);
3382                         drbd_err(device, "Allocation of new disk_conf failed\n");
3383                         return -ENOMEM;
3384                 }
3385
3386                 old_disk_conf = device->ldev->disk_conf;
3387                 *new_disk_conf = *old_disk_conf;
3388
3389                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3390         }
3391
3392         if (apv >= 88) {
3393                 if (apv == 88) {
3394                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3395                                 drbd_err(device, "verify-alg of wrong size, "
3396                                         "peer wants %u, accepting only up to %u byte\n",
3397                                         data_size, SHARED_SECRET_MAX);
3398                                 err = -EIO;
3399                                 goto reconnect;
3400                         }
3401
3402                         err = drbd_recv_all(first_peer_device(device)->connection, p->verify_alg, data_size);
3403                         if (err)
3404                                 goto reconnect;
3405                         /* we expect NUL terminated string */
3406                         /* but just in case someone tries to be evil */
3407                         D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3408                         p->verify_alg[data_size-1] = 0;
3409
3410                 } else /* apv >= 89 */ {
3411                         /* we still expect NUL terminated strings */
3412                         /* but just in case someone tries to be evil */
3413                         D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3414                         D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3415                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3416                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3417                 }
3418
3419                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3420                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3421                                 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3422                                     old_net_conf->verify_alg, p->verify_alg);
3423                                 goto disconnect;
3424                         }
3425                         verify_tfm = drbd_crypto_alloc_digest_safe(device,
3426                                         p->verify_alg, "verify-alg");
3427                         if (IS_ERR(verify_tfm)) {
3428                                 verify_tfm = NULL;
3429                                 goto disconnect;
3430                         }
3431                 }
3432
3433                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3434                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3435                                 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3436                                     old_net_conf->csums_alg, p->csums_alg);
3437                                 goto disconnect;
3438                         }
3439                         csums_tfm = drbd_crypto_alloc_digest_safe(device,
3440                                         p->csums_alg, "csums-alg");
3441                         if (IS_ERR(csums_tfm)) {
3442                                 csums_tfm = NULL;
3443                                 goto disconnect;
3444                         }
3445                 }
3446
3447                 if (apv > 94 && new_disk_conf) {
3448                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3449                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3450                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3451                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3452
3453                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3454                         if (fifo_size != device->rs_plan_s->size) {
3455                                 new_plan = fifo_alloc(fifo_size);
3456                                 if (!new_plan) {
3457                                         drbd_err(device, "kmalloc of fifo_buffer failed");
3458                                         put_ldev(device);
3459                                         goto disconnect;
3460                                 }
3461                         }
3462                 }
3463
3464                 if (verify_tfm || csums_tfm) {
3465                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3466                         if (!new_net_conf) {
3467                                 drbd_err(device, "Allocation of new net_conf failed\n");
3468                                 goto disconnect;
3469                         }
3470
3471                         *new_net_conf = *old_net_conf;
3472
3473                         if (verify_tfm) {
3474                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3475                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3476                                 crypto_free_hash(first_peer_device(device)->connection->verify_tfm);
3477                                 first_peer_device(device)->connection->verify_tfm = verify_tfm;
3478                                 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3479                         }
3480                         if (csums_tfm) {
3481                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3482                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3483                                 crypto_free_hash(first_peer_device(device)->connection->csums_tfm);
3484                                 first_peer_device(device)->connection->csums_tfm = csums_tfm;
3485                                 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3486                         }
3487                         rcu_assign_pointer(connection->net_conf, new_net_conf);
3488                 }
3489         }
3490
3491         if (new_disk_conf) {
3492                 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3493                 put_ldev(device);
3494         }
3495
3496         if (new_plan) {
3497                 old_plan = device->rs_plan_s;
3498                 rcu_assign_pointer(device->rs_plan_s, new_plan);
3499         }
3500
3501         mutex_unlock(&connection->resource->conf_update);
3502         synchronize_rcu();
3503         if (new_net_conf)
3504                 kfree(old_net_conf);
3505         kfree(old_disk_conf);
3506         kfree(old_plan);
3507
3508         return 0;
3509
3510 reconnect:
3511         if (new_disk_conf) {
3512                 put_ldev(device);
3513                 kfree(new_disk_conf);
3514         }
3515         mutex_unlock(&connection->resource->conf_update);
3516         return -EIO;
3517
3518 disconnect:
3519         kfree(new_plan);
3520         if (new_disk_conf) {
3521                 put_ldev(device);
3522                 kfree(new_disk_conf);
3523         }
3524         mutex_unlock(&connection->resource->conf_update);
3525         /* just for completeness: actually not needed,
3526          * as this is not reached if csums_tfm was ok. */
3527         crypto_free_hash(csums_tfm);
3528         /* but free the verify_tfm again, if csums_tfm did not work out */
3529         crypto_free_hash(verify_tfm);
3530         conn_request_state(first_peer_device(device)->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3531         return -EIO;
3532 }
3533
3534 /* warn if the arguments differ by more than 12.5% */
3535 static void warn_if_differ_considerably(struct drbd_device *device,
3536         const char *s, sector_t a, sector_t b)
3537 {
3538         sector_t d;
3539         if (a == 0 || b == 0)
3540                 return;
3541         d = (a > b) ? (a - b) : (b - a);
3542         if (d > (a>>3) || d > (b>>3))
3543                 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3544                      (unsigned long long)a, (unsigned long long)b);
3545 }
3546
3547 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3548 {
3549         struct drbd_device *device;
3550         struct p_sizes *p = pi->data;
3551         enum determine_dev_size dd = DS_UNCHANGED;
3552         sector_t p_size, p_usize, my_usize;
3553         int ldsc = 0; /* local disk size changed */
3554         enum dds_flags ddsf;
3555
3556         device = vnr_to_device(connection, pi->vnr);
3557         if (!device)
3558                 return config_unknown_volume(connection, pi);
3559
3560         p_size = be64_to_cpu(p->d_size);
3561         p_usize = be64_to_cpu(p->u_size);
3562
3563         /* just store the peer's disk size for now.
3564          * we still need to figure out whether we accept that. */
3565         device->p_size = p_size;
3566
3567         if (get_ldev(device)) {
3568                 rcu_read_lock();
3569                 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3570                 rcu_read_unlock();
3571
3572                 warn_if_differ_considerably(device, "lower level device sizes",
3573                            p_size, drbd_get_max_capacity(device->ldev));
3574                 warn_if_differ_considerably(device, "user requested size",
3575                                             p_usize, my_usize);
3576
3577                 /* if this is the first connect, or an otherwise expected
3578                  * param exchange, choose the minimum */
3579                 if (device->state.conn == C_WF_REPORT_PARAMS)
3580                         p_usize = min_not_zero(my_usize, p_usize);
3581
3582                 /* Never shrink a device with usable data during connect.
3583                    But allow online shrinking if we are connected. */
3584                 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3585                     drbd_get_capacity(device->this_bdev) &&
3586                     device->state.disk >= D_OUTDATED &&
3587                     device->state.conn < C_CONNECTED) {
3588                         drbd_err(device, "The peer's disk size is too small!\n");
3589                         conn_request_state(first_peer_device(device)->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3590                         put_ldev(device);
3591                         return -EIO;
3592                 }
3593
3594                 if (my_usize != p_usize) {
3595                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3596
3597                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3598                         if (!new_disk_conf) {
3599                                 drbd_err(device, "Allocation of new disk_conf failed\n");
3600                                 put_ldev(device);
3601                                 return -ENOMEM;
3602                         }
3603
3604                         mutex_lock(&connection->resource->conf_update);
3605                         old_disk_conf = device->ldev->disk_conf;
3606                         *new_disk_conf = *old_disk_conf;
3607                         new_disk_conf->disk_size = p_usize;
3608
3609                         rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3610                         mutex_unlock(&connection->resource->conf_update);
3611                         synchronize_rcu();
3612                         kfree(old_disk_conf);
3613
3614                         drbd_info(device, "Peer sets u_size to %lu sectors\n",
3615                                  (unsigned long)my_usize);
3616                 }
3617
3618                 put_ldev(device);
3619         }
3620
3621         ddsf = be16_to_cpu(p->dds_flags);
3622         if (get_ldev(device)) {
3623                 dd = drbd_determine_dev_size(device, ddsf, NULL);
3624                 put_ldev(device);
3625                 if (dd == DS_ERROR)
3626                         return -EIO;
3627                 drbd_md_sync(device);
3628         } else {
3629                 /* I am diskless, need to accept the peer's size. */
3630                 drbd_set_my_capacity(device, p_size);
3631         }
3632
3633         device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3634         drbd_reconsider_max_bio_size(device);
3635
3636         if (get_ldev(device)) {
3637                 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3638                         device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3639                         ldsc = 1;
3640                 }
3641
3642                 put_ldev(device);
3643         }
3644
3645         if (device->state.conn > C_WF_REPORT_PARAMS) {
3646                 if (be64_to_cpu(p->c_size) !=
3647                     drbd_get_capacity(device->this_bdev) || ldsc) {
3648                         /* we have different sizes, probably peer
3649                          * needs to know my new size... */
3650                         drbd_send_sizes(device, 0, ddsf);
3651                 }
3652                 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3653                     (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3654                         if (device->state.pdsk >= D_INCONSISTENT &&
3655                             device->state.disk >= D_INCONSISTENT) {
3656                                 if (ddsf & DDSF_NO_RESYNC)
3657                                         drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3658                                 else
3659                                         resync_after_online_grow(device);
3660                         } else
3661                                 set_bit(RESYNC_AFTER_NEG, &device->flags);
3662                 }
3663         }
3664
3665         return 0;
3666 }
3667
3668 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3669 {
3670         struct drbd_device *device;
3671         struct p_uuids *p = pi->data;
3672         u64 *p_uuid;
3673         int i, updated_uuids = 0;
3674
3675         device = vnr_to_device(connection, pi->vnr);
3676         if (!device)
3677                 return config_unknown_volume(connection, pi);
3678
3679         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3680         if (!p_uuid) {
3681                 drbd_err(device, "kmalloc of p_uuid failed\n");
3682                 return false;
3683         }
3684
3685         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3686                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3687
3688         kfree(device->p_uuid);
3689         device->p_uuid = p_uuid;
3690
3691         if (device->state.conn < C_CONNECTED &&
3692             device->state.disk < D_INCONSISTENT &&
3693             device->state.role == R_PRIMARY &&
3694             (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3695                 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3696                     (unsigned long long)device->ed_uuid);
3697                 conn_request_state(first_peer_device(device)->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3698                 return -EIO;
3699         }
3700
3701         if (get_ldev(device)) {
3702                 int skip_initial_sync =
3703                         device->state.conn == C_CONNECTED &&
3704                         first_peer_device(device)->connection->agreed_pro_version >= 90 &&
3705                         device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3706                         (p_uuid[UI_FLAGS] & 8);
3707                 if (skip_initial_sync) {
3708                         drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3709                         drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3710                                         "clear_n_write from receive_uuids",
3711                                         BM_LOCKED_TEST_ALLOWED);
3712                         _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3713                         _drbd_uuid_set(device, UI_BITMAP, 0);
3714                         _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3715                                         CS_VERBOSE, NULL);
3716                         drbd_md_sync(device);
3717                         updated_uuids = 1;
3718                 }
3719                 put_ldev(device);
3720         } else if (device->state.disk < D_INCONSISTENT &&
3721                    device->state.role == R_PRIMARY) {
3722                 /* I am a diskless primary, the peer just created a new current UUID
3723                    for me. */
3724                 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3725         }
3726
3727         /* Before we test for the disk state, we should wait until an eventually
3728            ongoing cluster wide state change is finished. That is important if
3729            we are primary and are detaching from our disk. We need to see the
3730            new disk state... */
3731         mutex_lock(device->state_mutex);
3732         mutex_unlock(device->state_mutex);
3733         if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3734                 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3735
3736         if (updated_uuids)
3737                 drbd_print_uuids(device, "receiver updated UUIDs to");
3738
3739         return 0;
3740 }
3741
3742 /**
3743  * convert_state() - Converts the peer's view of the cluster state to our point of view
3744  * @ps:         The state as seen by the peer.
3745  */
3746 static union drbd_state convert_state(union drbd_state ps)
3747 {
3748         union drbd_state ms;
3749
3750         static enum drbd_conns c_tab[] = {
3751                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3752                 [C_CONNECTED] = C_CONNECTED,
3753
3754                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3755                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3756                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3757                 [C_VERIFY_S]       = C_VERIFY_T,
3758                 [C_MASK]   = C_MASK,
3759         };
3760
3761         ms.i = ps.i;
3762
3763         ms.conn = c_tab[ps.conn];
3764         ms.peer = ps.role;
3765         ms.role = ps.peer;
3766         ms.pdsk = ps.disk;
3767         ms.disk = ps.pdsk;
3768         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3769
3770         return ms;
3771 }
3772
3773 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
3774 {
3775         struct drbd_device *device;
3776         struct p_req_state *p = pi->data;
3777         union drbd_state mask, val;
3778         enum drbd_state_rv rv;
3779
3780         device = vnr_to_device(connection, pi->vnr);
3781         if (!device)
3782                 return -EIO;
3783
3784         mask.i = be32_to_cpu(p->mask);
3785         val.i = be32_to_cpu(p->val);
3786
3787         if (test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags) &&
3788             mutex_is_locked(device->state_mutex)) {
3789                 drbd_send_sr_reply(device, SS_CONCURRENT_ST_CHG);
3790                 return 0;
3791         }
3792
3793         mask = convert_state(mask);
3794         val = convert_state(val);
3795
3796         rv = drbd_change_state(device, CS_VERBOSE, mask, val);
3797         drbd_send_sr_reply(device, rv);
3798
3799         drbd_md_sync(device);
3800
3801         return 0;
3802 }
3803
3804 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
3805 {
3806         struct p_req_state *p = pi->data;
3807         union drbd_state mask, val;
3808         enum drbd_state_rv rv;
3809
3810         mask.i = be32_to_cpu(p->mask);
3811         val.i = be32_to_cpu(p->val);
3812
3813         if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
3814             mutex_is_locked(&connection->cstate_mutex)) {
3815                 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
3816                 return 0;
3817         }
3818
3819         mask = convert_state(mask);
3820         val = convert_state(val);
3821
3822         rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3823         conn_send_sr_reply(connection, rv);
3824
3825         return 0;
3826 }
3827
3828 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
3829 {
3830         struct drbd_device *device;
3831         struct p_state *p = pi->data;
3832         union drbd_state os, ns, peer_state;
3833         enum drbd_disk_state real_peer_disk;
3834         enum chg_state_flags cs_flags;
3835         int rv;
3836
3837         device = vnr_to_device(connection, pi->vnr);
3838         if (!device)
3839                 return config_unknown_volume(connection, pi);
3840
3841         peer_state.i = be32_to_cpu(p->state);
3842
3843         real_peer_disk = peer_state.disk;
3844         if (peer_state.disk == D_NEGOTIATING) {
3845                 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3846                 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3847         }
3848
3849         spin_lock_irq(&device->resource->req_lock);
3850  retry:
3851         os = ns = drbd_read_state(device);
3852         spin_unlock_irq(&device->resource->req_lock);
3853
3854         /* If some other part of the code (asender thread, timeout)
3855          * already decided to close the connection again,
3856          * we must not "re-establish" it here. */
3857         if (os.conn <= C_TEAR_DOWN)
3858                 return -ECONNRESET;
3859
3860         /* If this is the "end of sync" confirmation, usually the peer disk
3861          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3862          * set) resync started in PausedSyncT, or if the timing of pause-/
3863          * unpause-sync events has been "just right", the peer disk may
3864          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3865          */
3866         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3867             real_peer_disk == D_UP_TO_DATE &&
3868             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3869                 /* If we are (becoming) SyncSource, but peer is still in sync
3870                  * preparation, ignore its uptodate-ness to avoid flapping, it
3871                  * will change to inconsistent once the peer reaches active
3872                  * syncing states.
3873                  * It may have changed syncer-paused flags, however, so we
3874                  * cannot ignore this completely. */
3875                 if (peer_state.conn > C_CONNECTED &&
3876                     peer_state.conn < C_SYNC_SOURCE)
3877                         real_peer_disk = D_INCONSISTENT;
3878
3879                 /* if peer_state changes to connected at the same time,
3880                  * it explicitly notifies us that it finished resync.
3881                  * Maybe we should finish it up, too? */
3882                 else if (os.conn >= C_SYNC_SOURCE &&
3883                          peer_state.conn == C_CONNECTED) {
3884                         if (drbd_bm_total_weight(device) <= device->rs_failed)
3885                                 drbd_resync_finished(device);
3886                         return 0;
3887                 }
3888         }
3889
3890         /* explicit verify finished notification, stop sector reached. */
3891         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3892             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3893                 ov_out_of_sync_print(device);
3894                 drbd_resync_finished(device);
3895                 return 0;
3896         }
3897
3898         /* peer says his disk is inconsistent, while we think it is uptodate,
3899          * and this happens while the peer still thinks we have a sync going on,
3900          * but we think we are already done with the sync.
3901          * We ignore this to avoid flapping pdsk.
3902          * This should not happen, if the peer is a recent version of drbd. */
3903         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3904             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3905                 real_peer_disk = D_UP_TO_DATE;
3906
3907         if (ns.conn == C_WF_REPORT_PARAMS)
3908                 ns.conn = C_CONNECTED;
3909
3910         if (peer_state.conn == C_AHEAD)
3911                 ns.conn = C_BEHIND;
3912
3913         if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3914             get_ldev_if_state(device, D_NEGOTIATING)) {
3915                 int cr; /* consider resync */
3916
3917                 /* if we established a new connection */
3918                 cr  = (os.conn < C_CONNECTED);
3919                 /* if we had an established connection
3920                  * and one of the nodes newly attaches a disk */
3921                 cr |= (os.conn == C_CONNECTED &&
3922                        (peer_state.disk == D_NEGOTIATING ||
3923                         os.disk == D_NEGOTIATING));
3924                 /* if we have both been inconsistent, and the peer has been
3925                  * forced to be UpToDate with --overwrite-data */
3926                 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
3927                 /* if we had been plain connected, and the admin requested to
3928                  * start a sync by "invalidate" or "invalidate-remote" */
3929                 cr |= (os.conn == C_CONNECTED &&
3930                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3931                                  peer_state.conn <= C_WF_BITMAP_T));
3932
3933                 if (cr)
3934                         ns.conn = drbd_sync_handshake(device, peer_state.role, real_peer_disk);
3935
3936                 put_ldev(device);
3937                 if (ns.conn == C_MASK) {
3938                         ns.conn = C_CONNECTED;
3939                         if (device->state.disk == D_NEGOTIATING) {
3940                                 drbd_force_state(device, NS(disk, D_FAILED));
3941                         } else if (peer_state.disk == D_NEGOTIATING) {
3942                                 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
3943                                 peer_state.disk = D_DISKLESS;
3944                                 real_peer_disk = D_DISKLESS;
3945                         } else {
3946                                 if (test_and_clear_bit(CONN_DRY_RUN, &first_peer_device(device)->connection->flags))
3947                                         return -EIO;
3948                                 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
3949                                 conn_request_state(first_peer_device(device)->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3950                                 return -EIO;
3951                         }
3952                 }
3953         }
3954
3955         spin_lock_irq(&device->resource->req_lock);
3956         if (os.i != drbd_read_state(device).i)
3957                 goto retry;
3958         clear_bit(CONSIDER_RESYNC, &device->flags);
3959         ns.peer = peer_state.role;
3960         ns.pdsk = real_peer_disk;
3961         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3962         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3963                 ns.disk = device->new_state_tmp.disk;
3964         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3965         if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3966             test_bit(NEW_CUR_UUID, &device->flags)) {
3967                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3968                    for temporal network outages! */
3969                 spin_unlock_irq(&device->resource->req_lock);
3970                 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3971                 tl_clear(first_peer_device(device)->connection);
3972                 drbd_uuid_new_current(device);
3973                 clear_bit(NEW_CUR_UUID, &device->flags);
3974                 conn_request_state(first_peer_device(device)->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3975                 return -EIO;
3976         }
3977         rv = _drbd_set_state(device, ns, cs_flags, NULL);
3978         ns = drbd_read_state(device);
3979         spin_unlock_irq(&device->resource->req_lock);
3980
3981         if (rv < SS_SUCCESS) {
3982                 conn_request_state(first_peer_device(device)->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3983                 return -EIO;
3984         }
3985
3986         if (os.conn > C_WF_REPORT_PARAMS) {
3987                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3988                     peer_state.disk != D_NEGOTIATING ) {
3989                         /* we want resync, peer has not yet decided to sync... */
3990                         /* Nowadays only used when forcing a node into primary role and
3991                            setting its disk to UpToDate with that */
3992                         drbd_send_uuids(device);
3993                         drbd_send_current_state(device);
3994                 }
3995         }
3996
3997         clear_bit(DISCARD_MY_DATA, &device->flags);
3998
3999         drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4000
4001         return 0;
4002 }
4003
4004 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4005 {
4006         struct drbd_device *device;
4007         struct p_rs_uuid *p = pi->data;
4008
4009         device = vnr_to_device(connection, pi->vnr);
4010         if (!device)
4011                 return -EIO;
4012
4013         wait_event(device->misc_wait,
4014                    device->state.conn == C_WF_SYNC_UUID ||
4015                    device->state.conn == C_BEHIND ||
4016                    device->state.conn < C_CONNECTED ||
4017                    device->state.disk < D_NEGOTIATING);
4018
4019         /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4020
4021         /* Here the _drbd_uuid_ functions are right, current should
4022            _not_ be rotated into the history */
4023         if (get_ldev_if_state(device, D_NEGOTIATING)) {
4024                 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4025                 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4026
4027                 drbd_print_uuids(device, "updated sync uuid");
4028                 drbd_start_resync(device, C_SYNC_TARGET);
4029
4030                 put_ldev(device);
4031         } else
4032                 drbd_err(device, "Ignoring SyncUUID packet!\n");
4033
4034         return 0;
4035 }
4036
4037 /**
4038  * receive_bitmap_plain
4039  *
4040  * Return 0 when done, 1 when another iteration is needed, and a negative error
4041  * code upon failure.
4042  */
4043 static int
4044 receive_bitmap_plain(struct drbd_device *device, unsigned int size,
4045                      unsigned long *p, struct bm_xfer_ctx *c)
4046 {
4047         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4048                                  drbd_header_size(first_peer_device(device)->connection);
4049         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4050                                        c->bm_words - c->word_offset);
4051         unsigned int want = num_words * sizeof(*p);
4052         int err;
4053
4054         if (want != size) {
4055                 drbd_err(device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4056                 return -EIO;
4057         }
4058         if (want == 0)
4059                 return 0;
4060         err = drbd_recv_all(first_peer_device(device)->connection, p, want);
4061         if (err)
4062                 return err;
4063
4064         drbd_bm_merge_lel(device, c->word_offset, num_words, p);
4065
4066         c->word_offset += num_words;
4067         c->bit_offset = c->word_offset * BITS_PER_LONG;
4068         if (c->bit_offset > c->bm_bits)
4069                 c->bit_offset = c->bm_bits;
4070
4071         return 1;
4072 }
4073
4074 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4075 {
4076         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4077 }
4078
4079 static int dcbp_get_start(struct p_compressed_bm *p)
4080 {
4081         return (p->encoding & 0x80) != 0;
4082 }
4083
4084 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4085 {
4086         return (p->encoding >> 4) & 0x7;
4087 }
4088
4089 /**
4090  * recv_bm_rle_bits
4091  *
4092  * Return 0 when done, 1 when another iteration is needed, and a negative error
4093  * code upon failure.
4094  */
4095 static int
4096 recv_bm_rle_bits(struct drbd_device *device,
4097                 struct p_compressed_bm *p,
4098                  struct bm_xfer_ctx *c,
4099                  unsigned int len)
4100 {
4101         struct bitstream bs;
4102         u64 look_ahead;
4103         u64 rl;
4104         u64 tmp;
4105         unsigned long s = c->bit_offset;
4106         unsigned long e;
4107         int toggle = dcbp_get_start(p);
4108         int have;
4109         int bits;
4110
4111         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4112
4113         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4114         if (bits < 0)
4115                 return -EIO;
4116
4117         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4118                 bits = vli_decode_bits(&rl, look_ahead);
4119                 if (bits <= 0)
4120                         return -EIO;
4121
4122                 if (toggle) {
4123                         e = s + rl -1;
4124                         if (e >= c->bm_bits) {
4125                                 drbd_err(device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4126                                 return -EIO;
4127                         }
4128                         _drbd_bm_set_bits(device, s, e);
4129                 }
4130
4131                 if (have < bits) {
4132                         drbd_err(device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4133                                 have, bits, look_ahead,
4134                                 (unsigned int)(bs.cur.b - p->code),
4135                                 (unsigned int)bs.buf_len);
4136                         return -EIO;
4137                 }
4138                 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4139                 if (likely(bits < 64))
4140                         look_ahead >>= bits;
4141                 else
4142                         look_ahead = 0;
4143                 have -= bits;
4144
4145                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4146                 if (bits < 0)
4147                         return -EIO;
4148                 look_ahead |= tmp << have;
4149                 have += bits;
4150         }
4151
4152         c->bit_offset = s;
4153         bm_xfer_ctx_bit_to_word_offset(c);
4154
4155         return (s != c->bm_bits);
4156 }
4157
4158 /**
4159  * decode_bitmap_c
4160  *
4161  * Return 0 when done, 1 when another iteration is needed, and a negative error
4162  * code upon failure.
4163  */
4164 static int
4165 decode_bitmap_c(struct drbd_device *device,
4166                 struct p_compressed_bm *p,
4167                 struct bm_xfer_ctx *c,
4168                 unsigned int len)
4169 {
4170         if (dcbp_get_code(p) == RLE_VLI_Bits)
4171                 return recv_bm_rle_bits(device, p, c, len - sizeof(*p));
4172
4173         /* other variants had been implemented for evaluation,
4174          * but have been dropped as this one turned out to be "best"
4175          * during all our tests. */
4176
4177         drbd_err(device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4178         conn_request_state(first_peer_device(device)->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4179         return -EIO;
4180 }
4181
4182 void INFO_bm_xfer_stats(struct drbd_device *device,
4183                 const char *direction, struct bm_xfer_ctx *c)
4184 {
4185         /* what would it take to transfer it "plaintext" */
4186         unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4187         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4188         unsigned int plain =
4189                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4190                 c->bm_words * sizeof(unsigned long);
4191         unsigned int total = c->bytes[0] + c->bytes[1];
4192         unsigned int r;
4193
4194         /* total can not be zero. but just in case: */
4195         if (total == 0)
4196                 return;
4197
4198         /* don't report if not compressed */
4199         if (total >= plain)
4200                 return;
4201
4202         /* total < plain. check for overflow, still */
4203         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4204                                     : (1000 * total / plain);
4205
4206         if (r > 1000)
4207                 r = 1000;
4208
4209         r = 1000 - r;
4210         drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4211              "total %u; compression: %u.%u%%\n",
4212                         direction,
4213                         c->bytes[1], c->packets[1],
4214                         c->bytes[0], c->packets[0],
4215                         total, r/10, r % 10);
4216 }
4217
4218 /* Since we are processing the bitfield from lower addresses to higher,
4219    it does not matter if the process it in 32 bit chunks or 64 bit
4220    chunks as long as it is little endian. (Understand it as byte stream,
4221    beginning with the lowest byte...) If we would use big endian
4222    we would need to process it from the highest address to the lowest,
4223    in order to be agnostic to the 32 vs 64 bits issue.
4224
4225    returns 0 on failure, 1 if we successfully received it. */
4226 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4227 {
4228         struct drbd_device *device;
4229         struct bm_xfer_ctx c;
4230         int err;
4231
4232         device = vnr_to_device(connection, pi->vnr);
4233         if (!device)
4234                 return -EIO;
4235
4236         drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4237         /* you are supposed to send additional out-of-sync information
4238          * if you actually set bits during this phase */
4239
4240         c = (struct bm_xfer_ctx) {
4241                 .bm_bits = drbd_bm_bits(device),
4242                 .bm_words = drbd_bm_words(device),
4243         };
4244
4245         for(;;) {
4246                 if (pi->cmd == P_BITMAP)
4247                         err = receive_bitmap_plain(device, pi->size, pi->data, &c);
4248                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4249                         /* MAYBE: sanity check that we speak proto >= 90,
4250                          * and the feature is enabled! */
4251                         struct p_compressed_bm *p = pi->data;
4252
4253                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4254                                 drbd_err(device, "ReportCBitmap packet too large\n");
4255                                 err = -EIO;
4256                                 goto out;
4257                         }
4258                         if (pi->size <= sizeof(*p)) {
4259                                 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4260                                 err = -EIO;
4261                                 goto out;
4262                         }
4263                         err = drbd_recv_all(first_peer_device(device)->connection, p, pi->size);
4264                         if (err)
4265                                goto out;
4266                         err = decode_bitmap_c(device, p, &c, pi->size);
4267                 } else {
4268                         drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4269                         err = -EIO;
4270                         goto out;
4271                 }
4272
4273                 c.packets[pi->cmd == P_BITMAP]++;
4274                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4275
4276                 if (err <= 0) {
4277                         if (err < 0)
4278                                 goto out;
4279                         break;
4280                 }
4281                 err = drbd_recv_header(first_peer_device(device)->connection, pi);
4282                 if (err)
4283                         goto out;
4284         }
4285
4286         INFO_bm_xfer_stats(device, "receive", &c);
4287
4288         if (device->state.conn == C_WF_BITMAP_T) {
4289                 enum drbd_state_rv rv;
4290
4291                 err = drbd_send_bitmap(device);
4292                 if (err)
4293                         goto out;
4294                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4295                 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4296                 D_ASSERT(device, rv == SS_SUCCESS);
4297         } else if (device->state.conn != C_WF_BITMAP_S) {
4298                 /* admin may have requested C_DISCONNECTING,
4299                  * other threads may have noticed network errors */
4300                 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4301                     drbd_conn_str(device->state.conn));
4302         }
4303         err = 0;
4304
4305  out:
4306         drbd_bm_unlock(device);
4307         if (!err && device->state.conn == C_WF_BITMAP_S)
4308                 drbd_start_resync(device, C_SYNC_SOURCE);
4309         return err;
4310 }
4311
4312 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4313 {
4314         drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4315                  pi->cmd, pi->size);
4316
4317         return ignore_remaining_packet(connection, pi);
4318 }
4319
4320 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4321 {
4322         /* Make sure we've acked all the TCP data associated
4323          * with the data requests being unplugged */
4324         drbd_tcp_quickack(connection->data.socket);
4325
4326         return 0;
4327 }
4328
4329 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4330 {
4331         struct drbd_device *device;
4332         struct p_block_desc *p = pi->data;
4333
4334         device = vnr_to_device(connection, pi->vnr);
4335         if (!device)
4336                 return -EIO;
4337
4338         switch (device->state.conn) {
4339         case C_WF_SYNC_UUID:
4340         case C_WF_BITMAP_T:
4341         case C_BEHIND:
4342                         break;
4343         default:
4344                 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4345                                 drbd_conn_str(device->state.conn));
4346         }
4347
4348         drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4349
4350         return 0;
4351 }
4352
4353 struct data_cmd {
4354         int expect_payload;
4355         size_t pkt_size;
4356         int (*fn)(struct drbd_connection *, struct packet_info *);
4357 };
4358
4359 static struct data_cmd drbd_cmd_handler[] = {
4360         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4361         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4362         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4363         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4364         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4365         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4366         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4367         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4368         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4369         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4370         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4371         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4372         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4373         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4374         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4375         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4376         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4377         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4378         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4379         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4380         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4381         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4382         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4383         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4384 };
4385
4386 static void drbdd(struct drbd_connection *connection)
4387 {
4388         struct packet_info pi;
4389         size_t shs; /* sub header size */
4390         int err;
4391
4392         while (get_t_state(&connection->receiver) == RUNNING) {
4393                 struct data_cmd *cmd;
4394
4395                 drbd_thread_current_set_cpu(&connection->receiver);
4396                 if (drbd_recv_header(connection, &pi))
4397                         goto err_out;
4398
4399                 cmd = &drbd_cmd_handler[pi.cmd];
4400                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4401                         drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4402                                  cmdname(pi.cmd), pi.cmd);
4403                         goto err_out;
4404                 }
4405
4406                 shs = cmd->pkt_size;
4407                 if (pi.size > shs && !cmd->expect_payload) {
4408                         drbd_err(connection, "No payload expected %s l:%d\n",
4409                                  cmdname(pi.cmd), pi.size);
4410                         goto err_out;
4411                 }
4412
4413                 if (shs) {
4414                         err = drbd_recv_all_warn(connection, pi.data, shs);
4415                         if (err)
4416                                 goto err_out;
4417                         pi.size -= shs;
4418                 }
4419
4420                 err = cmd->fn(connection, &pi);
4421                 if (err) {
4422                         drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4423                                  cmdname(pi.cmd), err, pi.size);
4424                         goto err_out;
4425                 }
4426         }
4427         return;
4428
4429     err_out:
4430         conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4431 }
4432
4433 void conn_flush_workqueue(struct drbd_connection *connection)
4434 {
4435         struct drbd_wq_barrier barr;
4436
4437         barr.w.cb = w_prev_work_done;
4438         barr.w.connection = connection;
4439         init_completion(&barr.done);
4440         drbd_queue_work(&connection->sender_work, &barr.w);
4441         wait_for_completion(&barr.done);
4442 }
4443
4444 static void conn_disconnect(struct drbd_connection *connection)
4445 {
4446         struct drbd_peer_device *peer_device;
4447         enum drbd_conns oc;
4448         int vnr;
4449
4450         if (connection->cstate == C_STANDALONE)
4451                 return;
4452
4453         /* We are about to start the cleanup after connection loss.
4454          * Make sure drbd_make_request knows about that.
4455          * Usually we should be in some network failure state already,
4456          * but just in case we are not, we fix it up here.
4457          */
4458         conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4459
4460         /* asender does not clean up anything. it must not interfere, either */
4461         drbd_thread_stop(&connection->asender);
4462         drbd_free_sock(connection);
4463
4464         rcu_read_lock();
4465         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4466                 struct drbd_device *device = peer_device->device;
4467                 kref_get(&device->kref);
4468                 rcu_read_unlock();
4469                 drbd_disconnected(device);
4470                 kref_put(&device->kref, drbd_destroy_device);
4471                 rcu_read_lock();
4472         }
4473         rcu_read_unlock();
4474
4475         if (!list_empty(&connection->current_epoch->list))
4476                 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4477         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4478         atomic_set(&connection->current_epoch->epoch_size, 0);
4479         connection->send.seen_any_write_yet = false;
4480
4481         drbd_info(connection, "Connection closed\n");
4482
4483         if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4484                 conn_try_outdate_peer_async(connection);
4485
4486         spin_lock_irq(&connection->resource->req_lock);
4487         oc = connection->cstate;
4488         if (oc >= C_UNCONNECTED)
4489                 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4490
4491         spin_unlock_irq(&connection->resource->req_lock);
4492
4493         if (oc == C_DISCONNECTING)
4494                 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4495 }
4496
4497 static int drbd_disconnected(struct drbd_device *device)
4498 {
4499         unsigned int i;
4500
4501         /* wait for current activity to cease. */
4502         spin_lock_irq(&device->resource->req_lock);
4503         _drbd_wait_ee_list_empty(device, &device->active_ee);
4504         _drbd_wait_ee_list_empty(device, &device->sync_ee);
4505         _drbd_wait_ee_list_empty(device, &device->read_ee);
4506         spin_unlock_irq(&device->resource->req_lock);
4507
4508         /* We do not have data structures that would allow us to
4509          * get the rs_pending_cnt down to 0 again.
4510          *  * On C_SYNC_TARGET we do not have any data structures describing
4511          *    the pending RSDataRequest's we have sent.
4512          *  * On C_SYNC_SOURCE there is no data structure that tracks
4513          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4514          *  And no, it is not the sum of the reference counts in the
4515          *  resync_LRU. The resync_LRU tracks the whole operation including
4516          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4517          *  on the fly. */
4518         drbd_rs_cancel_all(device);
4519         device->rs_total = 0;
4520         device->rs_failed = 0;
4521         atomic_set(&device->rs_pending_cnt, 0);
4522         wake_up(&device->misc_wait);
4523
4524         del_timer_sync(&device->resync_timer);
4525         resync_timer_fn((unsigned long)device);
4526
4527         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4528          * w_make_resync_request etc. which may still be on the worker queue
4529          * to be "canceled" */
4530         drbd_flush_workqueue(device);
4531
4532         drbd_finish_peer_reqs(device);
4533
4534         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4535            might have issued a work again. The one before drbd_finish_peer_reqs() is
4536            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4537         drbd_flush_workqueue(device);
4538
4539         /* need to do it again, drbd_finish_peer_reqs() may have populated it
4540          * again via drbd_try_clear_on_disk_bm(). */
4541         drbd_rs_cancel_all(device);
4542
4543         kfree(device->p_uuid);
4544         device->p_uuid = NULL;
4545
4546         if (!drbd_suspended(device))
4547                 tl_clear(first_peer_device(device)->connection);
4548
4549         drbd_md_sync(device);
4550
4551         /* serialize with bitmap writeout triggered by the state change,
4552          * if any. */
4553         wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4554
4555         /* tcp_close and release of sendpage pages can be deferred.  I don't
4556          * want to use SO_LINGER, because apparently it can be deferred for
4557          * more than 20 seconds (longest time I checked).
4558          *
4559          * Actually we don't care for exactly when the network stack does its
4560          * put_page(), but release our reference on these pages right here.
4561          */
4562         i = drbd_free_peer_reqs(device, &device->net_ee);
4563         if (i)
4564                 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4565         i = atomic_read(&device->pp_in_use_by_net);
4566         if (i)
4567                 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4568         i = atomic_read(&device->pp_in_use);
4569         if (i)
4570                 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4571
4572         D_ASSERT(device, list_empty(&device->read_ee));
4573         D_ASSERT(device, list_empty(&device->active_ee));
4574         D_ASSERT(device, list_empty(&device->sync_ee));
4575         D_ASSERT(device, list_empty(&device->done_ee));
4576
4577         return 0;
4578 }
4579
4580 /*
4581  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4582  * we can agree on is stored in agreed_pro_version.
4583  *
4584  * feature flags and the reserved array should be enough room for future
4585  * enhancements of the handshake protocol, and possible plugins...
4586  *
4587  * for now, they are expected to be zero, but ignored.
4588  */
4589 static int drbd_send_features(struct drbd_connection *connection)
4590 {
4591         struct drbd_socket *sock;
4592         struct p_connection_features *p;
4593
4594         sock = &connection->data;
4595         p = conn_prepare_command(connection, sock);
4596         if (!p)
4597                 return -EIO;
4598         memset(p, 0, sizeof(*p));
4599         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4600         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4601         return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4602 }
4603
4604 /*
4605  * return values:
4606  *   1 yes, we have a valid connection
4607  *   0 oops, did not work out, please try again
4608  *  -1 peer talks different language,
4609  *     no point in trying again, please go standalone.
4610  */
4611 static int drbd_do_features(struct drbd_connection *connection)
4612 {
4613         /* ASSERT current == connection->receiver ... */
4614         struct p_connection_features *p;
4615         const int expect = sizeof(struct p_connection_features);
4616         struct packet_info pi;
4617         int err;
4618
4619         err = drbd_send_features(connection);
4620         if (err)
4621                 return 0;
4622
4623         err = drbd_recv_header(connection, &pi);
4624         if (err)
4625                 return 0;
4626
4627         if (pi.cmd != P_CONNECTION_FEATURES) {
4628                 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4629                          cmdname(pi.cmd), pi.cmd);
4630                 return -1;
4631         }
4632
4633         if (pi.size != expect) {
4634                 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4635                      expect, pi.size);
4636                 return -1;
4637         }
4638
4639         p = pi.data;
4640         err = drbd_recv_all_warn(connection, p, expect);
4641         if (err)
4642                 return 0;
4643
4644         p->protocol_min = be32_to_cpu(p->protocol_min);
4645         p->protocol_max = be32_to_cpu(p->protocol_max);
4646         if (p->protocol_max == 0)
4647                 p->protocol_max = p->protocol_min;
4648
4649         if (PRO_VERSION_MAX < p->protocol_min ||
4650             PRO_VERSION_MIN > p->protocol_max)
4651                 goto incompat;
4652
4653         connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4654
4655         drbd_info(connection, "Handshake successful: "
4656              "Agreed network protocol version %d\n", connection->agreed_pro_version);
4657
4658         return 1;
4659
4660  incompat:
4661         drbd_err(connection, "incompatible DRBD dialects: "
4662             "I support %d-%d, peer supports %d-%d\n",
4663             PRO_VERSION_MIN, PRO_VERSION_MAX,
4664             p->protocol_min, p->protocol_max);
4665         return -1;
4666 }
4667
4668 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4669 static int drbd_do_auth(struct drbd_connection *connection)
4670 {
4671         drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4672         drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4673         return -1;
4674 }
4675 #else
4676 #define CHALLENGE_LEN 64
4677
4678 /* Return value:
4679         1 - auth succeeded,
4680         0 - failed, try again (network error),
4681         -1 - auth failed, don't try again.
4682 */
4683
4684 static int drbd_do_auth(struct drbd_connection *connection)
4685 {
4686         struct drbd_socket *sock;
4687         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4688         struct scatterlist sg;
4689         char *response = NULL;
4690         char *right_response = NULL;
4691         char *peers_ch = NULL;
4692         unsigned int key_len;
4693         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4694         unsigned int resp_size;
4695         struct hash_desc desc;
4696         struct packet_info pi;
4697         struct net_conf *nc;
4698         int err, rv;
4699
4700         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4701
4702         rcu_read_lock();
4703         nc = rcu_dereference(connection->net_conf);
4704         key_len = strlen(nc->shared_secret);
4705         memcpy(secret, nc->shared_secret, key_len);
4706         rcu_read_unlock();
4707
4708         desc.tfm = connection->cram_hmac_tfm;
4709         desc.flags = 0;
4710
4711         rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4712         if (rv) {
4713                 drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
4714                 rv = -1;
4715                 goto fail;
4716         }
4717
4718         get_random_bytes(my_challenge, CHALLENGE_LEN);
4719
4720         sock = &connection->data;
4721         if (!conn_prepare_command(connection, sock)) {
4722                 rv = 0;
4723                 goto fail;
4724         }
4725         rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4726                                 my_challenge, CHALLENGE_LEN);
4727         if (!rv)
4728                 goto fail;
4729
4730         err = drbd_recv_header(connection, &pi);
4731         if (err) {
4732                 rv = 0;
4733                 goto fail;
4734         }
4735
4736         if (pi.cmd != P_AUTH_CHALLENGE) {
4737                 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4738                          cmdname(pi.cmd), pi.cmd);
4739                 rv = 0;
4740                 goto fail;
4741         }
4742
4743         if (pi.size > CHALLENGE_LEN * 2) {
4744                 drbd_err(connection, "expected AuthChallenge payload too big.\n");
4745                 rv = -1;
4746                 goto fail;
4747         }
4748
4749         peers_ch = kmalloc(pi.size, GFP_NOIO);
4750         if (peers_ch == NULL) {
4751                 drbd_err(connection, "kmalloc of peers_ch failed\n");
4752                 rv = -1;
4753                 goto fail;
4754         }
4755
4756         err = drbd_recv_all_warn(connection, peers_ch, pi.size);
4757         if (err) {
4758                 rv = 0;
4759                 goto fail;
4760         }
4761
4762         resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
4763         response = kmalloc(resp_size, GFP_NOIO);
4764         if (response == NULL) {
4765                 drbd_err(connection, "kmalloc of response failed\n");
4766                 rv = -1;
4767                 goto fail;
4768         }
4769
4770         sg_init_table(&sg, 1);
4771         sg_set_buf(&sg, peers_ch, pi.size);
4772
4773         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4774         if (rv) {
4775                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4776                 rv = -1;
4777                 goto fail;
4778         }
4779
4780         if (!conn_prepare_command(connection, sock)) {
4781                 rv = 0;
4782                 goto fail;
4783         }
4784         rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
4785                                 response, resp_size);
4786         if (!rv)
4787                 goto fail;
4788
4789         err = drbd_recv_header(connection, &pi);
4790         if (err) {
4791                 rv = 0;
4792                 goto fail;
4793         }
4794
4795         if (pi.cmd != P_AUTH_RESPONSE) {
4796                 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
4797                          cmdname(pi.cmd), pi.cmd);
4798                 rv = 0;
4799                 goto fail;
4800         }
4801
4802         if (pi.size != resp_size) {
4803                 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
4804                 rv = 0;
4805                 goto fail;
4806         }
4807
4808         err = drbd_recv_all_warn(connection, response , resp_size);
4809         if (err) {
4810                 rv = 0;
4811                 goto fail;
4812         }
4813
4814         right_response = kmalloc(resp_size, GFP_NOIO);
4815         if (right_response == NULL) {
4816                 drbd_err(connection, "kmalloc of right_response failed\n");
4817                 rv = -1;
4818                 goto fail;
4819         }
4820
4821         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4822
4823         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4824         if (rv) {
4825                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4826                 rv = -1;
4827                 goto fail;
4828         }
4829
4830         rv = !memcmp(response, right_response, resp_size);
4831
4832         if (rv)
4833                 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
4834                      resp_size);
4835         else
4836                 rv = -1;
4837
4838  fail:
4839         kfree(peers_ch);
4840         kfree(response);
4841         kfree(right_response);
4842
4843         return rv;
4844 }
4845 #endif
4846
4847 int drbdd_init(struct drbd_thread *thi)
4848 {
4849         struct drbd_connection *connection = thi->connection;
4850         int h;
4851
4852         drbd_info(connection, "receiver (re)started\n");
4853
4854         do {
4855                 h = conn_connect(connection);
4856                 if (h == 0) {
4857                         conn_disconnect(connection);
4858                         schedule_timeout_interruptible(HZ);
4859                 }
4860                 if (h == -1) {
4861                         drbd_warn(connection, "Discarding network configuration.\n");
4862                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
4863                 }
4864         } while (h == 0);
4865
4866         if (h > 0)
4867                 drbdd(connection);
4868
4869         conn_disconnect(connection);
4870
4871         drbd_info(connection, "receiver terminated\n");
4872         return 0;
4873 }
4874
4875 /* ********* acknowledge sender ******** */
4876
4877 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
4878 {
4879         struct p_req_state_reply *p = pi->data;
4880         int retcode = be32_to_cpu(p->retcode);
4881
4882         if (retcode >= SS_SUCCESS) {
4883                 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
4884         } else {
4885                 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
4886                 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
4887                          drbd_set_st_err_str(retcode), retcode);
4888         }
4889         wake_up(&connection->ping_wait);
4890
4891         return 0;
4892 }
4893
4894 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
4895 {
4896         struct drbd_device *device;
4897         struct p_req_state_reply *p = pi->data;
4898         int retcode = be32_to_cpu(p->retcode);
4899
4900         device = vnr_to_device(connection, pi->vnr);
4901         if (!device)
4902                 return -EIO;
4903
4904         if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
4905                 D_ASSERT(device, connection->agreed_pro_version < 100);
4906                 return got_conn_RqSReply(connection, pi);
4907         }
4908
4909         if (retcode >= SS_SUCCESS) {
4910                 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
4911         } else {
4912                 set_bit(CL_ST_CHG_FAIL, &device->flags);
4913                 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
4914                         drbd_set_st_err_str(retcode), retcode);
4915         }
4916         wake_up(&device->state_wait);
4917
4918         return 0;
4919 }
4920
4921 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
4922 {
4923         return drbd_send_ping_ack(connection);
4924
4925 }
4926
4927 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
4928 {
4929         /* restore idle timeout */
4930         connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
4931         if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
4932                 wake_up(&connection->ping_wait);
4933
4934         return 0;
4935 }
4936
4937 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
4938 {
4939         struct drbd_device *device;
4940         struct p_block_ack *p = pi->data;
4941         sector_t sector = be64_to_cpu(p->sector);
4942         int blksize = be32_to_cpu(p->blksize);
4943
4944         device = vnr_to_device(connection, pi->vnr);
4945         if (!device)
4946                 return -EIO;
4947
4948         D_ASSERT(device, first_peer_device(device)->connection->agreed_pro_version >= 89);
4949
4950         update_peer_seq(device, be32_to_cpu(p->seq_num));
4951
4952         if (get_ldev(device)) {
4953                 drbd_rs_complete_io(device, sector);
4954                 drbd_set_in_sync(device, sector, blksize);
4955                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4956                 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4957                 put_ldev(device);
4958         }
4959         dec_rs_pending(device);
4960         atomic_add(blksize >> 9, &device->rs_sect_in);
4961
4962         return 0;
4963 }
4964
4965 static int
4966 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
4967                               struct rb_root *root, const char *func,
4968                               enum drbd_req_event what, bool missing_ok)
4969 {
4970         struct drbd_request *req;
4971         struct bio_and_error m;
4972
4973         spin_lock_irq(&device->resource->req_lock);
4974         req = find_request(device, root, id, sector, missing_ok, func);
4975         if (unlikely(!req)) {
4976                 spin_unlock_irq(&device->resource->req_lock);
4977                 return -EIO;
4978         }
4979         __req_mod(req, what, &m);
4980         spin_unlock_irq(&device->resource->req_lock);
4981
4982         if (m.bio)
4983                 complete_master_bio(device, &m);
4984         return 0;
4985 }
4986
4987 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
4988 {
4989         struct drbd_device *device;
4990         struct p_block_ack *p = pi->data;
4991         sector_t sector = be64_to_cpu(p->sector);
4992         int blksize = be32_to_cpu(p->blksize);
4993         enum drbd_req_event what;
4994
4995         device = vnr_to_device(connection, pi->vnr);
4996         if (!device)
4997                 return -EIO;
4998
4999         update_peer_seq(device, be32_to_cpu(p->seq_num));
5000
5001         if (p->block_id == ID_SYNCER) {
5002                 drbd_set_in_sync(device, sector, blksize);
5003                 dec_rs_pending(device);
5004                 return 0;
5005         }
5006         switch (pi->cmd) {
5007         case P_RS_WRITE_ACK:
5008                 what = WRITE_ACKED_BY_PEER_AND_SIS;
5009                 break;
5010         case P_WRITE_ACK:
5011                 what = WRITE_ACKED_BY_PEER;
5012                 break;
5013         case P_RECV_ACK:
5014                 what = RECV_ACKED_BY_PEER;
5015                 break;
5016         case P_SUPERSEDED:
5017                 what = CONFLICT_RESOLVED;
5018                 break;
5019         case P_RETRY_WRITE:
5020                 what = POSTPONE_WRITE;
5021                 break;
5022         default:
5023                 BUG();
5024         }
5025
5026         return validate_req_change_req_state(device, p->block_id, sector,
5027                                              &device->write_requests, __func__,
5028                                              what, false);
5029 }
5030
5031 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5032 {
5033         struct drbd_device *device;
5034         struct p_block_ack *p = pi->data;
5035         sector_t sector = be64_to_cpu(p->sector);
5036         int size = be32_to_cpu(p->blksize);
5037         int err;
5038
5039         device = vnr_to_device(connection, pi->vnr);
5040         if (!device)
5041                 return -EIO;
5042
5043         update_peer_seq(device, be32_to_cpu(p->seq_num));
5044
5045         if (p->block_id == ID_SYNCER) {
5046                 dec_rs_pending(device);
5047                 drbd_rs_failed_io(device, sector, size);
5048                 return 0;
5049         }
5050
5051         err = validate_req_change_req_state(device, p->block_id, sector,
5052                                             &device->write_requests, __func__,
5053                                             NEG_ACKED, true);
5054         if (err) {
5055                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5056                    The master bio might already be completed, therefore the
5057                    request is no longer in the collision hash. */
5058                 /* In Protocol B we might already have got a P_RECV_ACK
5059                    but then get a P_NEG_ACK afterwards. */
5060                 drbd_set_out_of_sync(device, sector, size);
5061         }
5062         return 0;
5063 }
5064
5065 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5066 {
5067         struct drbd_device *device;
5068         struct p_block_ack *p = pi->data;
5069         sector_t sector = be64_to_cpu(p->sector);
5070
5071         device = vnr_to_device(connection, pi->vnr);
5072         if (!device)
5073                 return -EIO;
5074
5075         update_peer_seq(device, be32_to_cpu(p->seq_num));
5076
5077         drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5078             (unsigned long long)sector, be32_to_cpu(p->blksize));
5079
5080         return validate_req_change_req_state(device, p->block_id, sector,
5081                                              &device->read_requests, __func__,
5082                                              NEG_ACKED, false);
5083 }
5084
5085 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5086 {
5087         struct drbd_device *device;
5088         sector_t sector;
5089         int size;
5090         struct p_block_ack *p = pi->data;
5091
5092         device = vnr_to_device(connection, pi->vnr);
5093         if (!device)
5094                 return -EIO;
5095
5096         sector = be64_to_cpu(p->sector);
5097         size = be32_to_cpu(p->blksize);
5098
5099         update_peer_seq(device, be32_to_cpu(p->seq_num));
5100
5101         dec_rs_pending(device);
5102
5103         if (get_ldev_if_state(device, D_FAILED)) {
5104                 drbd_rs_complete_io(device, sector);
5105                 switch (pi->cmd) {
5106                 case P_NEG_RS_DREPLY:
5107                         drbd_rs_failed_io(device, sector, size);
5108                 case P_RS_CANCEL:
5109                         break;
5110                 default:
5111                         BUG();
5112                 }
5113                 put_ldev(device);
5114         }
5115
5116         return 0;
5117 }
5118
5119 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5120 {
5121         struct p_barrier_ack *p = pi->data;
5122         struct drbd_peer_device *peer_device;
5123         int vnr;
5124
5125         tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5126
5127         rcu_read_lock();
5128         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5129                 struct drbd_device *device = peer_device->device;
5130
5131                 if (device->state.conn == C_AHEAD &&
5132                     atomic_read(&device->ap_in_flight) == 0 &&
5133                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5134                         device->start_resync_timer.expires = jiffies + HZ;
5135                         add_timer(&device->start_resync_timer);
5136                 }
5137         }
5138         rcu_read_unlock();
5139
5140         return 0;
5141 }
5142
5143 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5144 {
5145         struct drbd_device *device;
5146         struct p_block_ack *p = pi->data;
5147         struct drbd_work *w;
5148         sector_t sector;
5149         int size;
5150
5151         device = vnr_to_device(connection, pi->vnr);
5152         if (!device)
5153                 return -EIO;
5154
5155         sector = be64_to_cpu(p->sector);
5156         size = be32_to_cpu(p->blksize);
5157
5158         update_peer_seq(device, be32_to_cpu(p->seq_num));
5159
5160         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5161                 drbd_ov_out_of_sync_found(device, sector, size);
5162         else
5163                 ov_out_of_sync_print(device);
5164
5165         if (!get_ldev(device))
5166                 return 0;
5167
5168         drbd_rs_complete_io(device, sector);
5169         dec_rs_pending(device);
5170
5171         --device->ov_left;
5172
5173         /* let's advance progress step marks only for every other megabyte */
5174         if ((device->ov_left & 0x200) == 0x200)
5175                 drbd_advance_rs_marks(device, device->ov_left);
5176
5177         if (device->ov_left == 0) {
5178                 w = kmalloc(sizeof(*w), GFP_NOIO);
5179                 if (w) {
5180                         w->cb = w_ov_finished;
5181                         w->device = device;
5182                         drbd_queue_work(&first_peer_device(device)->connection->sender_work, w);
5183                 } else {
5184                         drbd_err(device, "kmalloc(w) failed.");
5185                         ov_out_of_sync_print(device);
5186                         drbd_resync_finished(device);
5187                 }
5188         }
5189         put_ldev(device);
5190         return 0;
5191 }
5192
5193 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5194 {
5195         return 0;
5196 }
5197
5198 static int connection_finish_peer_reqs(struct drbd_connection *connection)
5199 {
5200         struct drbd_peer_device *peer_device;
5201         int vnr, not_empty = 0;
5202
5203         do {
5204                 clear_bit(SIGNAL_ASENDER, &connection->flags);
5205                 flush_signals(current);
5206
5207                 rcu_read_lock();
5208                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5209                         struct drbd_device *device = peer_device->device;
5210                         kref_get(&device->kref);
5211                         rcu_read_unlock();
5212                         if (drbd_finish_peer_reqs(device)) {
5213                                 kref_put(&device->kref, drbd_destroy_device);
5214                                 return 1;
5215                         }
5216                         kref_put(&device->kref, drbd_destroy_device);
5217                         rcu_read_lock();
5218                 }
5219                 set_bit(SIGNAL_ASENDER, &connection->flags);
5220
5221                 spin_lock_irq(&connection->resource->req_lock);
5222                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5223                         struct drbd_device *device = peer_device->device;
5224                         not_empty = !list_empty(&device->done_ee);
5225                         if (not_empty)
5226                                 break;
5227                 }
5228                 spin_unlock_irq(&connection->resource->req_lock);
5229                 rcu_read_unlock();
5230         } while (not_empty);
5231
5232         return 0;
5233 }
5234
5235 struct asender_cmd {
5236         size_t pkt_size;
5237         int (*fn)(struct drbd_connection *connection, struct packet_info *);
5238 };
5239
5240 static struct asender_cmd asender_tbl[] = {
5241         [P_PING]            = { 0, got_Ping },
5242         [P_PING_ACK]        = { 0, got_PingAck },
5243         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5244         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5245         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5246         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5247         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5248         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5249         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5250         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5251         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5252         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5253         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5254         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5255         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5256         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5257         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5258 };
5259
5260 int drbd_asender(struct drbd_thread *thi)
5261 {
5262         struct drbd_connection *connection = thi->connection;
5263         struct asender_cmd *cmd = NULL;
5264         struct packet_info pi;
5265         int rv;
5266         void *buf    = connection->meta.rbuf;
5267         int received = 0;
5268         unsigned int header_size = drbd_header_size(connection);
5269         int expect   = header_size;
5270         bool ping_timeout_active = false;
5271         struct net_conf *nc;
5272         int ping_timeo, tcp_cork, ping_int;
5273         struct sched_param param = { .sched_priority = 2 };
5274
5275         rv = sched_setscheduler(current, SCHED_RR, &param);
5276         if (rv < 0)
5277                 drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv);
5278
5279         while (get_t_state(thi) == RUNNING) {
5280                 drbd_thread_current_set_cpu(thi);
5281
5282                 rcu_read_lock();
5283                 nc = rcu_dereference(connection->net_conf);
5284                 ping_timeo = nc->ping_timeo;
5285                 tcp_cork = nc->tcp_cork;
5286                 ping_int = nc->ping_int;
5287                 rcu_read_unlock();
5288
5289                 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5290                         if (drbd_send_ping(connection)) {
5291                                 drbd_err(connection, "drbd_send_ping has failed\n");
5292                                 goto reconnect;
5293                         }
5294                         connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5295                         ping_timeout_active = true;
5296                 }
5297
5298                 /* TODO: conditionally cork; it may hurt latency if we cork without
5299                    much to send */
5300                 if (tcp_cork)
5301                         drbd_tcp_cork(connection->meta.socket);
5302                 if (connection_finish_peer_reqs(connection)) {
5303                         drbd_err(connection, "connection_finish_peer_reqs() failed\n");
5304                         goto reconnect;
5305                 }
5306                 /* but unconditionally uncork unless disabled */
5307                 if (tcp_cork)
5308                         drbd_tcp_uncork(connection->meta.socket);
5309
5310                 /* short circuit, recv_msg would return EINTR anyways. */
5311                 if (signal_pending(current))
5312                         continue;
5313
5314                 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5315                 clear_bit(SIGNAL_ASENDER, &connection->flags);
5316
5317                 flush_signals(current);
5318
5319                 /* Note:
5320                  * -EINTR        (on meta) we got a signal
5321                  * -EAGAIN       (on meta) rcvtimeo expired
5322                  * -ECONNRESET   other side closed the connection
5323                  * -ERESTARTSYS  (on data) we got a signal
5324                  * rv <  0       other than above: unexpected error!
5325                  * rv == expected: full header or command
5326                  * rv <  expected: "woken" by signal during receive
5327                  * rv == 0       : "connection shut down by peer"
5328                  */
5329                 if (likely(rv > 0)) {
5330                         received += rv;
5331                         buf      += rv;
5332                 } else if (rv == 0) {
5333                         if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5334                                 long t;
5335                                 rcu_read_lock();
5336                                 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5337                                 rcu_read_unlock();
5338
5339                                 t = wait_event_timeout(connection->ping_wait,
5340                                                        connection->cstate < C_WF_REPORT_PARAMS,
5341                                                        t);
5342                                 if (t)
5343                                         break;
5344                         }
5345                         drbd_err(connection, "meta connection shut down by peer.\n");
5346                         goto reconnect;
5347                 } else if (rv == -EAGAIN) {
5348                         /* If the data socket received something meanwhile,
5349                          * that is good enough: peer is still alive. */
5350                         if (time_after(connection->last_received,
5351                                 jiffies - connection->meta.socket->sk->sk_rcvtimeo))
5352                                 continue;
5353                         if (ping_timeout_active) {
5354                                 drbd_err(connection, "PingAck did not arrive in time.\n");
5355                                 goto reconnect;
5356                         }
5357                         set_bit(SEND_PING, &connection->flags);
5358                         continue;
5359                 } else if (rv == -EINTR) {
5360                         continue;
5361                 } else {
5362                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5363                         goto reconnect;
5364                 }
5365
5366                 if (received == expect && cmd == NULL) {
5367                         if (decode_header(connection, connection->meta.rbuf, &pi))
5368                                 goto reconnect;
5369                         cmd = &asender_tbl[pi.cmd];
5370                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5371                                 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5372                                          cmdname(pi.cmd), pi.cmd);
5373                                 goto disconnect;
5374                         }
5375                         expect = header_size + cmd->pkt_size;
5376                         if (pi.size != expect - header_size) {
5377                                 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5378                                         pi.cmd, pi.size);
5379                                 goto reconnect;
5380                         }
5381                 }
5382                 if (received == expect) {
5383                         bool err;
5384
5385                         err = cmd->fn(connection, &pi);
5386                         if (err) {
5387                                 drbd_err(connection, "%pf failed\n", cmd->fn);
5388                                 goto reconnect;
5389                         }
5390
5391                         connection->last_received = jiffies;
5392
5393                         if (cmd == &asender_tbl[P_PING_ACK]) {
5394                                 /* restore idle timeout */
5395                                 connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5396                                 ping_timeout_active = false;
5397                         }
5398
5399                         buf      = connection->meta.rbuf;
5400                         received = 0;
5401                         expect   = header_size;
5402                         cmd      = NULL;
5403                 }
5404         }
5405
5406         if (0) {
5407 reconnect:
5408                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5409                 conn_md_sync(connection);
5410         }
5411         if (0) {
5412 disconnect:
5413                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5414         }
5415         clear_bit(SIGNAL_ASENDER, &connection->flags);
5416
5417         drbd_info(connection, "asender terminated\n");
5418
5419         return 0;
5420 }