diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 8cc8b08eb3dd7597d08b82dfb77a180a88ec046d..b5268127c55ef23dcb046a1024b5a01de2cb135f 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -382,6 +382,7 @@ int ceph_con_in_msg_alloc(struct ceph_connection *con,
 			  struct ceph_msg_header *hdr, int *skip);
 void ceph_con_get_out_msg(struct ceph_connection *con);
 
+/* messenger_v1.c */
 int ceph_con_v1_try_read(struct ceph_connection *con);
 int ceph_con_v1_try_write(struct ceph_connection *con);
 void ceph_con_v1_revoke(struct ceph_connection *con);
diff --git a/net/ceph/Makefile b/net/ceph/Makefile
index ce09bb4fb2499dfac943ca504d0e49a3317c2d0d..df02bd8d6c7b355920184b929500f1019ca0507e 100644
--- a/net/ceph/Makefile
+++ b/net/ceph/Makefile
@@ -14,4 +14,5 @@ libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
 	crypto.o armor.o \
 	auth_x.o \
 	ceph_strings.o ceph_hash.o \
-	pagevec.o snapshot.o string_table.o
+	pagevec.o snapshot.o string_table.o \
+	messenger_v1.o
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 4ca7d9b594c74b2f90f2f8ecab65e9dca26be65a..544cfdbe52d652ff501dab2dd76ad4cffd2bee8c 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -137,12 +137,6 @@ bool ceph_con_flag_test_and_set(struct ceph_connection *con,
 
 static struct kmem_cache	*ceph_msg_cache;
 
-/* static tag bytes (protocol control messages) */
-static char tag_msg = CEPH_MSGR_TAG_MSG;
-static char tag_ack = CEPH_MSGR_TAG_ACK;
-static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
-static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
-
 #ifdef CONFIG_LOCKDEP
 static struct lock_class_key socket_class;
 #endif
@@ -477,96 +471,6 @@ int ceph_tcp_connect(struct ceph_connection *con)
 	return 0;
 }
 
-/*
- * If @buf is NULL, discard up to @len bytes.
- */
-static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
-{
-	struct kvec iov = {buf, len};
-	struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
-	int r;
-
-	if (!buf)
-		msg.msg_flags |= MSG_TRUNC;
-
-	iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, len);
-	r = sock_recvmsg(sock, &msg, msg.msg_flags);
-	if (r == -EAGAIN)
-		r = 0;
-	return r;
-}
-
-static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
-		     int page_offset, size_t length)
-{
-	struct bio_vec bvec = {
-		.bv_page = page,
-		.bv_offset = page_offset,
-		.bv_len = length
-	};
-	struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
-	int r;
-
-	BUG_ON(page_offset + length > PAGE_SIZE);
-	iov_iter_bvec(&msg.msg_iter, READ, &bvec, 1, length);
-	r = sock_recvmsg(sock, &msg, msg.msg_flags);
-	if (r == -EAGAIN)
-		r = 0;
-	return r;
-}
-
-/*
- * write something.  @more is true if caller will be sending more data
- * shortly.
- */
-static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
-			    size_t kvlen, size_t len, bool more)
-{
-	struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
-	int r;
-
-	if (more)
-		msg.msg_flags |= MSG_MORE;
-	else
-		msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
-
-	r = kernel_sendmsg(sock, &msg, iov, kvlen, len);
-	if (r == -EAGAIN)
-		r = 0;
-	return r;
-}
-
-/*
- * @more: either or both of MSG_MORE and MSG_SENDPAGE_NOTLAST
- */
-static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
-			     int offset, size_t size, int more)
-{
-	ssize_t (*sendpage)(struct socket *sock, struct page *page,
-			    int offset, size_t size, int flags);
-	int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more;
-	int ret;
-
-	/*
-	 * sendpage cannot properly handle pages with page_count == 0,
-	 * we need to fall back to sendmsg if that's the case.
-	 *
-	 * Same goes for slab pages: skb_can_coalesce() allows
-	 * coalescing neighboring slab objects into a single frag which
-	 * triggers one of hardened usercopy checks.
-	 */
-	if (sendpage_ok(page))
-		sendpage = sock->ops->sendpage;
-	else
-		sendpage = sock_no_sendpage;
-
-	ret = sendpage(sock, page, offset, size, flags);
-	if (ret == -EAGAIN)
-		ret = 0;
-
-	return ret;
-}
-
 /*
  * Shutdown/close the socket for the given connection.
  */
@@ -593,11 +497,6 @@ int ceph_con_close_socket(struct ceph_connection *con)
 	return rc;
 }
 
-void ceph_con_v1_reset_protocol(struct ceph_connection *con)
-{
-	con->out_skip = 0;
-}
-
 static void ceph_con_reset_protocol(struct ceph_connection *con)
 {
 	dout("%s con %p\n", __func__, con);
@@ -636,12 +535,6 @@ static void ceph_msg_remove_list(struct list_head *head)
 	}
 }
 
-void ceph_con_v1_reset_session(struct ceph_connection *con)
-{
-	con->connect_seq = 0;
-	con->peer_global_seq = 0;
-}
-
 void ceph_con_reset_session(struct ceph_connection *con)
 {
 	dout("%s con %p\n", __func__, con);
@@ -702,11 +595,6 @@ void ceph_con_open(struct ceph_connection *con,
 }
 EXPORT_SYMBOL(ceph_con_open);
 
-bool ceph_con_v1_opened(struct ceph_connection *con)
-{
-	return con->connect_seq;
-}
-
 /*
  * return true if this connection ever successfully opened
  */
@@ -739,7 +627,6 @@ void ceph_con_init(struct ceph_connection *con, void *private,
 }
 EXPORT_SYMBOL(ceph_con_init);
 
-
 /*
  * We maintain a global counter to order connection attempts.  Get
  * a unique seq greater than @gt.
@@ -805,50 +692,6 @@ void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq)
 	}
 }
 
-static void con_out_kvec_reset(struct ceph_connection *con)
-{
-	BUG_ON(con->out_skip);
-
-	con->out_kvec_left = 0;
-	con->out_kvec_bytes = 0;
-	con->out_kvec_cur = &con->out_kvec[0];
-}
-
-static void con_out_kvec_add(struct ceph_connection *con,
-				size_t size, void *data)
-{
-	int index = con->out_kvec_left;
-
-	BUG_ON(con->out_skip);
-	BUG_ON(index >= ARRAY_SIZE(con->out_kvec));
-
-	con->out_kvec[index].iov_len = size;
-	con->out_kvec[index].iov_base = data;
-	con->out_kvec_left++;
-	con->out_kvec_bytes += size;
-}
-
-/*
- * Chop off a kvec from the end.  Return residual number of bytes for
- * that kvec, i.e. how many bytes would have been written if the kvec
- * hadn't been nuked.
- */
-static int con_out_kvec_skip(struct ceph_connection *con)
-{
-	int off = con->out_kvec_cur - con->out_kvec;
-	int skip = 0;
-
-	if (con->out_kvec_bytes > 0) {
-		skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len;
-		BUG_ON(con->out_kvec_bytes < skip);
-		BUG_ON(!con->out_kvec_left);
-		con->out_kvec_bytes -= skip;
-		con->out_kvec_left--;
-	}
-
-	return skip;
-}
-
 #ifdef CONFIG_BLOCK
 
 /*
@@ -1260,307 +1103,6 @@ void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes)
 	cursor->need_crc = new_piece;
 }
 
-static size_t sizeof_footer(struct ceph_connection *con)
-{
-	return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ?
-	    sizeof(struct ceph_msg_footer) :
-	    sizeof(struct ceph_msg_footer_old);
-}
-
-static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
-{
-	/* Initialize data cursor */
-
-	ceph_msg_data_cursor_init(&msg->cursor, msg, data_len);
-}
-
-/*
- * Prepare footer for currently outgoing message, and finish things
- * off.  Assumes out_kvec* are already valid.. we just add on to the end.
- */
-static void prepare_write_message_footer(struct ceph_connection *con)
-{
-	struct ceph_msg *m = con->out_msg;
-
-	m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
-
-	dout("prepare_write_message_footer %p\n", con);
-	con_out_kvec_add(con, sizeof_footer(con), &m->footer);
-	if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
-		if (con->ops->sign_message)
-			con->ops->sign_message(m);
-		else
-			m->footer.sig = 0;
-	} else {
-		m->old_footer.flags = m->footer.flags;
-	}
-	con->out_more = m->more_to_follow;
-	con->out_msg_done = true;
-}
-
-/*
- * Prepare headers for the next outgoing message.
- */
-static void prepare_write_message(struct ceph_connection *con)
-{
-	struct ceph_msg *m;
-	u32 crc;
-
-	con_out_kvec_reset(con);
-	con->out_msg_done = false;
-
-	/* Sneak an ack in there first?  If we can get it into the same
-	 * TCP packet that's a good thing. */
-	if (con->in_seq > con->in_seq_acked) {
-		con->in_seq_acked = con->in_seq;
-		con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
-		con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
-		con_out_kvec_add(con, sizeof (con->out_temp_ack),
-			&con->out_temp_ack);
-	}
-
-	ceph_con_get_out_msg(con);
-	m = con->out_msg;
-
-	dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
-	     m, con->out_seq, le16_to_cpu(m->hdr.type),
-	     le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
-	     m->data_length);
-	WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len));
-	WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
-
-	/* tag + hdr + front + middle */
-	con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
-	con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr);
-	con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
-
-	if (m->middle)
-		con_out_kvec_add(con, m->middle->vec.iov_len,
-			m->middle->vec.iov_base);
-
-	/* fill in hdr crc and finalize hdr */
-	crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
-	con->out_msg->hdr.crc = cpu_to_le32(crc);
-	memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr));
-
-	/* fill in front and middle crc, footer */
-	crc = crc32c(0, m->front.iov_base, m->front.iov_len);
-	con->out_msg->footer.front_crc = cpu_to_le32(crc);
-	if (m->middle) {
-		crc = crc32c(0, m->middle->vec.iov_base,
-				m->middle->vec.iov_len);
-		con->out_msg->footer.middle_crc = cpu_to_le32(crc);
-	} else
-		con->out_msg->footer.middle_crc = 0;
-	dout("%s front_crc %u middle_crc %u\n", __func__,
-	     le32_to_cpu(con->out_msg->footer.front_crc),
-	     le32_to_cpu(con->out_msg->footer.middle_crc));
-	con->out_msg->footer.flags = 0;
-
-	/* is there a data payload? */
-	con->out_msg->footer.data_crc = 0;
-	if (m->data_length) {
-		prepare_message_data(con->out_msg, m->data_length);
-		con->out_more = 1;  /* data + footer will follow */
-	} else {
-		/* no, queue up footer too and be done */
-		prepare_write_message_footer(con);
-	}
-
-	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
-}
-
-/*
- * Prepare an ack.
- */
-static void prepare_write_ack(struct ceph_connection *con)
-{
-	dout("prepare_write_ack %p %llu -> %llu\n", con,
-	     con->in_seq_acked, con->in_seq);
-	con->in_seq_acked = con->in_seq;
-
-	con_out_kvec_reset(con);
-
-	con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
-
-	con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
-	con_out_kvec_add(con, sizeof (con->out_temp_ack),
-				&con->out_temp_ack);
-
-	con->out_more = 1;  /* more will follow.. eventually.. */
-	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
-}
-
-/*
- * Prepare to share the seq during handshake
- */
-static void prepare_write_seq(struct ceph_connection *con)
-{
-	dout("prepare_write_seq %p %llu -> %llu\n", con,
-	     con->in_seq_acked, con->in_seq);
-	con->in_seq_acked = con->in_seq;
-
-	con_out_kvec_reset(con);
-
-	con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
-	con_out_kvec_add(con, sizeof (con->out_temp_ack),
-			 &con->out_temp_ack);
-
-	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
-}
-
-/*
- * Prepare to write keepalive byte.
- */
-static void prepare_write_keepalive(struct ceph_connection *con)
-{
-	dout("prepare_write_keepalive %p\n", con);
-	con_out_kvec_reset(con);
-	if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) {
-		struct timespec64 now;
-
-		ktime_get_real_ts64(&now);
-		con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2);
-		ceph_encode_timespec64(&con->out_temp_keepalive2, &now);
-		con_out_kvec_add(con, sizeof(con->out_temp_keepalive2),
-				 &con->out_temp_keepalive2);
-	} else {
-		con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
-	}
-	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
-}
-
-/*
- * Connection negotiation.
- */
-
-static int get_connect_authorizer(struct ceph_connection *con)
-{
-	struct ceph_auth_handshake *auth;
-	int auth_proto;
-
-	if (!con->ops->get_authorizer) {
-		con->auth = NULL;
-		con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
-		con->out_connect.authorizer_len = 0;
-		return 0;
-	}
-
-	auth = con->ops->get_authorizer(con, &auth_proto, con->auth_retry);
-	if (IS_ERR(auth))
-		return PTR_ERR(auth);
-
-	con->auth = auth;
-	con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
-	con->out_connect.authorizer_len = cpu_to_le32(auth->authorizer_buf_len);
-	return 0;
-}
-
-/*
- * We connected to a peer and are saying hello.
- */
-static void prepare_write_banner(struct ceph_connection *con)
-{
-	con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
-	con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
-					&con->msgr->my_enc_addr);
-
-	con->out_more = 0;
-	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
-}
-
-static void __prepare_write_connect(struct ceph_connection *con)
-{
-	con_out_kvec_add(con, sizeof(con->out_connect), &con->out_connect);
-	if (con->auth)
-		con_out_kvec_add(con, con->auth->authorizer_buf_len,
-				 con->auth->authorizer_buf);
-
-	con->out_more = 0;
-	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
-}
-
-static int prepare_write_connect(struct ceph_connection *con)
-{
-	unsigned int global_seq = ceph_get_global_seq(con->msgr, 0);
-	int proto;
-	int ret;
-
-	switch (con->peer_name.type) {
-	case CEPH_ENTITY_TYPE_MON:
-		proto = CEPH_MONC_PROTOCOL;
-		break;
-	case CEPH_ENTITY_TYPE_OSD:
-		proto = CEPH_OSDC_PROTOCOL;
-		break;
-	case CEPH_ENTITY_TYPE_MDS:
-		proto = CEPH_MDSC_PROTOCOL;
-		break;
-	default:
-		BUG();
-	}
-
-	dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
-	     con->connect_seq, global_seq, proto);
-
-	con->out_connect.features =
-	    cpu_to_le64(from_msgr(con->msgr)->supported_features);
-	con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
-	con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
-	con->out_connect.global_seq = cpu_to_le32(global_seq);
-	con->out_connect.protocol_version = cpu_to_le32(proto);
-	con->out_connect.flags = 0;
-
-	ret = get_connect_authorizer(con);
-	if (ret)
-		return ret;
-
-	__prepare_write_connect(con);
-	return 0;
-}
-
-/*
- * write as much of pending kvecs to the socket as we can.
- *  1 -> done
- *  0 -> socket full, but more to do
- * <0 -> error
- */
-static int write_partial_kvec(struct ceph_connection *con)
-{
-	int ret;
-
-	dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes);
-	while (con->out_kvec_bytes > 0) {
-		ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
-				       con->out_kvec_left, con->out_kvec_bytes,
-				       con->out_more);
-		if (ret <= 0)
-			goto out;
-		con->out_kvec_bytes -= ret;
-		if (con->out_kvec_bytes == 0)
-			break;            /* done */
-
-		/* account for full iov entries consumed */
-		while (ret >= con->out_kvec_cur->iov_len) {
-			BUG_ON(!con->out_kvec_left);
-			ret -= con->out_kvec_cur->iov_len;
-			con->out_kvec_cur++;
-			con->out_kvec_left--;
-		}
-		/* and for a partially-consumed entry */
-		if (ret) {
-			con->out_kvec_cur->iov_len -= ret;
-			con->out_kvec_cur->iov_base += ret;
-		}
-	}
-	con->out_kvec_left = 0;
-	ret = 1;
-out:
-	dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
-	     con->out_kvec_bytes, con->out_kvec_left, ret);
-	return ret;  /* done! */
-}
-
 u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset,
 		     unsigned int length)
 {
@@ -1573,256 +1115,6 @@ u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset,
 
 	return crc;
 }
-/*
- * Write as much message data payload as we can.  If we finish, queue
- * up the footer.
- *  1 -> done, footer is now queued in out_kvec[].
- *  0 -> socket full, but more to do
- * <0 -> error
- */
-static int write_partial_message_data(struct ceph_connection *con)
-{
-	struct ceph_msg *msg = con->out_msg;
-	struct ceph_msg_data_cursor *cursor = &msg->cursor;
-	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
-	int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
-	u32 crc;
-
-	dout("%s %p msg %p\n", __func__, con, msg);
-
-	if (!msg->num_data_items)
-		return -EINVAL;
-
-	/*
-	 * Iterate through each page that contains data to be
-	 * written, and send as much as possible for each.
-	 *
-	 * If we are calculating the data crc (the default), we will
-	 * need to map the page.  If we have no pages, they have
-	 * been revoked, so use the zero page.
-	 */
-	crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
-	while (cursor->total_resid) {
-		struct page *page;
-		size_t page_offset;
-		size_t length;
-		int ret;
-
-		if (!cursor->resid) {
-			ceph_msg_data_advance(cursor, 0);
-			continue;
-		}
-
-		page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
-		if (length == cursor->total_resid)
-			more = MSG_MORE;
-		ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
-					more);
-		if (ret <= 0) {
-			if (do_datacrc)
-				msg->footer.data_crc = cpu_to_le32(crc);
-
-			return ret;
-		}
-		if (do_datacrc && cursor->need_crc)
-			crc = ceph_crc32c_page(crc, page, page_offset, length);
-		ceph_msg_data_advance(cursor, (size_t)ret);
-	}
-
-	dout("%s %p msg %p done\n", __func__, con, msg);
-
-	/* prepare and queue up footer, too */
-	if (do_datacrc)
-		msg->footer.data_crc = cpu_to_le32(crc);
-	else
-		msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
-	con_out_kvec_reset(con);
-	prepare_write_message_footer(con);
-
-	return 1;	/* must return > 0 to indicate success */
-}
-
-/*
- * write some zeros
- */
-static int write_partial_skip(struct ceph_connection *con)
-{
-	int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
-	int ret;
-
-	dout("%s %p %d left\n", __func__, con, con->out_skip);
-	while (con->out_skip > 0) {
-		size_t size = min(con->out_skip, (int) PAGE_SIZE);
-
-		if (size == con->out_skip)
-			more = MSG_MORE;
-		ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size,
-					more);
-		if (ret <= 0)
-			goto out;
-		con->out_skip -= ret;
-	}
-	ret = 1;
-out:
-	return ret;
-}
-
-/*
- * Prepare to read connection handshake, or an ack.
- */
-static void prepare_read_banner(struct ceph_connection *con)
-{
-	dout("prepare_read_banner %p\n", con);
-	con->in_base_pos = 0;
-}
-
-static void prepare_read_connect(struct ceph_connection *con)
-{
-	dout("prepare_read_connect %p\n", con);
-	con->in_base_pos = 0;
-}
-
-static void prepare_read_ack(struct ceph_connection *con)
-{
-	dout("prepare_read_ack %p\n", con);
-	con->in_base_pos = 0;
-}
-
-static void prepare_read_seq(struct ceph_connection *con)
-{
-	dout("prepare_read_seq %p\n", con);
-	con->in_base_pos = 0;
-	con->in_tag = CEPH_MSGR_TAG_SEQ;
-}
-
-static void prepare_read_tag(struct ceph_connection *con)
-{
-	dout("prepare_read_tag %p\n", con);
-	con->in_base_pos = 0;
-	con->in_tag = CEPH_MSGR_TAG_READY;
-}
-
-static void prepare_read_keepalive_ack(struct ceph_connection *con)
-{
-	dout("prepare_read_keepalive_ack %p\n", con);
-	con->in_base_pos = 0;
-}
-
-/*
- * Prepare to read a message.
- */
-static int prepare_read_message(struct ceph_connection *con)
-{
-	dout("prepare_read_message %p\n", con);
-	BUG_ON(con->in_msg != NULL);
-	con->in_base_pos = 0;
-	con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
-	return 0;
-}
-
-
-static int read_partial(struct ceph_connection *con,
-			int end, int size, void *object)
-{
-	while (con->in_base_pos < end) {
-		int left = end - con->in_base_pos;
-		int have = size - left;
-		int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
-		if (ret <= 0)
-			return ret;
-		con->in_base_pos += ret;
-	}
-	return 1;
-}
-
-
-/*
- * Read all or part of the connect-side handshake on a new connection
- */
-static int read_partial_banner(struct ceph_connection *con)
-{
-	int size;
-	int end;
-	int ret;
-
-	dout("read_partial_banner %p at %d\n", con, con->in_base_pos);
-
-	/* peer's banner */
-	size = strlen(CEPH_BANNER);
-	end = size;
-	ret = read_partial(con, end, size, con->in_banner);
-	if (ret <= 0)
-		goto out;
-
-	size = sizeof (con->actual_peer_addr);
-	end += size;
-	ret = read_partial(con, end, size, &con->actual_peer_addr);
-	if (ret <= 0)
-		goto out;
-	ceph_decode_banner_addr(&con->actual_peer_addr);
-
-	size = sizeof (con->peer_addr_for_me);
-	end += size;
-	ret = read_partial(con, end, size, &con->peer_addr_for_me);
-	if (ret <= 0)
-		goto out;
-	ceph_decode_banner_addr(&con->peer_addr_for_me);
-
-out:
-	return ret;
-}
-
-static int read_partial_connect(struct ceph_connection *con)
-{
-	int size;
-	int end;
-	int ret;
-
-	dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
-
-	size = sizeof (con->in_reply);
-	end = size;
-	ret = read_partial(con, end, size, &con->in_reply);
-	if (ret <= 0)
-		goto out;
-
-	if (con->auth) {
-		size = le32_to_cpu(con->in_reply.authorizer_len);
-		if (size > con->auth->authorizer_reply_buf_len) {
-			pr_err("authorizer reply too big: %d > %zu\n", size,
-			       con->auth->authorizer_reply_buf_len);
-			ret = -EINVAL;
-			goto out;
-		}
-
-		end += size;
-		ret = read_partial(con, end, size,
-				   con->auth->authorizer_reply_buf);
-		if (ret <= 0)
-			goto out;
-	}
-
-	dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n",
-	     con, (int)con->in_reply.tag,
-	     le32_to_cpu(con->in_reply.connect_seq),
-	     le32_to_cpu(con->in_reply.global_seq));
-out:
-	return ret;
-}
-
-/*
- * Verify the hello banner looks okay.
- */
-static int verify_hello(struct ceph_connection *con)
-{
-	if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
-		pr_err("connect to %s got bad banner\n",
-		       ceph_pr_addr(&con->peer_addr));
-		con->error_msg = "protocol error, bad banner";
-		return -1;
-	}
-	return 0;
-}
 
 bool ceph_addr_is_blank(const struct ceph_entity_addr *addr)
 {
@@ -2032,492 +1324,6 @@ bad:
 	return ret;
 }
 
-static int process_banner(struct ceph_connection *con)
-{
-	struct ceph_entity_addr *my_addr = &con->msgr->inst.addr;
-
-	dout("process_banner on %p\n", con);
-
-	if (verify_hello(con) < 0)
-		return -1;
-
-	/*
-	 * Make sure the other end is who we wanted.  note that the other
-	 * end may not yet know their ip address, so if it's 0.0.0.0, give
-	 * them the benefit of the doubt.
-	 */
-	if (memcmp(&con->peer_addr, &con->actual_peer_addr,
-		   sizeof(con->peer_addr)) != 0 &&
-	    !(ceph_addr_is_blank(&con->actual_peer_addr) &&
-	      con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
-		pr_warn("wrong peer, want %s/%u, got %s/%u\n",
-			ceph_pr_addr(&con->peer_addr),
-			le32_to_cpu(con->peer_addr.nonce),
-			ceph_pr_addr(&con->actual_peer_addr),
-			le32_to_cpu(con->actual_peer_addr.nonce));
-		con->error_msg = "wrong peer at address";
-		return -1;
-	}
-
-	/*
-	 * did we learn our address?
-	 */
-	if (ceph_addr_is_blank(my_addr)) {
-		memcpy(&my_addr->in_addr,
-		       &con->peer_addr_for_me.in_addr,
-		       sizeof(con->peer_addr_for_me.in_addr));
-		ceph_addr_set_port(my_addr, 0);
-		ceph_encode_my_addr(con->msgr);
-		dout("process_banner learned my addr is %s\n",
-		     ceph_pr_addr(my_addr));
-	}
-
-	return 0;
-}
-
-static int process_connect(struct ceph_connection *con)
-{
-	u64 sup_feat = from_msgr(con->msgr)->supported_features;
-	u64 req_feat = from_msgr(con->msgr)->required_features;
-	u64 server_feat = le64_to_cpu(con->in_reply.features);
-	int ret;
-
-	dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
-
-	if (con->auth) {
-		int len = le32_to_cpu(con->in_reply.authorizer_len);
-
-		/*
-		 * Any connection that defines ->get_authorizer()
-		 * should also define ->add_authorizer_challenge() and
-		 * ->verify_authorizer_reply().
-		 *
-		 * See get_connect_authorizer().
-		 */
-		if (con->in_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
-			ret = con->ops->add_authorizer_challenge(
-				    con, con->auth->authorizer_reply_buf, len);
-			if (ret < 0)
-				return ret;
-
-			con_out_kvec_reset(con);
-			__prepare_write_connect(con);
-			prepare_read_connect(con);
-			return 0;
-		}
-
-		if (len) {
-			ret = con->ops->verify_authorizer_reply(con);
-			if (ret < 0) {
-				con->error_msg = "bad authorize reply";
-				return ret;
-			}
-		}
-	}
-
-	switch (con->in_reply.tag) {
-	case CEPH_MSGR_TAG_FEATURES:
-		pr_err("%s%lld %s feature set mismatch,"
-		       " my %llx < server's %llx, missing %llx\n",
-		       ENTITY_NAME(con->peer_name),
-		       ceph_pr_addr(&con->peer_addr),
-		       sup_feat, server_feat, server_feat & ~sup_feat);
-		con->error_msg = "missing required protocol features";
-		return -1;
-
-	case CEPH_MSGR_TAG_BADPROTOVER:
-		pr_err("%s%lld %s protocol version mismatch,"
-		       " my %d != server's %d\n",
-		       ENTITY_NAME(con->peer_name),
-		       ceph_pr_addr(&con->peer_addr),
-		       le32_to_cpu(con->out_connect.protocol_version),
-		       le32_to_cpu(con->in_reply.protocol_version));
-		con->error_msg = "protocol version mismatch";
-		return -1;
-
-	case CEPH_MSGR_TAG_BADAUTHORIZER:
-		con->auth_retry++;
-		dout("process_connect %p got BADAUTHORIZER attempt %d\n", con,
-		     con->auth_retry);
-		if (con->auth_retry == 2) {
-			con->error_msg = "connect authorization failure";
-			return -1;
-		}
-		con_out_kvec_reset(con);
-		ret = prepare_write_connect(con);
-		if (ret < 0)
-			return ret;
-		prepare_read_connect(con);
-		break;
-
-	case CEPH_MSGR_TAG_RESETSESSION:
-		/*
-		 * If we connected with a large connect_seq but the peer
-		 * has no record of a session with us (no connection, or
-		 * connect_seq == 0), they will send RESETSESION to indicate
-		 * that they must have reset their session, and may have
-		 * dropped messages.
-		 */
-		dout("process_connect got RESET peer seq %u\n",
-		     le32_to_cpu(con->in_reply.connect_seq));
-		pr_info("%s%lld %s session reset\n",
-			ENTITY_NAME(con->peer_name),
-			ceph_pr_addr(&con->peer_addr));
-		ceph_con_reset_session(con);
-		con_out_kvec_reset(con);
-		ret = prepare_write_connect(con);
-		if (ret < 0)
-			return ret;
-		prepare_read_connect(con);
-
-		/* Tell ceph about it. */
-		mutex_unlock(&con->mutex);
-		if (con->ops->peer_reset)
-			con->ops->peer_reset(con);
-		mutex_lock(&con->mutex);
-		if (con->state != CEPH_CON_S_V1_CONNECT_MSG)
-			return -EAGAIN;
-		break;
-
-	case CEPH_MSGR_TAG_RETRY_SESSION:
-		/*
-		 * If we sent a smaller connect_seq than the peer has, try
-		 * again with a larger value.
-		 */
-		dout("process_connect got RETRY_SESSION my seq %u, peer %u\n",
-		     le32_to_cpu(con->out_connect.connect_seq),
-		     le32_to_cpu(con->in_reply.connect_seq));
-		con->connect_seq = le32_to_cpu(con->in_reply.connect_seq);
-		con_out_kvec_reset(con);
-		ret = prepare_write_connect(con);
-		if (ret < 0)
-			return ret;
-		prepare_read_connect(con);
-		break;
-
-	case CEPH_MSGR_TAG_RETRY_GLOBAL:
-		/*
-		 * If we sent a smaller global_seq than the peer has, try
-		 * again with a larger value.
-		 */
-		dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
-		     con->peer_global_seq,
-		     le32_to_cpu(con->in_reply.global_seq));
-		ceph_get_global_seq(con->msgr,
-				    le32_to_cpu(con->in_reply.global_seq));
-		con_out_kvec_reset(con);
-		ret = prepare_write_connect(con);
-		if (ret < 0)
-			return ret;
-		prepare_read_connect(con);
-		break;
-
-	case CEPH_MSGR_TAG_SEQ:
-	case CEPH_MSGR_TAG_READY:
-		if (req_feat & ~server_feat) {
-			pr_err("%s%lld %s protocol feature mismatch,"
-			       " my required %llx > server's %llx, need %llx\n",
-			       ENTITY_NAME(con->peer_name),
-			       ceph_pr_addr(&con->peer_addr),
-			       req_feat, server_feat, req_feat & ~server_feat);
-			con->error_msg = "missing required protocol features";
-			return -1;
-		}
-
-		WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG);
-		con->state = CEPH_CON_S_OPEN;
-		con->auth_retry = 0;    /* we authenticated; clear flag */
-		con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
-		con->connect_seq++;
-		con->peer_features = server_feat;
-		dout("process_connect got READY gseq %d cseq %d (%d)\n",
-		     con->peer_global_seq,
-		     le32_to_cpu(con->in_reply.connect_seq),
-		     con->connect_seq);
-		WARN_ON(con->connect_seq !=
-			le32_to_cpu(con->in_reply.connect_seq));
-
-		if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
-			ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX);
-
-		con->delay = 0;      /* reset backoff memory */
-
-		if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) {
-			prepare_write_seq(con);
-			prepare_read_seq(con);
-		} else {
-			prepare_read_tag(con);
-		}
-		break;
-
-	case CEPH_MSGR_TAG_WAIT:
-		/*
-		 * If there is a connection race (we are opening
-		 * connections to each other), one of us may just have
-		 * to WAIT.  This shouldn't happen if we are the
-		 * client.
-		 */
-		con->error_msg = "protocol error, got WAIT as client";
-		return -1;
-
-	default:
-		con->error_msg = "protocol error, garbage tag during connect";
-		return -1;
-	}
-	return 0;
-}
-
-
-/*
- * read (part of) an ack
- */
-static int read_partial_ack(struct ceph_connection *con)
-{
-	int size = sizeof (con->in_temp_ack);
-	int end = size;
-
-	return read_partial(con, end, size, &con->in_temp_ack);
-}
-
-/*
- * We can finally discard anything that's been acked.
- */
-static void process_ack(struct ceph_connection *con)
-{
-	u64 ack = le64_to_cpu(con->in_temp_ack);
-
-	if (con->in_tag == CEPH_MSGR_TAG_ACK)
-		ceph_con_discard_sent(con, ack);
-	else
-		ceph_con_discard_requeued(con, ack);
-
-	prepare_read_tag(con);
-}
-
-
-static int read_partial_message_section(struct ceph_connection *con,
-					struct kvec *section,
-					unsigned int sec_len, u32 *crc)
-{
-	int ret, left;
-
-	BUG_ON(!section);
-
-	while (section->iov_len < sec_len) {
-		BUG_ON(section->iov_base == NULL);
-		left = sec_len - section->iov_len;
-		ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
-				       section->iov_len, left);
-		if (ret <= 0)
-			return ret;
-		section->iov_len += ret;
-	}
-	if (section->iov_len == sec_len)
-		*crc = crc32c(0, section->iov_base, section->iov_len);
-
-	return 1;
-}
-
-static int read_partial_msg_data(struct ceph_connection *con)
-{
-	struct ceph_msg *msg = con->in_msg;
-	struct ceph_msg_data_cursor *cursor = &msg->cursor;
-	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
-	struct page *page;
-	size_t page_offset;
-	size_t length;
-	u32 crc = 0;
-	int ret;
-
-	if (!msg->num_data_items)
-		return -EIO;
-
-	if (do_datacrc)
-		crc = con->in_data_crc;
-	while (cursor->total_resid) {
-		if (!cursor->resid) {
-			ceph_msg_data_advance(cursor, 0);
-			continue;
-		}
-
-		page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
-		ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
-		if (ret <= 0) {
-			if (do_datacrc)
-				con->in_data_crc = crc;
-
-			return ret;
-		}
-
-		if (do_datacrc)
-			crc = ceph_crc32c_page(crc, page, page_offset, ret);
-		ceph_msg_data_advance(cursor, (size_t)ret);
-	}
-	if (do_datacrc)
-		con->in_data_crc = crc;
-
-	return 1;	/* must return > 0 to indicate success */
-}
-
-/*
- * read (part of) a message.
- */
-static int read_partial_message(struct ceph_connection *con)
-{
-	struct ceph_msg *m = con->in_msg;
-	int size;
-	int end;
-	int ret;
-	unsigned int front_len, middle_len, data_len;
-	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
-	bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
-	u64 seq;
-	u32 crc;
-
-	dout("read_partial_message con %p msg %p\n", con, m);
-
-	/* header */
-	size = sizeof (con->in_hdr);
-	end = size;
-	ret = read_partial(con, end, size, &con->in_hdr);
-	if (ret <= 0)
-		return ret;
-
-	crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc));
-	if (cpu_to_le32(crc) != con->in_hdr.crc) {
-		pr_err("read_partial_message bad hdr crc %u != expected %u\n",
-		       crc, con->in_hdr.crc);
-		return -EBADMSG;
-	}
-
-	front_len = le32_to_cpu(con->in_hdr.front_len);
-	if (front_len > CEPH_MSG_MAX_FRONT_LEN)
-		return -EIO;
-	middle_len = le32_to_cpu(con->in_hdr.middle_len);
-	if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN)
-		return -EIO;
-	data_len = le32_to_cpu(con->in_hdr.data_len);
-	if (data_len > CEPH_MSG_MAX_DATA_LEN)
-		return -EIO;
-
-	/* verify seq# */
-	seq = le64_to_cpu(con->in_hdr.seq);
-	if ((s64)seq - (s64)con->in_seq < 1) {
-		pr_info("skipping %s%lld %s seq %lld expected %lld\n",
-			ENTITY_NAME(con->peer_name),
-			ceph_pr_addr(&con->peer_addr),
-			seq, con->in_seq + 1);
-		con->in_base_pos = -front_len - middle_len - data_len -
-			sizeof_footer(con);
-		con->in_tag = CEPH_MSGR_TAG_READY;
-		return 1;
-	} else if ((s64)seq - (s64)con->in_seq > 1) {
-		pr_err("read_partial_message bad seq %lld expected %lld\n",
-		       seq, con->in_seq + 1);
-		con->error_msg = "bad message sequence # for incoming message";
-		return -EBADE;
-	}
-
-	/* allocate message? */
-	if (!con->in_msg) {
-		int skip = 0;
-
-		dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
-		     front_len, data_len);
-		ret = ceph_con_in_msg_alloc(con, &con->in_hdr, &skip);
-		if (ret < 0)
-			return ret;
-
-		BUG_ON(!con->in_msg ^ skip);
-		if (skip) {
-			/* skip this message */
-			dout("alloc_msg said skip message\n");
-			con->in_base_pos = -front_len - middle_len - data_len -
-				sizeof_footer(con);
-			con->in_tag = CEPH_MSGR_TAG_READY;
-			con->in_seq++;
-			return 1;
-		}
-
-		BUG_ON(!con->in_msg);
-		BUG_ON(con->in_msg->con != con);
-		m = con->in_msg;
-		m->front.iov_len = 0;    /* haven't read it yet */
-		if (m->middle)
-			m->middle->vec.iov_len = 0;
-
-		/* prepare for data payload, if any */
-
-		if (data_len)
-			prepare_message_data(con->in_msg, data_len);
-	}
-
-	/* front */
-	ret = read_partial_message_section(con, &m->front, front_len,
-					   &con->in_front_crc);
-	if (ret <= 0)
-		return ret;
-
-	/* middle */
-	if (m->middle) {
-		ret = read_partial_message_section(con, &m->middle->vec,
-						   middle_len,
-						   &con->in_middle_crc);
-		if (ret <= 0)
-			return ret;
-	}
-
-	/* (page) data */
-	if (data_len) {
-		ret = read_partial_msg_data(con);
-		if (ret <= 0)
-			return ret;
-	}
-
-	/* footer */
-	size = sizeof_footer(con);
-	end += size;
-	ret = read_partial(con, end, size, &m->footer);
-	if (ret <= 0)
-		return ret;
-
-	if (!need_sign) {
-		m->footer.flags = m->old_footer.flags;
-		m->footer.sig = 0;
-	}
-
-	dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
-	     m, front_len, m->footer.front_crc, middle_len,
-	     m->footer.middle_crc, data_len, m->footer.data_crc);
-
-	/* crc ok? */
-	if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
-		pr_err("read_partial_message %p front crc %u != exp. %u\n",
-		       m, con->in_front_crc, m->footer.front_crc);
-		return -EBADMSG;
-	}
-	if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
-		pr_err("read_partial_message %p middle crc %u != exp %u\n",
-		       m, con->in_middle_crc, m->footer.middle_crc);
-		return -EBADMSG;
-	}
-	if (do_datacrc &&
-	    (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
-	    con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
-		pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
-		       con->in_data_crc, le32_to_cpu(m->footer.data_crc));
-		return -EBADMSG;
-	}
-
-	if (need_sign && con->ops->check_message_signature &&
-	    con->ops->check_message_signature(m)) {
-		pr_err("read_partial_message %p signature check failed\n", m);
-		return -EBADMSG;
-	}
-
-	return 1; /* done! */
-}
-
 /*
  * Process message.  This happens in the worker thread.  The callback should
  * be careful not to do anything that waits on other incoming messages or it
@@ -2551,263 +1357,6 @@ void ceph_con_process_message(struct ceph_connection *con)
 	mutex_lock(&con->mutex);
 }
 
-static int read_keepalive_ack(struct ceph_connection *con)
-{
-	struct ceph_timespec ceph_ts;
-	size_t size = sizeof(ceph_ts);
-	int ret = read_partial(con, size, size, &ceph_ts);
-	if (ret <= 0)
-		return ret;
-	ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts);
-	prepare_read_tag(con);
-	return 1;
-}
-
-/*
- * Write something to the socket.  Called in a worker thread when the
- * socket appears to be writeable and we have something ready to send.
- */
-int ceph_con_v1_try_write(struct ceph_connection *con)
-{
-	int ret = 1;
-
-	dout("try_write start %p state %d\n", con, con->state);
-	if (con->state != CEPH_CON_S_PREOPEN &&
-	    con->state != CEPH_CON_S_V1_BANNER &&
-	    con->state != CEPH_CON_S_V1_CONNECT_MSG &&
-	    con->state != CEPH_CON_S_OPEN)
-		return 0;
-
-	/* open the socket first? */
-	if (con->state == CEPH_CON_S_PREOPEN) {
-		BUG_ON(con->sock);
-		con->state = CEPH_CON_S_V1_BANNER;
-
-		con_out_kvec_reset(con);
-		prepare_write_banner(con);
-		prepare_read_banner(con);
-
-		BUG_ON(con->in_msg);
-		con->in_tag = CEPH_MSGR_TAG_READY;
-		dout("try_write initiating connect on %p new state %d\n",
-		     con, con->state);
-		ret = ceph_tcp_connect(con);
-		if (ret < 0) {
-			con->error_msg = "connect error";
-			goto out;
-		}
-	}
-
-more:
-	dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
-	BUG_ON(!con->sock);
-
-	/* kvec data queued? */
-	if (con->out_kvec_left) {
-		ret = write_partial_kvec(con);
-		if (ret <= 0)
-			goto out;
-	}
-	if (con->out_skip) {
-		ret = write_partial_skip(con);
-		if (ret <= 0)
-			goto out;
-	}
-
-	/* msg pages? */
-	if (con->out_msg) {
-		if (con->out_msg_done) {
-			ceph_msg_put(con->out_msg);
-			con->out_msg = NULL;   /* we're done with this one */
-			goto do_next;
-		}
-
-		ret = write_partial_message_data(con);
-		if (ret == 1)
-			goto more;  /* we need to send the footer, too! */
-		if (ret == 0)
-			goto out;
-		if (ret < 0) {
-			dout("try_write write_partial_message_data err %d\n",
-			     ret);
-			goto out;
-		}
-	}
-
-do_next:
-	if (con->state == CEPH_CON_S_OPEN) {
-		if (ceph_con_flag_test_and_clear(con,
-				CEPH_CON_F_KEEPALIVE_PENDING)) {
-			prepare_write_keepalive(con);
-			goto more;
-		}
-		/* is anything else pending? */
-		if (!list_empty(&con->out_queue)) {
-			prepare_write_message(con);
-			goto more;
-		}
-		if (con->in_seq > con->in_seq_acked) {
-			prepare_write_ack(con);
-			goto more;
-		}
-	}
-
-	/* Nothing to do! */
-	ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
-	dout("try_write nothing else to write.\n");
-	ret = 0;
-out:
-	dout("try_write done on %p ret %d\n", con, ret);
-	return ret;
-}
-
-/*
- * Read what we can from the socket.
- */
-int ceph_con_v1_try_read(struct ceph_connection *con)
-{
-	int ret = -1;
-
-more:
-	dout("try_read start %p state %d\n", con, con->state);
-	if (con->state != CEPH_CON_S_V1_BANNER &&
-	    con->state != CEPH_CON_S_V1_CONNECT_MSG &&
-	    con->state != CEPH_CON_S_OPEN)
-		return 0;
-
-	BUG_ON(!con->sock);
-
-	dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
-	     con->in_base_pos);
-
-	if (con->state == CEPH_CON_S_V1_BANNER) {
-		ret = read_partial_banner(con);
-		if (ret <= 0)
-			goto out;
-		ret = process_banner(con);
-		if (ret < 0)
-			goto out;
-
-		con->state = CEPH_CON_S_V1_CONNECT_MSG;
-
-		/*
-		 * Received banner is good, exchange connection info.
-		 * Do not reset out_kvec, as sending our banner raced
-		 * with receiving peer banner after connect completed.
-		 */
-		ret = prepare_write_connect(con);
-		if (ret < 0)
-			goto out;
-		prepare_read_connect(con);
-
-		/* Send connection info before awaiting response */
-		goto out;
-	}
-
-	if (con->state == CEPH_CON_S_V1_CONNECT_MSG) {
-		ret = read_partial_connect(con);
-		if (ret <= 0)
-			goto out;
-		ret = process_connect(con);
-		if (ret < 0)
-			goto out;
-		goto more;
-	}
-
-	WARN_ON(con->state != CEPH_CON_S_OPEN);
-
-	if (con->in_base_pos < 0) {
-		/*
-		 * skipping + discarding content.
-		 */
-		ret = ceph_tcp_recvmsg(con->sock, NULL, -con->in_base_pos);
-		if (ret <= 0)
-			goto out;
-		dout("skipped %d / %d bytes\n", ret, -con->in_base_pos);
-		con->in_base_pos += ret;
-		if (con->in_base_pos)
-			goto more;
-	}
-	if (con->in_tag == CEPH_MSGR_TAG_READY) {
-		/*
-		 * what's next?
-		 */
-		ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
-		if (ret <= 0)
-			goto out;
-		dout("try_read got tag %d\n", (int)con->in_tag);
-		switch (con->in_tag) {
-		case CEPH_MSGR_TAG_MSG:
-			prepare_read_message(con);
-			break;
-		case CEPH_MSGR_TAG_ACK:
-			prepare_read_ack(con);
-			break;
-		case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
-			prepare_read_keepalive_ack(con);
-			break;
-		case CEPH_MSGR_TAG_CLOSE:
-			ceph_con_close_socket(con);
-			con->state = CEPH_CON_S_CLOSED;
-			goto out;
-		default:
-			goto bad_tag;
-		}
-	}
-	if (con->in_tag == CEPH_MSGR_TAG_MSG) {
-		ret = read_partial_message(con);
-		if (ret <= 0) {
-			switch (ret) {
-			case -EBADMSG:
-				con->error_msg = "bad crc/signature";
-				fallthrough;
-			case -EBADE:
-				ret = -EIO;
-				break;
-			case -EIO:
-				con->error_msg = "io error";
-				break;
-			}
-			goto out;
-		}
-		if (con->in_tag == CEPH_MSGR_TAG_READY)
-			goto more;
-		ceph_con_process_message(con);
-		if (con->state == CEPH_CON_S_OPEN)
-			prepare_read_tag(con);
-		goto more;
-	}
-	if (con->in_tag == CEPH_MSGR_TAG_ACK ||
-	    con->in_tag == CEPH_MSGR_TAG_SEQ) {
-		/*
-		 * the final handshake seq exchange is semantically
-		 * equivalent to an ACK
-		 */
-		ret = read_partial_ack(con);
-		if (ret <= 0)
-			goto out;
-		process_ack(con);
-		goto more;
-	}
-	if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
-		ret = read_keepalive_ack(con);
-		if (ret <= 0)
-			goto out;
-		goto more;
-	}
-
-out:
-	dout("try_read done on %p ret %d\n", con, ret);
-	return ret;
-
-bad_tag:
-	pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag);
-	con->error_msg = "protocol error, garbage tag";
-	ret = -1;
-	goto out;
-}
-
-
 /*
  * Atomically queue work on a connection after the specified delay.
  * Bump @con reference to avoid races with connection teardown.
@@ -3026,7 +1575,6 @@ static void con_fault(struct ceph_connection *con)
 	}
 }
 
-
 void ceph_messenger_reset_nonce(struct ceph_messenger *msgr)
 {
 	u32 nonce = le32_to_cpu(msgr->inst.addr.nonce) + 1000000;
@@ -3131,29 +1679,6 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
 }
 EXPORT_SYMBOL(ceph_con_send);
 
-void ceph_con_v1_revoke(struct ceph_connection *con)
-{
-	struct ceph_msg *msg = con->out_msg;
-
-	WARN_ON(con->out_skip);
-	/* footer */
-	if (con->out_msg_done) {
-		con->out_skip += con_out_kvec_skip(con);
-	} else {
-		WARN_ON(!msg->data_length);
-		con->out_skip += sizeof_footer(con);
-	}
-	/* data, middle, front */
-	if (msg->data_length)
-		con->out_skip += msg->cursor.total_resid;
-	if (msg->middle)
-		con->out_skip += con_out_kvec_skip(con);
-	con->out_skip += con_out_kvec_skip(con);
-
-	dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con,
-	     con->out_kvec_bytes, con->out_skip);
-}
-
 /*
  * Revoke a message that was previously queued for send
  */
@@ -3191,26 +1716,6 @@ void ceph_msg_revoke(struct ceph_msg *msg)
 	mutex_unlock(&con->mutex);
 }
 
-void ceph_con_v1_revoke_incoming(struct ceph_connection *con)
-{
-	unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
-	unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
-	unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
-
-	/* skip rest of message */
-	con->in_base_pos = con->in_base_pos -
-			sizeof(struct ceph_msg_header) -
-			front_len -
-			middle_len -
-			data_len -
-			sizeof(struct ceph_msg_footer);
-
-	con->in_tag = CEPH_MSGR_TAG_READY;
-	con->in_seq++;
-
-	dout("%s con %p in_base_pos %d\n", __func__, con, con->in_base_pos);
-}
-
 /*
  * Revoke a message that we may be reading data into
  */
diff --git a/net/ceph/messenger_v1.c b/net/ceph/messenger_v1.c
new file mode 100644
index 0000000000000000000000000000000000000000..899038a9678e37c92f2a38288054a210808fa8d9
--- /dev/null
+++ b/net/ceph/messenger_v1.c
@@ -0,0 +1,1502 @@
+// SPDX-License-Identifier: GPL-2.0
+#include <linux/ceph/ceph_debug.h>
+
+#include <linux/bvec.h>
+#include <linux/crc32c.h>
+#include <linux/net.h>
+#include <linux/socket.h>
+#include <net/sock.h>
+
+#include <linux/ceph/ceph_features.h>
+#include <linux/ceph/decode.h>
+#include <linux/ceph/libceph.h>
+#include <linux/ceph/messenger.h>
+
+/* static tag bytes (protocol control messages) */
+static char tag_msg = CEPH_MSGR_TAG_MSG;
+static char tag_ack = CEPH_MSGR_TAG_ACK;
+static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
+static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
+
+/*
+ * If @buf is NULL, discard up to @len bytes.
+ */
+static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
+{
+	struct kvec iov = {buf, len};
+	struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
+	int r;
+
+	if (!buf)
+		msg.msg_flags |= MSG_TRUNC;
+
+	iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, len);
+	r = sock_recvmsg(sock, &msg, msg.msg_flags);
+	if (r == -EAGAIN)
+		r = 0;
+	return r;
+}
+
+static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
+		     int page_offset, size_t length)
+{
+	struct bio_vec bvec = {
+		.bv_page = page,
+		.bv_offset = page_offset,
+		.bv_len = length
+	};
+	struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
+	int r;
+
+	BUG_ON(page_offset + length > PAGE_SIZE);
+	iov_iter_bvec(&msg.msg_iter, READ, &bvec, 1, length);
+	r = sock_recvmsg(sock, &msg, msg.msg_flags);
+	if (r == -EAGAIN)
+		r = 0;
+	return r;
+}
+
+/*
+ * write something.  @more is true if caller will be sending more data
+ * shortly.
+ */
+static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
+			    size_t kvlen, size_t len, bool more)
+{
+	struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
+	int r;
+
+	if (more)
+		msg.msg_flags |= MSG_MORE;
+	else
+		msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
+
+	r = kernel_sendmsg(sock, &msg, iov, kvlen, len);
+	if (r == -EAGAIN)
+		r = 0;
+	return r;
+}
+
+/*
+ * @more: either or both of MSG_MORE and MSG_SENDPAGE_NOTLAST
+ */
+static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
+			     int offset, size_t size, int more)
+{
+	ssize_t (*sendpage)(struct socket *sock, struct page *page,
+			    int offset, size_t size, int flags);
+	int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more;
+	int ret;
+
+	/*
+	 * sendpage cannot properly handle pages with page_count == 0,
+	 * we need to fall back to sendmsg if that's the case.
+	 *
+	 * Same goes for slab pages: skb_can_coalesce() allows
+	 * coalescing neighboring slab objects into a single frag which
+	 * triggers one of hardened usercopy checks.
+	 */
+	if (sendpage_ok(page))
+		sendpage = sock->ops->sendpage;
+	else
+		sendpage = sock_no_sendpage;
+
+	ret = sendpage(sock, page, offset, size, flags);
+	if (ret == -EAGAIN)
+		ret = 0;
+
+	return ret;
+}
+
+static void con_out_kvec_reset(struct ceph_connection *con)
+{
+	BUG_ON(con->out_skip);
+
+	con->out_kvec_left = 0;
+	con->out_kvec_bytes = 0;
+	con->out_kvec_cur = &con->out_kvec[0];
+}
+
+static void con_out_kvec_add(struct ceph_connection *con,
+				size_t size, void *data)
+{
+	int index = con->out_kvec_left;
+
+	BUG_ON(con->out_skip);
+	BUG_ON(index >= ARRAY_SIZE(con->out_kvec));
+
+	con->out_kvec[index].iov_len = size;
+	con->out_kvec[index].iov_base = data;
+	con->out_kvec_left++;
+	con->out_kvec_bytes += size;
+}
+
+/*
+ * Chop off a kvec from the end.  Return residual number of bytes for
+ * that kvec, i.e. how many bytes would have been written if the kvec
+ * hadn't been nuked.
+ */
+static int con_out_kvec_skip(struct ceph_connection *con)
+{
+	int off = con->out_kvec_cur - con->out_kvec;
+	int skip = 0;
+
+	if (con->out_kvec_bytes > 0) {
+		skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len;
+		BUG_ON(con->out_kvec_bytes < skip);
+		BUG_ON(!con->out_kvec_left);
+		con->out_kvec_bytes -= skip;
+		con->out_kvec_left--;
+	}
+
+	return skip;
+}
+
+static size_t sizeof_footer(struct ceph_connection *con)
+{
+	return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ?
+	    sizeof(struct ceph_msg_footer) :
+	    sizeof(struct ceph_msg_footer_old);
+}
+
+static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
+{
+	/* Initialize data cursor */
+
+	ceph_msg_data_cursor_init(&msg->cursor, msg, data_len);
+}
+
+/*
+ * Prepare footer for currently outgoing message, and finish things
+ * off.  Assumes out_kvec* are already valid.. we just add on to the end.
+ */
+static void prepare_write_message_footer(struct ceph_connection *con)
+{
+	struct ceph_msg *m = con->out_msg;
+
+	m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
+
+	dout("prepare_write_message_footer %p\n", con);
+	con_out_kvec_add(con, sizeof_footer(con), &m->footer);
+	if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
+		if (con->ops->sign_message)
+			con->ops->sign_message(m);
+		else
+			m->footer.sig = 0;
+	} else {
+		m->old_footer.flags = m->footer.flags;
+	}
+	con->out_more = m->more_to_follow;
+	con->out_msg_done = true;
+}
+
+/*
+ * Prepare headers for the next outgoing message.
+ */
+static void prepare_write_message(struct ceph_connection *con)
+{
+	struct ceph_msg *m;
+	u32 crc;
+
+	con_out_kvec_reset(con);
+	con->out_msg_done = false;
+
+	/* Sneak an ack in there first?  If we can get it into the same
+	 * TCP packet that's a good thing. */
+	if (con->in_seq > con->in_seq_acked) {
+		con->in_seq_acked = con->in_seq;
+		con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
+		con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
+		con_out_kvec_add(con, sizeof (con->out_temp_ack),
+			&con->out_temp_ack);
+	}
+
+	ceph_con_get_out_msg(con);
+	m = con->out_msg;
+
+	dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
+	     m, con->out_seq, le16_to_cpu(m->hdr.type),
+	     le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
+	     m->data_length);
+	WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len));
+	WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
+
+	/* tag + hdr + front + middle */
+	con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
+	con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr);
+	con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
+
+	if (m->middle)
+		con_out_kvec_add(con, m->middle->vec.iov_len,
+			m->middle->vec.iov_base);
+
+	/* fill in hdr crc and finalize hdr */
+	crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
+	con->out_msg->hdr.crc = cpu_to_le32(crc);
+	memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr));
+
+	/* fill in front and middle crc, footer */
+	crc = crc32c(0, m->front.iov_base, m->front.iov_len);
+	con->out_msg->footer.front_crc = cpu_to_le32(crc);
+	if (m->middle) {
+		crc = crc32c(0, m->middle->vec.iov_base,
+				m->middle->vec.iov_len);
+		con->out_msg->footer.middle_crc = cpu_to_le32(crc);
+	} else
+		con->out_msg->footer.middle_crc = 0;
+	dout("%s front_crc %u middle_crc %u\n", __func__,
+	     le32_to_cpu(con->out_msg->footer.front_crc),
+	     le32_to_cpu(con->out_msg->footer.middle_crc));
+	con->out_msg->footer.flags = 0;
+
+	/* is there a data payload? */
+	con->out_msg->footer.data_crc = 0;
+	if (m->data_length) {
+		prepare_message_data(con->out_msg, m->data_length);
+		con->out_more = 1;  /* data + footer will follow */
+	} else {
+		/* no, queue up footer too and be done */
+		prepare_write_message_footer(con);
+	}
+
+	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
+}
+
+/*
+ * Prepare an ack.
+ */
+static void prepare_write_ack(struct ceph_connection *con)
+{
+	dout("prepare_write_ack %p %llu -> %llu\n", con,
+	     con->in_seq_acked, con->in_seq);
+	con->in_seq_acked = con->in_seq;
+
+	con_out_kvec_reset(con);
+
+	con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
+
+	con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
+	con_out_kvec_add(con, sizeof (con->out_temp_ack),
+				&con->out_temp_ack);
+
+	con->out_more = 1;  /* more will follow.. eventually.. */
+	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
+}
+
+/*
+ * Prepare to share the seq during handshake
+ */
+static void prepare_write_seq(struct ceph_connection *con)
+{
+	dout("prepare_write_seq %p %llu -> %llu\n", con,
+	     con->in_seq_acked, con->in_seq);
+	con->in_seq_acked = con->in_seq;
+
+	con_out_kvec_reset(con);
+
+	con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
+	con_out_kvec_add(con, sizeof (con->out_temp_ack),
+			 &con->out_temp_ack);
+
+	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
+}
+
+/*
+ * Prepare to write keepalive byte.
+ */
+static void prepare_write_keepalive(struct ceph_connection *con)
+{
+	dout("prepare_write_keepalive %p\n", con);
+	con_out_kvec_reset(con);
+	if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) {
+		struct timespec64 now;
+
+		ktime_get_real_ts64(&now);
+		con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2);
+		ceph_encode_timespec64(&con->out_temp_keepalive2, &now);
+		con_out_kvec_add(con, sizeof(con->out_temp_keepalive2),
+				 &con->out_temp_keepalive2);
+	} else {
+		con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
+	}
+	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
+}
+
+/*
+ * Connection negotiation.
+ */
+
+static int get_connect_authorizer(struct ceph_connection *con)
+{
+	struct ceph_auth_handshake *auth;
+	int auth_proto;
+
+	if (!con->ops->get_authorizer) {
+		con->auth = NULL;
+		con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
+		con->out_connect.authorizer_len = 0;
+		return 0;
+	}
+
+	auth = con->ops->get_authorizer(con, &auth_proto, con->auth_retry);
+	if (IS_ERR(auth))
+		return PTR_ERR(auth);
+
+	con->auth = auth;
+	con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
+	con->out_connect.authorizer_len = cpu_to_le32(auth->authorizer_buf_len);
+	return 0;
+}
+
+/*
+ * We connected to a peer and are saying hello.
+ */
+static void prepare_write_banner(struct ceph_connection *con)
+{
+	con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
+	con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
+					&con->msgr->my_enc_addr);
+
+	con->out_more = 0;
+	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
+}
+
+static void __prepare_write_connect(struct ceph_connection *con)
+{
+	con_out_kvec_add(con, sizeof(con->out_connect), &con->out_connect);
+	if (con->auth)
+		con_out_kvec_add(con, con->auth->authorizer_buf_len,
+				 con->auth->authorizer_buf);
+
+	con->out_more = 0;
+	ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
+}
+
+static int prepare_write_connect(struct ceph_connection *con)
+{
+	unsigned int global_seq = ceph_get_global_seq(con->msgr, 0);
+	int proto;
+	int ret;
+
+	switch (con->peer_name.type) {
+	case CEPH_ENTITY_TYPE_MON:
+		proto = CEPH_MONC_PROTOCOL;
+		break;
+	case CEPH_ENTITY_TYPE_OSD:
+		proto = CEPH_OSDC_PROTOCOL;
+		break;
+	case CEPH_ENTITY_TYPE_MDS:
+		proto = CEPH_MDSC_PROTOCOL;
+		break;
+	default:
+		BUG();
+	}
+
+	dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
+	     con->connect_seq, global_seq, proto);
+
+	con->out_connect.features =
+	    cpu_to_le64(from_msgr(con->msgr)->supported_features);
+	con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
+	con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
+	con->out_connect.global_seq = cpu_to_le32(global_seq);
+	con->out_connect.protocol_version = cpu_to_le32(proto);
+	con->out_connect.flags = 0;
+
+	ret = get_connect_authorizer(con);
+	if (ret)
+		return ret;
+
+	__prepare_write_connect(con);
+	return 0;
+}
+
+/*
+ * write as much of pending kvecs to the socket as we can.
+ *  1 -> done
+ *  0 -> socket full, but more to do
+ * <0 -> error
+ */
+static int write_partial_kvec(struct ceph_connection *con)
+{
+	int ret;
+
+	dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes);
+	while (con->out_kvec_bytes > 0) {
+		ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
+				       con->out_kvec_left, con->out_kvec_bytes,
+				       con->out_more);
+		if (ret <= 0)
+			goto out;
+		con->out_kvec_bytes -= ret;
+		if (con->out_kvec_bytes == 0)
+			break;            /* done */
+
+		/* account for full iov entries consumed */
+		while (ret >= con->out_kvec_cur->iov_len) {
+			BUG_ON(!con->out_kvec_left);
+			ret -= con->out_kvec_cur->iov_len;
+			con->out_kvec_cur++;
+			con->out_kvec_left--;
+		}
+		/* and for a partially-consumed entry */
+		if (ret) {
+			con->out_kvec_cur->iov_len -= ret;
+			con->out_kvec_cur->iov_base += ret;
+		}
+	}
+	con->out_kvec_left = 0;
+	ret = 1;
+out:
+	dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
+	     con->out_kvec_bytes, con->out_kvec_left, ret);
+	return ret;  /* done! */
+}
+
+/*
+ * Write as much message data payload as we can.  If we finish, queue
+ * up the footer.
+ *  1 -> done, footer is now queued in out_kvec[].
+ *  0 -> socket full, but more to do
+ * <0 -> error
+ */
+static int write_partial_message_data(struct ceph_connection *con)
+{
+	struct ceph_msg *msg = con->out_msg;
+	struct ceph_msg_data_cursor *cursor = &msg->cursor;
+	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
+	int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
+	u32 crc;
+
+	dout("%s %p msg %p\n", __func__, con, msg);
+
+	if (!msg->num_data_items)
+		return -EINVAL;
+
+	/*
+	 * Iterate through each page that contains data to be
+	 * written, and send as much as possible for each.
+	 *
+	 * If we are calculating the data crc (the default), we will
+	 * need to map the page.  If we have no pages, they have
+	 * been revoked, so use the zero page.
+	 */
+	crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
+	while (cursor->total_resid) {
+		struct page *page;
+		size_t page_offset;
+		size_t length;
+		int ret;
+
+		if (!cursor->resid) {
+			ceph_msg_data_advance(cursor, 0);
+			continue;
+		}
+
+		page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
+		if (length == cursor->total_resid)
+			more = MSG_MORE;
+		ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
+					more);
+		if (ret <= 0) {
+			if (do_datacrc)
+				msg->footer.data_crc = cpu_to_le32(crc);
+
+			return ret;
+		}
+		if (do_datacrc && cursor->need_crc)
+			crc = ceph_crc32c_page(crc, page, page_offset, length);
+		ceph_msg_data_advance(cursor, (size_t)ret);
+	}
+
+	dout("%s %p msg %p done\n", __func__, con, msg);
+
+	/* prepare and queue up footer, too */
+	if (do_datacrc)
+		msg->footer.data_crc = cpu_to_le32(crc);
+	else
+		msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
+	con_out_kvec_reset(con);
+	prepare_write_message_footer(con);
+
+	return 1;	/* must return > 0 to indicate success */
+}
+
+/*
+ * write some zeros
+ */
+static int write_partial_skip(struct ceph_connection *con)
+{
+	int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
+	int ret;
+
+	dout("%s %p %d left\n", __func__, con, con->out_skip);
+	while (con->out_skip > 0) {
+		size_t size = min(con->out_skip, (int) PAGE_SIZE);
+
+		if (size == con->out_skip)
+			more = MSG_MORE;
+		ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size,
+					more);
+		if (ret <= 0)
+			goto out;
+		con->out_skip -= ret;
+	}
+	ret = 1;
+out:
+	return ret;
+}
+
+/*
+ * Prepare to read connection handshake, or an ack.
+ */
+static void prepare_read_banner(struct ceph_connection *con)
+{
+	dout("prepare_read_banner %p\n", con);
+	con->in_base_pos = 0;
+}
+
+static void prepare_read_connect(struct ceph_connection *con)
+{
+	dout("prepare_read_connect %p\n", con);
+	con->in_base_pos = 0;
+}
+
+static void prepare_read_ack(struct ceph_connection *con)
+{
+	dout("prepare_read_ack %p\n", con);
+	con->in_base_pos = 0;
+}
+
+static void prepare_read_seq(struct ceph_connection *con)
+{
+	dout("prepare_read_seq %p\n", con);
+	con->in_base_pos = 0;
+	con->in_tag = CEPH_MSGR_TAG_SEQ;
+}
+
+static void prepare_read_tag(struct ceph_connection *con)
+{
+	dout("prepare_read_tag %p\n", con);
+	con->in_base_pos = 0;
+	con->in_tag = CEPH_MSGR_TAG_READY;
+}
+
+static void prepare_read_keepalive_ack(struct ceph_connection *con)
+{
+	dout("prepare_read_keepalive_ack %p\n", con);
+	con->in_base_pos = 0;
+}
+
+/*
+ * Prepare to read a message.
+ */
+static int prepare_read_message(struct ceph_connection *con)
+{
+	dout("prepare_read_message %p\n", con);
+	BUG_ON(con->in_msg != NULL);
+	con->in_base_pos = 0;
+	con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
+	return 0;
+}
+
+static int read_partial(struct ceph_connection *con,
+			int end, int size, void *object)
+{
+	while (con->in_base_pos < end) {
+		int left = end - con->in_base_pos;
+		int have = size - left;
+		int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
+		if (ret <= 0)
+			return ret;
+		con->in_base_pos += ret;
+	}
+	return 1;
+}
+
+/*
+ * Read all or part of the connect-side handshake on a new connection
+ */
+static int read_partial_banner(struct ceph_connection *con)
+{
+	int size;
+	int end;
+	int ret;
+
+	dout("read_partial_banner %p at %d\n", con, con->in_base_pos);
+
+	/* peer's banner */
+	size = strlen(CEPH_BANNER);
+	end = size;
+	ret = read_partial(con, end, size, con->in_banner);
+	if (ret <= 0)
+		goto out;
+
+	size = sizeof (con->actual_peer_addr);
+	end += size;
+	ret = read_partial(con, end, size, &con->actual_peer_addr);
+	if (ret <= 0)
+		goto out;
+	ceph_decode_banner_addr(&con->actual_peer_addr);
+
+	size = sizeof (con->peer_addr_for_me);
+	end += size;
+	ret = read_partial(con, end, size, &con->peer_addr_for_me);
+	if (ret <= 0)
+		goto out;
+	ceph_decode_banner_addr(&con->peer_addr_for_me);
+
+out:
+	return ret;
+}
+
+static int read_partial_connect(struct ceph_connection *con)
+{
+	int size;
+	int end;
+	int ret;
+
+	dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
+
+	size = sizeof (con->in_reply);
+	end = size;
+	ret = read_partial(con, end, size, &con->in_reply);
+	if (ret <= 0)
+		goto out;
+
+	if (con->auth) {
+		size = le32_to_cpu(con->in_reply.authorizer_len);
+		if (size > con->auth->authorizer_reply_buf_len) {
+			pr_err("authorizer reply too big: %d > %zu\n", size,
+			       con->auth->authorizer_reply_buf_len);
+			ret = -EINVAL;
+			goto out;
+		}
+
+		end += size;
+		ret = read_partial(con, end, size,
+				   con->auth->authorizer_reply_buf);
+		if (ret <= 0)
+			goto out;
+	}
+
+	dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n",
+	     con, (int)con->in_reply.tag,
+	     le32_to_cpu(con->in_reply.connect_seq),
+	     le32_to_cpu(con->in_reply.global_seq));
+out:
+	return ret;
+}
+
+/*
+ * Verify the hello banner looks okay.
+ */
+static int verify_hello(struct ceph_connection *con)
+{
+	if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
+		pr_err("connect to %s got bad banner\n",
+		       ceph_pr_addr(&con->peer_addr));
+		con->error_msg = "protocol error, bad banner";
+		return -1;
+	}
+	return 0;
+}
+
+static int process_banner(struct ceph_connection *con)
+{
+	struct ceph_entity_addr *my_addr = &con->msgr->inst.addr;
+
+	dout("process_banner on %p\n", con);
+
+	if (verify_hello(con) < 0)
+		return -1;
+
+	/*
+	 * Make sure the other end is who we wanted.  note that the other
+	 * end may not yet know their ip address, so if it's 0.0.0.0, give
+	 * them the benefit of the doubt.
+	 */
+	if (memcmp(&con->peer_addr, &con->actual_peer_addr,
+		   sizeof(con->peer_addr)) != 0 &&
+	    !(ceph_addr_is_blank(&con->actual_peer_addr) &&
+	      con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
+		pr_warn("wrong peer, want %s/%u, got %s/%u\n",
+			ceph_pr_addr(&con->peer_addr),
+			le32_to_cpu(con->peer_addr.nonce),
+			ceph_pr_addr(&con->actual_peer_addr),
+			le32_to_cpu(con->actual_peer_addr.nonce));
+		con->error_msg = "wrong peer at address";
+		return -1;
+	}
+
+	/*
+	 * did we learn our address?
+	 */
+	if (ceph_addr_is_blank(my_addr)) {
+		memcpy(&my_addr->in_addr,
+		       &con->peer_addr_for_me.in_addr,
+		       sizeof(con->peer_addr_for_me.in_addr));
+		ceph_addr_set_port(my_addr, 0);
+		ceph_encode_my_addr(con->msgr);
+		dout("process_banner learned my addr is %s\n",
+		     ceph_pr_addr(my_addr));
+	}
+
+	return 0;
+}
+
+static int process_connect(struct ceph_connection *con)
+{
+	u64 sup_feat = from_msgr(con->msgr)->supported_features;
+	u64 req_feat = from_msgr(con->msgr)->required_features;
+	u64 server_feat = le64_to_cpu(con->in_reply.features);
+	int ret;
+
+	dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
+
+	if (con->auth) {
+		int len = le32_to_cpu(con->in_reply.authorizer_len);
+
+		/*
+		 * Any connection that defines ->get_authorizer()
+		 * should also define ->add_authorizer_challenge() and
+		 * ->verify_authorizer_reply().
+		 *
+		 * See get_connect_authorizer().
+		 */
+		if (con->in_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
+			ret = con->ops->add_authorizer_challenge(
+				    con, con->auth->authorizer_reply_buf, len);
+			if (ret < 0)
+				return ret;
+
+			con_out_kvec_reset(con);
+			__prepare_write_connect(con);
+			prepare_read_connect(con);
+			return 0;
+		}
+
+		if (len) {
+			ret = con->ops->verify_authorizer_reply(con);
+			if (ret < 0) {
+				con->error_msg = "bad authorize reply";
+				return ret;
+			}
+		}
+	}
+
+	switch (con->in_reply.tag) {
+	case CEPH_MSGR_TAG_FEATURES:
+		pr_err("%s%lld %s feature set mismatch,"
+		       " my %llx < server's %llx, missing %llx\n",
+		       ENTITY_NAME(con->peer_name),
+		       ceph_pr_addr(&con->peer_addr),
+		       sup_feat, server_feat, server_feat & ~sup_feat);
+		con->error_msg = "missing required protocol features";
+		return -1;
+
+	case CEPH_MSGR_TAG_BADPROTOVER:
+		pr_err("%s%lld %s protocol version mismatch,"
+		       " my %d != server's %d\n",
+		       ENTITY_NAME(con->peer_name),
+		       ceph_pr_addr(&con->peer_addr),
+		       le32_to_cpu(con->out_connect.protocol_version),
+		       le32_to_cpu(con->in_reply.protocol_version));
+		con->error_msg = "protocol version mismatch";
+		return -1;
+
+	case CEPH_MSGR_TAG_BADAUTHORIZER:
+		con->auth_retry++;
+		dout("process_connect %p got BADAUTHORIZER attempt %d\n", con,
+		     con->auth_retry);
+		if (con->auth_retry == 2) {
+			con->error_msg = "connect authorization failure";
+			return -1;
+		}
+		con_out_kvec_reset(con);
+		ret = prepare_write_connect(con);
+		if (ret < 0)
+			return ret;
+		prepare_read_connect(con);
+		break;
+
+	case CEPH_MSGR_TAG_RESETSESSION:
+		/*
+		 * If we connected with a large connect_seq but the peer
+		 * has no record of a session with us (no connection, or
+		 * connect_seq == 0), they will send RESETSESION to indicate
+		 * that they must have reset their session, and may have
+		 * dropped messages.
+		 */
+		dout("process_connect got RESET peer seq %u\n",
+		     le32_to_cpu(con->in_reply.connect_seq));
+		pr_info("%s%lld %s session reset\n",
+			ENTITY_NAME(con->peer_name),
+			ceph_pr_addr(&con->peer_addr));
+		ceph_con_reset_session(con);
+		con_out_kvec_reset(con);
+		ret = prepare_write_connect(con);
+		if (ret < 0)
+			return ret;
+		prepare_read_connect(con);
+
+		/* Tell ceph about it. */
+		mutex_unlock(&con->mutex);
+		if (con->ops->peer_reset)
+			con->ops->peer_reset(con);
+		mutex_lock(&con->mutex);
+		if (con->state != CEPH_CON_S_V1_CONNECT_MSG)
+			return -EAGAIN;
+		break;
+
+	case CEPH_MSGR_TAG_RETRY_SESSION:
+		/*
+		 * If we sent a smaller connect_seq than the peer has, try
+		 * again with a larger value.
+		 */
+		dout("process_connect got RETRY_SESSION my seq %u, peer %u\n",
+		     le32_to_cpu(con->out_connect.connect_seq),
+		     le32_to_cpu(con->in_reply.connect_seq));
+		con->connect_seq = le32_to_cpu(con->in_reply.connect_seq);
+		con_out_kvec_reset(con);
+		ret = prepare_write_connect(con);
+		if (ret < 0)
+			return ret;
+		prepare_read_connect(con);
+		break;
+
+	case CEPH_MSGR_TAG_RETRY_GLOBAL:
+		/*
+		 * If we sent a smaller global_seq than the peer has, try
+		 * again with a larger value.
+		 */
+		dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
+		     con->peer_global_seq,
+		     le32_to_cpu(con->in_reply.global_seq));
+		ceph_get_global_seq(con->msgr,
+				    le32_to_cpu(con->in_reply.global_seq));
+		con_out_kvec_reset(con);
+		ret = prepare_write_connect(con);
+		if (ret < 0)
+			return ret;
+		prepare_read_connect(con);
+		break;
+
+	case CEPH_MSGR_TAG_SEQ:
+	case CEPH_MSGR_TAG_READY:
+		if (req_feat & ~server_feat) {
+			pr_err("%s%lld %s protocol feature mismatch,"
+			       " my required %llx > server's %llx, need %llx\n",
+			       ENTITY_NAME(con->peer_name),
+			       ceph_pr_addr(&con->peer_addr),
+			       req_feat, server_feat, req_feat & ~server_feat);
+			con->error_msg = "missing required protocol features";
+			return -1;
+		}
+
+		WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG);
+		con->state = CEPH_CON_S_OPEN;
+		con->auth_retry = 0;    /* we authenticated; clear flag */
+		con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
+		con->connect_seq++;
+		con->peer_features = server_feat;
+		dout("process_connect got READY gseq %d cseq %d (%d)\n",
+		     con->peer_global_seq,
+		     le32_to_cpu(con->in_reply.connect_seq),
+		     con->connect_seq);
+		WARN_ON(con->connect_seq !=
+			le32_to_cpu(con->in_reply.connect_seq));
+
+		if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
+			ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX);
+
+		con->delay = 0;      /* reset backoff memory */
+
+		if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) {
+			prepare_write_seq(con);
+			prepare_read_seq(con);
+		} else {
+			prepare_read_tag(con);
+		}
+		break;
+
+	case CEPH_MSGR_TAG_WAIT:
+		/*
+		 * If there is a connection race (we are opening
+		 * connections to each other), one of us may just have
+		 * to WAIT.  This shouldn't happen if we are the
+		 * client.
+		 */
+		con->error_msg = "protocol error, got WAIT as client";
+		return -1;
+
+	default:
+		con->error_msg = "protocol error, garbage tag during connect";
+		return -1;
+	}
+	return 0;
+}
+
+/*
+ * read (part of) an ack
+ */
+static int read_partial_ack(struct ceph_connection *con)
+{
+	int size = sizeof (con->in_temp_ack);
+	int end = size;
+
+	return read_partial(con, end, size, &con->in_temp_ack);
+}
+
+/*
+ * We can finally discard anything that's been acked.
+ */
+static void process_ack(struct ceph_connection *con)
+{
+	u64 ack = le64_to_cpu(con->in_temp_ack);
+
+	if (con->in_tag == CEPH_MSGR_TAG_ACK)
+		ceph_con_discard_sent(con, ack);
+	else
+		ceph_con_discard_requeued(con, ack);
+
+	prepare_read_tag(con);
+}
+
+static int read_partial_message_section(struct ceph_connection *con,
+					struct kvec *section,
+					unsigned int sec_len, u32 *crc)
+{
+	int ret, left;
+
+	BUG_ON(!section);
+
+	while (section->iov_len < sec_len) {
+		BUG_ON(section->iov_base == NULL);
+		left = sec_len - section->iov_len;
+		ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
+				       section->iov_len, left);
+		if (ret <= 0)
+			return ret;
+		section->iov_len += ret;
+	}
+	if (section->iov_len == sec_len)
+		*crc = crc32c(0, section->iov_base, section->iov_len);
+
+	return 1;
+}
+
+static int read_partial_msg_data(struct ceph_connection *con)
+{
+	struct ceph_msg *msg = con->in_msg;
+	struct ceph_msg_data_cursor *cursor = &msg->cursor;
+	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
+	struct page *page;
+	size_t page_offset;
+	size_t length;
+	u32 crc = 0;
+	int ret;
+
+	if (!msg->num_data_items)
+		return -EIO;
+
+	if (do_datacrc)
+		crc = con->in_data_crc;
+	while (cursor->total_resid) {
+		if (!cursor->resid) {
+			ceph_msg_data_advance(cursor, 0);
+			continue;
+		}
+
+		page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
+		ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
+		if (ret <= 0) {
+			if (do_datacrc)
+				con->in_data_crc = crc;
+
+			return ret;
+		}
+
+		if (do_datacrc)
+			crc = ceph_crc32c_page(crc, page, page_offset, ret);
+		ceph_msg_data_advance(cursor, (size_t)ret);
+	}
+	if (do_datacrc)
+		con->in_data_crc = crc;
+
+	return 1;	/* must return > 0 to indicate success */
+}
+
+/*
+ * read (part of) a message.
+ */
+static int read_partial_message(struct ceph_connection *con)
+{
+	struct ceph_msg *m = con->in_msg;
+	int size;
+	int end;
+	int ret;
+	unsigned int front_len, middle_len, data_len;
+	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
+	bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
+	u64 seq;
+	u32 crc;
+
+	dout("read_partial_message con %p msg %p\n", con, m);
+
+	/* header */
+	size = sizeof (con->in_hdr);
+	end = size;
+	ret = read_partial(con, end, size, &con->in_hdr);
+	if (ret <= 0)
+		return ret;
+
+	crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc));
+	if (cpu_to_le32(crc) != con->in_hdr.crc) {
+		pr_err("read_partial_message bad hdr crc %u != expected %u\n",
+		       crc, con->in_hdr.crc);
+		return -EBADMSG;
+	}
+
+	front_len = le32_to_cpu(con->in_hdr.front_len);
+	if (front_len > CEPH_MSG_MAX_FRONT_LEN)
+		return -EIO;
+	middle_len = le32_to_cpu(con->in_hdr.middle_len);
+	if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN)
+		return -EIO;
+	data_len = le32_to_cpu(con->in_hdr.data_len);
+	if (data_len > CEPH_MSG_MAX_DATA_LEN)
+		return -EIO;
+
+	/* verify seq# */
+	seq = le64_to_cpu(con->in_hdr.seq);
+	if ((s64)seq - (s64)con->in_seq < 1) {
+		pr_info("skipping %s%lld %s seq %lld expected %lld\n",
+			ENTITY_NAME(con->peer_name),
+			ceph_pr_addr(&con->peer_addr),
+			seq, con->in_seq + 1);
+		con->in_base_pos = -front_len - middle_len - data_len -
+			sizeof_footer(con);
+		con->in_tag = CEPH_MSGR_TAG_READY;
+		return 1;
+	} else if ((s64)seq - (s64)con->in_seq > 1) {
+		pr_err("read_partial_message bad seq %lld expected %lld\n",
+		       seq, con->in_seq + 1);
+		con->error_msg = "bad message sequence # for incoming message";
+		return -EBADE;
+	}
+
+	/* allocate message? */
+	if (!con->in_msg) {
+		int skip = 0;
+
+		dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
+		     front_len, data_len);
+		ret = ceph_con_in_msg_alloc(con, &con->in_hdr, &skip);
+		if (ret < 0)
+			return ret;
+
+		BUG_ON(!con->in_msg ^ skip);
+		if (skip) {
+			/* skip this message */
+			dout("alloc_msg said skip message\n");
+			con->in_base_pos = -front_len - middle_len - data_len -
+				sizeof_footer(con);
+			con->in_tag = CEPH_MSGR_TAG_READY;
+			con->in_seq++;
+			return 1;
+		}
+
+		BUG_ON(!con->in_msg);
+		BUG_ON(con->in_msg->con != con);
+		m = con->in_msg;
+		m->front.iov_len = 0;    /* haven't read it yet */
+		if (m->middle)
+			m->middle->vec.iov_len = 0;
+
+		/* prepare for data payload, if any */
+
+		if (data_len)
+			prepare_message_data(con->in_msg, data_len);
+	}
+
+	/* front */
+	ret = read_partial_message_section(con, &m->front, front_len,
+					   &con->in_front_crc);
+	if (ret <= 0)
+		return ret;
+
+	/* middle */
+	if (m->middle) {
+		ret = read_partial_message_section(con, &m->middle->vec,
+						   middle_len,
+						   &con->in_middle_crc);
+		if (ret <= 0)
+			return ret;
+	}
+
+	/* (page) data */
+	if (data_len) {
+		ret = read_partial_msg_data(con);
+		if (ret <= 0)
+			return ret;
+	}
+
+	/* footer */
+	size = sizeof_footer(con);
+	end += size;
+	ret = read_partial(con, end, size, &m->footer);
+	if (ret <= 0)
+		return ret;
+
+	if (!need_sign) {
+		m->footer.flags = m->old_footer.flags;
+		m->footer.sig = 0;
+	}
+
+	dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
+	     m, front_len, m->footer.front_crc, middle_len,
+	     m->footer.middle_crc, data_len, m->footer.data_crc);
+
+	/* crc ok? */
+	if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
+		pr_err("read_partial_message %p front crc %u != exp. %u\n",
+		       m, con->in_front_crc, m->footer.front_crc);
+		return -EBADMSG;
+	}
+	if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
+		pr_err("read_partial_message %p middle crc %u != exp %u\n",
+		       m, con->in_middle_crc, m->footer.middle_crc);
+		return -EBADMSG;
+	}
+	if (do_datacrc &&
+	    (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
+	    con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
+		pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
+		       con->in_data_crc, le32_to_cpu(m->footer.data_crc));
+		return -EBADMSG;
+	}
+
+	if (need_sign && con->ops->check_message_signature &&
+	    con->ops->check_message_signature(m)) {
+		pr_err("read_partial_message %p signature check failed\n", m);
+		return -EBADMSG;
+	}
+
+	return 1; /* done! */
+}
+
+static int read_keepalive_ack(struct ceph_connection *con)
+{
+	struct ceph_timespec ceph_ts;
+	size_t size = sizeof(ceph_ts);
+	int ret = read_partial(con, size, size, &ceph_ts);
+	if (ret <= 0)
+		return ret;
+	ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts);
+	prepare_read_tag(con);
+	return 1;
+}
+
+/*
+ * Read what we can from the socket.
+ */
+int ceph_con_v1_try_read(struct ceph_connection *con)
+{
+	int ret = -1;
+
+more:
+	dout("try_read start %p state %d\n", con, con->state);
+	if (con->state != CEPH_CON_S_V1_BANNER &&
+	    con->state != CEPH_CON_S_V1_CONNECT_MSG &&
+	    con->state != CEPH_CON_S_OPEN)
+		return 0;
+
+	BUG_ON(!con->sock);
+
+	dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
+	     con->in_base_pos);
+
+	if (con->state == CEPH_CON_S_V1_BANNER) {
+		ret = read_partial_banner(con);
+		if (ret <= 0)
+			goto out;
+		ret = process_banner(con);
+		if (ret < 0)
+			goto out;
+
+		con->state = CEPH_CON_S_V1_CONNECT_MSG;
+
+		/*
+		 * Received banner is good, exchange connection info.
+		 * Do not reset out_kvec, as sending our banner raced
+		 * with receiving peer banner after connect completed.
+		 */
+		ret = prepare_write_connect(con);
+		if (ret < 0)
+			goto out;
+		prepare_read_connect(con);
+
+		/* Send connection info before awaiting response */
+		goto out;
+	}
+
+	if (con->state == CEPH_CON_S_V1_CONNECT_MSG) {
+		ret = read_partial_connect(con);
+		if (ret <= 0)
+			goto out;
+		ret = process_connect(con);
+		if (ret < 0)
+			goto out;
+		goto more;
+	}
+
+	WARN_ON(con->state != CEPH_CON_S_OPEN);
+
+	if (con->in_base_pos < 0) {
+		/*
+		 * skipping + discarding content.
+		 */
+		ret = ceph_tcp_recvmsg(con->sock, NULL, -con->in_base_pos);
+		if (ret <= 0)
+			goto out;
+		dout("skipped %d / %d bytes\n", ret, -con->in_base_pos);
+		con->in_base_pos += ret;
+		if (con->in_base_pos)
+			goto more;
+	}
+	if (con->in_tag == CEPH_MSGR_TAG_READY) {
+		/*
+		 * what's next?
+		 */
+		ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
+		if (ret <= 0)
+			goto out;
+		dout("try_read got tag %d\n", (int)con->in_tag);
+		switch (con->in_tag) {
+		case CEPH_MSGR_TAG_MSG:
+			prepare_read_message(con);
+			break;
+		case CEPH_MSGR_TAG_ACK:
+			prepare_read_ack(con);
+			break;
+		case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
+			prepare_read_keepalive_ack(con);
+			break;
+		case CEPH_MSGR_TAG_CLOSE:
+			ceph_con_close_socket(con);
+			con->state = CEPH_CON_S_CLOSED;
+			goto out;
+		default:
+			goto bad_tag;
+		}
+	}
+	if (con->in_tag == CEPH_MSGR_TAG_MSG) {
+		ret = read_partial_message(con);
+		if (ret <= 0) {
+			switch (ret) {
+			case -EBADMSG:
+				con->error_msg = "bad crc/signature";
+				fallthrough;
+			case -EBADE:
+				ret = -EIO;
+				break;
+			case -EIO:
+				con->error_msg = "io error";
+				break;
+			}
+			goto out;
+		}
+		if (con->in_tag == CEPH_MSGR_TAG_READY)
+			goto more;
+		ceph_con_process_message(con);
+		if (con->state == CEPH_CON_S_OPEN)
+			prepare_read_tag(con);
+		goto more;
+	}
+	if (con->in_tag == CEPH_MSGR_TAG_ACK ||
+	    con->in_tag == CEPH_MSGR_TAG_SEQ) {
+		/*
+		 * the final handshake seq exchange is semantically
+		 * equivalent to an ACK
+		 */
+		ret = read_partial_ack(con);
+		if (ret <= 0)
+			goto out;
+		process_ack(con);
+		goto more;
+	}
+	if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
+		ret = read_keepalive_ack(con);
+		if (ret <= 0)
+			goto out;
+		goto more;
+	}
+
+out:
+	dout("try_read done on %p ret %d\n", con, ret);
+	return ret;
+
+bad_tag:
+	pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag);
+	con->error_msg = "protocol error, garbage tag";
+	ret = -1;
+	goto out;
+}
+
+/*
+ * Write something to the socket.  Called in a worker thread when the
+ * socket appears to be writeable and we have something ready to send.
+ */
+int ceph_con_v1_try_write(struct ceph_connection *con)
+{
+	int ret = 1;
+
+	dout("try_write start %p state %d\n", con, con->state);
+	if (con->state != CEPH_CON_S_PREOPEN &&
+	    con->state != CEPH_CON_S_V1_BANNER &&
+	    con->state != CEPH_CON_S_V1_CONNECT_MSG &&
+	    con->state != CEPH_CON_S_OPEN)
+		return 0;
+
+	/* open the socket first? */
+	if (con->state == CEPH_CON_S_PREOPEN) {
+		BUG_ON(con->sock);
+		con->state = CEPH_CON_S_V1_BANNER;
+
+		con_out_kvec_reset(con);
+		prepare_write_banner(con);
+		prepare_read_banner(con);
+
+		BUG_ON(con->in_msg);
+		con->in_tag = CEPH_MSGR_TAG_READY;
+		dout("try_write initiating connect on %p new state %d\n",
+		     con, con->state);
+		ret = ceph_tcp_connect(con);
+		if (ret < 0) {
+			con->error_msg = "connect error";
+			goto out;
+		}
+	}
+
+more:
+	dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
+	BUG_ON(!con->sock);
+
+	/* kvec data queued? */
+	if (con->out_kvec_left) {
+		ret = write_partial_kvec(con);
+		if (ret <= 0)
+			goto out;
+	}
+	if (con->out_skip) {
+		ret = write_partial_skip(con);
+		if (ret <= 0)
+			goto out;
+	}
+
+	/* msg pages? */
+	if (con->out_msg) {
+		if (con->out_msg_done) {
+			ceph_msg_put(con->out_msg);
+			con->out_msg = NULL;   /* we're done with this one */
+			goto do_next;
+		}
+
+		ret = write_partial_message_data(con);
+		if (ret == 1)
+			goto more;  /* we need to send the footer, too! */
+		if (ret == 0)
+			goto out;
+		if (ret < 0) {
+			dout("try_write write_partial_message_data err %d\n",
+			     ret);
+			goto out;
+		}
+	}
+
+do_next:
+	if (con->state == CEPH_CON_S_OPEN) {
+		if (ceph_con_flag_test_and_clear(con,
+				CEPH_CON_F_KEEPALIVE_PENDING)) {
+			prepare_write_keepalive(con);
+			goto more;
+		}
+		/* is anything else pending? */
+		if (!list_empty(&con->out_queue)) {
+			prepare_write_message(con);
+			goto more;
+		}
+		if (con->in_seq > con->in_seq_acked) {
+			prepare_write_ack(con);
+			goto more;
+		}
+	}
+
+	/* Nothing to do! */
+	ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
+	dout("try_write nothing else to write.\n");
+	ret = 0;
+out:
+	dout("try_write done on %p ret %d\n", con, ret);
+	return ret;
+}
+
+void ceph_con_v1_revoke(struct ceph_connection *con)
+{
+	struct ceph_msg *msg = con->out_msg;
+
+	WARN_ON(con->out_skip);
+	/* footer */
+	if (con->out_msg_done) {
+		con->out_skip += con_out_kvec_skip(con);
+	} else {
+		WARN_ON(!msg->data_length);
+		con->out_skip += sizeof_footer(con);
+	}
+	/* data, middle, front */
+	if (msg->data_length)
+		con->out_skip += msg->cursor.total_resid;
+	if (msg->middle)
+		con->out_skip += con_out_kvec_skip(con);
+	con->out_skip += con_out_kvec_skip(con);
+
+	dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con,
+	     con->out_kvec_bytes, con->out_skip);
+}
+
+void ceph_con_v1_revoke_incoming(struct ceph_connection *con)
+{
+	unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
+	unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
+	unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
+
+	/* skip rest of message */
+	con->in_base_pos = con->in_base_pos -
+			sizeof(struct ceph_msg_header) -
+			front_len -
+			middle_len -
+			data_len -
+			sizeof(struct ceph_msg_footer);
+
+	con->in_tag = CEPH_MSGR_TAG_READY;
+	con->in_seq++;
+
+	dout("%s con %p in_base_pos %d\n", __func__, con, con->in_base_pos);
+}
+
+bool ceph_con_v1_opened(struct ceph_connection *con)
+{
+	return con->connect_seq;
+}
+
+void ceph_con_v1_reset_session(struct ceph_connection *con)
+{
+	con->connect_seq = 0;
+	con->peer_global_seq = 0;
+}
+
+void ceph_con_v1_reset_protocol(struct ceph_connection *con)
+{
+	con->out_skip = 0;
+}