Merge Linux 2.6.23
[cascardo/linux.git] / net / sunrpc / clnt.c
index 613c10e..52429b1 100644 (file)
        dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,         \
                        __FUNCTION__, t->tk_status)
 
+/*
+ * All RPC clients are linked into this list
+ */
+static LIST_HEAD(all_clients);
+static DEFINE_SPINLOCK(rpc_client_lock);
+
 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
 
 
@@ -66,6 +72,21 @@ static void  call_connect_status(struct rpc_task *task);
 static __be32 *        call_header(struct rpc_task *task);
 static __be32 *        call_verify(struct rpc_task *task);
 
+static int     rpc_ping(struct rpc_clnt *clnt, int flags);
+
+static void rpc_register_client(struct rpc_clnt *clnt)
+{
+       spin_lock(&rpc_client_lock);
+       list_add(&clnt->cl_clients, &all_clients);
+       spin_unlock(&rpc_client_lock);
+}
+
+static void rpc_unregister_client(struct rpc_clnt *clnt)
+{
+       spin_lock(&rpc_client_lock);
+       list_del(&clnt->cl_clients);
+       spin_unlock(&rpc_client_lock);
+}
 
 static int
 rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
@@ -111,6 +132,9 @@ static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, s
        dprintk("RPC:       creating %s client for %s (xprt %p)\n",
                        program->name, servname, xprt);
 
+       err = rpciod_up();
+       if (err)
+               goto out_no_rpciod;
        err = -EINVAL;
        if (!xprt)
                goto out_no_xprt;
@@ -121,7 +145,6 @@ static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, s
        clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
        if (!clnt)
                goto out_err;
-       atomic_set(&clnt->cl_count, 1);
        clnt->cl_parent = clnt;
 
        clnt->cl_server = clnt->cl_inline_name;
@@ -192,6 +215,8 @@ out_no_stats:
 out_err:
        xprt_put(xprt);
 out_no_xprt:
+       rpciod_down();
+out_no_rpciod:
        return ERR_PTR(err);
 }
 
@@ -209,12 +234,31 @@ struct rpc_clnt *rpc_create(struct rpc_create_args *args)
 {
        struct rpc_xprt *xprt;
        struct rpc_clnt *clnt;
+       struct rpc_xprtsock_create xprtargs = {
+               .proto = args->protocol,
+               .srcaddr = args->saddress,
+               .dstaddr = args->address,
+               .addrlen = args->addrsize,
+               .timeout = args->timeout
+       };
+       char servername[20];
 
-       xprt = xprt_create_transport(args->protocol, args->address,
-                                       args->addrsize, args->timeout);
+       xprt = xprt_create_transport(&xprtargs);
        if (IS_ERR(xprt))
                return (struct rpc_clnt *)xprt;
 
+       /*
+        * If the caller chooses not to specify a hostname, whip
+        * up a string representation of the passed-in address.
+        */
+       if (args->servername == NULL) {
+               struct sockaddr_in *addr =
+                                       (struct sockaddr_in *) &args->address;
+               snprintf(servername, sizeof(servername), NIPQUAD_FMT,
+                       NIPQUAD(addr->sin_addr.s_addr));
+               args->servername = servername;
+       }
+
        /*
         * By default, kernel RPC client connects from a reserved port.
         * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
@@ -270,7 +314,12 @@ rpc_clone_client(struct rpc_clnt *clnt)
        new = kmemdup(clnt, sizeof(*new), GFP_KERNEL);
        if (!new)
                goto out_no_clnt;
-       atomic_set(&new->cl_count, 1);
+       new->cl_parent = clnt;
+       /* Turn off autobind on clones */
+       new->cl_autobind = 0;
+       INIT_LIST_HEAD(&new->cl_tasks);
+       spin_lock_init(&new->cl_lock);
+       rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
        new->cl_metrics = rpc_alloc_iostats(clnt);
        if (new->cl_metrics == NULL)
                goto out_no_stats;
@@ -278,17 +327,12 @@ rpc_clone_client(struct rpc_clnt *clnt)
        err = rpc_setup_pipedir(new, clnt->cl_program->pipe_dir_name);
        if (err != 0)
                goto out_no_path;
-       new->cl_parent = clnt;
-       kref_get(&clnt->cl_kref);
-       new->cl_xprt = xprt_get(clnt->cl_xprt);
-       /* Turn off autobind on clones */
-       new->cl_autobind = 0;
-       INIT_LIST_HEAD(&new->cl_tasks);
-       spin_lock_init(&new->cl_lock);
-       rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
        if (new->cl_auth)
                atomic_inc(&new->cl_auth->au_count);
+       xprt_get(clnt->cl_xprt);
+       kref_get(&clnt->cl_kref);
        rpc_register_client(new);
+       rpciod_up();
        return new;
 out_no_path:
        rpc_free_iostats(new->cl_metrics);
@@ -303,8 +347,7 @@ out_no_clnt:
  * Properly shut down an RPC client, terminating all outstanding
  * requests.
  */
-int
-rpc_shutdown_client(struct rpc_clnt *clnt)
+void rpc_shutdown_client(struct rpc_clnt *clnt)
 {
        dprintk("RPC:       shutting down %s client for %s\n",
                        clnt->cl_protname, clnt->cl_server);
@@ -315,7 +358,7 @@ rpc_shutdown_client(struct rpc_clnt *clnt)
                        list_empty(&clnt->cl_tasks), 1*HZ);
        }
 
-       return rpc_destroy_client(clnt);
+       rpc_release_client(clnt);
 }
 
 /*
@@ -328,10 +371,6 @@ rpc_free_client(struct kref *kref)
 
        dprintk("RPC:       destroying %s client for %s\n",
                        clnt->cl_protname, clnt->cl_server);
-       if (clnt->cl_auth) {
-               rpcauth_destroy(clnt->cl_auth);
-               clnt->cl_auth = NULL;
-       }
        if (!IS_ERR(clnt->cl_dentry)) {
                rpc_rmdir(clnt->cl_dentry);
                rpc_put_mount();
@@ -347,9 +386,34 @@ out_free:
        rpc_free_iostats(clnt->cl_metrics);
        clnt->cl_metrics = NULL;
        xprt_put(clnt->cl_xprt);
+       rpciod_down();
        kfree(clnt);
 }
 
+/*
+ * Free an RPC client
+ */
+static void
+rpc_free_auth(struct kref *kref)
+{
+       struct rpc_clnt *clnt = container_of(kref, struct rpc_clnt, cl_kref);
+
+       if (clnt->cl_auth == NULL) {
+               rpc_free_client(kref);
+               return;
+       }
+
+       /*
+        * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
+        *       release remaining GSS contexts. This mechanism ensures
+        *       that it can do so safely.
+        */
+       kref_init(kref);
+       rpcauth_release(clnt->cl_auth);
+       clnt->cl_auth = NULL;
+       kref_put(kref, rpc_free_client);
+}
+
 /*
  * Release reference to the RPC client
  */
@@ -360,19 +424,7 @@ rpc_release_client(struct rpc_clnt *clnt)
 
        if (list_empty(&clnt->cl_tasks))
                wake_up(&destroy_wait);
-       kref_put(&clnt->cl_kref, rpc_free_client);
-}
-
-/*
- * Delete an RPC client
- */
-int
-rpc_destroy_client(struct rpc_clnt *clnt)
-{
-       if (!atomic_dec_and_test(&clnt->cl_count))
-               return 1;
-       kref_put(&clnt->cl_kref, rpc_free_client);
-       return 0;
+       kref_put(&clnt->cl_kref, rpc_free_auth);
 }
 
 /**
@@ -463,73 +515,96 @@ void rpc_clnt_sigunmask(struct rpc_clnt *clnt, sigset_t *oldset)
        rpc_restore_sigmask(oldset);
 }
 
-/*
- * New rpc_call implementation
+static
+struct rpc_task *rpc_do_run_task(struct rpc_clnt *clnt,
+               struct rpc_message *msg,
+               int flags,
+               const struct rpc_call_ops *ops,
+               void *data)
+{
+       struct rpc_task *task, *ret;
+       sigset_t oldset;
+
+       task = rpc_new_task(clnt, flags, ops, data);
+       if (task == NULL) {
+               rpc_release_calldata(ops, data);
+               return ERR_PTR(-ENOMEM);
+       }
+
+       /* Mask signals on synchronous RPC calls and RPCSEC_GSS upcalls */
+       rpc_task_sigmask(task, &oldset);
+       if (msg != NULL) {
+               rpc_call_setup(task, msg, 0);
+               if (task->tk_status != 0) {
+                       ret = ERR_PTR(task->tk_status);
+                       rpc_put_task(task);
+                       goto out;
+               }
+       }
+       atomic_inc(&task->tk_count);
+       rpc_execute(task);
+       ret = task;
+out:
+       rpc_restore_sigmask(&oldset);
+       return ret;
+}
+
+/**
+ * rpc_call_sync - Perform a synchronous RPC call
+ * @clnt: pointer to RPC client
+ * @msg: RPC call parameters
+ * @flags: RPC call flags
  */
 int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
 {
        struct rpc_task *task;
-       sigset_t        oldset;
-       int             status;
+       int status;
 
        BUG_ON(flags & RPC_TASK_ASYNC);
 
-       task = rpc_new_task(clnt, flags, &rpc_default_ops, NULL);
-       if (task == NULL)
-               return -ENOMEM;
-
-       /* Mask signals on RPC calls _and_ GSS_AUTH upcalls */
-       rpc_task_sigmask(task, &oldset);
-
-       /* Set up the call info struct and execute the task */
-       rpc_call_setup(task, msg, 0);
-       if (task->tk_status == 0) {
-               atomic_inc(&task->tk_count);
-               rpc_execute(task);
-       }
+       task = rpc_do_run_task(clnt, msg, flags, &rpc_default_ops, NULL);
+       if (IS_ERR(task))
+               return PTR_ERR(task);
        status = task->tk_status;
        rpc_put_task(task);
-       rpc_restore_sigmask(&oldset);
        return status;
 }
 
-/*
- * New rpc_call implementation
+/**
+ * rpc_call_async - Perform an asynchronous RPC call
+ * @clnt: pointer to RPC client
+ * @msg: RPC call parameters
+ * @flags: RPC call flags
+ * @ops: RPC call ops
+ * @data: user call data
  */
 int
 rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
               const struct rpc_call_ops *tk_ops, void *data)
 {
        struct rpc_task *task;
-       sigset_t        oldset;
-       int             status;
-
-       flags |= RPC_TASK_ASYNC;
-
-       /* Create/initialize a new RPC task */
-       status = -ENOMEM;
-       if (!(task = rpc_new_task(clnt, flags, tk_ops, data)))
-               goto out_release;
-
-       /* Mask signals on GSS_AUTH upcalls */
-       rpc_task_sigmask(task, &oldset);
-
-       rpc_call_setup(task, msg, 0);
 
-       /* Set up the call info struct and execute the task */
-       status = task->tk_status;
-       if (status == 0)
-               rpc_execute(task);
-       else
-               rpc_put_task(task);
-
-       rpc_restore_sigmask(&oldset);
-       return status;
-out_release:
-       rpc_release_calldata(tk_ops, data);
-       return status;
+       task = rpc_do_run_task(clnt, msg, flags|RPC_TASK_ASYNC, tk_ops, data);
+       if (IS_ERR(task))
+               return PTR_ERR(task);
+       rpc_put_task(task);
+       return 0;
 }
 
+/**
+ * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
+ * @clnt: pointer to RPC client
+ * @flags: RPC flags
+ * @ops: RPC call ops
+ * @data: user call data
+ */
+struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags,
+                                       const struct rpc_call_ops *tk_ops,
+                                       void *data)
+{
+       return rpc_do_run_task(clnt, NULL, flags, tk_ops, data);
+}
+EXPORT_SYMBOL(rpc_run_task);
 
 void
 rpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags)
@@ -731,7 +806,7 @@ call_reserveresult(struct rpc_task *task)
 static void
 call_allocate(struct rpc_task *task)
 {
-       unsigned int slack = task->tk_auth->au_cslack;
+       unsigned int slack = task->tk_msg.rpc_cred->cr_auth->au_cslack;
        struct rpc_rqst *req = task->tk_rqstp;
        struct rpc_xprt *xprt = task->tk_xprt;
        struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
@@ -829,10 +904,8 @@ call_encode(struct rpc_task *task)
        if (encode == NULL)
                return;
 
-       lock_kernel();
        task->tk_status = rpcauth_wrap_req(task, encode, req, p,
                        task->tk_msg.rpc_argp);
-       unlock_kernel();
        if (task->tk_status == -ENOMEM) {
                /* XXX: Is this sane? */
                rpc_delay(task, 3*HZ);
@@ -1163,10 +1236,8 @@ call_decode(struct rpc_task *task)
        task->tk_action = rpc_exit_task;
 
        if (decode) {
-               lock_kernel();
                task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
                                                      task->tk_msg.rpc_resp);
-               unlock_kernel();
        }
        dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
                        task->tk_status);
@@ -1259,9 +1330,9 @@ call_verify(struct rpc_task *task)
                 * - if it isn't pointer subtraction in the NFS client may give
                 *   undefined results
                 */
-               printk(KERN_WARNING
-                      "call_verify: XDR representation not a multiple of"
-                      " 4 bytes: 0x%x\n", task->tk_rqstp->rq_rcv_buf.len);
+               dprintk("RPC: %5u %s: XDR representation not a multiple of"
+                      " 4 bytes: 0x%x\n", task->tk_pid, __FUNCTION__,
+                      task->tk_rqstp->rq_rcv_buf.len);
                goto out_eio;
        }
        if ((len -= 3) < 0)
@@ -1269,7 +1340,8 @@ call_verify(struct rpc_task *task)
        p += 1; /* skip XID */
 
        if ((n = ntohl(*p++)) != RPC_REPLY) {
-               printk(KERN_WARNING "call_verify: not an RPC reply: %x\n", n);
+               dprintk("RPC: %5u %s: not an RPC reply: %x\n",
+                               task->tk_pid, __FUNCTION__, n);
                goto out_garbage;
        }
        if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
@@ -1320,7 +1392,8 @@ call_verify(struct rpc_task *task)
                               "authentication.\n", task->tk_client->cl_server);
                        break;
                default:
-                       printk(KERN_WARNING "call_verify: unknown auth error: %x\n", n);
+                       dprintk("RPC: %5u %s: unknown auth error: %x\n",
+                                       task->tk_pid, __FUNCTION__, n);
                        error = -EIO;
                }
                dprintk("RPC: %5u %s: call rejected %d\n",
@@ -1328,7 +1401,8 @@ call_verify(struct rpc_task *task)
                goto out_err;
        }
        if (!(p = rpcauth_checkverf(task, p))) {
-               printk(KERN_WARNING "call_verify: auth check failed\n");
+               dprintk("RPC: %5u %s: auth check failed\n",
+                               task->tk_pid, __FUNCTION__);
                goto out_garbage;               /* bad verifier, retry */
        }
        len = p - (__be32 *)iov->iov_base - 1;
@@ -1367,7 +1441,8 @@ call_verify(struct rpc_task *task)
                                task->tk_pid, __FUNCTION__);
                break;                  /* retry */
        default:
-               printk(KERN_WARNING "call_verify: server accept status: %x\n", n);
+               dprintk("RPC: %5u %s: server accept status: %x\n",
+                               task->tk_pid, __FUNCTION__, n);
                /* Also retry */
        }
 
@@ -1381,14 +1456,16 @@ out_garbage:
 out_retry:
                return ERR_PTR(-EAGAIN);
        }
-       printk(KERN_WARNING "RPC %s: retry failed, exit EIO\n", __FUNCTION__);
 out_eio:
        error = -EIO;
 out_err:
        rpc_exit(task, error);
+       dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
+                       __FUNCTION__, error);
        return ERR_PTR(error);
 out_overflow:
-       printk(KERN_WARNING "RPC %s: server reply was truncated.\n", __FUNCTION__);
+       dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
+                       __FUNCTION__);
        goto out_garbage;
 }
 
@@ -1407,7 +1484,7 @@ static struct rpc_procinfo rpcproc_null = {
        .p_decode = rpcproc_decode_null,
 };
 
-int rpc_ping(struct rpc_clnt *clnt, int flags)
+static int rpc_ping(struct rpc_clnt *clnt, int flags)
 {
        struct rpc_message msg = {
                .rpc_proc = &rpcproc_null,
@@ -1418,3 +1495,51 @@ int rpc_ping(struct rpc_clnt *clnt, int flags)
        put_rpccred(msg.rpc_cred);
        return err;
 }
+
+struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
+{
+       struct rpc_message msg = {
+               .rpc_proc = &rpcproc_null,
+               .rpc_cred = cred,
+       };
+       return rpc_do_run_task(clnt, &msg, flags, &rpc_default_ops, NULL);
+}
+EXPORT_SYMBOL(rpc_call_null);
+
+#ifdef RPC_DEBUG
+void rpc_show_tasks(void)
+{
+       struct rpc_clnt *clnt;
+       struct rpc_task *t;
+
+       spin_lock(&rpc_client_lock);
+       if (list_empty(&all_clients))
+               goto out;
+       printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
+               "-rpcwait -action- ---ops--\n");
+       list_for_each_entry(clnt, &all_clients, cl_clients) {
+               if (list_empty(&clnt->cl_tasks))
+                       continue;
+               spin_lock(&clnt->cl_lock);
+               list_for_each_entry(t, &clnt->cl_tasks, tk_task) {
+                       const char *rpc_waitq = "none";
+
+                       if (RPC_IS_QUEUED(t))
+                               rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
+
+                       printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n",
+                               t->tk_pid,
+                               (t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1),
+                               t->tk_flags, t->tk_status,
+                               t->tk_client,
+                               (t->tk_client ? t->tk_client->cl_prog : 0),
+                               t->tk_rqstp, t->tk_timeout,
+                               rpc_waitq,
+                               t->tk_action, t->tk_ops);
+               }
+               spin_unlock(&clnt->cl_lock);
+       }
+out:
+       spin_unlock(&rpc_client_lock);
+}
+#endif