diff --git a/fs/io-wq.c b/fs/io-wq.c
index e92c4724480cadc8756360802ee9d44e748c0f32..414beb5438836cf4c28935bbdfd376e74078e19f 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -925,6 +925,24 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
 	return match->nr_running && !match->cancel_all;
 }
 
+static inline void io_wqe_remove_pending(struct io_wqe *wqe,
+					 struct io_wq_work *work,
+					 struct io_wq_work_node *prev)
+{
+	unsigned int hash = io_get_work_hash(work);
+	struct io_wq_work *prev_work = NULL;
+
+	if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
+		if (prev)
+			prev_work = container_of(prev, struct io_wq_work, list);
+		if (prev_work && io_get_work_hash(prev_work) == hash)
+			wqe->hash_tail[hash] = prev_work;
+		else
+			wqe->hash_tail[hash] = NULL;
+	}
+	wq_list_del(&wqe->work_list, &work->list, prev);
+}
+
 static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
 				       struct io_cb_cancel_data *match)
 {
@@ -938,8 +956,7 @@ retry:
 		work = container_of(node, struct io_wq_work, list);
 		if (!match->fn(work, match->data))
 			continue;
-
-		wq_list_del(&wqe->work_list, node, prev);
+		io_wqe_remove_pending(wqe, work, prev);
 		spin_unlock_irqrestore(&wqe->lock, flags);
 		io_run_cancel(work, wqe);
 		match->nr_pending++;
diff --git a/fs/io_uring.c b/fs/io_uring.c
index 8a53af8e5fe2558e5c1051c3c007274253a2e423..ce69bd9b0838e316b369fe607327a74c19c046a1 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -1150,7 +1150,7 @@ static void io_prep_async_work(struct io_kiocb *req)
 	io_req_init_async(req);
 
 	if (req->flags & REQ_F_ISREG) {
-		if (def->hash_reg_file)
+		if (def->hash_reg_file || (req->ctx->flags & IORING_SETUP_IOPOLL))
 			io_wq_hash_work(&req->work, file_inode(req->file));
 	} else {
 		if (def->unbound_nonreg_file)
@@ -1746,7 +1746,8 @@ static struct io_kiocb *io_req_find_next(struct io_kiocb *req)
 	return __io_req_find_next(req);
 }
 
-static int io_req_task_work_add(struct io_kiocb *req, struct callback_head *cb)
+static int io_req_task_work_add(struct io_kiocb *req, struct callback_head *cb,
+				bool twa_signal_ok)
 {
 	struct task_struct *tsk = req->task;
 	struct io_ring_ctx *ctx = req->ctx;
@@ -1759,7 +1760,7 @@ static int io_req_task_work_add(struct io_kiocb *req, struct callback_head *cb)
 	 * will do the job.
 	 */
 	notify = 0;
-	if (!(ctx->flags & IORING_SETUP_SQPOLL))
+	if (!(ctx->flags & IORING_SETUP_SQPOLL) && twa_signal_ok)
 		notify = TWA_SIGNAL;
 
 	ret = task_work_add(tsk, cb, notify);
@@ -1819,7 +1820,7 @@ static void io_req_task_queue(struct io_kiocb *req)
 	init_task_work(&req->task_work, io_req_task_submit);
 	percpu_ref_get(&req->ctx->refs);
 
-	ret = io_req_task_work_add(req, &req->task_work);
+	ret = io_req_task_work_add(req, &req->task_work, true);
 	if (unlikely(ret)) {
 		struct task_struct *tsk;
 
@@ -2048,6 +2049,7 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
 
 		req = list_first_entry(done, struct io_kiocb, inflight_entry);
 		if (READ_ONCE(req->result) == -EAGAIN) {
+			req->result = 0;
 			req->iopoll_completed = 0;
 			list_move_tail(&req->inflight_entry, &again);
 			continue;
@@ -2293,22 +2295,6 @@ end_req:
 	io_req_complete(req, ret);
 	return false;
 }
-
-static void io_rw_resubmit(struct callback_head *cb)
-{
-	struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work);
-	struct io_ring_ctx *ctx = req->ctx;
-	int err;
-
-	err = io_sq_thread_acquire_mm(ctx, req);
-
-	if (io_resubmit_prep(req, err)) {
-		refcount_inc(&req->refs);
-		io_queue_async_work(req);
-	}
-
-	percpu_ref_put(&ctx->refs);
-}
 #endif
 
 static bool io_rw_reissue(struct io_kiocb *req, long res)
@@ -2319,12 +2305,14 @@ static bool io_rw_reissue(struct io_kiocb *req, long res)
 	if ((res != -EAGAIN && res != -EOPNOTSUPP) || io_wq_current_is_worker())
 		return false;
 
-	init_task_work(&req->task_work, io_rw_resubmit);
-	percpu_ref_get(&req->ctx->refs);
+	ret = io_sq_thread_acquire_mm(req->ctx, req);
 
-	ret = io_req_task_work_add(req, &req->task_work);
-	if (!ret)
+	if (io_resubmit_prep(req, ret)) {
+		refcount_inc(&req->refs);
+		io_queue_async_work(req);
 		return true;
+	}
+
 #endif
 	return false;
 }
@@ -2865,6 +2853,11 @@ static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
 	return iov_iter_count(&req->io->rw.iter);
 }
 
+static inline loff_t *io_kiocb_ppos(struct kiocb *kiocb)
+{
+	return kiocb->ki_filp->f_mode & FMODE_STREAM ? NULL : &kiocb->ki_pos;
+}
+
 /*
  * For files that don't have ->read_iter() and ->write_iter(), handle them
  * by looping over ->read() or ->write() manually.
@@ -2900,10 +2893,10 @@ static ssize_t loop_rw_iter(int rw, struct file *file, struct kiocb *kiocb,
 
 		if (rw == READ) {
 			nr = file->f_op->read(file, iovec.iov_base,
-					      iovec.iov_len, &kiocb->ki_pos);
+					      iovec.iov_len, io_kiocb_ppos(kiocb));
 		} else {
 			nr = file->f_op->write(file, iovec.iov_base,
-					       iovec.iov_len, &kiocb->ki_pos);
+					       iovec.iov_len, io_kiocb_ppos(kiocb));
 		}
 
 		if (iov_iter_is_bvec(iter))
@@ -3044,7 +3037,7 @@ static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode,
 
 	/* submit ref gets dropped, acquire a new one */
 	refcount_inc(&req->refs);
-	ret = io_req_task_work_add(req, &req->task_work);
+	ret = io_req_task_work_add(req, &req->task_work, true);
 	if (unlikely(ret)) {
 		struct task_struct *tsk;
 
@@ -3125,6 +3118,7 @@ static int io_read(struct io_kiocb *req, bool force_nonblock,
 	ret = io_import_iovec(READ, req, &iovec, iter, !force_nonblock);
 	if (ret < 0)
 		return ret;
+	iov_count = iov_iter_count(iter);
 	io_size = ret;
 	req->result = io_size;
 	ret = 0;
@@ -3137,8 +3131,7 @@ static int io_read(struct io_kiocb *req, bool force_nonblock,
 	if (force_nonblock && !io_file_supports_async(req->file, READ))
 		goto copy_iov;
 
-	iov_count = iov_iter_count(iter);
-	ret = rw_verify_area(READ, req->file, &kiocb->ki_pos, iov_count);
+	ret = rw_verify_area(READ, req->file, io_kiocb_ppos(kiocb), iov_count);
 	if (unlikely(ret))
 		goto out_free;
 
@@ -3150,14 +3143,18 @@ static int io_read(struct io_kiocb *req, bool force_nonblock,
 		ret = 0;
 		goto out_free;
 	} else if (ret == -EAGAIN) {
-		if (!force_nonblock)
+		/* IOPOLL retry should happen for io-wq threads */
+		if (!force_nonblock && !(req->ctx->flags & IORING_SETUP_IOPOLL))
 			goto done;
+		/* some cases will consume bytes even on error returns */
+		iov_iter_revert(iter, iov_count - iov_iter_count(iter));
 		ret = io_setup_async_rw(req, iovec, inline_vecs, iter, false);
 		if (ret)
 			goto out_free;
 		return -EAGAIN;
 	} else if (ret < 0) {
-		goto out_free;
+		/* make sure -ERESTARTSYS -> -EINTR is done */
+		goto done;
 	}
 
 	/* read it all, or we did blocking attempt. no retry. */
@@ -3241,6 +3238,7 @@ static int io_write(struct io_kiocb *req, bool force_nonblock,
 	ret = io_import_iovec(WRITE, req, &iovec, iter, !force_nonblock);
 	if (ret < 0)
 		return ret;
+	iov_count = iov_iter_count(iter);
 	io_size = ret;
 	req->result = io_size;
 
@@ -3257,8 +3255,7 @@ static int io_write(struct io_kiocb *req, bool force_nonblock,
 	    (req->flags & REQ_F_ISREG))
 		goto copy_iov;
 
-	iov_count = iov_iter_count(iter);
-	ret = rw_verify_area(WRITE, req->file, &kiocb->ki_pos, iov_count);
+	ret = rw_verify_area(WRITE, req->file, io_kiocb_ppos(kiocb), iov_count);
 	if (unlikely(ret))
 		goto out_free;
 
@@ -3291,9 +3288,14 @@ static int io_write(struct io_kiocb *req, bool force_nonblock,
 	if (ret2 == -EOPNOTSUPP && (kiocb->ki_flags & IOCB_NOWAIT))
 		ret2 = -EAGAIN;
 	if (!force_nonblock || ret2 != -EAGAIN) {
+		/* IOPOLL retry should happen for io-wq threads */
+		if ((req->ctx->flags & IORING_SETUP_IOPOLL) && ret2 == -EAGAIN)
+			goto copy_iov;
 		kiocb_done(kiocb, ret2, cs);
 	} else {
 copy_iov:
+		/* some cases will consume bytes even on error returns */
+		iov_iter_revert(iter, iov_count - iov_iter_count(iter));
 		ret = io_setup_async_rw(req, iovec, inline_vecs, iter, false);
 		if (!ret)
 			return -EAGAIN;
@@ -4566,6 +4568,7 @@ struct io_poll_table {
 static int __io_async_wake(struct io_kiocb *req, struct io_poll_iocb *poll,
 			   __poll_t mask, task_work_func_t func)
 {
+	bool twa_signal_ok;
 	int ret;
 
 	/* for instances that support it check for an event match first: */
@@ -4580,13 +4583,21 @@ static int __io_async_wake(struct io_kiocb *req, struct io_poll_iocb *poll,
 	init_task_work(&req->task_work, func);
 	percpu_ref_get(&req->ctx->refs);
 
+	/*
+	 * If we using the signalfd wait_queue_head for this wakeup, then
+	 * it's not safe to use TWA_SIGNAL as we could be recursing on the
+	 * tsk->sighand->siglock on doing the wakeup. Should not be needed
+	 * either, as the normal wakeup will suffice.
+	 */
+	twa_signal_ok = (poll->head != &req->task->sighand->signalfd_wqh);
+
 	/*
 	 * If this fails, then the task is exiting. When a task exits, the
 	 * work gets canceled, so just cancel this request as well instead
 	 * of executing it. We can't safely execute it anyway, as we may not
 	 * have the needed state needed for it anyway.
 	 */
-	ret = io_req_task_work_add(req, &req->task_work);
+	ret = io_req_task_work_add(req, &req->task_work, twa_signal_ok);
 	if (unlikely(ret)) {
 		struct task_struct *tsk;
 
@@ -4875,12 +4886,20 @@ static bool io_arm_poll_handler(struct io_kiocb *req)
 	struct async_poll *apoll;
 	struct io_poll_table ipt;
 	__poll_t mask, ret;
+	int rw;
 
 	if (!req->file || !file_can_poll(req->file))
 		return false;
 	if (req->flags & REQ_F_POLLED)
 		return false;
-	if (!def->pollin && !def->pollout)
+	if (def->pollin)
+		rw = READ;
+	else if (def->pollout)
+		rw = WRITE;
+	else
+		return false;
+	/* if we can't nonblock try, then no point in arming a poll handler */
+	if (!io_file_supports_async(req->file, rw))
 		return false;
 
 	apoll = kmalloc(sizeof(*apoll), GFP_ATOMIC);
@@ -7433,9 +7452,6 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx,
 {
 	int ret;
 
-	mmgrab(current->mm);
-	ctx->sqo_mm = current->mm;
-
 	if (ctx->flags & IORING_SETUP_SQPOLL) {
 		ret = -EPERM;
 		if (!capable(CAP_SYS_ADMIN))
@@ -7480,10 +7496,6 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx,
 	return 0;
 err:
 	io_finish_async(ctx);
-	if (ctx->sqo_mm) {
-		mmdrop(ctx->sqo_mm);
-		ctx->sqo_mm = NULL;
-	}
 	return ret;
 }
 
@@ -8533,6 +8545,9 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p,
 	ctx->user = user;
 	ctx->creds = get_current_cred();
 
+	mmgrab(current->mm);
+	ctx->sqo_mm = current->mm;
+
 	/*
 	 * Account memory _before_ installing the file descriptor. Once
 	 * the descriptor is installed, it can get closed at any time. Also