diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 4a876785b68cd5c550dbbb1d2b63071446454081..9c6234428607fb60a6d65a3ea0c1eebfa72b6b96 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -1847,14 +1847,12 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
 	if (osd_req->r_result < 0)
 		obj_request->result = osd_req->r_result;
 
-	rbd_assert(osd_req->r_num_ops <= CEPH_OSD_MAX_OP);
-
 	/*
 	 * We support a 64-bit length, but ultimately it has to be
 	 * passed to the block layer, which just supports a 32-bit
 	 * length field.
 	 */
-	obj_request->xferred = osd_req->r_reply_op_len[0];
+	obj_request->xferred = osd_req->r_ops[0].outdata_len;
 	rbd_assert(obj_request->xferred < (u64)UINT_MAX);
 
 	opcode = osd_req->r_ops[0].op;
@@ -5643,18 +5641,12 @@ static void rbd_sysfs_cleanup(void)
 static int rbd_slab_init(void)
 {
 	rbd_assert(!rbd_img_request_cache);
-	rbd_img_request_cache = kmem_cache_create("rbd_img_request",
-					sizeof (struct rbd_img_request),
-					__alignof__(struct rbd_img_request),
-					0, NULL);
+	rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
 	if (!rbd_img_request_cache)
 		return -ENOMEM;
 
 	rbd_assert(!rbd_obj_request_cache);
-	rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
-					sizeof (struct rbd_obj_request),
-					__alignof__(struct rbd_obj_request),
-					0, NULL);
+	rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
 	if (!rbd_obj_request_cache)
 		goto out_err;
 
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 19adeb0ef82a397f122682db3a9ba31f56df5f5d..fc5cae2a0db2dc2a4229f275f8345c6544ea13d6 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -175,8 +175,8 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset,
 
 static int ceph_releasepage(struct page *page, gfp_t g)
 {
-	struct inode *inode = page->mapping ? page->mapping->host : NULL;
-	dout("%p releasepage %p idx %lu\n", inode, page, page->index);
+	dout("%p releasepage %p idx %lu\n", page->mapping->host,
+	     page, page->index);
 	WARN_ON(PageDirty(page));
 
 	/* Can we release the page from the cache? */
@@ -276,7 +276,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
 	for (i = 0; i < num_pages; i++) {
 		struct page *page = osd_data->pages[i];
 
-		if (rc < 0 && rc != ENOENT)
+		if (rc < 0 && rc != -ENOENT)
 			goto unlock;
 		if (bytes < (int)PAGE_CACHE_SIZE) {
 			/* zero (remainder of) page */
@@ -606,71 +606,71 @@ static void writepages_finish(struct ceph_osd_request *req,
 	struct inode *inode = req->r_inode;
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_osd_data *osd_data;
-	unsigned wrote;
 	struct page *page;
-	int num_pages;
-	int i;
+	int num_pages, total_pages = 0;
+	int i, j;
+	int rc = req->r_result;
 	struct ceph_snap_context *snapc = req->r_snapc;
 	struct address_space *mapping = inode->i_mapping;
-	int rc = req->r_result;
-	u64 bytes = req->r_ops[0].extent.length;
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-	long writeback_stat;
-	unsigned issued = ceph_caps_issued(ci);
+	bool remove_page;
 
-	osd_data = osd_req_op_extent_osd_data(req, 0);
-	BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
-	num_pages = calc_pages_for((u64)osd_data->alignment,
-					(u64)osd_data->length);
-	if (rc >= 0) {
-		/*
-		 * Assume we wrote the pages we originally sent.  The
-		 * osd might reply with fewer pages if our writeback
-		 * raced with a truncation and was adjusted at the osd,
-		 * so don't believe the reply.
-		 */
-		wrote = num_pages;
-	} else {
-		wrote = 0;
+
+	dout("writepages_finish %p rc %d\n", inode, rc);
+	if (rc < 0)
 		mapping_set_error(mapping, rc);
-	}
-	dout("writepages_finish %p rc %d bytes %llu wrote %d (pages)\n",
-	     inode, rc, bytes, wrote);
 
-	/* clean all pages */
-	for (i = 0; i < num_pages; i++) {
-		page = osd_data->pages[i];
-		BUG_ON(!page);
-		WARN_ON(!PageUptodate(page));
+	/*
+	 * We lost the cache cap, need to truncate the page before
+	 * it is unlocked, otherwise we'd truncate it later in the
+	 * page truncation thread, possibly losing some data that
+	 * raced its way in
+	 */
+	remove_page = !(ceph_caps_issued(ci) &
+			(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
 
-		writeback_stat =
-			atomic_long_dec_return(&fsc->writeback_count);
-		if (writeback_stat <
-		    CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb))
-			clear_bdi_congested(&fsc->backing_dev_info,
-					    BLK_RW_ASYNC);
+	/* clean all pages */
+	for (i = 0; i < req->r_num_ops; i++) {
+		if (req->r_ops[i].op != CEPH_OSD_OP_WRITE)
+			break;
 
-		ceph_put_snap_context(page_snap_context(page));
-		page->private = 0;
-		ClearPagePrivate(page);
-		dout("unlocking %d %p\n", i, page);
-		end_page_writeback(page);
+		osd_data = osd_req_op_extent_osd_data(req, i);
+		BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
+		num_pages = calc_pages_for((u64)osd_data->alignment,
+					   (u64)osd_data->length);
+		total_pages += num_pages;
+		for (j = 0; j < num_pages; j++) {
+			page = osd_data->pages[j];
+			BUG_ON(!page);
+			WARN_ON(!PageUptodate(page));
+
+			if (atomic_long_dec_return(&fsc->writeback_count) <
+			     CONGESTION_OFF_THRESH(
+					fsc->mount_options->congestion_kb))
+				clear_bdi_congested(&fsc->backing_dev_info,
+						    BLK_RW_ASYNC);
+
+			ceph_put_snap_context(page_snap_context(page));
+			page->private = 0;
+			ClearPagePrivate(page);
+			dout("unlocking %p\n", page);
+			end_page_writeback(page);
+
+			if (remove_page)
+				generic_error_remove_page(inode->i_mapping,
+							  page);
 
-		/*
-		 * We lost the cache cap, need to truncate the page before
-		 * it is unlocked, otherwise we'd truncate it later in the
-		 * page truncation thread, possibly losing some data that
-		 * raced its way in
-		 */
-		if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
-			generic_error_remove_page(inode->i_mapping, page);
+			unlock_page(page);
+		}
+		dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n",
+		     inode, osd_data->length, rc >= 0 ? num_pages : 0);
 
-		unlock_page(page);
+		ceph_release_pages(osd_data->pages, num_pages);
 	}
-	dout("%p wrote+cleaned %d pages\n", inode, wrote);
-	ceph_put_wrbuffer_cap_refs(ci, num_pages, snapc);
 
-	ceph_release_pages(osd_data->pages, num_pages);
+	ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
+
+	osd_data = osd_req_op_extent_osd_data(req, 0);
 	if (osd_data->pages_from_pool)
 		mempool_free(osd_data->pages,
 			     ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
@@ -778,17 +778,15 @@ static int ceph_writepages_start(struct address_space *mapping,
 	while (!done && index <= end) {
 		unsigned i;
 		int first;
-		pgoff_t next;
-		int pvec_pages, locked_pages;
-		struct page **pages = NULL;
+		pgoff_t strip_unit_end = 0;
+		int num_ops = 0, op_idx;
+		int pvec_pages, locked_pages = 0;
+		struct page **pages = NULL, **data_pages;
 		mempool_t *pool = NULL;	/* Becomes non-null if mempool used */
 		struct page *page;
 		int want;
-		u64 offset, len;
-		long writeback_stat;
+		u64 offset = 0, len = 0;
 
-		next = 0;
-		locked_pages = 0;
 		max_pages = max_pages_ever;
 
 get_more_pages:
@@ -824,8 +822,8 @@ static int ceph_writepages_start(struct address_space *mapping,
 				unlock_page(page);
 				break;
 			}
-			if (next && (page->index != next)) {
-				dout("not consecutive %p\n", page);
+			if (strip_unit_end && (page->index > strip_unit_end)) {
+				dout("end of strip unit %p\n", page);
 				unlock_page(page);
 				break;
 			}
@@ -867,36 +865,31 @@ static int ceph_writepages_start(struct address_space *mapping,
 			/*
 			 * We have something to write.  If this is
 			 * the first locked page this time through,
-			 * allocate an osd request and a page array
-			 * that it will use.
+			 * calculate max possinle write size and
+			 * allocate a page array
 			 */
 			if (locked_pages == 0) {
-				BUG_ON(pages);
+				u64 objnum;
+				u64 objoff;
+
 				/* prepare async write request */
 				offset = (u64)page_offset(page);
 				len = wsize;
-				req = ceph_osdc_new_request(&fsc->client->osdc,
-							&ci->i_layout, vino,
-							offset, &len, 0,
-							do_sync ? 2 : 1,
-							CEPH_OSD_OP_WRITE,
-							CEPH_OSD_FLAG_WRITE |
-							CEPH_OSD_FLAG_ONDISK,
-							snapc, truncate_seq,
-							truncate_size, true);
-				if (IS_ERR(req)) {
-					rc = PTR_ERR(req);
+
+				rc = ceph_calc_file_object_mapping(&ci->i_layout,
+								offset, len,
+								&objnum, &objoff,
+								&len);
+				if (rc < 0) {
 					unlock_page(page);
 					break;
 				}
 
-				if (do_sync)
-					osd_req_op_init(req, 1,
-							CEPH_OSD_OP_STARTSYNC, 0);
-
-				req->r_callback = writepages_finish;
-				req->r_inode = inode;
+				num_ops = 1 + do_sync;
+				strip_unit_end = page->index +
+					((len - 1) >> PAGE_CACHE_SHIFT);
 
+				BUG_ON(pages);
 				max_pages = calc_pages_for(0, (u64)len);
 				pages = kmalloc(max_pages * sizeof (*pages),
 						GFP_NOFS);
@@ -905,6 +898,20 @@ static int ceph_writepages_start(struct address_space *mapping,
 					pages = mempool_alloc(pool, GFP_NOFS);
 					BUG_ON(!pages);
 				}
+
+				len = 0;
+			} else if (page->index !=
+				   (offset + len) >> PAGE_CACHE_SHIFT) {
+				if (num_ops >= (pool ?  CEPH_OSD_SLAB_OPS :
+							CEPH_OSD_MAX_OPS)) {
+					redirty_page_for_writepage(wbc, page);
+					unlock_page(page);
+					break;
+				}
+
+				num_ops++;
+				offset = (u64)page_offset(page);
+				len = 0;
 			}
 
 			/* note position of first page in pvec */
@@ -913,18 +920,16 @@ static int ceph_writepages_start(struct address_space *mapping,
 			dout("%p will write page %p idx %lu\n",
 			     inode, page, page->index);
 
-			writeback_stat =
-			       atomic_long_inc_return(&fsc->writeback_count);
-			if (writeback_stat > CONGESTION_ON_THRESH(
+			if (atomic_long_inc_return(&fsc->writeback_count) >
+			    CONGESTION_ON_THRESH(
 				    fsc->mount_options->congestion_kb)) {
 				set_bdi_congested(&fsc->backing_dev_info,
 						  BLK_RW_ASYNC);
 			}
 
-			set_page_writeback(page);
 			pages[locked_pages] = page;
 			locked_pages++;
-			next = page->index + 1;
+			len += PAGE_CACHE_SIZE;
 		}
 
 		/* did we get anything? */
@@ -944,38 +949,119 @@ static int ceph_writepages_start(struct address_space *mapping,
 			/* shift unused pages over in the pvec...  we
 			 * will need to release them below. */
 			for (j = i; j < pvec_pages; j++) {
-				dout(" pvec leftover page %p\n",
-				     pvec.pages[j]);
+				dout(" pvec leftover page %p\n", pvec.pages[j]);
 				pvec.pages[j-i+first] = pvec.pages[j];
 			}
 			pvec.nr -= i-first;
 		}
 
-		/* Format the osd request message and submit the write */
+new_request:
 		offset = page_offset(pages[0]);
-		len = (u64)locked_pages << PAGE_CACHE_SHIFT;
-		if (snap_size == -1) {
-			len = min(len, (u64)i_size_read(inode) - offset);
-			 /* writepages_finish() clears writeback pages
-			  * according to the data length, so make sure
-			  * data length covers all locked pages */
-			len = max(len, 1 +
-				((u64)(locked_pages - 1) << PAGE_CACHE_SHIFT));
-		} else {
-			len = min(len, snap_size - offset);
+		len = wsize;
+
+		req = ceph_osdc_new_request(&fsc->client->osdc,
+					&ci->i_layout, vino,
+					offset, &len, 0, num_ops,
+					CEPH_OSD_OP_WRITE,
+					CEPH_OSD_FLAG_WRITE |
+					CEPH_OSD_FLAG_ONDISK,
+					snapc, truncate_seq,
+					truncate_size, false);
+		if (IS_ERR(req)) {
+			req = ceph_osdc_new_request(&fsc->client->osdc,
+						&ci->i_layout, vino,
+						offset, &len, 0,
+						min(num_ops,
+						    CEPH_OSD_SLAB_OPS),
+						CEPH_OSD_OP_WRITE,
+						CEPH_OSD_FLAG_WRITE |
+						CEPH_OSD_FLAG_ONDISK,
+						snapc, truncate_seq,
+						truncate_size, true);
+			BUG_ON(IS_ERR(req));
 		}
-		dout("writepages got %d pages at %llu~%llu\n",
-		     locked_pages, offset, len);
+		BUG_ON(len < page_offset(pages[locked_pages - 1]) +
+			     PAGE_CACHE_SIZE - offset);
+
+		req->r_callback = writepages_finish;
+		req->r_inode = inode;
 
-		osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
+		/* Format the osd request message and submit the write */
+		len = 0;
+		data_pages = pages;
+		op_idx = 0;
+		for (i = 0; i < locked_pages; i++) {
+			u64 cur_offset = page_offset(pages[i]);
+			if (offset + len != cur_offset) {
+				if (op_idx + do_sync + 1 == req->r_num_ops)
+					break;
+				osd_req_op_extent_dup_last(req, op_idx,
+							   cur_offset - offset);
+				dout("writepages got pages at %llu~%llu\n",
+				     offset, len);
+				osd_req_op_extent_osd_data_pages(req, op_idx,
+							data_pages, len, 0,
 							!!pool, false);
+				osd_req_op_extent_update(req, op_idx, len);
 
-		pages = NULL;	/* request message now owns the pages array */
-		pool = NULL;
+				len = 0;
+				offset = cur_offset; 
+				data_pages = pages + i;
+				op_idx++;
+			}
 
-		/* Update the write op length in case we changed it */
+			set_page_writeback(pages[i]);
+			len += PAGE_CACHE_SIZE;
+		}
+
+		if (snap_size != -1) {
+			len = min(len, snap_size - offset);
+		} else if (i == locked_pages) {
+			/* writepages_finish() clears writeback pages
+			 * according to the data length, so make sure
+			 * data length covers all locked pages */
+			u64 min_len = len + 1 - PAGE_CACHE_SIZE;
+			len = min(len, (u64)i_size_read(inode) - offset);
+			len = max(len, min_len);
+		}
+		dout("writepages got pages at %llu~%llu\n", offset, len);
+
+		osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len,
+						 0, !!pool, false);
+		osd_req_op_extent_update(req, op_idx, len);
 
-		osd_req_op_extent_update(req, 0, len);
+		if (do_sync) {
+			op_idx++;
+			osd_req_op_init(req, op_idx, CEPH_OSD_OP_STARTSYNC, 0);
+		}
+		BUG_ON(op_idx + 1 != req->r_num_ops);
+
+		pool = NULL;
+		if (i < locked_pages) {
+			BUG_ON(num_ops <= req->r_num_ops);
+			num_ops -= req->r_num_ops;
+			num_ops += do_sync;
+			locked_pages -= i;
+
+			/* allocate new pages array for next request */
+			data_pages = pages;
+			pages = kmalloc(locked_pages * sizeof (*pages),
+					GFP_NOFS);
+			if (!pages) {
+				pool = fsc->wb_pagevec_pool;
+				pages = mempool_alloc(pool, GFP_NOFS);
+				BUG_ON(!pages);
+			}
+			memcpy(pages, data_pages + i,
+			       locked_pages * sizeof(*pages));
+			memset(data_pages + i, 0,
+			       locked_pages * sizeof(*pages));
+		} else {
+			BUG_ON(num_ops != req->r_num_ops);
+			index = pages[i - 1]->index + 1;
+			/* request message now owns the pages array */
+			pages = NULL;
+		}
 
 		vino = ceph_vino(inode);
 		ceph_osdc_build_request(req, offset, snapc, vino.snap,
@@ -985,9 +1071,10 @@ static int ceph_writepages_start(struct address_space *mapping,
 		BUG_ON(rc);
 		req = NULL;
 
-		/* continue? */
-		index = next;
-		wbc->nr_to_write -= locked_pages;
+		wbc->nr_to_write -= i;
+		if (pages)
+			goto new_request;
+
 		if (wbc->nr_to_write <= 0)
 			done = 1;
 
@@ -1522,7 +1609,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
 				    ceph_vino(inode), 0, &len, 0, 1,
 				    CEPH_OSD_OP_CREATE,
 				    CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
-				    ceph_empty_snapc, 0, 0, false);
+				    NULL, 0, 0, false);
 	if (IS_ERR(req)) {
 		err = PTR_ERR(req);
 		goto out;
@@ -1540,9 +1627,8 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
 				    ceph_vino(inode), 0, &len, 1, 3,
 				    CEPH_OSD_OP_WRITE,
 				    CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
-				    ceph_empty_snapc,
-				    ci->i_truncate_seq, ci->i_truncate_size,
-				    false);
+				    NULL, ci->i_truncate_seq,
+				    ci->i_truncate_size, false);
 	if (IS_ERR(req)) {
 		err = PTR_ERR(req);
 		goto out;
@@ -1663,8 +1749,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
 		goto out;
 	}
 
-	rd_req = ceph_osdc_alloc_request(&fsc->client->osdc,
-					 ceph_empty_snapc,
+	rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
 					 1, false, GFP_NOFS);
 	if (!rd_req) {
 		err = -ENOMEM;
@@ -1678,8 +1763,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
 		 "%llx.00000000", ci->i_vino.ino);
 	rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name);
 
-	wr_req = ceph_osdc_alloc_request(&fsc->client->osdc,
-					 ceph_empty_snapc,
+	wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
 					 1, false, GFP_NOFS);
 	if (!wr_req) {
 		err = -ENOMEM;
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 6fe0ad26a7dfc0430da9e9e0fca8c312a4b2bdc0..de17bb232ff8d30906c53f34248ce99ea566562a 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -991,7 +991,7 @@ static int send_cap_msg(struct ceph_mds_session *session,
 			u32 seq, u64 flush_tid, u64 oldest_flush_tid,
 			u32 issue_seq, u32 mseq, u64 size, u64 max_size,
 			struct timespec *mtime, struct timespec *atime,
-			u64 time_warp_seq,
+			struct timespec *ctime, u64 time_warp_seq,
 			kuid_t uid, kgid_t gid, umode_t mode,
 			u64 xattr_version,
 			struct ceph_buffer *xattrs_buf,
@@ -1042,6 +1042,8 @@ static int send_cap_msg(struct ceph_mds_session *session,
 		ceph_encode_timespec(&fc->mtime, mtime);
 	if (atime)
 		ceph_encode_timespec(&fc->atime, atime);
+	if (ctime)
+		ceph_encode_timespec(&fc->ctime, ctime);
 	fc->time_warp_seq = cpu_to_le32(time_warp_seq);
 
 	fc->uid = cpu_to_le32(from_kuid(&init_user_ns, uid));
@@ -1116,7 +1118,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
 	int held, revoking, dropping, keep;
 	u64 seq, issue_seq, mseq, time_warp_seq, follows;
 	u64 size, max_size;
-	struct timespec mtime, atime;
+	struct timespec mtime, atime, ctime;
 	int wake = 0;
 	umode_t mode;
 	kuid_t uid;
@@ -1180,6 +1182,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
 	ci->i_requested_max_size = max_size;
 	mtime = inode->i_mtime;
 	atime = inode->i_atime;
+	ctime = inode->i_ctime;
 	time_warp_seq = ci->i_time_warp_seq;
 	uid = inode->i_uid;
 	gid = inode->i_gid;
@@ -1198,7 +1201,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
 	ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
 		op, keep, want, flushing, seq,
 		flush_tid, oldest_flush_tid, issue_seq, mseq,
-		size, max_size, &mtime, &atime, time_warp_seq,
+		size, max_size, &mtime, &atime, &ctime, time_warp_seq,
 		uid, gid, mode, xattr_version, xattr_blob,
 		follows, inline_data);
 	if (ret < 0) {
@@ -1320,7 +1323,7 @@ void __ceph_flush_snaps(struct ceph_inode_info *ci,
 			     capsnap->dirty, 0, capsnap->flush_tid, 0,
 			     0, mseq, capsnap->size, 0,
 			     &capsnap->mtime, &capsnap->atime,
-			     capsnap->time_warp_seq,
+			     &capsnap->ctime, capsnap->time_warp_seq,
 			     capsnap->uid, capsnap->gid, capsnap->mode,
 			     capsnap->xattr_version, capsnap->xattr_blob,
 			     capsnap->follows, capsnap->inline_data);
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index fd11fb231a2ea796e86eae933265a488ad6774e5..fadc243dfb284b90a63961f4a48c372f483c1142 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -38,7 +38,7 @@ int ceph_init_dentry(struct dentry *dentry)
 	if (dentry->d_fsdata)
 		return 0;
 
-	di = kmem_cache_alloc(ceph_dentry_cachep, GFP_KERNEL | __GFP_ZERO);
+	di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL);
 	if (!di)
 		return -ENOMEM;          /* oh well */
 
@@ -68,23 +68,6 @@ int ceph_init_dentry(struct dentry *dentry)
 	return 0;
 }
 
-struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry)
-{
-	struct inode *inode = NULL;
-
-	if (!dentry)
-		return NULL;
-
-	spin_lock(&dentry->d_lock);
-	if (!IS_ROOT(dentry)) {
-		inode = d_inode(dentry->d_parent);
-		ihold(inode);
-	}
-	spin_unlock(&dentry->d_lock);
-	return inode;
-}
-
-
 /*
  * for readdir, we encode the directory frag and offset within that
  * frag into f_pos.
@@ -624,6 +607,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
 	struct ceph_mds_client *mdsc = fsc->mdsc;
 	struct ceph_mds_request *req;
 	int op;
+	int mask;
 	int err;
 
 	dout("lookup %p dentry %p '%pd'\n",
@@ -666,8 +650,12 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
 		return ERR_CAST(req);
 	req->r_dentry = dget(dentry);
 	req->r_num_caps = 2;
-	/* we only need inode linkage */
-	req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INODE);
+
+	mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
+	if (ceph_security_xattr_wanted(dir))
+		mask |= CEPH_CAP_XATTR_SHARED;
+	req->r_args.getattr.mask = cpu_to_le32(mask);
+
 	req->r_locked_dir = dir;
 	err = ceph_mdsc_do_request(mdsc, NULL, req);
 	err = ceph_handle_snapdir(req, dentry, err);
@@ -1095,6 +1083,7 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
 static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
 {
 	int valid = 0;
+	struct dentry *parent;
 	struct inode *dir;
 
 	if (flags & LOOKUP_RCU)
@@ -1103,7 +1092,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
 	dout("d_revalidate %p '%pd' inode %p offset %lld\n", dentry,
 	     dentry, d_inode(dentry), ceph_dentry(dentry)->offset);
 
-	dir = ceph_get_dentry_parent_inode(dentry);
+	parent = dget_parent(dentry);
+	dir = d_inode(parent);
 
 	/* always trust cached snapped dentries, snapdir dentry */
 	if (ceph_snap(dir) != CEPH_NOSNAP) {
@@ -1121,13 +1111,48 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
 			valid = 1;
 	}
 
+	if (!valid) {
+		struct ceph_mds_client *mdsc =
+			ceph_sb_to_client(dir->i_sb)->mdsc;
+		struct ceph_mds_request *req;
+		int op, mask, err;
+
+		op = ceph_snap(dir) == CEPH_SNAPDIR ?
+			CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
+		req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
+		if (!IS_ERR(req)) {
+			req->r_dentry = dget(dentry);
+			req->r_num_caps = 2;
+
+			mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
+			if (ceph_security_xattr_wanted(dir))
+				mask |= CEPH_CAP_XATTR_SHARED;
+			req->r_args.getattr.mask = mask;
+
+			req->r_locked_dir = dir;
+			err = ceph_mdsc_do_request(mdsc, NULL, req);
+			if (err == 0 || err == -ENOENT) {
+				if (dentry == req->r_dentry) {
+					valid = !d_unhashed(dentry);
+				} else {
+					d_invalidate(req->r_dentry);
+					err = -EAGAIN;
+				}
+			}
+			ceph_mdsc_put_request(req);
+			dout("d_revalidate %p lookup result=%d\n",
+			     dentry, err);
+		}
+	}
+
 	dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid");
 	if (valid) {
 		ceph_dentry_lru_touch(dentry);
 	} else {
 		ceph_dir_clear_complete(dir);
 	}
-	iput(dir);
+
+	dput(parent);
 	return valid;
 }
 
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index 3b31723573268e924346c1bffddb7361390b2a8d..6e72c98162d5351a9ad27b581eb17737eb4a1203 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -71,12 +71,18 @@ static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino)
 	inode = ceph_find_inode(sb, vino);
 	if (!inode) {
 		struct ceph_mds_request *req;
+		int mask;
 
 		req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPINO,
 					       USE_ANY_MDS);
 		if (IS_ERR(req))
 			return ERR_CAST(req);
 
+		mask = CEPH_STAT_CAP_INODE;
+		if (ceph_security_xattr_wanted(d_inode(sb->s_root)))
+			mask |= CEPH_CAP_XATTR_SHARED;
+		req->r_args.getattr.mask = cpu_to_le32(mask);
+
 		req->r_ino1 = vino;
 		req->r_num_caps = 1;
 		err = ceph_mdsc_do_request(mdsc, NULL, req);
@@ -128,6 +134,7 @@ static struct dentry *__get_parent(struct super_block *sb,
 	struct ceph_mds_request *req;
 	struct inode *inode;
 	struct dentry *dentry;
+	int mask;
 	int err;
 
 	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPPARENT,
@@ -144,6 +151,12 @@ static struct dentry *__get_parent(struct super_block *sb,
 			.snap = CEPH_NOSNAP,
 		};
 	}
+
+	mask = CEPH_STAT_CAP_INODE;
+	if (ceph_security_xattr_wanted(d_inode(sb->s_root)))
+		mask |= CEPH_CAP_XATTR_SHARED;
+	req->r_args.getattr.mask = cpu_to_le32(mask);
+
 	req->r_num_caps = 1;
 	err = ceph_mdsc_do_request(mdsc, NULL, req);
 	inode = req->r_target_inode;
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index eb9028e8cfc5197ef1ea99524fcbc67d67b4df91..ef38f01c1795a1187f2190a05f7ebd06bd9bc69e 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -157,7 +157,7 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
 	case S_IFDIR:
 		dout("init_file %p %p 0%o (regular)\n", inode, file,
 		     inode->i_mode);
-		cf = kmem_cache_alloc(ceph_file_cachep, GFP_KERNEL | __GFP_ZERO);
+		cf = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL);
 		if (cf == NULL) {
 			ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
 			return -ENOMEM;
@@ -300,6 +300,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
 	struct ceph_mds_request *req;
 	struct dentry *dn;
 	struct ceph_acls_info acls = {};
+       int mask;
 	int err;
 
 	dout("atomic_open %p dentry %p '%pd' %s flags %d mode 0%o\n",
@@ -335,6 +336,12 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
 			acls.pagelist = NULL;
 		}
 	}
+
+       mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
+       if (ceph_security_xattr_wanted(dir))
+               mask |= CEPH_CAP_XATTR_SHARED;
+       req->r_args.open.mask = cpu_to_le32(mask);
+
 	req->r_locked_dir = dir;           /* caller holds dir->i_mutex */
 	err = ceph_mdsc_do_request(mdsc,
 				   (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
@@ -725,7 +732,6 @@ static void ceph_aio_retry_work(struct work_struct *work)
 	ret = ceph_osdc_start_request(req->r_osdc, req, false);
 out:
 	if (ret < 0) {
-		BUG_ON(ret == -EOLDSNAPC);
 		req->r_result = ret;
 		ceph_aio_complete_req(req, NULL);
 	}
@@ -783,7 +789,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 	int num_pages = 0;
 	int flags;
 	int ret;
-	struct timespec mtime = CURRENT_TIME;
+	struct timespec mtime = current_fs_time(inode->i_sb);
 	size_t count = iov_iter_count(iter);
 	loff_t pos = iocb->ki_pos;
 	bool write = iov_iter_rw(iter) == WRITE;
@@ -949,7 +955,6 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 				ret = ceph_osdc_start_request(req->r_osdc,
 							      req, false);
 			if (ret < 0) {
-				BUG_ON(ret == -EOLDSNAPC);
 				req->r_result = ret;
 				ceph_aio_complete_req(req, NULL);
 			}
@@ -988,7 +993,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
 	int flags;
 	int check_caps = 0;
 	int ret;
-	struct timespec mtime = CURRENT_TIME;
+	struct timespec mtime = current_fs_time(inode->i_sb);
 	size_t count = iov_iter_count(from);
 
 	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index e48fd8b2325701acb6be66c72a119e20df000ba6..ed58b168904a9c2425badda5ff50a05832249705 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -549,6 +549,10 @@ int ceph_fill_file_size(struct inode *inode, int issued,
 	if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 ||
 	    (truncate_seq == ci->i_truncate_seq && size > inode->i_size)) {
 		dout("size %lld -> %llu\n", inode->i_size, size);
+		if (size > 0 && S_ISDIR(inode->i_mode)) {
+			pr_err("fill_file_size non-zero size for directory\n");
+			size = 0;
+		}
 		i_size_write(inode, size);
 		inode->i_blocks = (size + (1<<9) - 1) >> 9;
 		ci->i_reported_size = size;
@@ -1261,6 +1265,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
 			dout(" %p links to %p %llx.%llx, not %llx.%llx\n",
 			     dn, d_inode(dn), ceph_vinop(d_inode(dn)),
 			     ceph_vinop(in));
+			d_invalidate(dn);
 			have_lease = false;
 		}
 
@@ -1349,15 +1354,20 @@ static int fill_readdir_cache(struct inode *dir, struct dentry *dn,
 
 	if (!ctl->page || pgoff != page_index(ctl->page)) {
 		ceph_readdir_cache_release(ctl);
-		ctl->page  = grab_cache_page(&dir->i_data, pgoff);
+		if (idx == 0)
+			ctl->page = grab_cache_page(&dir->i_data, pgoff);
+		else
+			ctl->page = find_lock_page(&dir->i_data, pgoff);
 		if (!ctl->page) {
 			ctl->index = -1;
-			return -ENOMEM;
+			return idx == 0 ? -ENOMEM : 0;
 		}
 		/* reading/filling the cache are serialized by
 		 * i_mutex, no need to use page lock */
 		unlock_page(ctl->page);
 		ctl->dentries = kmap(ctl->page);
+		if (idx == 0)
+			memset(ctl->dentries, 0, PAGE_CACHE_SIZE);
 	}
 
 	if (req->r_dir_release_cnt == atomic64_read(&ci->i_release_count) &&
@@ -1380,7 +1390,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
 	struct qstr dname;
 	struct dentry *dn;
 	struct inode *in;
-	int err = 0, ret, i;
+	int err = 0, skipped = 0, ret, i;
 	struct inode *snapdir = NULL;
 	struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
 	struct ceph_dentry_info *di;
@@ -1492,7 +1502,17 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
 		}
 
 		if (d_really_is_negative(dn)) {
-			struct dentry *realdn = splice_dentry(dn, in);
+			struct dentry *realdn;
+
+			if (ceph_security_xattr_deadlock(in)) {
+				dout(" skip splicing dn %p to inode %p"
+				     " (security xattr deadlock)\n", dn, in);
+				iput(in);
+				skipped++;
+				goto next_item;
+			}
+
+			realdn = splice_dentry(dn, in);
 			if (IS_ERR(realdn)) {
 				err = PTR_ERR(realdn);
 				d_drop(dn);
@@ -1509,7 +1529,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
 				    req->r_session,
 				    req->r_request_started);
 
-		if (err == 0 && cache_ctl.index >= 0) {
+		if (err == 0 && skipped == 0 && cache_ctl.index >= 0) {
 			ret = fill_readdir_cache(d_inode(parent), dn,
 						 &cache_ctl, req);
 			if (ret < 0)
@@ -1520,7 +1540,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
 			dput(dn);
 	}
 out:
-	if (err == 0) {
+	if (err == 0 && skipped == 0) {
 		req->r_did_prepopulate = true;
 		req->r_readdir_cache_idx = cache_ctl.index;
 	}
@@ -1950,7 +1970,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
 	if (dirtied) {
 		inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied,
 							   &prealloc_cf);
-		inode->i_ctime = CURRENT_TIME;
+		inode->i_ctime = current_fs_time(inode->i_sb);
 	}
 
 	release &= issued;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 911d64d865f176aa4ce28832207abb8651a4fec8..44852c3ae5311a3367ca081685f8baa7ecd60bed 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1729,7 +1729,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
 	init_completion(&req->r_safe_completion);
 	INIT_LIST_HEAD(&req->r_unsafe_item);
 
-	req->r_stamp = CURRENT_TIME;
+	req->r_stamp = current_fs_time(mdsc->fsc->sb);
 
 	req->r_op = op;
 	req->r_direct_mode = mode;
@@ -2540,6 +2540,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
 
 	/* insert trace into our cache */
 	mutex_lock(&req->r_fill_mutex);
+	current->journal_info = req;
 	err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
 	if (err == 0) {
 		if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
@@ -2547,6 +2548,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
 			ceph_readdir_prepopulate(req, req->r_session);
 		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
 	}
+	current->journal_info = NULL;
 	mutex_unlock(&req->r_fill_mutex);
 
 	up_read(&mdsc->snap_rwsem);
@@ -3764,7 +3766,6 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
 	dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
 
 	/* do we need it? */
-	ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch);
 	mutex_lock(&mdsc->mutex);
 	if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
 		dout("handle_map epoch %u <= our %u\n",
@@ -3791,6 +3792,8 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
 	mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
 
 	__wake_requests(mdsc, &mdsc->waiting_for_map);
+	ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
+			  mdsc->mdsmap->m_epoch);
 
 	mutex_unlock(&mdsc->mutex);
 	schedule_delayed(mdsc);
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index 4aa7122a8d38c18dd4fe7443fb64f46765b83280..9caaa7ffc93fe06876283367d78c3573512c7549 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -296,8 +296,6 @@ static int cmpu64_rev(const void *a, const void *b)
 }
 
 
-struct ceph_snap_context *ceph_empty_snapc;
-
 /*
  * build the snap context for a given realm.
  */
@@ -987,17 +985,3 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
 		up_write(&mdsc->snap_rwsem);
 	return;
 }
-
-int __init ceph_snap_init(void)
-{
-	ceph_empty_snapc = ceph_create_snap_context(0, GFP_NOFS);
-	if (!ceph_empty_snapc)
-		return -ENOMEM;
-	ceph_empty_snapc->seq = 1;
-	return 0;
-}
-
-void ceph_snap_exit(void)
-{
-	ceph_put_snap_context(ceph_empty_snapc);
-}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index ca4d5e8457f1e5340330bb225b4da84c88727dc5..c973043deb0ecc2d4e002b323afc4ffb6c9c7782 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -439,8 +439,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
 
 	if (fsopt->flags & CEPH_MOUNT_OPT_DIRSTAT)
 		seq_puts(m, ",dirstat");
-	if ((fsopt->flags & CEPH_MOUNT_OPT_RBYTES) == 0)
-		seq_puts(m, ",norbytes");
+	if ((fsopt->flags & CEPH_MOUNT_OPT_RBYTES))
+		seq_puts(m, ",rbytes");
 	if (fsopt->flags & CEPH_MOUNT_OPT_NOASYNCREADDIR)
 		seq_puts(m, ",noasyncreaddir");
 	if ((fsopt->flags & CEPH_MOUNT_OPT_DCACHE) == 0)
@@ -530,7 +530,7 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
 		goto fail;
 	}
 	fsc->client->extra_mon_dispatch = extra_mon_dispatch;
-	fsc->client->monc.want_mdsmap = 1;
+	ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 0, true);
 
 	fsc->mount_options = fsopt;
 
@@ -793,22 +793,20 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc,
 	struct dentry *root;
 	int first = 0;   /* first vfsmount for this super_block */
 
-	dout("mount start\n");
+	dout("mount start %p\n", fsc);
 	mutex_lock(&fsc->client->mount_mutex);
 
-	err = __ceph_open_session(fsc->client, started);
-	if (err < 0)
-		goto out;
+	if (!fsc->sb->s_root) {
+		err = __ceph_open_session(fsc->client, started);
+		if (err < 0)
+			goto out;
 
-	dout("mount opening root\n");
-	root = open_root_dentry(fsc, "", started);
-	if (IS_ERR(root)) {
-		err = PTR_ERR(root);
-		goto out;
-	}
-	if (fsc->sb->s_root) {
-		dput(root);
-	} else {
+		dout("mount opening root\n");
+		root = open_root_dentry(fsc, "", started);
+		if (IS_ERR(root)) {
+			err = PTR_ERR(root);
+			goto out;
+		}
 		fsc->sb->s_root = root;
 		first = 1;
 
@@ -818,6 +816,7 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc,
 	}
 
 	if (path[0] == 0) {
+		root = fsc->sb->s_root;
 		dget(root);
 	} else {
 		dout("mount opening base mountpoint\n");
@@ -833,16 +832,14 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc,
 	mutex_unlock(&fsc->client->mount_mutex);
 	return root;
 
-out:
-	mutex_unlock(&fsc->client->mount_mutex);
-	return ERR_PTR(err);
-
 fail:
 	if (first) {
 		dput(fsc->sb->s_root);
 		fsc->sb->s_root = NULL;
 	}
-	goto out;
+out:
+	mutex_unlock(&fsc->client->mount_mutex);
+	return ERR_PTR(err);
 }
 
 static int ceph_set_super(struct super_block *s, void *data)
@@ -1042,19 +1039,14 @@ static int __init init_ceph(void)
 
 	ceph_flock_init();
 	ceph_xattr_init();
-	ret = ceph_snap_init();
-	if (ret)
-		goto out_xattr;
 	ret = register_filesystem(&ceph_fs_type);
 	if (ret)
-		goto out_snap;
+		goto out_xattr;
 
 	pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL);
 
 	return 0;
 
-out_snap:
-	ceph_snap_exit();
 out_xattr:
 	ceph_xattr_exit();
 	destroy_caches();
@@ -1066,7 +1058,6 @@ static void __exit exit_ceph(void)
 {
 	dout("exit_ceph\n");
 	unregister_filesystem(&ceph_fs_type);
-	ceph_snap_exit();
 	ceph_xattr_exit();
 	destroy_caches();
 }
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 9c458eb522458da2364238cef2e92530ace87df9..e705c4d612d763515bb3a85f603c1c7e52107cc7 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -37,8 +37,7 @@
 #define CEPH_MOUNT_OPT_FSCACHE         (1<<10) /* use fscache */
 #define CEPH_MOUNT_OPT_NOPOOLPERM      (1<<11) /* no pool permission check */
 
-#define CEPH_MOUNT_OPT_DEFAULT    (CEPH_MOUNT_OPT_RBYTES | \
-				   CEPH_MOUNT_OPT_DCACHE)
+#define CEPH_MOUNT_OPT_DEFAULT    CEPH_MOUNT_OPT_DCACHE
 
 #define ceph_set_mount_opt(fsc, opt) \
 	(fsc)->mount_options->flags |= CEPH_MOUNT_OPT_##opt;
@@ -469,7 +468,7 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
 #define CEPH_I_POOL_PERM	(1 << 4)  /* pool rd/wr bits are valid */
 #define CEPH_I_POOL_RD		(1 << 5)  /* can read from pool */
 #define CEPH_I_POOL_WR		(1 << 6)  /* can write to pool */
-
+#define CEPH_I_SEC_INITED	(1 << 7)  /* security initialized */
 
 static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
 					   long long release_count,
@@ -721,7 +720,6 @@ static inline int default_congestion_kb(void)
 
 
 /* snap.c */
-extern struct ceph_snap_context *ceph_empty_snapc;
 struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
 					       u64 ino);
 extern void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
@@ -738,8 +736,6 @@ extern void ceph_queue_cap_snap(struct ceph_inode_info *ci);
 extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
 				  struct ceph_cap_snap *capsnap);
 extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc);
-extern int ceph_snap_init(void);
-extern void ceph_snap_exit(void);
 
 /*
  * a cap_snap is "pending" if it is still awaiting an in-progress
@@ -808,6 +804,20 @@ extern void __init ceph_xattr_init(void);
 extern void ceph_xattr_exit(void);
 extern const struct xattr_handler *ceph_xattr_handlers[];
 
+#ifdef CONFIG_SECURITY
+extern bool ceph_security_xattr_deadlock(struct inode *in);
+extern bool ceph_security_xattr_wanted(struct inode *in);
+#else
+static inline bool ceph_security_xattr_deadlock(struct inode *in)
+{
+	return false;
+}
+static inline bool ceph_security_xattr_wanted(struct inode *in)
+{
+	return false;
+}
+#endif
+
 /* acl.c */
 struct ceph_acls_info {
 	void *default_acl;
@@ -947,7 +957,6 @@ extern void ceph_dentry_lru_touch(struct dentry *dn);
 extern void ceph_dentry_lru_del(struct dentry *dn);
 extern void ceph_invalidate_dentry_lease(struct dentry *dentry);
 extern unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn);
-extern struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry);
 extern void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl);
 
 /*
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 819163d8313bb3748765b5c3313dfdfd32ac9472..9410abdef3cec5fdb0ca73a905c503c10bc994fb 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -714,31 +714,62 @@ void __ceph_build_xattrs_blob(struct ceph_inode_info *ci)
 	}
 }
 
+static inline int __get_request_mask(struct inode *in) {
+	struct ceph_mds_request *req = current->journal_info;
+	int mask = 0;
+	if (req && req->r_target_inode == in) {
+		if (req->r_op == CEPH_MDS_OP_LOOKUP ||
+		    req->r_op == CEPH_MDS_OP_LOOKUPINO ||
+		    req->r_op == CEPH_MDS_OP_LOOKUPPARENT ||
+		    req->r_op == CEPH_MDS_OP_GETATTR) {
+			mask = le32_to_cpu(req->r_args.getattr.mask);
+		} else if (req->r_op == CEPH_MDS_OP_OPEN ||
+			   req->r_op == CEPH_MDS_OP_CREATE) {
+			mask = le32_to_cpu(req->r_args.open.mask);
+		}
+	}
+	return mask;
+}
+
 ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
 		      size_t size)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	int err;
 	struct ceph_inode_xattr *xattr;
 	struct ceph_vxattr *vxattr = NULL;
+	int req_mask;
+	int err;
 
 	if (!ceph_is_valid_xattr(name))
 		return -ENODATA;
 
 	/* let's see if a virtual xattr was requested */
 	vxattr = ceph_match_vxattr(inode, name);
-	if (vxattr && !(vxattr->exists_cb && !vxattr->exists_cb(ci))) {
-		err = vxattr->getxattr_cb(ci, value, size);
+	if (vxattr) {
+		err = -ENODATA;
+		if (!(vxattr->exists_cb && !vxattr->exists_cb(ci)))
+			err = vxattr->getxattr_cb(ci, value, size);
 		return err;
 	}
 
+	req_mask = __get_request_mask(inode);
+
 	spin_lock(&ci->i_ceph_lock);
 	dout("getxattr %p ver=%lld index_ver=%lld\n", inode,
 	     ci->i_xattrs.version, ci->i_xattrs.index_version);
 
 	if (ci->i_xattrs.version == 0 ||
-	    !__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1)) {
+	    !((req_mask & CEPH_CAP_XATTR_SHARED) ||
+	      __ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1))) {
 		spin_unlock(&ci->i_ceph_lock);
+
+		/* security module gets xattr while filling trace */
+		if (current->journal_info != NULL) {
+			pr_warn_ratelimited("sync getxattr %p "
+					    "during filling trace\n", inode);
+			return -EBUSY;
+		}
+
 		/* get xattrs from mds (if we don't already have them) */
 		err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR, true);
 		if (err)
@@ -765,6 +796,9 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
 
 	memcpy(value, xattr->val, xattr->val_len);
 
+	if (current->journal_info != NULL &&
+	    !strncmp(name, XATTR_SECURITY_PREFIX, XATTR_SECURITY_PREFIX_LEN))
+		ci->i_ceph_flags |= CEPH_I_SEC_INITED;
 out:
 	spin_unlock(&ci->i_ceph_lock);
 	return err;
@@ -999,7 +1033,7 @@ int __ceph_setxattr(struct dentry *dentry, const char *name,
 		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL,
 					       &prealloc_cf);
 		ci->i_xattrs.dirty = true;
-		inode->i_ctime = CURRENT_TIME;
+		inode->i_ctime = current_fs_time(inode->i_sb);
 	}
 
 	spin_unlock(&ci->i_ceph_lock);
@@ -1015,7 +1049,15 @@ int __ceph_setxattr(struct dentry *dentry, const char *name,
 do_sync_unlocked:
 	if (lock_snap_rwsem)
 		up_read(&mdsc->snap_rwsem);
-	err = ceph_sync_setxattr(dentry, name, value, size, flags);
+
+	/* security module set xattr while filling trace */
+	if (current->journal_info != NULL) {
+		pr_warn_ratelimited("sync setxattr %p "
+				    "during filling trace\n", inode);
+		err = -EBUSY;
+	} else {
+		err = ceph_sync_setxattr(dentry, name, value, size, flags);
+	}
 out:
 	ceph_free_cap_flush(prealloc_cf);
 	kfree(newname);
@@ -1136,7 +1178,7 @@ int __ceph_removexattr(struct dentry *dentry, const char *name)
 	dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL,
 				       &prealloc_cf);
 	ci->i_xattrs.dirty = true;
-	inode->i_ctime = CURRENT_TIME;
+	inode->i_ctime = current_fs_time(inode->i_sb);
 	spin_unlock(&ci->i_ceph_lock);
 	if (lock_snap_rwsem)
 		up_read(&mdsc->snap_rwsem);
@@ -1164,3 +1206,25 @@ int ceph_removexattr(struct dentry *dentry, const char *name)
 
 	return __ceph_removexattr(dentry, name);
 }
+
+#ifdef CONFIG_SECURITY
+bool ceph_security_xattr_wanted(struct inode *in)
+{
+	return in->i_security != NULL;
+}
+
+bool ceph_security_xattr_deadlock(struct inode *in)
+{
+	struct ceph_inode_info *ci;
+	bool ret;
+	if (in->i_security == NULL)
+		return false;
+	ci = ceph_inode(in);
+	spin_lock(&ci->i_ceph_lock);
+	ret = !(ci->i_ceph_flags & CEPH_I_SEC_INITED) &&
+	      !(ci->i_xattrs.version > 0 &&
+		__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0));
+	spin_unlock(&ci->i_ceph_lock);
+	return ret;
+}
+#endif
diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h
index 15151f3c41202c6b6229475affb654ae0bab272d..ae2f66833762cd7d63bd77eafb9aed13b6ab2308 100644
--- a/include/linux/ceph/ceph_features.h
+++ b/include/linux/ceph/ceph_features.h
@@ -105,6 +105,7 @@ static inline u64 ceph_sanitize_features(u64 features)
  */
 #define CEPH_FEATURES_SUPPORTED_DEFAULT		\
 	(CEPH_FEATURE_NOSRCADDR |		\
+	 CEPH_FEATURE_SUBSCRIBE2 |		\
 	 CEPH_FEATURE_RECONNECT_SEQ |		\
 	 CEPH_FEATURE_PGID64 |			\
 	 CEPH_FEATURE_PGPOOL3 |			\
@@ -127,6 +128,7 @@ static inline u64 ceph_sanitize_features(u64 features)
 
 #define CEPH_FEATURES_REQUIRED_DEFAULT   \
 	(CEPH_FEATURE_NOSRCADDR |	 \
+	 CEPH_FEATURE_SUBSCRIBE2 |	 \
 	 CEPH_FEATURE_RECONNECT_SEQ |	 \
 	 CEPH_FEATURE_PGID64 |		 \
 	 CEPH_FEATURE_PGPOOL3 |		 \
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index d7d072a25c2717734139e98b01dd2724f2c43844..37f28bf55ce426bac3a8a533101f4afed21be53f 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -198,8 +198,8 @@ struct ceph_client_mount {
 #define CEPH_SUBSCRIBE_ONETIME    1  /* i want only 1 update after have */
 
 struct ceph_mon_subscribe_item {
-	__le64 have_version;    __le64 have;
-	__u8 onetime;
+	__le64 start;
+	__u8 flags;
 } __attribute__ ((packed));
 
 struct ceph_mon_subscribe_ack {
@@ -376,7 +376,8 @@ union ceph_mds_request_args {
 		__le32 stripe_count;         /* ... */
 		__le32 object_size;
 		__le32 file_replication;
-		__le32 unused;               /* used to be preferred osd */
+               __le32 mask;                 /* CEPH_CAP_* */
+               __le32 old_size;
 	} __attribute__ ((packed)) open;
 	struct {
 		__le32 flags;
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 3e3799cdc6e66d719fed8ae9adc8539d4e2cb0c6..e7975e4681e1a9110ee26506eaf4402ffcbad082 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -47,7 +47,6 @@ struct ceph_options {
 	unsigned long mount_timeout;		/* jiffies */
 	unsigned long osd_idle_ttl;		/* jiffies */
 	unsigned long osd_keepalive_timeout;	/* jiffies */
-	unsigned long monc_ping_timeout;	/* jiffies */
 
 	/*
 	 * any type that can't be simply compared or doesn't need need
@@ -68,7 +67,12 @@ struct ceph_options {
 #define CEPH_MOUNT_TIMEOUT_DEFAULT	msecs_to_jiffies(60 * 1000)
 #define CEPH_OSD_KEEPALIVE_DEFAULT	msecs_to_jiffies(5 * 1000)
 #define CEPH_OSD_IDLE_TTL_DEFAULT	msecs_to_jiffies(60 * 1000)
-#define CEPH_MONC_PING_TIMEOUT_DEFAULT	msecs_to_jiffies(30 * 1000)
+
+#define CEPH_MONC_HUNT_INTERVAL		msecs_to_jiffies(3 * 1000)
+#define CEPH_MONC_PING_INTERVAL		msecs_to_jiffies(10 * 1000)
+#define CEPH_MONC_PING_TIMEOUT		msecs_to_jiffies(30 * 1000)
+#define CEPH_MONC_HUNT_BACKOFF		2
+#define CEPH_MONC_HUNT_MAX_MULT		10
 
 #define CEPH_MSG_MAX_FRONT_LEN	(16*1024*1024)
 #define CEPH_MSG_MAX_MIDDLE_LEN	(16*1024*1024)
diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h
index 81810dc21f061ce1acf5129f5dea5e6f7f758e65..e230e7ed60d369f202020e6c955f50c644b89dfa 100644
--- a/include/linux/ceph/mon_client.h
+++ b/include/linux/ceph/mon_client.h
@@ -68,18 +68,24 @@ struct ceph_mon_client {
 
 	bool hunting;
 	int cur_mon;                       /* last monitor i contacted */
-	unsigned long sub_sent, sub_renew_after;
+	unsigned long sub_renew_after;
+	unsigned long sub_renew_sent;
 	struct ceph_connection con;
 
+	bool had_a_connection;
+	int hunt_mult; /* [1..CEPH_MONC_HUNT_MAX_MULT] */
+
 	/* pending generic requests */
 	struct rb_root generic_request_tree;
 	int num_generic_requests;
 	u64 last_tid;
 
-	/* mds/osd map */
-	int want_mdsmap;
-	int want_next_osdmap; /* 1 = want, 2 = want+asked */
-	u32 have_osdmap, have_mdsmap;
+	/* subs, indexed with CEPH_SUB_* */
+	struct {
+		struct ceph_mon_subscribe_item item;
+		bool want;
+		u32 have; /* epoch */
+	} subs[3];
 
 #ifdef CONFIG_DEBUG_FS
 	struct dentry *debugfs_file;
@@ -93,14 +99,23 @@ extern int ceph_monmap_contains(struct ceph_monmap *m,
 extern int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl);
 extern void ceph_monc_stop(struct ceph_mon_client *monc);
 
+enum {
+	CEPH_SUB_MDSMAP = 0,
+	CEPH_SUB_MONMAP,
+	CEPH_SUB_OSDMAP,
+};
+
+extern const char *ceph_sub_str[];
+
 /*
  * The model here is to indicate that we need a new map of at least
- * epoch @want, and also call in when we receive a map.  We will
+ * epoch @epoch, and also call in when we receive a map.  We will
  * periodically rerequest the map from the monitor cluster until we
  * get what we want.
  */
-extern int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 have);
-extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 have);
+bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
+			bool continuous);
+void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch);
 
 extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc);
 extern int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 7506b485bb6d1d4cee0aff00cbe1132d55396071..4343df80671019a01e7a0b7aeec89f191670e6f0 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -43,7 +43,8 @@ struct ceph_osd {
 };
 
 
-#define CEPH_OSD_MAX_OP	3
+#define CEPH_OSD_SLAB_OPS	2
+#define CEPH_OSD_MAX_OPS	16
 
 enum ceph_osd_data_type {
 	CEPH_OSD_DATA_TYPE_NONE = 0,
@@ -77,7 +78,10 @@ struct ceph_osd_data {
 struct ceph_osd_req_op {
 	u16 op;           /* CEPH_OSD_OP_* */
 	u32 flags;        /* CEPH_OSD_OP_FLAG_* */
-	u32 payload_len;
+	u32 indata_len;   /* request */
+	u32 outdata_len;  /* reply */
+	s32 rval;
+
 	union {
 		struct ceph_osd_data raw_data_in;
 		struct {
@@ -136,7 +140,6 @@ struct ceph_osd_request {
 
 	/* request osd ops array  */
 	unsigned int		r_num_ops;
-	struct ceph_osd_req_op	r_ops[CEPH_OSD_MAX_OP];
 
 	/* these are updated on each send */
 	__le32           *r_request_osdmap_epoch;
@@ -148,8 +151,6 @@ struct ceph_osd_request {
 	struct ceph_eversion *r_request_reassert_version;
 
 	int               r_result;
-	int               r_reply_op_len[CEPH_OSD_MAX_OP];
-	s32               r_reply_op_result[CEPH_OSD_MAX_OP];
 	int               r_got_reply;
 	int		  r_linger;
 
@@ -174,6 +175,8 @@ struct ceph_osd_request {
 	unsigned long     r_stamp;            /* send OR check time */
 
 	struct ceph_snap_context *r_snapc;    /* snap context for writes */
+
+	struct ceph_osd_req_op r_ops[];
 };
 
 struct ceph_request_redirect {
@@ -263,6 +266,8 @@ extern void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
 					u64 truncate_size, u32 truncate_seq);
 extern void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
 					unsigned int which, u64 length);
+extern void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
+				       unsigned int which, u64 offset_inc);
 
 extern struct ceph_osd_data *osd_req_op_extent_osd_data(
 					struct ceph_osd_request *osd_req,
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index bcbec33c6a14a8f5d962fb1d803a6503a6e11f9b..dcc18c6f7cf9b96c303407c5ac5f13f564bb224d 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -361,7 +361,6 @@ ceph_parse_options(char *options, const char *dev_name,
 	opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
 	opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT;
 	opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT;
-	opt->monc_ping_timeout = CEPH_MONC_PING_TIMEOUT_DEFAULT;
 
 	/* get mon ip(s) */
 	/* ip1[:port1][,ip2[:port2]...] */
@@ -686,6 +685,9 @@ int __ceph_open_session(struct ceph_client *client, unsigned long started)
 			return client->auth_err;
 	}
 
+	pr_info("client%llu fsid %pU\n", ceph_client_id(client), &client->fsid);
+	ceph_debugfs_client_init(client);
+
 	return 0;
 }
 EXPORT_SYMBOL(__ceph_open_session);
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 593dc2eabcc803342187c1e57355f31d76b17f49..b902fbc7863ef893222401a5f2b9bdcc8db87431 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -112,15 +112,20 @@ static int monc_show(struct seq_file *s, void *p)
 	struct ceph_mon_generic_request *req;
 	struct ceph_mon_client *monc = &client->monc;
 	struct rb_node *rp;
+	int i;
 
 	mutex_lock(&monc->mutex);
 
-	if (monc->have_mdsmap)
-		seq_printf(s, "have mdsmap %u\n", (unsigned int)monc->have_mdsmap);
-	if (monc->have_osdmap)
-		seq_printf(s, "have osdmap %u\n", (unsigned int)monc->have_osdmap);
-	if (monc->want_next_osdmap)
-		seq_printf(s, "want next osdmap\n");
+	for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
+		seq_printf(s, "have %s %u", ceph_sub_str[i],
+			   monc->subs[i].have);
+		if (monc->subs[i].want)
+			seq_printf(s, " want %llu%s",
+				   le64_to_cpu(monc->subs[i].item.start),
+				   (monc->subs[i].item.flags &
+					CEPH_SUBSCRIBE_ONETIME ?  "" : "+"));
+		seq_putc(s, '\n');
+	}
 
 	for (rp = rb_first(&monc->generic_request_tree); rp; rp = rb_next(rp)) {
 		__u16 op;
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 9382619a405b8da854c82b041a137a670399567f..1831f63536225bbe120dbc5215b26d2d6518f3fb 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -235,18 +235,12 @@ static struct workqueue_struct *ceph_msgr_wq;
 static int ceph_msgr_slab_init(void)
 {
 	BUG_ON(ceph_msg_cache);
-	ceph_msg_cache = kmem_cache_create("ceph_msg",
-					sizeof (struct ceph_msg),
-					__alignof__(struct ceph_msg), 0, NULL);
-
+	ceph_msg_cache = KMEM_CACHE(ceph_msg, 0);
 	if (!ceph_msg_cache)
 		return -ENOMEM;
 
 	BUG_ON(ceph_msg_data_cache);
-	ceph_msg_data_cache = kmem_cache_create("ceph_msg_data",
-					sizeof (struct ceph_msg_data),
-					__alignof__(struct ceph_msg_data),
-					0, NULL);
+	ceph_msg_data_cache = KMEM_CACHE(ceph_msg_data, 0);
 	if (ceph_msg_data_cache)
 		return 0;
 
@@ -1221,25 +1215,19 @@ static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
 static void prepare_write_message_footer(struct ceph_connection *con)
 {
 	struct ceph_msg *m = con->out_msg;
-	int v = con->out_kvec_left;
 
 	m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
 
 	dout("prepare_write_message_footer %p\n", con);
-	con->out_kvec[v].iov_base = &m->footer;
+	con_out_kvec_add(con, sizeof_footer(con), &m->footer);
 	if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
 		if (con->ops->sign_message)
 			con->ops->sign_message(m);
 		else
 			m->footer.sig = 0;
-		con->out_kvec[v].iov_len = sizeof(m->footer);
-		con->out_kvec_bytes += sizeof(m->footer);
 	} else {
 		m->old_footer.flags = m->footer.flags;
-		con->out_kvec[v].iov_len = sizeof(m->old_footer);
-		con->out_kvec_bytes += sizeof(m->old_footer);
 	}
-	con->out_kvec_left++;
 	con->out_more = m->more_to_follow;
 	con->out_msg_done = true;
 }
@@ -2409,11 +2397,7 @@ static int read_partial_message(struct ceph_connection *con)
 	}
 
 	/* footer */
-	if (need_sign)
-		size = sizeof(m->footer);
-	else
-		size = sizeof(m->old_footer);
-
+	size = sizeof_footer(con);
 	end += size;
 	ret = read_partial(con, end, size, &m->footer);
 	if (ret <= 0)
@@ -3089,10 +3073,7 @@ void ceph_msg_revoke(struct ceph_msg *msg)
 			con->out_skip += con_out_kvec_skip(con);
 		} else {
 			BUG_ON(!msg->data_length);
-			if (con->peer_features & CEPH_FEATURE_MSG_AUTH)
-				con->out_skip += sizeof(msg->footer);
-			else
-				con->out_skip += sizeof(msg->old_footer);
+			con->out_skip += sizeof_footer(con);
 		}
 		/* data, middle, front */
 		if (msg->data_length)
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index de85dddc3dc08cfeee81435c4475a6d975c4cd96..cf638c009cfabe9e95d114c1e187785db46f958c 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -122,51 +122,91 @@ static void __close_session(struct ceph_mon_client *monc)
 	ceph_msg_revoke(monc->m_subscribe);
 	ceph_msg_revoke_incoming(monc->m_subscribe_ack);
 	ceph_con_close(&monc->con);
-	monc->cur_mon = -1;
+
 	monc->pending_auth = 0;
 	ceph_auth_reset(monc->auth);
 }
 
 /*
- * Open a session with a (new) monitor.
+ * Pick a new monitor at random and set cur_mon.  If we are repicking
+ * (i.e. cur_mon is already set), be sure to pick a different one.
  */
-static int __open_session(struct ceph_mon_client *monc)
+static void pick_new_mon(struct ceph_mon_client *monc)
 {
-	char r;
-	int ret;
+	int old_mon = monc->cur_mon;
 
-	if (monc->cur_mon < 0) {
-		get_random_bytes(&r, 1);
-		monc->cur_mon = r % monc->monmap->num_mon;
-		dout("open_session num=%d r=%d -> mon%d\n",
-		     monc->monmap->num_mon, r, monc->cur_mon);
-		monc->sub_sent = 0;
-		monc->sub_renew_after = jiffies;  /* i.e., expired */
-		monc->want_next_osdmap = !!monc->want_next_osdmap;
-
-		dout("open_session mon%d opening\n", monc->cur_mon);
-		ceph_con_open(&monc->con,
-			      CEPH_ENTITY_TYPE_MON, monc->cur_mon,
-			      &monc->monmap->mon_inst[monc->cur_mon].addr);
-
-		/* send an initial keepalive to ensure our timestamp is
-		 * valid by the time we are in an OPENED state */
-		ceph_con_keepalive(&monc->con);
-
-		/* initiatiate authentication handshake */
-		ret = ceph_auth_build_hello(monc->auth,
-					    monc->m_auth->front.iov_base,
-					    monc->m_auth->front_alloc_len);
-		__send_prepared_auth_request(monc, ret);
+	BUG_ON(monc->monmap->num_mon < 1);
+
+	if (monc->monmap->num_mon == 1) {
+		monc->cur_mon = 0;
 	} else {
-		dout("open_session mon%d already open\n", monc->cur_mon);
+		int max = monc->monmap->num_mon;
+		int o = -1;
+		int n;
+
+		if (monc->cur_mon >= 0) {
+			if (monc->cur_mon < monc->monmap->num_mon)
+				o = monc->cur_mon;
+			if (o >= 0)
+				max--;
+		}
+
+		n = prandom_u32() % max;
+		if (o >= 0 && n >= o)
+			n++;
+
+		monc->cur_mon = n;
 	}
-	return 0;
+
+	dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
+	     monc->cur_mon, monc->monmap->num_mon);
+}
+
+/*
+ * Open a session with a new monitor.
+ */
+static void __open_session(struct ceph_mon_client *monc)
+{
+	int ret;
+
+	pick_new_mon(monc);
+
+	monc->hunting = true;
+	if (monc->had_a_connection) {
+		monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
+		if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
+			monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
+	}
+
+	monc->sub_renew_after = jiffies; /* i.e., expired */
+	monc->sub_renew_sent = 0;
+
+	dout("%s opening mon%d\n", __func__, monc->cur_mon);
+	ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
+		      &monc->monmap->mon_inst[monc->cur_mon].addr);
+
+	/*
+	 * send an initial keepalive to ensure our timestamp is valid
+	 * by the time we are in an OPENED state
+	 */
+	ceph_con_keepalive(&monc->con);
+
+	/* initiate authentication handshake */
+	ret = ceph_auth_build_hello(monc->auth,
+				    monc->m_auth->front.iov_base,
+				    monc->m_auth->front_alloc_len);
+	BUG_ON(ret <= 0);
+	__send_prepared_auth_request(monc, ret);
 }
 
-static bool __sub_expired(struct ceph_mon_client *monc)
+static void reopen_session(struct ceph_mon_client *monc)
 {
-	return time_after_eq(jiffies, monc->sub_renew_after);
+	if (!monc->hunting)
+		pr_info("mon%d %s session lost, hunting for new mon\n",
+		    monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr.in_addr));
+
+	__close_session(monc);
+	__open_session(monc);
 }
 
 /*
@@ -174,74 +214,70 @@ static bool __sub_expired(struct ceph_mon_client *monc)
  */
 static void __schedule_delayed(struct ceph_mon_client *monc)
 {
-	struct ceph_options *opt = monc->client->options;
 	unsigned long delay;
 
-	if (monc->cur_mon < 0 || __sub_expired(monc)) {
-		delay = 10 * HZ;
-	} else {
-		delay = 20 * HZ;
-		if (opt->monc_ping_timeout > 0)
-			delay = min(delay, opt->monc_ping_timeout / 3);
-	}
+	if (monc->hunting)
+		delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
+	else
+		delay = CEPH_MONC_PING_INTERVAL;
+
 	dout("__schedule_delayed after %lu\n", delay);
-	schedule_delayed_work(&monc->delayed_work,
-			      round_jiffies_relative(delay));
+	mod_delayed_work(system_wq, &monc->delayed_work,
+			 round_jiffies_relative(delay));
 }
 
+const char *ceph_sub_str[] = {
+	[CEPH_SUB_MDSMAP] = "mdsmap",
+	[CEPH_SUB_MONMAP] = "monmap",
+	[CEPH_SUB_OSDMAP] = "osdmap",
+};
+
 /*
- * Send subscribe request for mdsmap and/or osdmap.
+ * Send subscribe request for one or more maps, according to
+ * monc->subs.
  */
 static void __send_subscribe(struct ceph_mon_client *monc)
 {
-	dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n",
-	     (unsigned int)monc->sub_sent, __sub_expired(monc),
-	     monc->want_next_osdmap);
-	if ((__sub_expired(monc) && !monc->sub_sent) ||
-	    monc->want_next_osdmap == 1) {
-		struct ceph_msg *msg = monc->m_subscribe;
-		struct ceph_mon_subscribe_item *i;
-		void *p, *end;
-		int num;
-
-		p = msg->front.iov_base;
-		end = p + msg->front_alloc_len;
-
-		num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap;
-		ceph_encode_32(&p, num);
-
-		if (monc->want_next_osdmap) {
-			dout("__send_subscribe to 'osdmap' %u\n",
-			     (unsigned int)monc->have_osdmap);
-			ceph_encode_string(&p, end, "osdmap", 6);
-			i = p;
-			i->have = cpu_to_le64(monc->have_osdmap);
-			i->onetime = 1;
-			p += sizeof(*i);
-			monc->want_next_osdmap = 2;  /* requested */
-		}
-		if (monc->want_mdsmap) {
-			dout("__send_subscribe to 'mdsmap' %u+\n",
-			     (unsigned int)monc->have_mdsmap);
-			ceph_encode_string(&p, end, "mdsmap", 6);
-			i = p;
-			i->have = cpu_to_le64(monc->have_mdsmap);
-			i->onetime = 0;
-			p += sizeof(*i);
-		}
-		ceph_encode_string(&p, end, "monmap", 6);
-		i = p;
-		i->have = 0;
-		i->onetime = 0;
-		p += sizeof(*i);
-
-		msg->front.iov_len = p - msg->front.iov_base;
-		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
-		ceph_msg_revoke(msg);
-		ceph_con_send(&monc->con, ceph_msg_get(msg));
-
-		monc->sub_sent = jiffies | 1;  /* never 0 */
+	struct ceph_msg *msg = monc->m_subscribe;
+	void *p = msg->front.iov_base;
+	void *const end = p + msg->front_alloc_len;
+	int num = 0;
+	int i;
+
+	dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
+
+	BUG_ON(monc->cur_mon < 0);
+
+	if (!monc->sub_renew_sent)
+		monc->sub_renew_sent = jiffies | 1; /* never 0 */
+
+	msg->hdr.version = cpu_to_le16(2);
+
+	for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
+		if (monc->subs[i].want)
+			num++;
 	}
+	BUG_ON(num < 1); /* monmap sub is always there */
+	ceph_encode_32(&p, num);
+	for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
+		const char *s = ceph_sub_str[i];
+
+		if (!monc->subs[i].want)
+			continue;
+
+		dout("%s %s start %llu flags 0x%x\n", __func__, s,
+		     le64_to_cpu(monc->subs[i].item.start),
+		     monc->subs[i].item.flags);
+		ceph_encode_string(&p, end, s, strlen(s));
+		memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
+		p += sizeof(monc->subs[i].item);
+	}
+
+	BUG_ON(p != (end - 35 - (ARRAY_SIZE(monc->subs) - num) * 19));
+	msg->front.iov_len = p - msg->front.iov_base;
+	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+	ceph_msg_revoke(msg);
+	ceph_con_send(&monc->con, ceph_msg_get(msg));
 }
 
 static void handle_subscribe_ack(struct ceph_mon_client *monc,
@@ -255,15 +291,16 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc,
 	seconds = le32_to_cpu(h->duration);
 
 	mutex_lock(&monc->mutex);
-	if (monc->hunting) {
-		pr_info("mon%d %s session established\n",
-			monc->cur_mon,
-			ceph_pr_addr(&monc->con.peer_addr.in_addr));
-		monc->hunting = false;
+	if (monc->sub_renew_sent) {
+		monc->sub_renew_after = monc->sub_renew_sent +
+					    (seconds >> 1) * HZ - 1;
+		dout("%s sent %lu duration %d renew after %lu\n", __func__,
+		     monc->sub_renew_sent, seconds, monc->sub_renew_after);
+		monc->sub_renew_sent = 0;
+	} else {
+		dout("%s sent %lu renew after %lu, ignoring\n", __func__,
+		     monc->sub_renew_sent, monc->sub_renew_after);
 	}
-	dout("handle_subscribe_ack after %d seconds\n", seconds);
-	monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1;
-	monc->sub_sent = 0;
 	mutex_unlock(&monc->mutex);
 	return;
 bad:
@@ -272,36 +309,82 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc,
 }
 
 /*
- * Keep track of which maps we have
+ * Register interest in a map
+ *
+ * @sub: one of CEPH_SUB_*
+ * @epoch: X for "every map since X", or 0 for "just the latest"
  */
-int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got)
+static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
+				 u32 epoch, bool continuous)
+{
+	__le64 start = cpu_to_le64(epoch);
+	u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
+
+	dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
+	     epoch, continuous);
+
+	if (monc->subs[sub].want &&
+	    monc->subs[sub].item.start == start &&
+	    monc->subs[sub].item.flags == flags)
+		return false;
+
+	monc->subs[sub].item.start = start;
+	monc->subs[sub].item.flags = flags;
+	monc->subs[sub].want = true;
+
+	return true;
+}
+
+bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
+			bool continuous)
 {
+	bool need_request;
+
 	mutex_lock(&monc->mutex);
-	monc->have_mdsmap = got;
+	need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
 	mutex_unlock(&monc->mutex);
-	return 0;
+
+	return need_request;
 }
-EXPORT_SYMBOL(ceph_monc_got_mdsmap);
+EXPORT_SYMBOL(ceph_monc_want_map);
 
-int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got)
+/*
+ * Keep track of which maps we have
+ *
+ * @sub: one of CEPH_SUB_*
+ */
+static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
+				u32 epoch)
+{
+	dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
+
+	if (monc->subs[sub].want) {
+		if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
+			monc->subs[sub].want = false;
+		else
+			monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
+	}
+
+	monc->subs[sub].have = epoch;
+}
+
+void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
 {
 	mutex_lock(&monc->mutex);
-	monc->have_osdmap = got;
-	monc->want_next_osdmap = 0;
+	__ceph_monc_got_map(monc, sub, epoch);
 	mutex_unlock(&monc->mutex);
-	return 0;
 }
+EXPORT_SYMBOL(ceph_monc_got_map);
 
 /*
  * Register interest in the next osdmap
  */
 void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
 {
-	dout("request_next_osdmap have %u\n", monc->have_osdmap);
+	dout("%s have %u\n", __func__, monc->subs[CEPH_SUB_OSDMAP].have);
 	mutex_lock(&monc->mutex);
-	if (!monc->want_next_osdmap)
-		monc->want_next_osdmap = 1;
-	if (monc->want_next_osdmap < 2)
+	if (__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP,
+				 monc->subs[CEPH_SUB_OSDMAP].have + 1, false))
 		__send_subscribe(monc);
 	mutex_unlock(&monc->mutex);
 }
@@ -320,15 +403,15 @@ int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
 	long ret;
 
 	mutex_lock(&monc->mutex);
-	while (monc->have_osdmap < epoch) {
+	while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
 		mutex_unlock(&monc->mutex);
 
 		if (timeout && time_after_eq(jiffies, started + timeout))
 			return -ETIMEDOUT;
 
 		ret = wait_event_interruptible_timeout(monc->client->auth_wq,
-						monc->have_osdmap >= epoch,
-						ceph_timeout_jiffies(timeout));
+				     monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
+				     ceph_timeout_jiffies(timeout));
 		if (ret < 0)
 			return ret;
 
@@ -341,11 +424,14 @@ int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
 EXPORT_SYMBOL(ceph_monc_wait_osdmap);
 
 /*
- *
+ * Open a session with a random monitor.  Request monmap and osdmap,
+ * which are waited upon in __ceph_open_session().
  */
 int ceph_monc_open_session(struct ceph_mon_client *monc)
 {
 	mutex_lock(&monc->mutex);
+	__ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
+	__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
 	__open_session(monc);
 	__schedule_delayed(monc);
 	mutex_unlock(&monc->mutex);
@@ -353,29 +439,15 @@ int ceph_monc_open_session(struct ceph_mon_client *monc)
 }
 EXPORT_SYMBOL(ceph_monc_open_session);
 
-/*
- * We require the fsid and global_id in order to initialize our
- * debugfs dir.
- */
-static bool have_debugfs_info(struct ceph_mon_client *monc)
-{
-	dout("have_debugfs_info fsid %d globalid %lld\n",
-	     (int)monc->client->have_fsid, monc->auth->global_id);
-	return monc->client->have_fsid && monc->auth->global_id > 0;
-}
-
 static void ceph_monc_handle_map(struct ceph_mon_client *monc,
 				 struct ceph_msg *msg)
 {
 	struct ceph_client *client = monc->client;
 	struct ceph_monmap *monmap = NULL, *old = monc->monmap;
 	void *p, *end;
-	int had_debugfs_info, init_debugfs = 0;
 
 	mutex_lock(&monc->mutex);
 
-	had_debugfs_info = have_debugfs_info(monc);
-
 	dout("handle_monmap\n");
 	p = msg->front.iov_base;
 	end = p + msg->front.iov_len;
@@ -395,29 +467,11 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,
 	client->monc.monmap = monmap;
 	kfree(old);
 
-	if (!client->have_fsid) {
-		client->have_fsid = true;
-		if (!had_debugfs_info && have_debugfs_info(monc)) {
-			pr_info("client%lld fsid %pU\n",
-				ceph_client_id(monc->client),
-				&monc->client->fsid);
-			init_debugfs = 1;
-		}
-		mutex_unlock(&monc->mutex);
-
-		if (init_debugfs) {
-			/*
-			 * do debugfs initialization without mutex to avoid
-			 * creating a locking dependency
-			 */
-			ceph_debugfs_client_init(monc->client);
-		}
+	__ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
+	client->have_fsid = true;
 
-		goto out_unlocked;
-	}
 out:
 	mutex_unlock(&monc->mutex);
-out_unlocked:
 	wake_up_all(&client->auth_wq);
 }
 
@@ -745,18 +799,15 @@ static void delayed_work(struct work_struct *work)
 	dout("monc delayed_work\n");
 	mutex_lock(&monc->mutex);
 	if (monc->hunting) {
-		__close_session(monc);
-		__open_session(monc);  /* continue hunting */
+		dout("%s continuing hunt\n", __func__);
+		reopen_session(monc);
 	} else {
-		struct ceph_options *opt = monc->client->options;
 		int is_auth = ceph_auth_is_authenticated(monc->auth);
 		if (ceph_con_keepalive_expired(&monc->con,
-					       opt->monc_ping_timeout)) {
+					       CEPH_MONC_PING_TIMEOUT)) {
 			dout("monc keepalive timeout\n");
 			is_auth = 0;
-			__close_session(monc);
-			monc->hunting = true;
-			__open_session(monc);
+			reopen_session(monc);
 		}
 
 		if (!monc->hunting) {
@@ -764,8 +815,14 @@ static void delayed_work(struct work_struct *work)
 			__validate_auth(monc);
 		}
 
-		if (is_auth)
-			__send_subscribe(monc);
+		if (is_auth) {
+			unsigned long now = jiffies;
+
+			dout("%s renew subs? now %lu renew after %lu\n",
+			     __func__, now, monc->sub_renew_after);
+			if (time_after_eq(now, monc->sub_renew_after))
+				__send_subscribe(monc);
+		}
 	}
 	__schedule_delayed(monc);
 	mutex_unlock(&monc->mutex);
@@ -852,18 +909,14 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
 		      &monc->client->msgr);
 
 	monc->cur_mon = -1;
-	monc->hunting = true;
-	monc->sub_renew_after = jiffies;
-	monc->sub_sent = 0;
+	monc->had_a_connection = false;
+	monc->hunt_mult = 1;
 
 	INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
 	monc->generic_request_tree = RB_ROOT;
 	monc->num_generic_requests = 0;
 	monc->last_tid = 0;
 
-	monc->have_mdsmap = 0;
-	monc->have_osdmap = 0;
-	monc->want_next_osdmap = 1;
 	return 0;
 
 out_auth_reply:
@@ -888,7 +941,7 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
 
 	mutex_lock(&monc->mutex);
 	__close_session(monc);
-
+	monc->cur_mon = -1;
 	mutex_unlock(&monc->mutex);
 
 	/*
@@ -910,26 +963,40 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
 }
 EXPORT_SYMBOL(ceph_monc_stop);
 
+static void finish_hunting(struct ceph_mon_client *monc)
+{
+	if (monc->hunting) {
+		dout("%s found mon%d\n", __func__, monc->cur_mon);
+		monc->hunting = false;
+		monc->had_a_connection = true;
+		monc->hunt_mult /= 2; /* reduce by 50% */
+		if (monc->hunt_mult < 1)
+			monc->hunt_mult = 1;
+	}
+}
+
 static void handle_auth_reply(struct ceph_mon_client *monc,
 			      struct ceph_msg *msg)
 {
 	int ret;
 	int was_auth = 0;
-	int had_debugfs_info, init_debugfs = 0;
 
 	mutex_lock(&monc->mutex);
-	had_debugfs_info = have_debugfs_info(monc);
 	was_auth = ceph_auth_is_authenticated(monc->auth);
 	monc->pending_auth = 0;
 	ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
 				     msg->front.iov_len,
 				     monc->m_auth->front.iov_base,
 				     monc->m_auth->front_alloc_len);
+	if (ret > 0) {
+		__send_prepared_auth_request(monc, ret);
+		goto out;
+	}
+
+	finish_hunting(monc);
+
 	if (ret < 0) {
 		monc->client->auth_err = ret;
-		wake_up_all(&monc->client->auth_wq);
-	} else if (ret > 0) {
-		__send_prepared_auth_request(monc, ret);
 	} else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
 		dout("authenticated, starting session\n");
 
@@ -939,23 +1006,15 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
 
 		__send_subscribe(monc);
 		__resend_generic_request(monc);
-	}
 
-	if (!had_debugfs_info && have_debugfs_info(monc)) {
-		pr_info("client%lld fsid %pU\n",
-			ceph_client_id(monc->client),
-			&monc->client->fsid);
-		init_debugfs = 1;
+		pr_info("mon%d %s session established\n", monc->cur_mon,
+			ceph_pr_addr(&monc->con.peer_addr.in_addr));
 	}
-	mutex_unlock(&monc->mutex);
 
-	if (init_debugfs) {
-		/*
-		 * do debugfs initialization without mutex to avoid
-		 * creating a locking dependency
-		 */
-		ceph_debugfs_client_init(monc->client);
-	}
+out:
+	mutex_unlock(&monc->mutex);
+	if (monc->client->auth_err < 0)
+		wake_up_all(&monc->client->auth_wq);
 }
 
 static int __validate_auth(struct ceph_mon_client *monc)
@@ -1096,29 +1155,17 @@ static void mon_fault(struct ceph_connection *con)
 {
 	struct ceph_mon_client *monc = con->private;
 
-	if (!monc)
-		return;
-
-	dout("mon_fault\n");
 	mutex_lock(&monc->mutex);
-	if (!con->private)
-		goto out;
-
-	if (!monc->hunting)
-		pr_info("mon%d %s session lost, "
-			"hunting for new mon\n", monc->cur_mon,
-			ceph_pr_addr(&monc->con.peer_addr.in_addr));
-
-	__close_session(monc);
-	if (!monc->hunting) {
-		/* start hunting */
-		monc->hunting = true;
-		__open_session(monc);
-	} else {
-		/* already hunting, let's wait a bit */
-		__schedule_delayed(monc);
+	dout("%s mon%d\n", __func__, monc->cur_mon);
+	if (monc->cur_mon >= 0) {
+		if (!monc->hunting) {
+			dout("%s hunting for new mon\n", __func__);
+			reopen_session(monc);
+			__schedule_delayed(monc);
+		} else {
+			dout("%s already hunting\n", __func__);
+		}
 	}
-out:
 	mutex_unlock(&monc->mutex);
 }
 
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 5bc053778feddd0e35fe73480317be25345da61c..32355d9d0103a827a82f3bfe27493d9a2b47b339 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -338,9 +338,10 @@ static void ceph_osdc_release_request(struct kref *kref)
 	ceph_put_snap_context(req->r_snapc);
 	if (req->r_mempool)
 		mempool_free(req, req->r_osdc->req_mempool);
-	else
+	else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
 		kmem_cache_free(ceph_osd_request_cache, req);
-
+	else
+		kfree(req);
 }
 
 void ceph_osdc_get_request(struct ceph_osd_request *req)
@@ -369,28 +370,22 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 	struct ceph_msg *msg;
 	size_t msg_size;
 
-	BUILD_BUG_ON(CEPH_OSD_MAX_OP > U16_MAX);
-	BUG_ON(num_ops > CEPH_OSD_MAX_OP);
-
-	msg_size = 4 + 4 + 8 + 8 + 4+8;
-	msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
-	msg_size += 1 + 8 + 4 + 4;     /* pg_t */
-	msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */
-	msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
-	msg_size += 8;  /* snapid */
-	msg_size += 8;  /* snap_seq */
-	msg_size += 8 * (snapc ? snapc->num_snaps : 0);  /* snaps */
-	msg_size += 4;
-
 	if (use_mempool) {
+		BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
 		req = mempool_alloc(osdc->req_mempool, gfp_flags);
-		memset(req, 0, sizeof(*req));
+	} else if (num_ops <= CEPH_OSD_SLAB_OPS) {
+		req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags);
 	} else {
-		req = kmem_cache_zalloc(ceph_osd_request_cache, gfp_flags);
+		BUG_ON(num_ops > CEPH_OSD_MAX_OPS);
+		req = kmalloc(sizeof(*req) + num_ops * sizeof(req->r_ops[0]),
+			      gfp_flags);
 	}
-	if (req == NULL)
+	if (unlikely(!req))
 		return NULL;
 
+	/* req only, each op is zeroed in _osd_req_op_init() */
+	memset(req, 0, sizeof(*req));
+
 	req->r_osdc = osdc;
 	req->r_mempool = use_mempool;
 	req->r_num_ops = num_ops;
@@ -408,18 +403,36 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 	req->r_base_oloc.pool = -1;
 	req->r_target_oloc.pool = -1;
 
+	msg_size = OSD_OPREPLY_FRONT_LEN;
+	if (num_ops > CEPH_OSD_SLAB_OPS) {
+		/* ceph_osd_op and rval */
+		msg_size += (num_ops - CEPH_OSD_SLAB_OPS) *
+			    (sizeof(struct ceph_osd_op) + 4);
+	}
+
 	/* create reply message */
 	if (use_mempool)
 		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
 	else
-		msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
-				   OSD_OPREPLY_FRONT_LEN, gfp_flags, true);
+		msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size,
+				   gfp_flags, true);
 	if (!msg) {
 		ceph_osdc_put_request(req);
 		return NULL;
 	}
 	req->r_reply = msg;
 
+	msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */
+	msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */
+	msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
+	msg_size += 1 + 8 + 4 + 4; /* pgid */
+	msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */
+	msg_size += 2 + num_ops * sizeof(struct ceph_osd_op);
+	msg_size += 8; /* snapid */
+	msg_size += 8; /* snap_seq */
+	msg_size += 4 + 8 * (snapc ? snapc->num_snaps : 0); /* snaps */
+	msg_size += 4; /* retry_attempt */
+
 	/* create request message; allow space for oid */
 	if (use_mempool)
 		msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
@@ -498,7 +511,7 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
 	if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
 		payload_len += length;
 
-	op->payload_len = payload_len;
+	op->indata_len = payload_len;
 }
 EXPORT_SYMBOL(osd_req_op_extent_init);
 
@@ -517,10 +530,32 @@ void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
 	BUG_ON(length > previous);
 
 	op->extent.length = length;
-	op->payload_len -= previous - length;
+	op->indata_len -= previous - length;
 }
 EXPORT_SYMBOL(osd_req_op_extent_update);
 
+void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
+				unsigned int which, u64 offset_inc)
+{
+	struct ceph_osd_req_op *op, *prev_op;
+
+	BUG_ON(which + 1 >= osd_req->r_num_ops);
+
+	prev_op = &osd_req->r_ops[which];
+	op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
+	/* dup previous one */
+	op->indata_len = prev_op->indata_len;
+	op->outdata_len = prev_op->outdata_len;
+	op->extent = prev_op->extent;
+	/* adjust offset */
+	op->extent.offset += offset_inc;
+	op->extent.length -= offset_inc;
+
+	if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
+		op->indata_len -= offset_inc;
+}
+EXPORT_SYMBOL(osd_req_op_extent_dup_last);
+
 void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
 			u16 opcode, const char *class, const char *method)
 {
@@ -554,7 +589,7 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
 
 	op->cls.argc = 0;	/* currently unused */
 
-	op->payload_len = payload_len;
+	op->indata_len = payload_len;
 }
 EXPORT_SYMBOL(osd_req_op_cls_init);
 
@@ -587,7 +622,7 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
 	op->xattr.cmp_mode = cmp_mode;
 
 	ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
-	op->payload_len = payload_len;
+	op->indata_len = payload_len;
 	return 0;
 }
 EXPORT_SYMBOL(osd_req_op_xattr_init);
@@ -707,7 +742,7 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
 			BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
 			dst->cls.indata_len = cpu_to_le32(data_length);
 			ceph_osdc_msg_data_add(req->r_request, osd_data);
-			src->payload_len += data_length;
+			src->indata_len += data_length;
 			request_data_len += data_length;
 		}
 		osd_data = &src->cls.response_data;
@@ -750,7 +785,7 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
 
 	dst->op = cpu_to_le16(src->op);
 	dst->flags = cpu_to_le32(src->flags);
-	dst->payload_len = cpu_to_le32(src->payload_len);
+	dst->payload_len = cpu_to_le32(src->indata_len);
 
 	return request_data_len;
 }
@@ -1810,7 +1845,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 
 	ceph_decode_need(&p, end, 4, bad_put);
 	numops = ceph_decode_32(&p);
-	if (numops > CEPH_OSD_MAX_OP)
+	if (numops > CEPH_OSD_MAX_OPS)
 		goto bad_put;
 	if (numops != req->r_num_ops)
 		goto bad_put;
@@ -1821,7 +1856,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 		int len;
 
 		len = le32_to_cpu(op->payload_len);
-		req->r_reply_op_len[i] = len;
+		req->r_ops[i].outdata_len = len;
 		dout(" op %d has %d bytes\n", i, len);
 		payload_len += len;
 		p += sizeof(*op);
@@ -1836,7 +1871,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 	ceph_decode_need(&p, end, 4 + numops * 4, bad_put);
 	retry_attempt = ceph_decode_32(&p);
 	for (i = 0; i < numops; i++)
-		req->r_reply_op_result[i] = ceph_decode_32(&p);
+		req->r_ops[i].rval = ceph_decode_32(&p);
 
 	if (le16_to_cpu(msg->hdr.version) >= 6) {
 		p += 8 + 4; /* skip replay_version */
@@ -2187,7 +2222,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 		goto bad;
 done:
 	downgrade_write(&osdc->map_sem);
-	ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
+	ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
+			  osdc->osdmap->epoch);
 
 	/*
 	 * subscribe to subsequent osdmap updates if full to ensure
@@ -2646,8 +2682,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
 	    round_jiffies_relative(osdc->client->options->osd_idle_ttl));
 
 	err = -ENOMEM;
-	osdc->req_mempool = mempool_create_kmalloc_pool(10,
-					sizeof(struct ceph_osd_request));
+	osdc->req_mempool = mempool_create_slab_pool(10,
+						     ceph_osd_request_cache);
 	if (!osdc->req_mempool)
 		goto out;
 
@@ -2782,11 +2818,12 @@ EXPORT_SYMBOL(ceph_osdc_writepages);
 
 int ceph_osdc_setup(void)
 {
+	size_t size = sizeof(struct ceph_osd_request) +
+	    CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op);
+
 	BUG_ON(ceph_osd_request_cache);
-	ceph_osd_request_cache = kmem_cache_create("ceph_osd_request",
-					sizeof (struct ceph_osd_request),
-					__alignof__(struct ceph_osd_request),
-					0, NULL);
+	ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size,
+						   0, 0, NULL);
 
 	return ceph_osd_request_cache ? 0 : -ENOMEM;
 }