Merge tag 'gpio-v4.8-1' of git://git.kernel.org/pub/scm/linux/kernel/git/linusw/linux...
[cascardo/linux.git] / drivers / staging / lustre / lnet / selftest / framework.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/selftest/framework.c
33  *
34  * Author: Isaac Huang <isaac@clusterfs.com>
35  * Author: Liang Zhen  <liangzhen@clusterfs.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_LNET
39
40 #include "selftest.h"
41
42 lst_sid_t LST_INVALID_SID = {LNET_NID_ANY, -1};
43
44 static int session_timeout = 100;
45 module_param(session_timeout, int, 0444);
46 MODULE_PARM_DESC(session_timeout, "test session timeout in seconds (100 by default, 0 == never)");
47
48 static int rpc_timeout = 64;
49 module_param(rpc_timeout, int, 0644);
50 MODULE_PARM_DESC(rpc_timeout, "rpc timeout in seconds (64 by default, 0 == never)");
51
52 #define sfw_unpack_id(id)               \
53 do {                                    \
54         __swab64s(&(id).nid);           \
55         __swab32s(&(id).pid);           \
56 } while (0)
57
58 #define sfw_unpack_sid(sid)             \
59 do {                                    \
60         __swab64s(&(sid).ses_nid);      \
61         __swab64s(&(sid).ses_stamp);    \
62 } while (0)
63
64 #define sfw_unpack_fw_counters(fc)        \
65 do {                                      \
66         __swab32s(&(fc).running_ms);      \
67         __swab32s(&(fc).active_batches);  \
68         __swab32s(&(fc).zombie_sessions); \
69         __swab32s(&(fc).brw_errors);      \
70         __swab32s(&(fc).ping_errors);     \
71 } while (0)
72
73 #define sfw_unpack_rpc_counters(rc)     \
74 do {                                    \
75         __swab32s(&(rc).errors);        \
76         __swab32s(&(rc).rpcs_sent);     \
77         __swab32s(&(rc).rpcs_rcvd);     \
78         __swab32s(&(rc).rpcs_dropped);  \
79         __swab32s(&(rc).rpcs_expired);  \
80         __swab64s(&(rc).bulk_get);      \
81         __swab64s(&(rc).bulk_put);      \
82 } while (0)
83
84 #define sfw_unpack_lnet_counters(lc)    \
85 do {                                    \
86         __swab32s(&(lc).errors);        \
87         __swab32s(&(lc).msgs_max);      \
88         __swab32s(&(lc).msgs_alloc);    \
89         __swab32s(&(lc).send_count);    \
90         __swab32s(&(lc).recv_count);    \
91         __swab32s(&(lc).drop_count);    \
92         __swab32s(&(lc).route_count);   \
93         __swab64s(&(lc).send_length);   \
94         __swab64s(&(lc).recv_length);   \
95         __swab64s(&(lc).drop_length);   \
96         __swab64s(&(lc).route_length);  \
97 } while (0)
98
99 #define sfw_test_active(t)      (atomic_read(&(t)->tsi_nactive))
100 #define sfw_batch_active(b)     (atomic_read(&(b)->bat_nactive))
101
102 static struct smoketest_framework {
103         struct list_head  fw_zombie_rpcs;     /* RPCs to be recycled */
104         struct list_head  fw_zombie_sessions; /* stopping sessions */
105         struct list_head  fw_tests;           /* registered test cases */
106         atomic_t          fw_nzombies;        /* # zombie sessions */
107         spinlock_t        fw_lock;            /* serialise */
108         struct sfw_session        *fw_session;        /* _the_ session */
109         int               fw_shuttingdown;    /* shutdown in progress */
110         struct srpc_server_rpc *fw_active_srpc;/* running RPC */
111 } sfw_data;
112
113 /* forward ref's */
114 int sfw_stop_batch(struct sfw_batch *tsb, int force);
115 void sfw_destroy_session(struct sfw_session *sn);
116
117 static inline struct sfw_test_case *
118 sfw_find_test_case(int id)
119 {
120         struct sfw_test_case *tsc;
121
122         LASSERT(id <= SRPC_SERVICE_MAX_ID);
123         LASSERT(id > SRPC_FRAMEWORK_SERVICE_MAX_ID);
124
125         list_for_each_entry(tsc, &sfw_data.fw_tests, tsc_list) {
126                 if (tsc->tsc_srv_service->sv_id == id)
127                         return tsc;
128         }
129
130         return NULL;
131 }
132
133 static int
134 sfw_register_test(struct srpc_service *service, struct sfw_test_client_ops *cliops)
135 {
136         struct sfw_test_case *tsc;
137
138         if (sfw_find_test_case(service->sv_id)) {
139                 CERROR("Failed to register test %s (%d)\n",
140                        service->sv_name, service->sv_id);
141                 return -EEXIST;
142         }
143
144         LIBCFS_ALLOC(tsc, sizeof(struct sfw_test_case));
145         if (!tsc)
146                 return -ENOMEM;
147
148         tsc->tsc_cli_ops = cliops;
149         tsc->tsc_srv_service = service;
150
151         list_add_tail(&tsc->tsc_list, &sfw_data.fw_tests);
152         return 0;
153 }
154
155 static void
156 sfw_add_session_timer(void)
157 {
158         struct sfw_session *sn = sfw_data.fw_session;
159         struct stt_timer *timer = &sn->sn_timer;
160
161         LASSERT(!sfw_data.fw_shuttingdown);
162
163         if (!sn || !sn->sn_timeout)
164                 return;
165
166         LASSERT(!sn->sn_timer_active);
167
168         sn->sn_timer_active = 1;
169         timer->stt_expires = ktime_get_real_seconds() + sn->sn_timeout;
170         stt_add_timer(timer);
171 }
172
173 static int
174 sfw_del_session_timer(void)
175 {
176         struct sfw_session *sn = sfw_data.fw_session;
177
178         if (!sn || !sn->sn_timer_active)
179                 return 0;
180
181         LASSERT(sn->sn_timeout);
182
183         if (stt_del_timer(&sn->sn_timer)) { /* timer defused */
184                 sn->sn_timer_active = 0;
185                 return 0;
186         }
187
188         return EBUSY; /* racing with sfw_session_expired() */
189 }
190
191 static void
192 sfw_deactivate_session(void)
193 __must_hold(&sfw_data.fw_lock)
194 {
195         struct sfw_session *sn = sfw_data.fw_session;
196         int nactive = 0;
197         struct sfw_batch *tsb;
198         struct sfw_test_case *tsc;
199
200         if (!sn)
201                 return;
202
203         LASSERT(!sn->sn_timer_active);
204
205         sfw_data.fw_session = NULL;
206         atomic_inc(&sfw_data.fw_nzombies);
207         list_add(&sn->sn_list, &sfw_data.fw_zombie_sessions);
208
209         spin_unlock(&sfw_data.fw_lock);
210
211         list_for_each_entry(tsc, &sfw_data.fw_tests, tsc_list) {
212                 srpc_abort_service(tsc->tsc_srv_service);
213         }
214
215         spin_lock(&sfw_data.fw_lock);
216
217         list_for_each_entry(tsb, &sn->sn_batches, bat_list) {
218                 if (sfw_batch_active(tsb)) {
219                         nactive++;
220                         sfw_stop_batch(tsb, 1);
221                 }
222         }
223
224         if (nactive)
225                 return; /* wait for active batches to stop */
226
227         list_del_init(&sn->sn_list);
228         spin_unlock(&sfw_data.fw_lock);
229
230         sfw_destroy_session(sn);
231
232         spin_lock(&sfw_data.fw_lock);
233 }
234
235 static void
236 sfw_session_expired(void *data)
237 {
238         struct sfw_session *sn = data;
239
240         spin_lock(&sfw_data.fw_lock);
241
242         LASSERT(sn->sn_timer_active);
243         LASSERT(sn == sfw_data.fw_session);
244
245         CWARN("Session expired! sid: %s-%llu, name: %s\n",
246               libcfs_nid2str(sn->sn_id.ses_nid),
247               sn->sn_id.ses_stamp, &sn->sn_name[0]);
248
249         sn->sn_timer_active = 0;
250         sfw_deactivate_session();
251
252         spin_unlock(&sfw_data.fw_lock);
253 }
254
255 static inline void
256 sfw_init_session(struct sfw_session *sn, lst_sid_t sid,
257                  unsigned features, const char *name)
258 {
259         struct stt_timer *timer = &sn->sn_timer;
260
261         memset(sn, 0, sizeof(struct sfw_session));
262         INIT_LIST_HEAD(&sn->sn_list);
263         INIT_LIST_HEAD(&sn->sn_batches);
264         atomic_set(&sn->sn_refcount, 1);        /* +1 for caller */
265         atomic_set(&sn->sn_brw_errors, 0);
266         atomic_set(&sn->sn_ping_errors, 0);
267         strlcpy(&sn->sn_name[0], name, sizeof(sn->sn_name));
268
269         sn->sn_timer_active = 0;
270         sn->sn_id = sid;
271         sn->sn_features = features;
272         sn->sn_timeout = session_timeout;
273         sn->sn_started = cfs_time_current();
274
275         timer->stt_data = sn;
276         timer->stt_func = sfw_session_expired;
277         INIT_LIST_HEAD(&timer->stt_list);
278 }
279
280 /* completion handler for incoming framework RPCs */
281 static void
282 sfw_server_rpc_done(struct srpc_server_rpc *rpc)
283 {
284         struct srpc_service *sv = rpc->srpc_scd->scd_svc;
285         int status = rpc->srpc_status;
286
287         CDEBUG(D_NET, "Incoming framework RPC done: service %s, peer %s, status %s:%d\n",
288                sv->sv_name, libcfs_id2str(rpc->srpc_peer),
289                swi_state2str(rpc->srpc_wi.swi_state),
290                status);
291
292         if (rpc->srpc_bulk)
293                 sfw_free_pages(rpc);
294 }
295
296 static void
297 sfw_client_rpc_fini(struct srpc_client_rpc *rpc)
298 {
299         LASSERT(!rpc->crpc_bulk.bk_niov);
300         LASSERT(list_empty(&rpc->crpc_list));
301         LASSERT(!atomic_read(&rpc->crpc_refcount));
302
303         CDEBUG(D_NET, "Outgoing framework RPC done: service %d, peer %s, status %s:%d:%d\n",
304                rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
305                swi_state2str(rpc->crpc_wi.swi_state),
306                rpc->crpc_aborted, rpc->crpc_status);
307
308         spin_lock(&sfw_data.fw_lock);
309
310         /* my callers must finish all RPCs before shutting me down */
311         LASSERT(!sfw_data.fw_shuttingdown);
312         list_add(&rpc->crpc_list, &sfw_data.fw_zombie_rpcs);
313
314         spin_unlock(&sfw_data.fw_lock);
315 }
316
317 static struct sfw_batch *
318 sfw_find_batch(lst_bid_t bid)
319 {
320         struct sfw_session *sn = sfw_data.fw_session;
321         struct sfw_batch *bat;
322
323         LASSERT(sn);
324
325         list_for_each_entry(bat, &sn->sn_batches, bat_list) {
326                 if (bat->bat_id.bat_id == bid.bat_id)
327                         return bat;
328         }
329
330         return NULL;
331 }
332
333 static struct sfw_batch *
334 sfw_bid2batch(lst_bid_t bid)
335 {
336         struct sfw_session *sn = sfw_data.fw_session;
337         struct sfw_batch *bat;
338
339         LASSERT(sn);
340
341         bat = sfw_find_batch(bid);
342         if (bat)
343                 return bat;
344
345         LIBCFS_ALLOC(bat, sizeof(struct sfw_batch));
346         if (!bat)
347                 return NULL;
348
349         bat->bat_error = 0;
350         bat->bat_session = sn;
351         bat->bat_id = bid;
352         atomic_set(&bat->bat_nactive, 0);
353         INIT_LIST_HEAD(&bat->bat_tests);
354
355         list_add_tail(&bat->bat_list, &sn->sn_batches);
356         return bat;
357 }
358
359 static int
360 sfw_get_stats(struct srpc_stat_reqst *request, struct srpc_stat_reply *reply)
361 {
362         struct sfw_session *sn = sfw_data.fw_session;
363         sfw_counters_t *cnt = &reply->str_fw;
364         struct sfw_batch *bat;
365
366         reply->str_sid = !sn ? LST_INVALID_SID : sn->sn_id;
367
368         if (request->str_sid.ses_nid == LNET_NID_ANY) {
369                 reply->str_status = EINVAL;
370                 return 0;
371         }
372
373         if (!sn || !sfw_sid_equal(request->str_sid, sn->sn_id)) {
374                 reply->str_status = ESRCH;
375                 return 0;
376         }
377
378         lnet_counters_get(&reply->str_lnet);
379         srpc_get_counters(&reply->str_rpc);
380
381         /*
382          * send over the msecs since the session was started
383          * with 32 bits to send, this is ~49 days
384          */
385         cnt->running_ms = jiffies_to_msecs(jiffies - sn->sn_started);
386         cnt->brw_errors = atomic_read(&sn->sn_brw_errors);
387         cnt->ping_errors = atomic_read(&sn->sn_ping_errors);
388         cnt->zombie_sessions = atomic_read(&sfw_data.fw_nzombies);
389
390         cnt->active_batches = 0;
391         list_for_each_entry(bat, &sn->sn_batches, bat_list) {
392                 if (atomic_read(&bat->bat_nactive) > 0)
393                         cnt->active_batches++;
394         }
395
396         reply->str_status = 0;
397         return 0;
398 }
399
400 int
401 sfw_make_session(struct srpc_mksn_reqst *request, struct srpc_mksn_reply *reply)
402 {
403         struct sfw_session *sn = sfw_data.fw_session;
404         struct srpc_msg *msg = container_of(request, struct srpc_msg,
405                                        msg_body.mksn_reqst);
406         int cplen = 0;
407
408         if (request->mksn_sid.ses_nid == LNET_NID_ANY) {
409                 reply->mksn_sid = !sn ? LST_INVALID_SID : sn->sn_id;
410                 reply->mksn_status = EINVAL;
411                 return 0;
412         }
413
414         if (sn) {
415                 reply->mksn_status = 0;
416                 reply->mksn_sid = sn->sn_id;
417                 reply->mksn_timeout = sn->sn_timeout;
418
419                 if (sfw_sid_equal(request->mksn_sid, sn->sn_id)) {
420                         atomic_inc(&sn->sn_refcount);
421                         return 0;
422                 }
423
424                 if (!request->mksn_force) {
425                         reply->mksn_status = EBUSY;
426                         cplen = strlcpy(&reply->mksn_name[0], &sn->sn_name[0],
427                                         sizeof(reply->mksn_name));
428                         if (cplen >= sizeof(reply->mksn_name))
429                                 return -E2BIG;
430                         return 0;
431                 }
432         }
433
434         /*
435          * reject the request if it requires unknown features
436          * NB: old version will always accept all features because it's not
437          * aware of srpc_msg::msg_ses_feats, it's a defect but it's also
438          * harmless because it will return zero feature to console, and it's
439          * console's responsibility to make sure all nodes in a session have
440          * same feature mask.
441          */
442         if (msg->msg_ses_feats & ~LST_FEATS_MASK) {
443                 reply->mksn_status = EPROTO;
444                 return 0;
445         }
446
447         /* brand new or create by force */
448         LIBCFS_ALLOC(sn, sizeof(struct sfw_session));
449         if (!sn) {
450                 CERROR("dropping RPC mksn under memory pressure\n");
451                 return -ENOMEM;
452         }
453
454         sfw_init_session(sn, request->mksn_sid,
455                          msg->msg_ses_feats, &request->mksn_name[0]);
456
457         spin_lock(&sfw_data.fw_lock);
458
459         sfw_deactivate_session();
460         LASSERT(!sfw_data.fw_session);
461         sfw_data.fw_session = sn;
462
463         spin_unlock(&sfw_data.fw_lock);
464
465         reply->mksn_status = 0;
466         reply->mksn_sid = sn->sn_id;
467         reply->mksn_timeout = sn->sn_timeout;
468         return 0;
469 }
470
471 static int
472 sfw_remove_session(struct srpc_rmsn_reqst *request, struct srpc_rmsn_reply *reply)
473 {
474         struct sfw_session *sn = sfw_data.fw_session;
475
476         reply->rmsn_sid = !sn ? LST_INVALID_SID : sn->sn_id;
477
478         if (request->rmsn_sid.ses_nid == LNET_NID_ANY) {
479                 reply->rmsn_status = EINVAL;
480                 return 0;
481         }
482
483         if (!sn || !sfw_sid_equal(request->rmsn_sid, sn->sn_id)) {
484                 reply->rmsn_status = !sn ? ESRCH : EBUSY;
485                 return 0;
486         }
487
488         if (!atomic_dec_and_test(&sn->sn_refcount)) {
489                 reply->rmsn_status = 0;
490                 return 0;
491         }
492
493         spin_lock(&sfw_data.fw_lock);
494         sfw_deactivate_session();
495         spin_unlock(&sfw_data.fw_lock);
496
497         reply->rmsn_status = 0;
498         reply->rmsn_sid = LST_INVALID_SID;
499         LASSERT(!sfw_data.fw_session);
500         return 0;
501 }
502
503 static int
504 sfw_debug_session(struct srpc_debug_reqst *request, struct srpc_debug_reply *reply)
505 {
506         struct sfw_session *sn = sfw_data.fw_session;
507
508         if (!sn) {
509                 reply->dbg_status = ESRCH;
510                 reply->dbg_sid = LST_INVALID_SID;
511                 return 0;
512         }
513
514         reply->dbg_status = 0;
515         reply->dbg_sid = sn->sn_id;
516         reply->dbg_timeout = sn->sn_timeout;
517         if (strlcpy(reply->dbg_name, &sn->sn_name[0], sizeof(reply->dbg_name))
518             >= sizeof(reply->dbg_name))
519                 return -E2BIG;
520
521         return 0;
522 }
523
524 static void
525 sfw_test_rpc_fini(struct srpc_client_rpc *rpc)
526 {
527         struct sfw_test_unit *tsu = rpc->crpc_priv;
528         struct sfw_test_instance *tsi = tsu->tsu_instance;
529
530         /* Called with hold of tsi->tsi_lock */
531         LASSERT(list_empty(&rpc->crpc_list));
532         list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
533 }
534
535 static inline int
536 sfw_test_buffers(struct sfw_test_instance *tsi)
537 {
538         struct sfw_test_case *tsc;
539         struct srpc_service *svc;
540         int nbuf;
541
542         LASSERT(tsi);
543         tsc = sfw_find_test_case(tsi->tsi_service);
544         LASSERT(tsc);
545         svc = tsc->tsc_srv_service;
546         LASSERT(svc);
547
548         nbuf = min(svc->sv_wi_total, tsi->tsi_loop) / svc->sv_ncpts;
549         return max(SFW_TEST_WI_MIN, nbuf + SFW_TEST_WI_EXTRA);
550 }
551
552 static int
553 sfw_load_test(struct sfw_test_instance *tsi)
554 {
555         struct sfw_test_case *tsc;
556         struct srpc_service *svc;
557         int nbuf;
558         int rc;
559
560         LASSERT(tsi);
561         tsc = sfw_find_test_case(tsi->tsi_service);
562         nbuf = sfw_test_buffers(tsi);
563         LASSERT(tsc);
564         svc = tsc->tsc_srv_service;
565
566         if (tsi->tsi_is_client) {
567                 tsi->tsi_ops = tsc->tsc_cli_ops;
568                 return 0;
569         }
570
571         rc = srpc_service_add_buffers(svc, nbuf);
572         if (rc) {
573                 CWARN("Failed to reserve enough buffers: service %s, %d needed: %d\n",
574                       svc->sv_name, nbuf, rc);
575                 /*
576                  * NB: this error handler is not strictly correct, because
577                  * it may release more buffers than already allocated,
578                  * but it doesn't matter because request portal should
579                  * be lazy portal and will grow buffers if necessary.
580                  */
581                 srpc_service_remove_buffers(svc, nbuf);
582                 return -ENOMEM;
583         }
584
585         CDEBUG(D_NET, "Reserved %d buffers for test %s\n",
586                nbuf * (srpc_serv_is_framework(svc) ?
587                        2 : cfs_cpt_number(cfs_cpt_table)), svc->sv_name);
588         return 0;
589 }
590
591 static void
592 sfw_unload_test(struct sfw_test_instance *tsi)
593 {
594         struct sfw_test_case *tsc;
595
596         LASSERT(tsi);
597         tsc = sfw_find_test_case(tsi->tsi_service);
598         LASSERT(tsc);
599
600         if (tsi->tsi_is_client)
601                 return;
602
603         /*
604          * shrink buffers, because request portal is lazy portal
605          * which can grow buffers at runtime so we may leave
606          * some buffers behind, but never mind...
607          */
608         srpc_service_remove_buffers(tsc->tsc_srv_service,
609                                     sfw_test_buffers(tsi));
610 }
611
612 static void
613 sfw_destroy_test_instance(struct sfw_test_instance *tsi)
614 {
615         struct srpc_client_rpc *rpc;
616         struct sfw_test_unit *tsu;
617
618         if (!tsi->tsi_is_client)
619                 goto clean;
620
621         tsi->tsi_ops->tso_fini(tsi);
622
623         LASSERT(!tsi->tsi_stopping);
624         LASSERT(list_empty(&tsi->tsi_active_rpcs));
625         LASSERT(!sfw_test_active(tsi));
626
627         while (!list_empty(&tsi->tsi_units)) {
628                 tsu = list_entry(tsi->tsi_units.next,
629                                  struct sfw_test_unit, tsu_list);
630                 list_del(&tsu->tsu_list);
631                 LIBCFS_FREE(tsu, sizeof(*tsu));
632         }
633
634         while (!list_empty(&tsi->tsi_free_rpcs)) {
635                 rpc = list_entry(tsi->tsi_free_rpcs.next,
636                                  struct srpc_client_rpc, crpc_list);
637                 list_del(&rpc->crpc_list);
638                 LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc));
639         }
640
641 clean:
642         sfw_unload_test(tsi);
643         LIBCFS_FREE(tsi, sizeof(*tsi));
644 }
645
646 static void
647 sfw_destroy_batch(struct sfw_batch *tsb)
648 {
649         struct sfw_test_instance *tsi;
650
651         LASSERT(!sfw_batch_active(tsb));
652         LASSERT(list_empty(&tsb->bat_list));
653
654         while (!list_empty(&tsb->bat_tests)) {
655                 tsi = list_entry(tsb->bat_tests.next,
656                                  struct sfw_test_instance, tsi_list);
657                 list_del_init(&tsi->tsi_list);
658                 sfw_destroy_test_instance(tsi);
659         }
660
661         LIBCFS_FREE(tsb, sizeof(struct sfw_batch));
662 }
663
664 void
665 sfw_destroy_session(struct sfw_session *sn)
666 {
667         struct sfw_batch *batch;
668
669         LASSERT(list_empty(&sn->sn_list));
670         LASSERT(sn != sfw_data.fw_session);
671
672         while (!list_empty(&sn->sn_batches)) {
673                 batch = list_entry(sn->sn_batches.next,
674                                    struct sfw_batch, bat_list);
675                 list_del_init(&batch->bat_list);
676                 sfw_destroy_batch(batch);
677         }
678
679         LIBCFS_FREE(sn, sizeof(*sn));
680         atomic_dec(&sfw_data.fw_nzombies);
681 }
682
683 static void
684 sfw_unpack_addtest_req(struct srpc_msg *msg)
685 {
686         struct srpc_test_reqst *req = &msg->msg_body.tes_reqst;
687
688         LASSERT(msg->msg_type == SRPC_MSG_TEST_REQST);
689         LASSERT(req->tsr_is_client);
690
691         if (msg->msg_magic == SRPC_MSG_MAGIC)
692                 return; /* no flipping needed */
693
694         LASSERT(msg->msg_magic == __swab32(SRPC_MSG_MAGIC));
695
696         if (req->tsr_service == SRPC_SERVICE_BRW) {
697                 if (!(msg->msg_ses_feats & LST_FEAT_BULK_LEN)) {
698                         struct test_bulk_req *bulk = &req->tsr_u.bulk_v0;
699
700                         __swab32s(&bulk->blk_opc);
701                         __swab32s(&bulk->blk_npg);
702                         __swab32s(&bulk->blk_flags);
703
704                 } else {
705                         struct test_bulk_req_v1 *bulk = &req->tsr_u.bulk_v1;
706
707                         __swab16s(&bulk->blk_opc);
708                         __swab16s(&bulk->blk_flags);
709                         __swab32s(&bulk->blk_offset);
710                         __swab32s(&bulk->blk_len);
711                 }
712
713                 return;
714         }
715
716         if (req->tsr_service == SRPC_SERVICE_PING) {
717                 struct test_ping_req *ping = &req->tsr_u.ping;
718
719                 __swab32s(&ping->png_size);
720                 __swab32s(&ping->png_flags);
721                 return;
722         }
723
724         LBUG();
725 }
726
727 static int
728 sfw_add_test_instance(struct sfw_batch *tsb, struct srpc_server_rpc *rpc)
729 {
730         struct srpc_msg *msg = &rpc->srpc_reqstbuf->buf_msg;
731         struct srpc_test_reqst *req = &msg->msg_body.tes_reqst;
732         struct srpc_bulk *bk = rpc->srpc_bulk;
733         int ndest = req->tsr_ndest;
734         struct sfw_test_unit *tsu;
735         struct sfw_test_instance *tsi;
736         int i;
737         int rc;
738
739         LIBCFS_ALLOC(tsi, sizeof(*tsi));
740         if (!tsi) {
741                 CERROR("Can't allocate test instance for batch: %llu\n",
742                        tsb->bat_id.bat_id);
743                 return -ENOMEM;
744         }
745
746         spin_lock_init(&tsi->tsi_lock);
747         atomic_set(&tsi->tsi_nactive, 0);
748         INIT_LIST_HEAD(&tsi->tsi_units);
749         INIT_LIST_HEAD(&tsi->tsi_free_rpcs);
750         INIT_LIST_HEAD(&tsi->tsi_active_rpcs);
751
752         tsi->tsi_stopping = 0;
753         tsi->tsi_batch = tsb;
754         tsi->tsi_loop = req->tsr_loop;
755         tsi->tsi_concur = req->tsr_concur;
756         tsi->tsi_service = req->tsr_service;
757         tsi->tsi_is_client = !!(req->tsr_is_client);
758         tsi->tsi_stoptsu_onerr = !!(req->tsr_stop_onerr);
759
760         rc = sfw_load_test(tsi);
761         if (rc) {
762                 LIBCFS_FREE(tsi, sizeof(*tsi));
763                 return rc;
764         }
765
766         LASSERT(!sfw_batch_active(tsb));
767
768         if (!tsi->tsi_is_client) {
769                 /* it's test server, just add it to tsb */
770                 list_add_tail(&tsi->tsi_list, &tsb->bat_tests);
771                 return 0;
772         }
773
774         LASSERT(bk);
775         LASSERT(bk->bk_niov * SFW_ID_PER_PAGE >= (unsigned int)ndest);
776         LASSERT((unsigned int)bk->bk_len >=
777                 sizeof(lnet_process_id_packed_t) * ndest);
778
779         sfw_unpack_addtest_req(msg);
780         memcpy(&tsi->tsi_u, &req->tsr_u, sizeof(tsi->tsi_u));
781
782         for (i = 0; i < ndest; i++) {
783                 lnet_process_id_packed_t *dests;
784                 lnet_process_id_packed_t id;
785                 int j;
786
787                 dests = page_address(bk->bk_iovs[i / SFW_ID_PER_PAGE].kiov_page);
788                 LASSERT(dests);         /* my pages are within KVM always */
789                 id = dests[i % SFW_ID_PER_PAGE];
790                 if (msg->msg_magic != SRPC_MSG_MAGIC)
791                         sfw_unpack_id(id);
792
793                 for (j = 0; j < tsi->tsi_concur; j++) {
794                         LIBCFS_ALLOC(tsu, sizeof(struct sfw_test_unit));
795                         if (!tsu) {
796                                 rc = -ENOMEM;
797                                 CERROR("Can't allocate tsu for %d\n",
798                                        tsi->tsi_service);
799                                 goto error;
800                         }
801
802                         tsu->tsu_dest.nid = id.nid;
803                         tsu->tsu_dest.pid = id.pid;
804                         tsu->tsu_instance = tsi;
805                         tsu->tsu_private = NULL;
806                         list_add_tail(&tsu->tsu_list, &tsi->tsi_units);
807                 }
808         }
809
810         rc = tsi->tsi_ops->tso_init(tsi);
811         if (!rc) {
812                 list_add_tail(&tsi->tsi_list, &tsb->bat_tests);
813                 return 0;
814         }
815
816 error:
817         LASSERT(rc);
818         sfw_destroy_test_instance(tsi);
819         return rc;
820 }
821
822 static void
823 sfw_test_unit_done(struct sfw_test_unit *tsu)
824 {
825         struct sfw_test_instance *tsi = tsu->tsu_instance;
826         struct sfw_batch *tsb = tsi->tsi_batch;
827         struct sfw_session *sn = tsb->bat_session;
828
829         LASSERT(sfw_test_active(tsi));
830
831         if (!atomic_dec_and_test(&tsi->tsi_nactive))
832                 return;
833
834         /* the test instance is done */
835         spin_lock(&tsi->tsi_lock);
836
837         tsi->tsi_stopping = 0;
838
839         spin_unlock(&tsi->tsi_lock);
840
841         spin_lock(&sfw_data.fw_lock);
842
843         if (!atomic_dec_and_test(&tsb->bat_nactive) ||  /* tsb still active */
844             sn == sfw_data.fw_session) {                /* sn also active */
845                 spin_unlock(&sfw_data.fw_lock);
846                 return;
847         }
848
849         LASSERT(!list_empty(&sn->sn_list)); /* I'm a zombie! */
850
851         list_for_each_entry(tsb, &sn->sn_batches, bat_list) {
852                 if (sfw_batch_active(tsb)) {
853                         spin_unlock(&sfw_data.fw_lock);
854                         return;
855                 }
856         }
857
858         list_del_init(&sn->sn_list);
859         spin_unlock(&sfw_data.fw_lock);
860
861         sfw_destroy_session(sn);
862 }
863
864 static void
865 sfw_test_rpc_done(struct srpc_client_rpc *rpc)
866 {
867         struct sfw_test_unit *tsu = rpc->crpc_priv;
868         struct sfw_test_instance *tsi = tsu->tsu_instance;
869         int done = 0;
870
871         tsi->tsi_ops->tso_done_rpc(tsu, rpc);
872
873         spin_lock(&tsi->tsi_lock);
874
875         LASSERT(sfw_test_active(tsi));
876         LASSERT(!list_empty(&rpc->crpc_list));
877
878         list_del_init(&rpc->crpc_list);
879
880         /* batch is stopping or loop is done or get error */
881         if (tsi->tsi_stopping || !tsu->tsu_loop ||
882             (rpc->crpc_status && tsi->tsi_stoptsu_onerr))
883                 done = 1;
884
885         /* dec ref for poster */
886         srpc_client_rpc_decref(rpc);
887
888         spin_unlock(&tsi->tsi_lock);
889
890         if (!done) {
891                 swi_schedule_workitem(&tsu->tsu_worker);
892                 return;
893         }
894
895         sfw_test_unit_done(tsu);
896 }
897
898 int
899 sfw_create_test_rpc(struct sfw_test_unit *tsu, lnet_process_id_t peer,
900                     unsigned features, int nblk, int blklen,
901                     struct srpc_client_rpc **rpcpp)
902 {
903         struct srpc_client_rpc *rpc = NULL;
904         struct sfw_test_instance *tsi = tsu->tsu_instance;
905
906         spin_lock(&tsi->tsi_lock);
907
908         LASSERT(sfw_test_active(tsi));
909                 /* pick request from buffer */
910         rpc = list_first_entry_or_null(&tsi->tsi_free_rpcs,
911                                        struct srpc_client_rpc, crpc_list);
912         if (rpc) {
913                 LASSERT(nblk == rpc->crpc_bulk.bk_niov);
914                 list_del_init(&rpc->crpc_list);
915         }
916
917         spin_unlock(&tsi->tsi_lock);
918
919         if (!rpc) {
920                 rpc = srpc_create_client_rpc(peer, tsi->tsi_service, nblk,
921                                              blklen, sfw_test_rpc_done,
922                                              sfw_test_rpc_fini, tsu);
923         } else {
924                 srpc_init_client_rpc(rpc, peer, tsi->tsi_service, nblk,
925                                      blklen, sfw_test_rpc_done,
926                                      sfw_test_rpc_fini, tsu);
927         }
928
929         if (!rpc) {
930                 CERROR("Can't create rpc for test %d\n", tsi->tsi_service);
931                 return -ENOMEM;
932         }
933
934         rpc->crpc_reqstmsg.msg_ses_feats = features;
935         *rpcpp = rpc;
936
937         return 0;
938 }
939
940 static int
941 sfw_run_test(struct swi_workitem *wi)
942 {
943         struct sfw_test_unit *tsu = wi->swi_workitem.wi_data;
944         struct sfw_test_instance *tsi = tsu->tsu_instance;
945         struct srpc_client_rpc *rpc = NULL;
946
947         LASSERT(wi == &tsu->tsu_worker);
948
949         if (tsi->tsi_ops->tso_prep_rpc(tsu, tsu->tsu_dest, &rpc)) {
950                 LASSERT(!rpc);
951                 goto test_done;
952         }
953
954         LASSERT(rpc);
955
956         spin_lock(&tsi->tsi_lock);
957
958         if (tsi->tsi_stopping) {
959                 list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
960                 spin_unlock(&tsi->tsi_lock);
961                 goto test_done;
962         }
963
964         if (tsu->tsu_loop > 0)
965                 tsu->tsu_loop--;
966
967         list_add_tail(&rpc->crpc_list, &tsi->tsi_active_rpcs);
968         spin_unlock(&tsi->tsi_lock);
969
970         spin_lock(&rpc->crpc_lock);
971         rpc->crpc_timeout = rpc_timeout;
972         srpc_post_rpc(rpc);
973         spin_unlock(&rpc->crpc_lock);
974         return 0;
975
976 test_done:
977         /*
978          * No one can schedule me now since:
979          * - previous RPC, if any, has done and
980          * - no new RPC is initiated.
981          * - my batch is still active; no one can run it again now.
982          * Cancel pending schedules and prevent future schedule attempts:
983          */
984         swi_exit_workitem(wi);
985         sfw_test_unit_done(tsu);
986         return 1;
987 }
988
989 static int
990 sfw_run_batch(struct sfw_batch *tsb)
991 {
992         struct swi_workitem *wi;
993         struct sfw_test_unit *tsu;
994         struct sfw_test_instance *tsi;
995
996         if (sfw_batch_active(tsb)) {
997                 CDEBUG(D_NET, "Batch already active: %llu (%d)\n",
998                        tsb->bat_id.bat_id, atomic_read(&tsb->bat_nactive));
999                 return 0;
1000         }
1001
1002         list_for_each_entry(tsi, &tsb->bat_tests, tsi_list) {
1003                 if (!tsi->tsi_is_client) /* skip server instances */
1004                         continue;
1005
1006                 LASSERT(!tsi->tsi_stopping);
1007                 LASSERT(!sfw_test_active(tsi));
1008
1009                 atomic_inc(&tsb->bat_nactive);
1010
1011                 list_for_each_entry(tsu, &tsi->tsi_units, tsu_list) {
1012                         atomic_inc(&tsi->tsi_nactive);
1013                         tsu->tsu_loop = tsi->tsi_loop;
1014                         wi = &tsu->tsu_worker;
1015                         swi_init_workitem(wi, tsu, sfw_run_test,
1016                                           lst_sched_test[lnet_cpt_of_nid(tsu->tsu_dest.nid)]);
1017                         swi_schedule_workitem(wi);
1018                 }
1019         }
1020
1021         return 0;
1022 }
1023
1024 int
1025 sfw_stop_batch(struct sfw_batch *tsb, int force)
1026 {
1027         struct sfw_test_instance *tsi;
1028         struct srpc_client_rpc *rpc;
1029
1030         if (!sfw_batch_active(tsb)) {
1031                 CDEBUG(D_NET, "Batch %llu inactive\n", tsb->bat_id.bat_id);
1032                 return 0;
1033         }
1034
1035         list_for_each_entry(tsi, &tsb->bat_tests, tsi_list) {
1036                 spin_lock(&tsi->tsi_lock);
1037
1038                 if (!tsi->tsi_is_client ||
1039                     !sfw_test_active(tsi) || tsi->tsi_stopping) {
1040                         spin_unlock(&tsi->tsi_lock);
1041                         continue;
1042                 }
1043
1044                 tsi->tsi_stopping = 1;
1045
1046                 if (!force) {
1047                         spin_unlock(&tsi->tsi_lock);
1048                         continue;
1049                 }
1050
1051                 /* abort launched rpcs in the test */
1052                 list_for_each_entry(rpc, &tsi->tsi_active_rpcs, crpc_list) {
1053                         spin_lock(&rpc->crpc_lock);
1054
1055                         srpc_abort_rpc(rpc, -EINTR);
1056
1057                         spin_unlock(&rpc->crpc_lock);
1058                 }
1059
1060                 spin_unlock(&tsi->tsi_lock);
1061         }
1062
1063         return 0;
1064 }
1065
1066 static int
1067 sfw_query_batch(struct sfw_batch *tsb, int testidx, struct srpc_batch_reply *reply)
1068 {
1069         struct sfw_test_instance *tsi;
1070
1071         if (testidx < 0)
1072                 return -EINVAL;
1073
1074         if (!testidx) {
1075                 reply->bar_active = atomic_read(&tsb->bat_nactive);
1076                 return 0;
1077         }
1078
1079         list_for_each_entry(tsi, &tsb->bat_tests, tsi_list) {
1080                 if (testidx-- > 1)
1081                         continue;
1082
1083                 reply->bar_active = atomic_read(&tsi->tsi_nactive);
1084                 return 0;
1085         }
1086
1087         return -ENOENT;
1088 }
1089
1090 void
1091 sfw_free_pages(struct srpc_server_rpc *rpc)
1092 {
1093         srpc_free_bulk(rpc->srpc_bulk);
1094         rpc->srpc_bulk = NULL;
1095 }
1096
1097 int
1098 sfw_alloc_pages(struct srpc_server_rpc *rpc, int cpt, int npages, int len,
1099                 int sink)
1100 {
1101         LASSERT(!rpc->srpc_bulk);
1102         LASSERT(npages > 0 && npages <= LNET_MAX_IOV);
1103
1104         rpc->srpc_bulk = srpc_alloc_bulk(cpt, npages, len, sink);
1105         if (!rpc->srpc_bulk)
1106                 return -ENOMEM;
1107
1108         return 0;
1109 }
1110
1111 static int
1112 sfw_add_test(struct srpc_server_rpc *rpc)
1113 {
1114         struct sfw_session *sn = sfw_data.fw_session;
1115         struct srpc_test_reply *reply = &rpc->srpc_replymsg.msg_body.tes_reply;
1116         struct srpc_test_reqst *request;
1117         int rc;
1118         struct sfw_batch *bat;
1119
1120         request = &rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst;
1121         reply->tsr_sid = !sn ? LST_INVALID_SID : sn->sn_id;
1122
1123         if (!request->tsr_loop ||
1124             !request->tsr_concur ||
1125             request->tsr_sid.ses_nid == LNET_NID_ANY ||
1126             request->tsr_ndest > SFW_MAX_NDESTS ||
1127             (request->tsr_is_client && !request->tsr_ndest) ||
1128             request->tsr_concur > SFW_MAX_CONCUR ||
1129             request->tsr_service > SRPC_SERVICE_MAX_ID ||
1130             request->tsr_service <= SRPC_FRAMEWORK_SERVICE_MAX_ID) {
1131                 reply->tsr_status = EINVAL;
1132                 return 0;
1133         }
1134
1135         if (!sn || !sfw_sid_equal(request->tsr_sid, sn->sn_id) ||
1136             !sfw_find_test_case(request->tsr_service)) {
1137                 reply->tsr_status = ENOENT;
1138                 return 0;
1139         }
1140
1141         bat = sfw_bid2batch(request->tsr_bid);
1142         if (!bat) {
1143                 CERROR("dropping RPC %s from %s under memory pressure\n",
1144                        rpc->srpc_scd->scd_svc->sv_name,
1145                        libcfs_id2str(rpc->srpc_peer));
1146                 return -ENOMEM;
1147         }
1148
1149         if (sfw_batch_active(bat)) {
1150                 reply->tsr_status = EBUSY;
1151                 return 0;
1152         }
1153
1154         if (request->tsr_is_client && !rpc->srpc_bulk) {
1155                 /* rpc will be resumed later in sfw_bulk_ready */
1156                 int npg = sfw_id_pages(request->tsr_ndest);
1157                 int len;
1158
1159                 if (!(sn->sn_features & LST_FEAT_BULK_LEN)) {
1160                         len = npg * PAGE_SIZE;
1161
1162                 } else {
1163                         len = sizeof(lnet_process_id_packed_t) *
1164                               request->tsr_ndest;
1165                 }
1166
1167                 return sfw_alloc_pages(rpc, CFS_CPT_ANY, npg, len, 1);
1168         }
1169
1170         rc = sfw_add_test_instance(bat, rpc);
1171         CDEBUG(!rc ? D_NET : D_WARNING,
1172                "%s test: sv %d %s, loop %d, concur %d, ndest %d\n",
1173                !rc ? "Added" : "Failed to add", request->tsr_service,
1174                request->tsr_is_client ? "client" : "server",
1175                request->tsr_loop, request->tsr_concur, request->tsr_ndest);
1176
1177         reply->tsr_status = (rc < 0) ? -rc : rc;
1178         return 0;
1179 }
1180
1181 static int
1182 sfw_control_batch(struct srpc_batch_reqst *request, struct srpc_batch_reply *reply)
1183 {
1184         struct sfw_session *sn = sfw_data.fw_session;
1185         int rc = 0;
1186         struct sfw_batch *bat;
1187
1188         reply->bar_sid = !sn ? LST_INVALID_SID : sn->sn_id;
1189
1190         if (!sn || !sfw_sid_equal(request->bar_sid, sn->sn_id)) {
1191                 reply->bar_status = ESRCH;
1192                 return 0;
1193         }
1194
1195         bat = sfw_find_batch(request->bar_bid);
1196         if (!bat) {
1197                 reply->bar_status = ENOENT;
1198                 return 0;
1199         }
1200
1201         switch (request->bar_opc) {
1202         case SRPC_BATCH_OPC_RUN:
1203                 rc = sfw_run_batch(bat);
1204                 break;
1205
1206         case SRPC_BATCH_OPC_STOP:
1207                 rc = sfw_stop_batch(bat, request->bar_arg);
1208                 break;
1209
1210         case SRPC_BATCH_OPC_QUERY:
1211                 rc = sfw_query_batch(bat, request->bar_testidx, reply);
1212                 break;
1213
1214         default:
1215                 return -EINVAL; /* drop it */
1216         }
1217
1218         reply->bar_status = (rc < 0) ? -rc : rc;
1219         return 0;
1220 }
1221
1222 static int
1223 sfw_handle_server_rpc(struct srpc_server_rpc *rpc)
1224 {
1225         struct srpc_service *sv = rpc->srpc_scd->scd_svc;
1226         struct srpc_msg *reply = &rpc->srpc_replymsg;
1227         struct srpc_msg *request = &rpc->srpc_reqstbuf->buf_msg;
1228         unsigned features = LST_FEATS_MASK;
1229         int rc = 0;
1230
1231         LASSERT(!sfw_data.fw_active_srpc);
1232         LASSERT(sv->sv_id <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1233
1234         spin_lock(&sfw_data.fw_lock);
1235
1236         if (sfw_data.fw_shuttingdown) {
1237                 spin_unlock(&sfw_data.fw_lock);
1238                 return -ESHUTDOWN;
1239         }
1240
1241         /* Remove timer to avoid racing with it or expiring active session */
1242         if (sfw_del_session_timer()) {
1243                 CERROR("dropping RPC %s from %s: racing with expiry timer\n",
1244                        sv->sv_name, libcfs_id2str(rpc->srpc_peer));
1245                 spin_unlock(&sfw_data.fw_lock);
1246                 return -EAGAIN;
1247         }
1248
1249         sfw_data.fw_active_srpc = rpc;
1250         spin_unlock(&sfw_data.fw_lock);
1251
1252         sfw_unpack_message(request);
1253         LASSERT(request->msg_type == srpc_service2request(sv->sv_id));
1254
1255         /* rpc module should have checked this */
1256         LASSERT(request->msg_version == SRPC_MSG_VERSION);
1257
1258         if (sv->sv_id != SRPC_SERVICE_MAKE_SESSION &&
1259             sv->sv_id != SRPC_SERVICE_DEBUG) {
1260                 struct sfw_session *sn = sfw_data.fw_session;
1261
1262                 if (sn &&
1263                     sn->sn_features != request->msg_ses_feats) {
1264                         CNETERR("Features of framework RPC don't match features of current session: %x/%x\n",
1265                                 request->msg_ses_feats, sn->sn_features);
1266                         reply->msg_body.reply.status = EPROTO;
1267                         reply->msg_body.reply.sid = sn->sn_id;
1268                         goto out;
1269                 }
1270
1271         } else if (request->msg_ses_feats & ~LST_FEATS_MASK) {
1272                 /*
1273                  * NB: at this point, old version will ignore features and
1274                  * create new session anyway, so console should be able
1275                  * to handle this
1276                  */
1277                 reply->msg_body.reply.status = EPROTO;
1278                 goto out;
1279         }
1280
1281         switch (sv->sv_id) {
1282         default:
1283                 LBUG();
1284         case SRPC_SERVICE_TEST:
1285                 rc = sfw_add_test(rpc);
1286                 break;
1287
1288         case SRPC_SERVICE_BATCH:
1289                 rc = sfw_control_batch(&request->msg_body.bat_reqst,
1290                                        &reply->msg_body.bat_reply);
1291                 break;
1292
1293         case SRPC_SERVICE_QUERY_STAT:
1294                 rc = sfw_get_stats(&request->msg_body.stat_reqst,
1295                                    &reply->msg_body.stat_reply);
1296                 break;
1297
1298         case SRPC_SERVICE_DEBUG:
1299                 rc = sfw_debug_session(&request->msg_body.dbg_reqst,
1300                                        &reply->msg_body.dbg_reply);
1301                 break;
1302
1303         case SRPC_SERVICE_MAKE_SESSION:
1304                 rc = sfw_make_session(&request->msg_body.mksn_reqst,
1305                                       &reply->msg_body.mksn_reply);
1306                 break;
1307
1308         case SRPC_SERVICE_REMOVE_SESSION:
1309                 rc = sfw_remove_session(&request->msg_body.rmsn_reqst,
1310                                         &reply->msg_body.rmsn_reply);
1311                 break;
1312         }
1313
1314         if (sfw_data.fw_session)
1315                 features = sfw_data.fw_session->sn_features;
1316  out:
1317         reply->msg_ses_feats = features;
1318         rpc->srpc_done = sfw_server_rpc_done;
1319         spin_lock(&sfw_data.fw_lock);
1320
1321         if (!sfw_data.fw_shuttingdown)
1322                 sfw_add_session_timer();
1323
1324         sfw_data.fw_active_srpc = NULL;
1325         spin_unlock(&sfw_data.fw_lock);
1326         return rc;
1327 }
1328
1329 static int
1330 sfw_bulk_ready(struct srpc_server_rpc *rpc, int status)
1331 {
1332         struct srpc_service *sv = rpc->srpc_scd->scd_svc;
1333         int rc;
1334
1335         LASSERT(rpc->srpc_bulk);
1336         LASSERT(sv->sv_id == SRPC_SERVICE_TEST);
1337         LASSERT(!sfw_data.fw_active_srpc);
1338         LASSERT(rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst.tsr_is_client);
1339
1340         spin_lock(&sfw_data.fw_lock);
1341
1342         if (status) {
1343                 CERROR("Bulk transfer failed for RPC: service %s, peer %s, status %d\n",
1344                        sv->sv_name, libcfs_id2str(rpc->srpc_peer), status);
1345                 spin_unlock(&sfw_data.fw_lock);
1346                 return -EIO;
1347         }
1348
1349         if (sfw_data.fw_shuttingdown) {
1350                 spin_unlock(&sfw_data.fw_lock);
1351                 return -ESHUTDOWN;
1352         }
1353
1354         if (sfw_del_session_timer()) {
1355                 CERROR("dropping RPC %s from %s: racing with expiry timer\n",
1356                        sv->sv_name, libcfs_id2str(rpc->srpc_peer));
1357                 spin_unlock(&sfw_data.fw_lock);
1358                 return -EAGAIN;
1359         }
1360
1361         sfw_data.fw_active_srpc = rpc;
1362         spin_unlock(&sfw_data.fw_lock);
1363
1364         rc = sfw_add_test(rpc);
1365
1366         spin_lock(&sfw_data.fw_lock);
1367
1368         if (!sfw_data.fw_shuttingdown)
1369                 sfw_add_session_timer();
1370
1371         sfw_data.fw_active_srpc = NULL;
1372         spin_unlock(&sfw_data.fw_lock);
1373         return rc;
1374 }
1375
1376 struct srpc_client_rpc *
1377 sfw_create_rpc(lnet_process_id_t peer, int service,
1378                unsigned features, int nbulkiov, int bulklen,
1379                void (*done)(struct srpc_client_rpc *), void *priv)
1380 {
1381         struct srpc_client_rpc *rpc = NULL;
1382
1383         spin_lock(&sfw_data.fw_lock);
1384
1385         LASSERT(!sfw_data.fw_shuttingdown);
1386         LASSERT(service <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1387
1388         if (!nbulkiov && !list_empty(&sfw_data.fw_zombie_rpcs)) {
1389                 rpc = list_entry(sfw_data.fw_zombie_rpcs.next,
1390                                  struct srpc_client_rpc, crpc_list);
1391                 list_del(&rpc->crpc_list);
1392
1393                 srpc_init_client_rpc(rpc, peer, service, 0, 0,
1394                                      done, sfw_client_rpc_fini, priv);
1395         }
1396
1397         spin_unlock(&sfw_data.fw_lock);
1398
1399         if (!rpc) {
1400                 rpc = srpc_create_client_rpc(peer, service,
1401                                              nbulkiov, bulklen, done,
1402                                              nbulkiov ?  NULL :
1403                                              sfw_client_rpc_fini,
1404                                              priv);
1405         }
1406
1407         if (rpc) /* "session" is concept in framework */
1408                 rpc->crpc_reqstmsg.msg_ses_feats = features;
1409
1410         return rpc;
1411 }
1412
1413 void
1414 sfw_unpack_message(struct srpc_msg *msg)
1415 {
1416         if (msg->msg_magic == SRPC_MSG_MAGIC)
1417                 return; /* no flipping needed */
1418
1419         /* srpc module should guarantee I wouldn't get crap */
1420         LASSERT(msg->msg_magic == __swab32(SRPC_MSG_MAGIC));
1421
1422         if (msg->msg_type == SRPC_MSG_STAT_REQST) {
1423                 struct srpc_stat_reqst *req = &msg->msg_body.stat_reqst;
1424
1425                 __swab32s(&req->str_type);
1426                 __swab64s(&req->str_rpyid);
1427                 sfw_unpack_sid(req->str_sid);
1428                 return;
1429         }
1430
1431         if (msg->msg_type == SRPC_MSG_STAT_REPLY) {
1432                 struct srpc_stat_reply *rep = &msg->msg_body.stat_reply;
1433
1434                 __swab32s(&rep->str_status);
1435                 sfw_unpack_sid(rep->str_sid);
1436                 sfw_unpack_fw_counters(rep->str_fw);
1437                 sfw_unpack_rpc_counters(rep->str_rpc);
1438                 sfw_unpack_lnet_counters(rep->str_lnet);
1439                 return;
1440         }
1441
1442         if (msg->msg_type == SRPC_MSG_MKSN_REQST) {
1443                 struct srpc_mksn_reqst *req = &msg->msg_body.mksn_reqst;
1444
1445                 __swab64s(&req->mksn_rpyid);
1446                 __swab32s(&req->mksn_force);
1447                 sfw_unpack_sid(req->mksn_sid);
1448                 return;
1449         }
1450
1451         if (msg->msg_type == SRPC_MSG_MKSN_REPLY) {
1452                 struct srpc_mksn_reply *rep = &msg->msg_body.mksn_reply;
1453
1454                 __swab32s(&rep->mksn_status);
1455                 __swab32s(&rep->mksn_timeout);
1456                 sfw_unpack_sid(rep->mksn_sid);
1457                 return;
1458         }
1459
1460         if (msg->msg_type == SRPC_MSG_RMSN_REQST) {
1461                 struct srpc_rmsn_reqst *req = &msg->msg_body.rmsn_reqst;
1462
1463                 __swab64s(&req->rmsn_rpyid);
1464                 sfw_unpack_sid(req->rmsn_sid);
1465                 return;
1466         }
1467
1468         if (msg->msg_type == SRPC_MSG_RMSN_REPLY) {
1469                 struct srpc_rmsn_reply *rep = &msg->msg_body.rmsn_reply;
1470
1471                 __swab32s(&rep->rmsn_status);
1472                 sfw_unpack_sid(rep->rmsn_sid);
1473                 return;
1474         }
1475
1476         if (msg->msg_type == SRPC_MSG_DEBUG_REQST) {
1477                 struct srpc_debug_reqst *req = &msg->msg_body.dbg_reqst;
1478
1479                 __swab64s(&req->dbg_rpyid);
1480                 __swab32s(&req->dbg_flags);
1481                 sfw_unpack_sid(req->dbg_sid);
1482                 return;
1483         }
1484
1485         if (msg->msg_type == SRPC_MSG_DEBUG_REPLY) {
1486                 struct srpc_debug_reply *rep = &msg->msg_body.dbg_reply;
1487
1488                 __swab32s(&rep->dbg_nbatch);
1489                 __swab32s(&rep->dbg_timeout);
1490                 sfw_unpack_sid(rep->dbg_sid);
1491                 return;
1492         }
1493
1494         if (msg->msg_type == SRPC_MSG_BATCH_REQST) {
1495                 struct srpc_batch_reqst *req = &msg->msg_body.bat_reqst;
1496
1497                 __swab32s(&req->bar_opc);
1498                 __swab64s(&req->bar_rpyid);
1499                 __swab32s(&req->bar_testidx);
1500                 __swab32s(&req->bar_arg);
1501                 sfw_unpack_sid(req->bar_sid);
1502                 __swab64s(&req->bar_bid.bat_id);
1503                 return;
1504         }
1505
1506         if (msg->msg_type == SRPC_MSG_BATCH_REPLY) {
1507                 struct srpc_batch_reply *rep = &msg->msg_body.bat_reply;
1508
1509                 __swab32s(&rep->bar_status);
1510                 sfw_unpack_sid(rep->bar_sid);
1511                 return;
1512         }
1513
1514         if (msg->msg_type == SRPC_MSG_TEST_REQST) {
1515                 struct srpc_test_reqst *req = &msg->msg_body.tes_reqst;
1516
1517                 __swab64s(&req->tsr_rpyid);
1518                 __swab64s(&req->tsr_bulkid);
1519                 __swab32s(&req->tsr_loop);
1520                 __swab32s(&req->tsr_ndest);
1521                 __swab32s(&req->tsr_concur);
1522                 __swab32s(&req->tsr_service);
1523                 sfw_unpack_sid(req->tsr_sid);
1524                 __swab64s(&req->tsr_bid.bat_id);
1525                 return;
1526         }
1527
1528         if (msg->msg_type == SRPC_MSG_TEST_REPLY) {
1529                 struct srpc_test_reply *rep = &msg->msg_body.tes_reply;
1530
1531                 __swab32s(&rep->tsr_status);
1532                 sfw_unpack_sid(rep->tsr_sid);
1533                 return;
1534         }
1535
1536         if (msg->msg_type == SRPC_MSG_JOIN_REQST) {
1537                 struct srpc_join_reqst *req = &msg->msg_body.join_reqst;
1538
1539                 __swab64s(&req->join_rpyid);
1540                 sfw_unpack_sid(req->join_sid);
1541                 return;
1542         }
1543
1544         if (msg->msg_type == SRPC_MSG_JOIN_REPLY) {
1545                 struct srpc_join_reply *rep = &msg->msg_body.join_reply;
1546
1547                 __swab32s(&rep->join_status);
1548                 __swab32s(&rep->join_timeout);
1549                 sfw_unpack_sid(rep->join_sid);
1550                 return;
1551         }
1552
1553         LBUG();
1554 }
1555
1556 void
1557 sfw_abort_rpc(struct srpc_client_rpc *rpc)
1558 {
1559         LASSERT(atomic_read(&rpc->crpc_refcount) > 0);
1560         LASSERT(rpc->crpc_service <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1561
1562         spin_lock(&rpc->crpc_lock);
1563         srpc_abort_rpc(rpc, -EINTR);
1564         spin_unlock(&rpc->crpc_lock);
1565 }
1566
1567 void
1568 sfw_post_rpc(struct srpc_client_rpc *rpc)
1569 {
1570         spin_lock(&rpc->crpc_lock);
1571
1572         LASSERT(!rpc->crpc_closed);
1573         LASSERT(!rpc->crpc_aborted);
1574         LASSERT(list_empty(&rpc->crpc_list));
1575         LASSERT(!sfw_data.fw_shuttingdown);
1576
1577         rpc->crpc_timeout = rpc_timeout;
1578         srpc_post_rpc(rpc);
1579
1580         spin_unlock(&rpc->crpc_lock);
1581 }
1582
1583 static struct srpc_service sfw_services[] = {
1584         {
1585                 /* sv_id */    SRPC_SERVICE_DEBUG,
1586                 /* sv_name */  "debug",
1587                 0
1588         },
1589         {
1590                 /* sv_id */    SRPC_SERVICE_QUERY_STAT,
1591                 /* sv_name */  "query stats",
1592                 0
1593         },
1594         {
1595                 /* sv_id */    SRPC_SERVICE_MAKE_SESSION,
1596                 /* sv_name */  "make session",
1597                 0
1598         },
1599         {
1600                 /* sv_id */    SRPC_SERVICE_REMOVE_SESSION,
1601                 /* sv_name */  "remove session",
1602                 0
1603         },
1604         {
1605                 /* sv_id */    SRPC_SERVICE_BATCH,
1606                 /* sv_name */  "batch service",
1607                 0
1608         },
1609         {
1610                 /* sv_id */    SRPC_SERVICE_TEST,
1611                 /* sv_name */  "test service",
1612                 0
1613         },
1614         {
1615                 /* sv_id */    0,
1616                 /* sv_name */  NULL,
1617                 0
1618         }
1619 };
1620
1621 int
1622 sfw_startup(void)
1623 {
1624         int i;
1625         int rc;
1626         int error;
1627         struct srpc_service *sv;
1628         struct sfw_test_case *tsc;
1629
1630         if (session_timeout < 0) {
1631                 CERROR("Session timeout must be non-negative: %d\n",
1632                        session_timeout);
1633                 return -EINVAL;
1634         }
1635
1636         if (rpc_timeout < 0) {
1637                 CERROR("RPC timeout must be non-negative: %d\n",
1638                        rpc_timeout);
1639                 return -EINVAL;
1640         }
1641
1642         if (!session_timeout)
1643                 CWARN("Zero session_timeout specified - test sessions never expire.\n");
1644
1645         if (!rpc_timeout)
1646                 CWARN("Zero rpc_timeout specified - test RPC never expire.\n");
1647
1648         memset(&sfw_data, 0, sizeof(struct smoketest_framework));
1649
1650         sfw_data.fw_session = NULL;
1651         sfw_data.fw_active_srpc = NULL;
1652         spin_lock_init(&sfw_data.fw_lock);
1653         atomic_set(&sfw_data.fw_nzombies, 0);
1654         INIT_LIST_HEAD(&sfw_data.fw_tests);
1655         INIT_LIST_HEAD(&sfw_data.fw_zombie_rpcs);
1656         INIT_LIST_HEAD(&sfw_data.fw_zombie_sessions);
1657
1658         brw_init_test_client();
1659         brw_init_test_service();
1660         rc = sfw_register_test(&brw_test_service, &brw_test_client);
1661         LASSERT(!rc);
1662
1663         ping_init_test_client();
1664         ping_init_test_service();
1665         rc = sfw_register_test(&ping_test_service, &ping_test_client);
1666         LASSERT(!rc);
1667
1668         error = 0;
1669         list_for_each_entry(tsc, &sfw_data.fw_tests, tsc_list) {
1670                 sv = tsc->tsc_srv_service;
1671
1672                 rc = srpc_add_service(sv);
1673                 LASSERT(rc != -EBUSY);
1674                 if (rc) {
1675                         CWARN("Failed to add %s service: %d\n",
1676                               sv->sv_name, rc);
1677                         error = rc;
1678                 }
1679         }
1680
1681         for (i = 0; ; i++) {
1682                 sv = &sfw_services[i];
1683                 if (!sv->sv_name)
1684                         break;
1685
1686                 sv->sv_bulk_ready = NULL;
1687                 sv->sv_handler = sfw_handle_server_rpc;
1688                 sv->sv_wi_total = SFW_FRWK_WI_MAX;
1689                 if (sv->sv_id == SRPC_SERVICE_TEST)
1690                         sv->sv_bulk_ready = sfw_bulk_ready;
1691
1692                 rc = srpc_add_service(sv);
1693                 LASSERT(rc != -EBUSY);
1694                 if (rc) {
1695                         CWARN("Failed to add %s service: %d\n",
1696                               sv->sv_name, rc);
1697                         error = rc;
1698                 }
1699
1700                 /* about to sfw_shutdown, no need to add buffer */
1701                 if (error)
1702                         continue;
1703
1704                 rc = srpc_service_add_buffers(sv, sv->sv_wi_total);
1705                 if (rc) {
1706                         CWARN("Failed to reserve enough buffers: service %s, %d needed: %d\n",
1707                               sv->sv_name, sv->sv_wi_total, rc);
1708                         error = -ENOMEM;
1709                 }
1710         }
1711
1712         if (error)
1713                 sfw_shutdown();
1714         return error;
1715 }
1716
1717 void
1718 sfw_shutdown(void)
1719 {
1720         struct srpc_service *sv;
1721         struct sfw_test_case    *tsc;
1722         int i;
1723
1724         spin_lock(&sfw_data.fw_lock);
1725
1726         sfw_data.fw_shuttingdown = 1;
1727         lst_wait_until(!sfw_data.fw_active_srpc, sfw_data.fw_lock,
1728                        "waiting for active RPC to finish.\n");
1729
1730         if (sfw_del_session_timer())
1731                 lst_wait_until(!sfw_data.fw_session, sfw_data.fw_lock,
1732                                "waiting for session timer to explode.\n");
1733
1734         sfw_deactivate_session();
1735         lst_wait_until(!atomic_read(&sfw_data.fw_nzombies),
1736                        sfw_data.fw_lock,
1737                        "waiting for %d zombie sessions to die.\n",
1738                        atomic_read(&sfw_data.fw_nzombies));
1739
1740         spin_unlock(&sfw_data.fw_lock);
1741
1742         for (i = 0; ; i++) {
1743                 sv = &sfw_services[i];
1744                 if (!sv->sv_name)
1745                         break;
1746
1747                 srpc_shutdown_service(sv);
1748                 srpc_remove_service(sv);
1749         }
1750
1751         list_for_each_entry(tsc, &sfw_data.fw_tests, tsc_list) {
1752                 sv = tsc->tsc_srv_service;
1753                 srpc_shutdown_service(sv);
1754                 srpc_remove_service(sv);
1755         }
1756
1757         while (!list_empty(&sfw_data.fw_zombie_rpcs)) {
1758                 struct srpc_client_rpc *rpc;
1759
1760                 rpc = list_entry(sfw_data.fw_zombie_rpcs.next,
1761                                  struct srpc_client_rpc, crpc_list);
1762                 list_del(&rpc->crpc_list);
1763
1764                 LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc));
1765         }
1766
1767         for (i = 0; ; i++) {
1768                 sv = &sfw_services[i];
1769                 if (!sv->sv_name)
1770                         break;
1771
1772                 srpc_wait_service_shutdown(sv);
1773         }
1774
1775         while (!list_empty(&sfw_data.fw_tests)) {
1776                 tsc = list_entry(sfw_data.fw_tests.next,
1777                                  struct sfw_test_case, tsc_list);
1778
1779                 srpc_wait_service_shutdown(tsc->tsc_srv_service);
1780
1781                 list_del(&tsc->tsc_list);
1782                 LIBCFS_FREE(tsc, sizeof(*tsc));
1783         }
1784 }