mirror of
https://github.com/torvalds/linux.git
synced 2026-03-08 01:24:47 +01:00
for-7.0/io_uring-20260206
-----BEGIN PGP SIGNATURE----- iQJEBAABCAAuFiEEwPw5LcreJtl1+l5K99NY+ylx4KYFAmmGJxsQHGF4Ym9lQGtl cm5lbC5kawAKCRD301j7KXHgpk6+EACamMdw6WU4VVNjUtjT93FuXxor4ioyhowJ myRtKG3ZvYrE63Z8F1dCQE28RXi9n6MhGxabCq8WZVGkhTv27DuaBkDjU4T8oCnP EYhs5a3sdRXfKuIlqVbxuiFdmiPHEP0vh3/MviKx9Ju3/Po3OEWKBalNMevfGkS4 bRNp9IQkAYNSRhGma2ni9Rnc5welWmhpsxUKFdGtPRX53ZlYegiZxKlfKMB4/SQ+ 7XAWKhy9dOGVo4DpLof7mCX6hMeX+FoNkJzF6cTMO/IF//lCLjI9BN4SMiI6mmEN RY6PLJiFraoQx8wdr3J1LtBCNXzzj6cPk6PNHKtsodoafe2oYFNLNgfAa9pHDzfM 12kvy58au0cQG6TnS2eNlqM2GN116mJi+k00E+UW4iaXXtpqcdcBrLlS+Q5hJ78C 9MBLQofv7D06C6kbpxV2pVS1u4oxefjl19wWLqLKx/VytCHrsaTm50n1r0k7YLCc plvPkQRQobqpp2GtcaXcfmsi1Vfu4jzMBAN+rTN4/te0kudNqL9+hPvrejIMEURc 2AcktMAHC8wjpr93dFASXiWh/fdyhV4e2a/D/ML4PXxhnCfnGx5s5Tp/pGjePHEU dLZm9vadmr/Yrdgycf9gQ8mz9IxI9FNJCKbI7lf7+/KJXe7DwngOa6VHNblWBRHv YoX6bG1yQQ== =Q248 -----END PGP SIGNATURE----- Merge tag 'for-7.0/io_uring-20260206' of git://git.kernel.org/pub/scm/linux/kernel/git/axboe/linux Pull io_uring updates from Jens Axboe: - Clean up the IORING_SETUP_R_DISABLED and submitter task checking, mostly just in preparation for relaxing the locking for SINGLE_ISSUER in the future. - Improve IOPOLL by using a doubly linked list to manage completions. Previously it was singly listed, which meant that to complete request N in the chain 0..N-1 had to have completed first. With a doubly linked list we can complete whatever request completes in that order, rather than need to wait for a consecutive range to be available. This reduces latencies. - Improve the restriction setup and checking. Mostly in preparation for adding further features on top of that. Coming in a separate pull request. - Split out task_work and wait handling into separate files. These are mostly nicely abstracted already, but still remained in the io_uring.c file which is on the larger side. - Use GFP_KERNEL_ACCOUNT in a few more spots, where appropriate. - Ensure even the idle io-wq worker exits if a task no longer has any rings open. - Add support for a non-circular submission queue. By default, the SQ ring keeps moving around, even if only a few entries are used for each submission. This can be wasteful in terms of cachelines. If IORING_SETUP_SQ_REWIND is set for the ring when created, each submission will start at offset 0 instead of where we last left off doing submissions. - Various little cleanups * tag 'for-7.0/io_uring-20260206' of git://git.kernel.org/pub/scm/linux/kernel/git/axboe/linux: (30 commits) io_uring/kbuf: fix memory leak if io_buffer_add_list fails io_uring: Add SPDX id lines to remaining source files io_uring: allow io-wq workers to exit when unused io_uring/io-wq: add exit-on-idle state io_uring/net: don't continue send bundle if poll was required for retry io_uring/rsrc: use GFP_KERNEL_ACCOUNT consistently io_uring/futex: use GFP_KERNEL_ACCOUNT for futex data allocation io_uring/io-wq: handle !sysctl_hung_task_timeout_secs io_uring: fix bad indentation for setup flags if statement io_uring/rsrc: take unsigned index in io_rsrc_node_lookup() io_uring: introduce non-circular SQ io_uring: split out CQ waiting code into wait.c io_uring: split out task work code into tw.c io_uring/io-wq: don't trigger hung task for syzbot craziness io_uring: add IO_URING_EXIT_WAIT_MAX definition io_uring/sync: validate passed in offset io_uring/eventfd: remove unused ctx->evfd_last_cq_tail member io_uring/timeout: annotate data race in io_flush_timeouts() io_uring/uring_cmd: explicitly disallow cancelations for IOPOLL io_uring: fix IOPOLL with passthrough I/O ...
This commit is contained in:
commit
f5d4feed17
35 changed files with 1074 additions and 915 deletions
|
|
@ -8,12 +8,14 @@ endif
|
|||
|
||||
obj-$(CONFIG_IO_URING) += io_uring.o opdef.o kbuf.o rsrc.o notif.o \
|
||||
tctx.o filetable.o rw.o poll.o \
|
||||
eventfd.o uring_cmd.o openclose.o \
|
||||
sqpoll.o xattr.o nop.o fs.o splice.o \
|
||||
sync.o msg_ring.o advise.o openclose.o \
|
||||
statx.o timeout.o cancel.o \
|
||||
waitid.o register.o truncate.o \
|
||||
memmap.o alloc_cache.o query.o
|
||||
tw.o wait.o eventfd.o uring_cmd.o \
|
||||
openclose.o sqpoll.o xattr.o nop.o \
|
||||
fs.o splice.o sync.o msg_ring.o \
|
||||
advise.o openclose.o statx.o timeout.o \
|
||||
cancel.o waitid.o register.o \
|
||||
truncate.o memmap.o alloc_cache.o \
|
||||
query.o
|
||||
|
||||
obj-$(CONFIG_IO_URING_ZCRX) += zcrx.o
|
||||
obj-$(CONFIG_IO_WQ) += io-wq.o
|
||||
obj-$(CONFIG_FUTEX) += futex.o
|
||||
|
|
|
|||
|
|
@ -1,7 +1,9 @@
|
|||
/* SPDX-License-Identifier: GPL-2.0 */
|
||||
#ifndef IOU_ALLOC_CACHE_H
|
||||
#define IOU_ALLOC_CACHE_H
|
||||
|
||||
#include <linux/io_uring_types.h>
|
||||
#include <linux/kasan.h>
|
||||
|
||||
/*
|
||||
* Don't allow the cache to grow beyond this size.
|
||||
|
|
|
|||
|
|
@ -2,10 +2,8 @@
|
|||
#include <linux/kernel.h>
|
||||
#include <linux/errno.h>
|
||||
#include <linux/fs.h>
|
||||
#include <linux/file.h>
|
||||
#include <linux/mm.h>
|
||||
#include <linux/slab.h>
|
||||
#include <linux/namei.h>
|
||||
#include <linux/nospec.h>
|
||||
#include <linux/io_uring.h>
|
||||
|
||||
|
|
@ -21,6 +19,7 @@
|
|||
#include "waitid.h"
|
||||
#include "futex.h"
|
||||
#include "cancel.h"
|
||||
#include "wait.h"
|
||||
|
||||
struct io_cancel {
|
||||
struct file *file;
|
||||
|
|
@ -539,7 +538,7 @@ __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
|
|||
/* SQPOLL thread does its own polling */
|
||||
if ((!(ctx->flags & IORING_SETUP_SQPOLL) && cancel_all) ||
|
||||
is_sqpoll_thread) {
|
||||
while (!wq_list_empty(&ctx->iopoll_list)) {
|
||||
while (!list_empty(&ctx->iopoll_list)) {
|
||||
io_iopoll_try_reap_events(ctx);
|
||||
ret = true;
|
||||
cond_resched();
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
// SPDX-License-Identifier: GPL-2.0
|
||||
#include <asm/ioctls.h>
|
||||
#include <linux/io_uring/net.h>
|
||||
#include <linux/errqueue.h>
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
/* SPDX-License-Identifier: GPL-2.0 */
|
||||
|
||||
struct io_ring_ctx;
|
||||
int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg,
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@
|
|||
#ifndef IOU_FILE_TABLE_H
|
||||
#define IOU_FILE_TABLE_H
|
||||
|
||||
#include <linux/file.h>
|
||||
#include <linux/io_uring_types.h>
|
||||
#include "rsrc.h"
|
||||
|
||||
|
|
|
|||
|
|
@ -186,7 +186,7 @@ int io_futexv_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
|
|||
return -EINVAL;
|
||||
|
||||
ifd = kzalloc(struct_size_t(struct io_futexv_data, futexv, iof->futex_nr),
|
||||
GFP_KERNEL);
|
||||
GFP_KERNEL_ACCOUNT);
|
||||
if (!ifd)
|
||||
return -ENOMEM;
|
||||
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
#include <linux/task_work.h>
|
||||
#include <linux/audit.h>
|
||||
#include <linux/mmu_context.h>
|
||||
#include <linux/sched/sysctl.h>
|
||||
#include <uapi/linux/io_uring.h>
|
||||
|
||||
#include "io-wq.h"
|
||||
|
|
@ -34,6 +35,7 @@ enum {
|
|||
|
||||
enum {
|
||||
IO_WQ_BIT_EXIT = 0, /* wq exiting */
|
||||
IO_WQ_BIT_EXIT_ON_IDLE = 1, /* allow all workers to exit on idle */
|
||||
};
|
||||
|
||||
enum {
|
||||
|
|
@ -706,9 +708,13 @@ static int io_wq_worker(void *data)
|
|||
raw_spin_lock(&acct->workers_lock);
|
||||
/*
|
||||
* Last sleep timed out. Exit if we're not the last worker,
|
||||
* or if someone modified our affinity.
|
||||
* or if someone modified our affinity. If wq is marked
|
||||
* idle-exit, drop the worker as well. This is used to avoid
|
||||
* keeping io-wq workers around for tasks that no longer have
|
||||
* any active io_uring instances.
|
||||
*/
|
||||
if (last_timeout && (exit_mask || acct->nr_workers > 1)) {
|
||||
if ((last_timeout && (exit_mask || acct->nr_workers > 1)) ||
|
||||
test_bit(IO_WQ_BIT_EXIT_ON_IDLE, &wq->state)) {
|
||||
acct->nr_workers--;
|
||||
raw_spin_unlock(&acct->workers_lock);
|
||||
__set_current_state(TASK_RUNNING);
|
||||
|
|
@ -963,6 +969,24 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data)
|
|||
return false;
|
||||
}
|
||||
|
||||
void io_wq_set_exit_on_idle(struct io_wq *wq, bool enable)
|
||||
{
|
||||
if (!wq->task)
|
||||
return;
|
||||
|
||||
if (!enable) {
|
||||
clear_bit(IO_WQ_BIT_EXIT_ON_IDLE, &wq->state);
|
||||
return;
|
||||
}
|
||||
|
||||
if (test_and_set_bit(IO_WQ_BIT_EXIT_ON_IDLE, &wq->state))
|
||||
return;
|
||||
|
||||
rcu_read_lock();
|
||||
io_wq_for_each_worker(wq, io_wq_worker_wake, NULL);
|
||||
rcu_read_unlock();
|
||||
}
|
||||
|
||||
static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq)
|
||||
{
|
||||
do {
|
||||
|
|
@ -1313,6 +1337,8 @@ static void io_wq_cancel_tw_create(struct io_wq *wq)
|
|||
|
||||
static void io_wq_exit_workers(struct io_wq *wq)
|
||||
{
|
||||
unsigned long timeout, warn_timeout;
|
||||
|
||||
if (!wq->task)
|
||||
return;
|
||||
|
||||
|
|
@ -1322,7 +1348,26 @@ static void io_wq_exit_workers(struct io_wq *wq)
|
|||
io_wq_for_each_worker(wq, io_wq_worker_wake, NULL);
|
||||
rcu_read_unlock();
|
||||
io_worker_ref_put(wq);
|
||||
wait_for_completion(&wq->worker_done);
|
||||
|
||||
/*
|
||||
* Shut up hung task complaint, see for example
|
||||
*
|
||||
* https://lore.kernel.org/all/696fc9e7.a70a0220.111c58.0006.GAE@google.com/
|
||||
*
|
||||
* where completely overloading the system with tons of long running
|
||||
* io-wq items can easily trigger the hung task timeout. Only sleep
|
||||
* uninterruptibly for half that time, and warn if we exceeded end
|
||||
* up waiting more than IO_URING_EXIT_WAIT_MAX.
|
||||
*/
|
||||
timeout = sysctl_hung_task_timeout_secs * HZ / 2;
|
||||
if (!timeout)
|
||||
timeout = MAX_SCHEDULE_TIMEOUT;
|
||||
warn_timeout = jiffies + IO_URING_EXIT_WAIT_MAX;
|
||||
do {
|
||||
if (wait_for_completion_timeout(&wq->worker_done, timeout))
|
||||
break;
|
||||
WARN_ON_ONCE(time_after(jiffies, warn_timeout));
|
||||
} while (1);
|
||||
|
||||
spin_lock_irq(&wq->hash->wait.lock);
|
||||
list_del_init(&wq->wait.entry);
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
/* SPDX-License-Identifier: GPL-2.0 */
|
||||
#ifndef INTERNAL_IO_WQ_H
|
||||
#define INTERNAL_IO_WQ_H
|
||||
|
||||
|
|
@ -41,6 +42,7 @@ struct io_wq_data {
|
|||
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data);
|
||||
void io_wq_exit_start(struct io_wq *wq);
|
||||
void io_wq_put_and_exit(struct io_wq *wq);
|
||||
void io_wq_set_exit_on_idle(struct io_wq *wq, bool enable);
|
||||
|
||||
void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work);
|
||||
void io_wq_hash_work(struct io_wq_work *work, void *val);
|
||||
|
|
|
|||
|
|
@ -40,37 +40,25 @@
|
|||
* Copyright (c) 2018-2019 Christoph Hellwig
|
||||
*/
|
||||
#include <linux/kernel.h>
|
||||
#include <linux/init.h>
|
||||
#include <linux/errno.h>
|
||||
#include <linux/syscalls.h>
|
||||
#include <net/compat.h>
|
||||
#include <linux/refcount.h>
|
||||
#include <linux/uio.h>
|
||||
#include <linux/bits.h>
|
||||
|
||||
#include <linux/sched/signal.h>
|
||||
#include <linux/fs.h>
|
||||
#include <linux/file.h>
|
||||
#include <linux/mm.h>
|
||||
#include <linux/mman.h>
|
||||
#include <linux/percpu.h>
|
||||
#include <linux/slab.h>
|
||||
#include <linux/bvec.h>
|
||||
#include <linux/net.h>
|
||||
#include <net/sock.h>
|
||||
#include <linux/anon_inodes.h>
|
||||
#include <linux/sched/mm.h>
|
||||
#include <linux/uaccess.h>
|
||||
#include <linux/nospec.h>
|
||||
#include <linux/fsnotify.h>
|
||||
#include <linux/fadvise.h>
|
||||
#include <linux/task_work.h>
|
||||
#include <linux/io_uring.h>
|
||||
#include <linux/io_uring/cmd.h>
|
||||
#include <linux/audit.h>
|
||||
#include <linux/security.h>
|
||||
#include <linux/jump_label.h>
|
||||
#include <asm/shmparam.h>
|
||||
|
||||
#define CREATE_TRACE_POINTS
|
||||
#include <trace/events/io_uring.h>
|
||||
|
|
@ -105,6 +93,7 @@
|
|||
#include "rw.h"
|
||||
#include "alloc_cache.h"
|
||||
#include "eventfd.h"
|
||||
#include "wait.h"
|
||||
|
||||
#define SQE_COMMON_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_LINK | \
|
||||
IOSQE_IO_HARDLINK | IOSQE_ASYNC)
|
||||
|
|
@ -122,19 +111,10 @@
|
|||
|
||||
#define IO_COMPL_BATCH 32
|
||||
#define IO_REQ_ALLOC_BATCH 8
|
||||
#define IO_LOCAL_TW_DEFAULT_MAX 20
|
||||
|
||||
/* requests with any of those set should undergo io_disarm_next() */
|
||||
#define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL)
|
||||
|
||||
/*
|
||||
* No waiters. It's larger than any valid value of the tw counter
|
||||
* so that tests against ->cq_wait_nr would fail and skip wake_up().
|
||||
*/
|
||||
#define IO_CQ_WAKE_INIT (-1U)
|
||||
/* Forced wake up if there is a waiter regardless of ->cq_wait_nr */
|
||||
#define IO_CQ_WAKE_FORCE (IO_CQ_WAKE_INIT >> 1)
|
||||
|
||||
static void io_queue_sqe(struct io_kiocb *req, unsigned int extra_flags);
|
||||
static void __io_req_caches_free(struct io_ring_ctx *ctx);
|
||||
|
||||
|
|
@ -187,16 +167,6 @@ static void io_poison_req(struct io_kiocb *req)
|
|||
req->link = IO_URING_PTR_POISON;
|
||||
}
|
||||
|
||||
static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx)
|
||||
{
|
||||
return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head);
|
||||
}
|
||||
|
||||
static inline unsigned int __io_cqring_events_user(struct io_ring_ctx *ctx)
|
||||
{
|
||||
return READ_ONCE(ctx->rings->cq.tail) - READ_ONCE(ctx->rings->cq.head);
|
||||
}
|
||||
|
||||
static inline void req_fail_link_node(struct io_kiocb *req, int res)
|
||||
{
|
||||
req_set_fail(req);
|
||||
|
|
@ -217,38 +187,6 @@ static __cold void io_ring_ctx_ref_free(struct percpu_ref *ref)
|
|||
complete(&ctx->ref_comp);
|
||||
}
|
||||
|
||||
/*
|
||||
* Terminate the request if either of these conditions are true:
|
||||
*
|
||||
* 1) It's being executed by the original task, but that task is marked
|
||||
* with PF_EXITING as it's exiting.
|
||||
* 2) PF_KTHREAD is set, in which case the invoker of the task_work is
|
||||
* our fallback task_work.
|
||||
* 3) The ring has been closed and is going away.
|
||||
*/
|
||||
static inline bool io_should_terminate_tw(struct io_ring_ctx *ctx)
|
||||
{
|
||||
return (current->flags & (PF_EXITING | PF_KTHREAD)) || percpu_ref_is_dying(&ctx->refs);
|
||||
}
|
||||
|
||||
static __cold void io_fallback_req_func(struct work_struct *work)
|
||||
{
|
||||
struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
|
||||
fallback_work.work);
|
||||
struct llist_node *node = llist_del_all(&ctx->fallback_llist);
|
||||
struct io_kiocb *req, *tmp;
|
||||
struct io_tw_state ts = {};
|
||||
|
||||
percpu_ref_get(&ctx->refs);
|
||||
mutex_lock(&ctx->uring_lock);
|
||||
ts.cancel = io_should_terminate_tw(ctx);
|
||||
llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
|
||||
req->io_task_work.func((struct io_tw_req){req}, ts);
|
||||
io_submit_flush_completions(ctx);
|
||||
mutex_unlock(&ctx->uring_lock);
|
||||
percpu_ref_put(&ctx->refs);
|
||||
}
|
||||
|
||||
static int io_alloc_hash_table(struct io_hash_table *table, unsigned bits)
|
||||
{
|
||||
unsigned int hash_buckets;
|
||||
|
|
@ -334,7 +272,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
|
|||
init_waitqueue_head(&ctx->poll_wq);
|
||||
spin_lock_init(&ctx->completion_lock);
|
||||
raw_spin_lock_init(&ctx->timeout_lock);
|
||||
INIT_WQ_LIST(&ctx->iopoll_list);
|
||||
INIT_LIST_HEAD(&ctx->iopoll_list);
|
||||
INIT_LIST_HEAD(&ctx->defer_list);
|
||||
INIT_LIST_HEAD(&ctx->timeout_list);
|
||||
INIT_LIST_HEAD(&ctx->ltimeout_list);
|
||||
|
|
@ -643,7 +581,7 @@ static void io_cqring_overflow_kill(struct io_ring_ctx *ctx)
|
|||
__io_cqring_overflow_flush(ctx, true);
|
||||
}
|
||||
|
||||
static void io_cqring_do_overflow_flush(struct io_ring_ctx *ctx)
|
||||
void io_cqring_do_overflow_flush(struct io_ring_ctx *ctx)
|
||||
{
|
||||
mutex_lock(&ctx->uring_lock);
|
||||
__io_cqring_overflow_flush(ctx, false);
|
||||
|
|
@ -1083,336 +1021,6 @@ static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
|
|||
return nxt;
|
||||
}
|
||||
|
||||
static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw)
|
||||
{
|
||||
if (!ctx)
|
||||
return;
|
||||
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
|
||||
atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
|
||||
|
||||
io_submit_flush_completions(ctx);
|
||||
mutex_unlock(&ctx->uring_lock);
|
||||
percpu_ref_put(&ctx->refs);
|
||||
}
|
||||
|
||||
/*
|
||||
* Run queued task_work, returning the number of entries processed in *count.
|
||||
* If more entries than max_entries are available, stop processing once this
|
||||
* is reached and return the rest of the list.
|
||||
*/
|
||||
struct llist_node *io_handle_tw_list(struct llist_node *node,
|
||||
unsigned int *count,
|
||||
unsigned int max_entries)
|
||||
{
|
||||
struct io_ring_ctx *ctx = NULL;
|
||||
struct io_tw_state ts = { };
|
||||
|
||||
do {
|
||||
struct llist_node *next = node->next;
|
||||
struct io_kiocb *req = container_of(node, struct io_kiocb,
|
||||
io_task_work.node);
|
||||
|
||||
if (req->ctx != ctx) {
|
||||
ctx_flush_and_put(ctx, ts);
|
||||
ctx = req->ctx;
|
||||
mutex_lock(&ctx->uring_lock);
|
||||
percpu_ref_get(&ctx->refs);
|
||||
ts.cancel = io_should_terminate_tw(ctx);
|
||||
}
|
||||
INDIRECT_CALL_2(req->io_task_work.func,
|
||||
io_poll_task_func, io_req_rw_complete,
|
||||
(struct io_tw_req){req}, ts);
|
||||
node = next;
|
||||
(*count)++;
|
||||
if (unlikely(need_resched())) {
|
||||
ctx_flush_and_put(ctx, ts);
|
||||
ctx = NULL;
|
||||
cond_resched();
|
||||
}
|
||||
} while (node && *count < max_entries);
|
||||
|
||||
ctx_flush_and_put(ctx, ts);
|
||||
return node;
|
||||
}
|
||||
|
||||
static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
|
||||
{
|
||||
struct io_ring_ctx *last_ctx = NULL;
|
||||
struct io_kiocb *req;
|
||||
|
||||
while (node) {
|
||||
req = container_of(node, struct io_kiocb, io_task_work.node);
|
||||
node = node->next;
|
||||
if (last_ctx != req->ctx) {
|
||||
if (last_ctx) {
|
||||
if (sync)
|
||||
flush_delayed_work(&last_ctx->fallback_work);
|
||||
percpu_ref_put(&last_ctx->refs);
|
||||
}
|
||||
last_ctx = req->ctx;
|
||||
percpu_ref_get(&last_ctx->refs);
|
||||
}
|
||||
if (llist_add(&req->io_task_work.node, &last_ctx->fallback_llist))
|
||||
schedule_delayed_work(&last_ctx->fallback_work, 1);
|
||||
}
|
||||
|
||||
if (last_ctx) {
|
||||
if (sync)
|
||||
flush_delayed_work(&last_ctx->fallback_work);
|
||||
percpu_ref_put(&last_ctx->refs);
|
||||
}
|
||||
}
|
||||
|
||||
static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
|
||||
{
|
||||
struct llist_node *node = llist_del_all(&tctx->task_list);
|
||||
|
||||
__io_fallback_tw(node, sync);
|
||||
}
|
||||
|
||||
struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
|
||||
unsigned int max_entries,
|
||||
unsigned int *count)
|
||||
{
|
||||
struct llist_node *node;
|
||||
|
||||
node = llist_del_all(&tctx->task_list);
|
||||
if (node) {
|
||||
node = llist_reverse_order(node);
|
||||
node = io_handle_tw_list(node, count, max_entries);
|
||||
}
|
||||
|
||||
/* relaxed read is enough as only the task itself sets ->in_cancel */
|
||||
if (unlikely(atomic_read(&tctx->in_cancel)))
|
||||
io_uring_drop_tctx_refs(current);
|
||||
|
||||
trace_io_uring_task_work_run(tctx, *count);
|
||||
return node;
|
||||
}
|
||||
|
||||
void tctx_task_work(struct callback_head *cb)
|
||||
{
|
||||
struct io_uring_task *tctx;
|
||||
struct llist_node *ret;
|
||||
unsigned int count = 0;
|
||||
|
||||
tctx = container_of(cb, struct io_uring_task, task_work);
|
||||
ret = tctx_task_work_run(tctx, UINT_MAX, &count);
|
||||
/* can't happen */
|
||||
WARN_ON_ONCE(ret);
|
||||
}
|
||||
|
||||
static void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
|
||||
{
|
||||
struct io_ring_ctx *ctx = req->ctx;
|
||||
unsigned nr_wait, nr_tw, nr_tw_prev;
|
||||
struct llist_node *head;
|
||||
|
||||
/* See comment above IO_CQ_WAKE_INIT */
|
||||
BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
|
||||
|
||||
/*
|
||||
* We don't know how many requests there are in the link and whether
|
||||
* they can even be queued lazily, fall back to non-lazy.
|
||||
*/
|
||||
if (req->flags & IO_REQ_LINK_FLAGS)
|
||||
flags &= ~IOU_F_TWQ_LAZY_WAKE;
|
||||
|
||||
guard(rcu)();
|
||||
|
||||
head = READ_ONCE(ctx->work_llist.first);
|
||||
do {
|
||||
nr_tw_prev = 0;
|
||||
if (head) {
|
||||
struct io_kiocb *first_req = container_of(head,
|
||||
struct io_kiocb,
|
||||
io_task_work.node);
|
||||
/*
|
||||
* Might be executed at any moment, rely on
|
||||
* SLAB_TYPESAFE_BY_RCU to keep it alive.
|
||||
*/
|
||||
nr_tw_prev = READ_ONCE(first_req->nr_tw);
|
||||
}
|
||||
|
||||
/*
|
||||
* Theoretically, it can overflow, but that's fine as one of
|
||||
* previous adds should've tried to wake the task.
|
||||
*/
|
||||
nr_tw = nr_tw_prev + 1;
|
||||
if (!(flags & IOU_F_TWQ_LAZY_WAKE))
|
||||
nr_tw = IO_CQ_WAKE_FORCE;
|
||||
|
||||
req->nr_tw = nr_tw;
|
||||
req->io_task_work.node.next = head;
|
||||
} while (!try_cmpxchg(&ctx->work_llist.first, &head,
|
||||
&req->io_task_work.node));
|
||||
|
||||
/*
|
||||
* cmpxchg implies a full barrier, which pairs with the barrier
|
||||
* in set_current_state() on the io_cqring_wait() side. It's used
|
||||
* to ensure that either we see updated ->cq_wait_nr, or waiters
|
||||
* going to sleep will observe the work added to the list, which
|
||||
* is similar to the wait/wawke task state sync.
|
||||
*/
|
||||
|
||||
if (!head) {
|
||||
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
|
||||
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
|
||||
if (ctx->has_evfd)
|
||||
io_eventfd_signal(ctx, false);
|
||||
}
|
||||
|
||||
nr_wait = atomic_read(&ctx->cq_wait_nr);
|
||||
/* not enough or no one is waiting */
|
||||
if (nr_tw < nr_wait)
|
||||
return;
|
||||
/* the previous add has already woken it up */
|
||||
if (nr_tw_prev >= nr_wait)
|
||||
return;
|
||||
wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
|
||||
}
|
||||
|
||||
static void io_req_normal_work_add(struct io_kiocb *req)
|
||||
{
|
||||
struct io_uring_task *tctx = req->tctx;
|
||||
struct io_ring_ctx *ctx = req->ctx;
|
||||
|
||||
/* task_work already pending, we're done */
|
||||
if (!llist_add(&req->io_task_work.node, &tctx->task_list))
|
||||
return;
|
||||
|
||||
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
|
||||
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
|
||||
|
||||
/* SQPOLL doesn't need the task_work added, it'll run it itself */
|
||||
if (ctx->flags & IORING_SETUP_SQPOLL) {
|
||||
__set_notify_signal(tctx->task);
|
||||
return;
|
||||
}
|
||||
|
||||
if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method)))
|
||||
return;
|
||||
|
||||
io_fallback_tw(tctx, false);
|
||||
}
|
||||
|
||||
void __io_req_task_work_add(struct io_kiocb *req, unsigned flags)
|
||||
{
|
||||
if (req->ctx->flags & IORING_SETUP_DEFER_TASKRUN)
|
||||
io_req_local_work_add(req, flags);
|
||||
else
|
||||
io_req_normal_work_add(req);
|
||||
}
|
||||
|
||||
void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags)
|
||||
{
|
||||
if (WARN_ON_ONCE(!(req->ctx->flags & IORING_SETUP_DEFER_TASKRUN)))
|
||||
return;
|
||||
__io_req_task_work_add(req, flags);
|
||||
}
|
||||
|
||||
static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
|
||||
{
|
||||
struct llist_node *node = llist_del_all(&ctx->work_llist);
|
||||
|
||||
__io_fallback_tw(node, false);
|
||||
node = llist_del_all(&ctx->retry_llist);
|
||||
__io_fallback_tw(node, false);
|
||||
}
|
||||
|
||||
static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
|
||||
int min_events)
|
||||
{
|
||||
if (!io_local_work_pending(ctx))
|
||||
return false;
|
||||
if (events < min_events)
|
||||
return true;
|
||||
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
|
||||
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
|
||||
return false;
|
||||
}
|
||||
|
||||
static int __io_run_local_work_loop(struct llist_node **node,
|
||||
io_tw_token_t tw,
|
||||
int events)
|
||||
{
|
||||
int ret = 0;
|
||||
|
||||
while (*node) {
|
||||
struct llist_node *next = (*node)->next;
|
||||
struct io_kiocb *req = container_of(*node, struct io_kiocb,
|
||||
io_task_work.node);
|
||||
INDIRECT_CALL_2(req->io_task_work.func,
|
||||
io_poll_task_func, io_req_rw_complete,
|
||||
(struct io_tw_req){req}, tw);
|
||||
*node = next;
|
||||
if (++ret >= events)
|
||||
break;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw,
|
||||
int min_events, int max_events)
|
||||
{
|
||||
struct llist_node *node;
|
||||
unsigned int loops = 0;
|
||||
int ret = 0;
|
||||
|
||||
if (WARN_ON_ONCE(ctx->submitter_task != current))
|
||||
return -EEXIST;
|
||||
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
|
||||
atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
|
||||
again:
|
||||
tw.cancel = io_should_terminate_tw(ctx);
|
||||
min_events -= ret;
|
||||
ret = __io_run_local_work_loop(&ctx->retry_llist.first, tw, max_events);
|
||||
if (ctx->retry_llist.first)
|
||||
goto retry_done;
|
||||
|
||||
/*
|
||||
* llists are in reverse order, flip it back the right way before
|
||||
* running the pending items.
|
||||
*/
|
||||
node = llist_reverse_order(llist_del_all(&ctx->work_llist));
|
||||
ret += __io_run_local_work_loop(&node, tw, max_events - ret);
|
||||
ctx->retry_llist.first = node;
|
||||
loops++;
|
||||
|
||||
if (io_run_local_work_continue(ctx, ret, min_events))
|
||||
goto again;
|
||||
retry_done:
|
||||
io_submit_flush_completions(ctx);
|
||||
if (io_run_local_work_continue(ctx, ret, min_events))
|
||||
goto again;
|
||||
|
||||
trace_io_uring_local_work_run(ctx, ret, loops);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static inline int io_run_local_work_locked(struct io_ring_ctx *ctx,
|
||||
int min_events)
|
||||
{
|
||||
struct io_tw_state ts = {};
|
||||
|
||||
if (!io_local_work_pending(ctx))
|
||||
return 0;
|
||||
return __io_run_local_work(ctx, ts, min_events,
|
||||
max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
|
||||
}
|
||||
|
||||
int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events)
|
||||
{
|
||||
struct io_tw_state ts = {};
|
||||
int ret;
|
||||
|
||||
mutex_lock(&ctx->uring_lock);
|
||||
ret = __io_run_local_work(ctx, ts, min_events, max_events);
|
||||
mutex_unlock(&ctx->uring_lock);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void io_req_task_cancel(struct io_tw_req tw_req, io_tw_token_t tw)
|
||||
{
|
||||
struct io_kiocb *req = tw_req.req;
|
||||
|
|
@ -1545,13 +1153,6 @@ void __io_submit_flush_completions(struct io_ring_ctx *ctx)
|
|||
ctx->submit_state.cq_flush = false;
|
||||
}
|
||||
|
||||
static unsigned io_cqring_events(struct io_ring_ctx *ctx)
|
||||
{
|
||||
/* See comment at the top of this file */
|
||||
smp_rmb();
|
||||
return __io_cqring_events(ctx);
|
||||
}
|
||||
|
||||
/*
|
||||
* We can't just wait for polled events to come to us, we have to actively
|
||||
* find and complete them.
|
||||
|
|
@ -1562,7 +1163,7 @@ __cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx)
|
|||
return;
|
||||
|
||||
mutex_lock(&ctx->uring_lock);
|
||||
while (!wq_list_empty(&ctx->iopoll_list)) {
|
||||
while (!list_empty(&ctx->iopoll_list)) {
|
||||
/* let it sleep and repeat later if can't complete a request */
|
||||
if (io_do_iopoll(ctx, true) == 0)
|
||||
break;
|
||||
|
|
@ -1627,21 +1228,18 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned int min_events)
|
|||
* forever, while the workqueue is stuck trying to acquire the
|
||||
* very same mutex.
|
||||
*/
|
||||
if (wq_list_empty(&ctx->iopoll_list) ||
|
||||
io_task_work_pending(ctx)) {
|
||||
if (list_empty(&ctx->iopoll_list) || io_task_work_pending(ctx)) {
|
||||
u32 tail = ctx->cached_cq_tail;
|
||||
|
||||
(void) io_run_local_work_locked(ctx, min_events);
|
||||
|
||||
if (task_work_pending(current) ||
|
||||
wq_list_empty(&ctx->iopoll_list)) {
|
||||
if (task_work_pending(current) || list_empty(&ctx->iopoll_list)) {
|
||||
mutex_unlock(&ctx->uring_lock);
|
||||
io_run_task_work();
|
||||
mutex_lock(&ctx->uring_lock);
|
||||
}
|
||||
/* some requests don't go through iopoll_list */
|
||||
if (tail != ctx->cached_cq_tail ||
|
||||
wq_list_empty(&ctx->iopoll_list))
|
||||
if (tail != ctx->cached_cq_tail || list_empty(&ctx->iopoll_list))
|
||||
break;
|
||||
}
|
||||
ret = io_do_iopoll(ctx, !min_events);
|
||||
|
|
@ -1684,25 +1282,17 @@ static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags)
|
|||
* how we do polling eventually, not spinning if we're on potentially
|
||||
* different devices.
|
||||
*/
|
||||
if (wq_list_empty(&ctx->iopoll_list)) {
|
||||
if (list_empty(&ctx->iopoll_list)) {
|
||||
ctx->poll_multi_queue = false;
|
||||
} else if (!ctx->poll_multi_queue) {
|
||||
struct io_kiocb *list_req;
|
||||
|
||||
list_req = container_of(ctx->iopoll_list.first, struct io_kiocb,
|
||||
comp_list);
|
||||
list_req = list_first_entry(&ctx->iopoll_list, struct io_kiocb, iopoll_node);
|
||||
if (list_req->file != req->file)
|
||||
ctx->poll_multi_queue = true;
|
||||
}
|
||||
|
||||
/*
|
||||
* For fast devices, IO may have already completed. If it has, add
|
||||
* it to the front so we find it first.
|
||||
*/
|
||||
if (READ_ONCE(req->iopoll_completed))
|
||||
wq_list_add_head(&req->comp_list, &ctx->iopoll_list);
|
||||
else
|
||||
wq_list_add_tail(&req->comp_list, &ctx->iopoll_list);
|
||||
list_add_tail(&req->iopoll_node, &ctx->iopoll_list);
|
||||
|
||||
if (unlikely(needs_lock)) {
|
||||
/*
|
||||
|
|
@ -2080,6 +1670,8 @@ static inline bool io_check_restriction(struct io_ring_ctx *ctx,
|
|||
struct io_kiocb *req,
|
||||
unsigned int sqe_flags)
|
||||
{
|
||||
if (!ctx->op_restricted)
|
||||
return true;
|
||||
if (!test_bit(req->opcode, ctx->restrictions.sqe_op))
|
||||
return false;
|
||||
|
||||
|
|
@ -2181,8 +1773,8 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
|
|||
io_init_drain(ctx);
|
||||
}
|
||||
}
|
||||
if (unlikely(ctx->restricted || ctx->drain_active || ctx->drain_next)) {
|
||||
if (ctx->restricted && !io_check_restriction(ctx, req, sqe_flags))
|
||||
if (unlikely(ctx->op_restricted || ctx->drain_active || ctx->drain_next)) {
|
||||
if (!io_check_restriction(ctx, req, sqe_flags))
|
||||
return io_init_fail_req(req, -EACCES);
|
||||
/* knock it to the slow queue path, will be drained there */
|
||||
if (ctx->drain_active)
|
||||
|
|
@ -2354,12 +1946,16 @@ static void io_commit_sqring(struct io_ring_ctx *ctx)
|
|||
{
|
||||
struct io_rings *rings = ctx->rings;
|
||||
|
||||
/*
|
||||
* Ensure any loads from the SQEs are done at this point,
|
||||
* since once we write the new head, the application could
|
||||
* write new data to them.
|
||||
*/
|
||||
smp_store_release(&rings->sq.head, ctx->cached_sq_head);
|
||||
if (ctx->flags & IORING_SETUP_SQ_REWIND) {
|
||||
ctx->cached_sq_head = 0;
|
||||
} else {
|
||||
/*
|
||||
* Ensure any loads from the SQEs are done at this point,
|
||||
* since once we write the new head, the application could
|
||||
* write new data to them.
|
||||
*/
|
||||
smp_store_release(&rings->sq.head, ctx->cached_sq_head);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
@ -2405,10 +2001,15 @@ static bool io_get_sqe(struct io_ring_ctx *ctx, const struct io_uring_sqe **sqe)
|
|||
int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
|
||||
__must_hold(&ctx->uring_lock)
|
||||
{
|
||||
unsigned int entries = io_sqring_entries(ctx);
|
||||
unsigned int entries;
|
||||
unsigned int left;
|
||||
int ret;
|
||||
|
||||
if (ctx->flags & IORING_SETUP_SQ_REWIND)
|
||||
entries = ctx->sq_entries;
|
||||
else
|
||||
entries = io_sqring_entries(ctx);
|
||||
|
||||
entries = min(nr, entries);
|
||||
if (unlikely(!entries))
|
||||
return 0;
|
||||
|
|
@ -2453,308 +2054,6 @@ int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
|
|||
return ret;
|
||||
}
|
||||
|
||||
static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
|
||||
int wake_flags, void *key)
|
||||
{
|
||||
struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq);
|
||||
|
||||
/*
|
||||
* Cannot safely flush overflowed CQEs from here, ensure we wake up
|
||||
* the task, and the next invocation will do it.
|
||||
*/
|
||||
if (io_should_wake(iowq) || io_has_work(iowq->ctx))
|
||||
return autoremove_wake_function(curr, mode, wake_flags, key);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int io_run_task_work_sig(struct io_ring_ctx *ctx)
|
||||
{
|
||||
if (io_local_work_pending(ctx)) {
|
||||
__set_current_state(TASK_RUNNING);
|
||||
if (io_run_local_work(ctx, INT_MAX, IO_LOCAL_TW_DEFAULT_MAX) > 0)
|
||||
return 0;
|
||||
}
|
||||
if (io_run_task_work() > 0)
|
||||
return 0;
|
||||
if (task_sigpending(current))
|
||||
return -EINTR;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static bool current_pending_io(void)
|
||||
{
|
||||
struct io_uring_task *tctx = current->io_uring;
|
||||
|
||||
if (!tctx)
|
||||
return false;
|
||||
return percpu_counter_read_positive(&tctx->inflight);
|
||||
}
|
||||
|
||||
static enum hrtimer_restart io_cqring_timer_wakeup(struct hrtimer *timer)
|
||||
{
|
||||
struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t);
|
||||
|
||||
WRITE_ONCE(iowq->hit_timeout, 1);
|
||||
iowq->min_timeout = 0;
|
||||
wake_up_process(iowq->wq.private);
|
||||
return HRTIMER_NORESTART;
|
||||
}
|
||||
|
||||
/*
|
||||
* Doing min_timeout portion. If we saw any timeouts, events, or have work,
|
||||
* wake up. If not, and we have a normal timeout, switch to that and keep
|
||||
* sleeping.
|
||||
*/
|
||||
static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer)
|
||||
{
|
||||
struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t);
|
||||
struct io_ring_ctx *ctx = iowq->ctx;
|
||||
|
||||
/* no general timeout, or shorter (or equal), we are done */
|
||||
if (iowq->timeout == KTIME_MAX ||
|
||||
ktime_compare(iowq->min_timeout, iowq->timeout) >= 0)
|
||||
goto out_wake;
|
||||
/* work we may need to run, wake function will see if we need to wake */
|
||||
if (io_has_work(ctx))
|
||||
goto out_wake;
|
||||
/* got events since we started waiting, min timeout is done */
|
||||
if (iowq->cq_min_tail != READ_ONCE(ctx->rings->cq.tail))
|
||||
goto out_wake;
|
||||
/* if we have any events and min timeout expired, we're done */
|
||||
if (io_cqring_events(ctx))
|
||||
goto out_wake;
|
||||
|
||||
/*
|
||||
* If using deferred task_work running and application is waiting on
|
||||
* more than one request, ensure we reset it now where we are switching
|
||||
* to normal sleeps. Any request completion post min_wait should wake
|
||||
* the task and return.
|
||||
*/
|
||||
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
|
||||
atomic_set(&ctx->cq_wait_nr, 1);
|
||||
smp_mb();
|
||||
if (!llist_empty(&ctx->work_llist))
|
||||
goto out_wake;
|
||||
}
|
||||
|
||||
/* any generated CQE posted past this time should wake us up */
|
||||
iowq->cq_tail = iowq->cq_min_tail;
|
||||
|
||||
hrtimer_update_function(&iowq->t, io_cqring_timer_wakeup);
|
||||
hrtimer_set_expires(timer, iowq->timeout);
|
||||
return HRTIMER_RESTART;
|
||||
out_wake:
|
||||
return io_cqring_timer_wakeup(timer);
|
||||
}
|
||||
|
||||
static int io_cqring_schedule_timeout(struct io_wait_queue *iowq,
|
||||
clockid_t clock_id, ktime_t start_time)
|
||||
{
|
||||
ktime_t timeout;
|
||||
|
||||
if (iowq->min_timeout) {
|
||||
timeout = ktime_add_ns(iowq->min_timeout, start_time);
|
||||
hrtimer_setup_on_stack(&iowq->t, io_cqring_min_timer_wakeup, clock_id,
|
||||
HRTIMER_MODE_ABS);
|
||||
} else {
|
||||
timeout = iowq->timeout;
|
||||
hrtimer_setup_on_stack(&iowq->t, io_cqring_timer_wakeup, clock_id,
|
||||
HRTIMER_MODE_ABS);
|
||||
}
|
||||
|
||||
hrtimer_set_expires_range_ns(&iowq->t, timeout, 0);
|
||||
hrtimer_start_expires(&iowq->t, HRTIMER_MODE_ABS);
|
||||
|
||||
if (!READ_ONCE(iowq->hit_timeout))
|
||||
schedule();
|
||||
|
||||
hrtimer_cancel(&iowq->t);
|
||||
destroy_hrtimer_on_stack(&iowq->t);
|
||||
__set_current_state(TASK_RUNNING);
|
||||
|
||||
return READ_ONCE(iowq->hit_timeout) ? -ETIME : 0;
|
||||
}
|
||||
|
||||
struct ext_arg {
|
||||
size_t argsz;
|
||||
struct timespec64 ts;
|
||||
const sigset_t __user *sig;
|
||||
ktime_t min_time;
|
||||
bool ts_set;
|
||||
bool iowait;
|
||||
};
|
||||
|
||||
static int __io_cqring_wait_schedule(struct io_ring_ctx *ctx,
|
||||
struct io_wait_queue *iowq,
|
||||
struct ext_arg *ext_arg,
|
||||
ktime_t start_time)
|
||||
{
|
||||
int ret = 0;
|
||||
|
||||
/*
|
||||
* Mark us as being in io_wait if we have pending requests, so cpufreq
|
||||
* can take into account that the task is waiting for IO - turns out
|
||||
* to be important for low QD IO.
|
||||
*/
|
||||
if (ext_arg->iowait && current_pending_io())
|
||||
current->in_iowait = 1;
|
||||
if (iowq->timeout != KTIME_MAX || iowq->min_timeout)
|
||||
ret = io_cqring_schedule_timeout(iowq, ctx->clockid, start_time);
|
||||
else
|
||||
schedule();
|
||||
current->in_iowait = 0;
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* If this returns > 0, the caller should retry */
|
||||
static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
|
||||
struct io_wait_queue *iowq,
|
||||
struct ext_arg *ext_arg,
|
||||
ktime_t start_time)
|
||||
{
|
||||
if (unlikely(READ_ONCE(ctx->check_cq)))
|
||||
return 1;
|
||||
if (unlikely(io_local_work_pending(ctx)))
|
||||
return 1;
|
||||
if (unlikely(task_work_pending(current)))
|
||||
return 1;
|
||||
if (unlikely(task_sigpending(current)))
|
||||
return -EINTR;
|
||||
if (unlikely(io_should_wake(iowq)))
|
||||
return 0;
|
||||
|
||||
return __io_cqring_wait_schedule(ctx, iowq, ext_arg, start_time);
|
||||
}
|
||||
|
||||
/*
|
||||
* Wait until events become available, if we don't already have some. The
|
||||
* application must reap them itself, as they reside on the shared cq ring.
|
||||
*/
|
||||
static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags,
|
||||
struct ext_arg *ext_arg)
|
||||
{
|
||||
struct io_wait_queue iowq;
|
||||
struct io_rings *rings = ctx->rings;
|
||||
ktime_t start_time;
|
||||
int ret;
|
||||
|
||||
min_events = min_t(int, min_events, ctx->cq_entries);
|
||||
|
||||
if (!io_allowed_run_tw(ctx))
|
||||
return -EEXIST;
|
||||
if (io_local_work_pending(ctx))
|
||||
io_run_local_work(ctx, min_events,
|
||||
max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
|
||||
io_run_task_work();
|
||||
|
||||
if (unlikely(test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)))
|
||||
io_cqring_do_overflow_flush(ctx);
|
||||
if (__io_cqring_events_user(ctx) >= min_events)
|
||||
return 0;
|
||||
|
||||
init_waitqueue_func_entry(&iowq.wq, io_wake_function);
|
||||
iowq.wq.private = current;
|
||||
INIT_LIST_HEAD(&iowq.wq.entry);
|
||||
iowq.ctx = ctx;
|
||||
iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events;
|
||||
iowq.cq_min_tail = READ_ONCE(ctx->rings->cq.tail);
|
||||
iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
|
||||
iowq.hit_timeout = 0;
|
||||
iowq.min_timeout = ext_arg->min_time;
|
||||
iowq.timeout = KTIME_MAX;
|
||||
start_time = io_get_time(ctx);
|
||||
|
||||
if (ext_arg->ts_set) {
|
||||
iowq.timeout = timespec64_to_ktime(ext_arg->ts);
|
||||
if (!(flags & IORING_ENTER_ABS_TIMER))
|
||||
iowq.timeout = ktime_add(iowq.timeout, start_time);
|
||||
}
|
||||
|
||||
if (ext_arg->sig) {
|
||||
#ifdef CONFIG_COMPAT
|
||||
if (in_compat_syscall())
|
||||
ret = set_compat_user_sigmask((const compat_sigset_t __user *)ext_arg->sig,
|
||||
ext_arg->argsz);
|
||||
else
|
||||
#endif
|
||||
ret = set_user_sigmask(ext_arg->sig, ext_arg->argsz);
|
||||
|
||||
if (ret)
|
||||
return ret;
|
||||
}
|
||||
|
||||
io_napi_busy_loop(ctx, &iowq);
|
||||
|
||||
trace_io_uring_cqring_wait(ctx, min_events);
|
||||
do {
|
||||
unsigned long check_cq;
|
||||
int nr_wait;
|
||||
|
||||
/* if min timeout has been hit, don't reset wait count */
|
||||
if (!iowq.hit_timeout)
|
||||
nr_wait = (int) iowq.cq_tail -
|
||||
READ_ONCE(ctx->rings->cq.tail);
|
||||
else
|
||||
nr_wait = 1;
|
||||
|
||||
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
|
||||
atomic_set(&ctx->cq_wait_nr, nr_wait);
|
||||
set_current_state(TASK_INTERRUPTIBLE);
|
||||
} else {
|
||||
prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
|
||||
TASK_INTERRUPTIBLE);
|
||||
}
|
||||
|
||||
ret = io_cqring_wait_schedule(ctx, &iowq, ext_arg, start_time);
|
||||
__set_current_state(TASK_RUNNING);
|
||||
atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT);
|
||||
|
||||
/*
|
||||
* Run task_work after scheduling and before io_should_wake().
|
||||
* If we got woken because of task_work being processed, run it
|
||||
* now rather than let the caller do another wait loop.
|
||||
*/
|
||||
if (io_local_work_pending(ctx))
|
||||
io_run_local_work(ctx, nr_wait, nr_wait);
|
||||
io_run_task_work();
|
||||
|
||||
/*
|
||||
* Non-local task_work will be run on exit to userspace, but
|
||||
* if we're using DEFER_TASKRUN, then we could have waited
|
||||
* with a timeout for a number of requests. If the timeout
|
||||
* hits, we could have some requests ready to process. Ensure
|
||||
* this break is _after_ we have run task_work, to avoid
|
||||
* deferring running potentially pending requests until the
|
||||
* next time we wait for events.
|
||||
*/
|
||||
if (ret < 0)
|
||||
break;
|
||||
|
||||
check_cq = READ_ONCE(ctx->check_cq);
|
||||
if (unlikely(check_cq)) {
|
||||
/* let the caller flush overflows, retry */
|
||||
if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
|
||||
io_cqring_do_overflow_flush(ctx);
|
||||
if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) {
|
||||
ret = -EBADR;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (io_should_wake(&iowq)) {
|
||||
ret = 0;
|
||||
break;
|
||||
}
|
||||
cond_resched();
|
||||
} while (1);
|
||||
|
||||
if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
|
||||
finish_wait(&ctx->cq_wait, &iowq.wq);
|
||||
restore_saved_sigmask_unless(ret == -EINTR);
|
||||
|
||||
return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
|
||||
}
|
||||
|
||||
static void io_rings_free(struct io_ring_ctx *ctx)
|
||||
{
|
||||
io_free_region(ctx->user, &ctx->sq_region);
|
||||
|
|
@ -2984,7 +2283,7 @@ static __cold void io_tctx_exit_cb(struct callback_head *cb)
|
|||
static __cold void io_ring_exit_work(struct work_struct *work)
|
||||
{
|
||||
struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx, exit_work);
|
||||
unsigned long timeout = jiffies + HZ * 60 * 5;
|
||||
unsigned long timeout = jiffies + IO_URING_EXIT_WAIT_MAX;
|
||||
unsigned long interval = HZ / 20;
|
||||
struct io_tctx_exit exit;
|
||||
struct io_tctx_node *node;
|
||||
|
|
@ -3256,7 +2555,11 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
|
|||
|
||||
ctx = file->private_data;
|
||||
ret = -EBADFD;
|
||||
if (unlikely(ctx->flags & IORING_SETUP_R_DISABLED))
|
||||
/*
|
||||
* Keep IORING_SETUP_R_DISABLED check before submitter_task load
|
||||
* in io_uring_add_tctx_node() -> __io_uring_add_tctx_node_from_submit()
|
||||
*/
|
||||
if (unlikely(smp_load_acquire(&ctx->flags) & IORING_SETUP_R_DISABLED))
|
||||
goto out;
|
||||
|
||||
/*
|
||||
|
|
@ -3439,6 +2742,12 @@ static int io_uring_sanitise_params(struct io_uring_params *p)
|
|||
if (flags & ~IORING_SETUP_FLAGS)
|
||||
return -EINVAL;
|
||||
|
||||
if (flags & IORING_SETUP_SQ_REWIND) {
|
||||
if ((flags & IORING_SETUP_SQPOLL) ||
|
||||
!(flags & IORING_SETUP_NO_SQARRAY))
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
/* There is no way to mmap rings without a real fd */
|
||||
if ((flags & IORING_SETUP_REGISTERED_FD_ONLY) &&
|
||||
!(flags & IORING_SETUP_NO_MMAP))
|
||||
|
|
@ -3661,13 +2970,8 @@ static __cold int io_uring_create(struct io_ctx_config *config)
|
|||
}
|
||||
|
||||
if (ctx->flags & IORING_SETUP_SINGLE_ISSUER
|
||||
&& !(ctx->flags & IORING_SETUP_R_DISABLED)) {
|
||||
/*
|
||||
* Unlike io_register_enable_rings(), don't need WRITE_ONCE()
|
||||
* since ctx isn't yet accessible from other tasks
|
||||
*/
|
||||
&& !(ctx->flags & IORING_SETUP_R_DISABLED))
|
||||
ctx->submitter_task = get_task_struct(current);
|
||||
}
|
||||
|
||||
file = io_uring_get_file(ctx);
|
||||
if (IS_ERR(file)) {
|
||||
|
|
|
|||
|
|
@ -1,16 +1,17 @@
|
|||
/* SPDX-License-Identifier: GPL-2.0 */
|
||||
#ifndef IOU_CORE_H
|
||||
#define IOU_CORE_H
|
||||
|
||||
#include <linux/errno.h>
|
||||
#include <linux/lockdep.h>
|
||||
#include <linux/resume_user_mode.h>
|
||||
#include <linux/kasan.h>
|
||||
#include <linux/poll.h>
|
||||
#include <linux/io_uring_types.h>
|
||||
#include <uapi/linux/eventpoll.h>
|
||||
#include "alloc_cache.h"
|
||||
#include "io-wq.h"
|
||||
#include "slist.h"
|
||||
#include "tw.h"
|
||||
#include "opdef.h"
|
||||
|
||||
#ifndef CREATE_TRACE_POINTS
|
||||
|
|
@ -69,7 +70,8 @@ struct io_ctx_config {
|
|||
IORING_SETUP_NO_SQARRAY |\
|
||||
IORING_SETUP_HYBRID_IOPOLL |\
|
||||
IORING_SETUP_CQE_MIXED |\
|
||||
IORING_SETUP_SQE_MIXED)
|
||||
IORING_SETUP_SQE_MIXED |\
|
||||
IORING_SETUP_SQ_REWIND)
|
||||
|
||||
#define IORING_ENTER_FLAGS (IORING_ENTER_GETEVENTS |\
|
||||
IORING_ENTER_SQ_WAKEUP |\
|
||||
|
|
@ -89,6 +91,14 @@ struct io_ctx_config {
|
|||
IOSQE_BUFFER_SELECT |\
|
||||
IOSQE_CQE_SKIP_SUCCESS)
|
||||
|
||||
#define IO_REQ_LINK_FLAGS (REQ_F_LINK | REQ_F_HARDLINK)
|
||||
|
||||
/*
|
||||
* Complaint timeout for io_uring cancelation exits, and for io-wq exit
|
||||
* worker waiting.
|
||||
*/
|
||||
#define IO_URING_EXIT_WAIT_MAX (HZ * 60 * 5)
|
||||
|
||||
enum {
|
||||
IOU_COMPLETE = 0,
|
||||
|
||||
|
|
@ -151,8 +161,6 @@ static inline bool io_should_wake(struct io_wait_queue *iowq)
|
|||
int io_prepare_config(struct io_ctx_config *config);
|
||||
|
||||
bool io_cqe_cache_refill(struct io_ring_ctx *ctx, bool overflow, bool cqe32);
|
||||
int io_run_task_work_sig(struct io_ring_ctx *ctx);
|
||||
int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events);
|
||||
void io_req_defer_failed(struct io_kiocb *req, s32 res);
|
||||
bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags);
|
||||
void io_add_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags);
|
||||
|
|
@ -166,15 +174,10 @@ struct file *io_file_get_normal(struct io_kiocb *req, int fd);
|
|||
struct file *io_file_get_fixed(struct io_kiocb *req, int fd,
|
||||
unsigned issue_flags);
|
||||
|
||||
void __io_req_task_work_add(struct io_kiocb *req, unsigned flags);
|
||||
void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags);
|
||||
void io_req_task_queue(struct io_kiocb *req);
|
||||
void io_req_task_complete(struct io_tw_req tw_req, io_tw_token_t tw);
|
||||
void io_req_task_queue_fail(struct io_kiocb *req, int ret);
|
||||
void io_req_task_submit(struct io_tw_req tw_req, io_tw_token_t tw);
|
||||
struct llist_node *io_handle_tw_list(struct llist_node *node, unsigned int *count, unsigned int max_entries);
|
||||
struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count);
|
||||
void tctx_task_work(struct callback_head *cb);
|
||||
__cold void io_uring_drop_tctx_refs(struct task_struct *task);
|
||||
|
||||
int io_ring_add_registered_file(struct io_uring_task *tctx, struct file *file,
|
||||
|
|
@ -227,11 +230,6 @@ static inline bool io_is_compat(struct io_ring_ctx *ctx)
|
|||
return IS_ENABLED(CONFIG_COMPAT) && unlikely(ctx->compat);
|
||||
}
|
||||
|
||||
static inline void io_req_task_work_add(struct io_kiocb *req)
|
||||
{
|
||||
__io_req_task_work_add(req, 0);
|
||||
}
|
||||
|
||||
static inline void io_submit_flush_completions(struct io_ring_ctx *ctx)
|
||||
{
|
||||
if (!wq_list_empty(&ctx->submit_state.compl_reqs) ||
|
||||
|
|
@ -456,59 +454,6 @@ static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
|
|||
return min(entries, ctx->sq_entries);
|
||||
}
|
||||
|
||||
static inline int io_run_task_work(void)
|
||||
{
|
||||
bool ret = false;
|
||||
|
||||
/*
|
||||
* Always check-and-clear the task_work notification signal. With how
|
||||
* signaling works for task_work, we can find it set with nothing to
|
||||
* run. We need to clear it for that case, like get_signal() does.
|
||||
*/
|
||||
if (test_thread_flag(TIF_NOTIFY_SIGNAL))
|
||||
clear_notify_signal();
|
||||
/*
|
||||
* PF_IO_WORKER never returns to userspace, so check here if we have
|
||||
* notify work that needs processing.
|
||||
*/
|
||||
if (current->flags & PF_IO_WORKER) {
|
||||
if (test_thread_flag(TIF_NOTIFY_RESUME)) {
|
||||
__set_current_state(TASK_RUNNING);
|
||||
resume_user_mode_work(NULL);
|
||||
}
|
||||
if (current->io_uring) {
|
||||
unsigned int count = 0;
|
||||
|
||||
__set_current_state(TASK_RUNNING);
|
||||
tctx_task_work_run(current->io_uring, UINT_MAX, &count);
|
||||
if (count)
|
||||
ret = true;
|
||||
}
|
||||
}
|
||||
if (task_work_pending(current)) {
|
||||
__set_current_state(TASK_RUNNING);
|
||||
task_work_run();
|
||||
ret = true;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static inline bool io_local_work_pending(struct io_ring_ctx *ctx)
|
||||
{
|
||||
return !llist_empty(&ctx->work_llist) || !llist_empty(&ctx->retry_llist);
|
||||
}
|
||||
|
||||
static inline bool io_task_work_pending(struct io_ring_ctx *ctx)
|
||||
{
|
||||
return task_work_pending(current) || io_local_work_pending(ctx);
|
||||
}
|
||||
|
||||
static inline void io_tw_lock(struct io_ring_ctx *ctx, io_tw_token_t tw)
|
||||
{
|
||||
lockdep_assert_held(&ctx->uring_lock);
|
||||
}
|
||||
|
||||
/*
|
||||
* Don't complete immediately but use deferred completion infrastructure.
|
||||
* Protected by ->uring_lock and can only be used either with
|
||||
|
|
@ -566,17 +511,6 @@ static inline bool io_alloc_req(struct io_ring_ctx *ctx, struct io_kiocb **req)
|
|||
return true;
|
||||
}
|
||||
|
||||
static inline bool io_allowed_defer_tw_run(struct io_ring_ctx *ctx)
|
||||
{
|
||||
return likely(ctx->submitter_task == current);
|
||||
}
|
||||
|
||||
static inline bool io_allowed_run_tw(struct io_ring_ctx *ctx)
|
||||
{
|
||||
return likely(!(ctx->flags & IORING_SETUP_DEFER_TASKRUN) ||
|
||||
ctx->submitter_task == current);
|
||||
}
|
||||
|
||||
static inline void io_req_queue_tw_complete(struct io_kiocb *req, s32 res)
|
||||
{
|
||||
io_req_set_res(req, res, 0);
|
||||
|
|
|
|||
|
|
@ -669,8 +669,9 @@ int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
|
|||
bl->buf_ring = br;
|
||||
if (reg.flags & IOU_PBUF_RING_INC)
|
||||
bl->flags |= IOBL_INC;
|
||||
io_buffer_add_list(ctx, bl, reg.bgid);
|
||||
return 0;
|
||||
ret = io_buffer_add_list(ctx, bl, reg.bgid);
|
||||
if (!ret)
|
||||
return 0;
|
||||
fail:
|
||||
io_free_region(ctx->user, &bl->region);
|
||||
kfree(bl);
|
||||
|
|
|
|||
|
|
@ -56,7 +56,7 @@ struct page **io_pin_pages(unsigned long uaddr, unsigned long len, int *npages)
|
|||
if (WARN_ON_ONCE(nr_pages > INT_MAX))
|
||||
return ERR_PTR(-EOVERFLOW);
|
||||
|
||||
pages = kvmalloc_array(nr_pages, sizeof(struct page *), GFP_KERNEL);
|
||||
pages = kvmalloc_array(nr_pages, sizeof(struct page *), GFP_KERNEL_ACCOUNT);
|
||||
if (!pages)
|
||||
return ERR_PTR(-ENOMEM);
|
||||
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
/* SPDX-License-Identifier: GPL-2.0 */
|
||||
#ifndef IO_URING_MEMMAP_H
|
||||
#define IO_URING_MEMMAP_H
|
||||
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
// SPDX-License-Identifier: GPL-2.0
|
||||
#include <linux/device.h>
|
||||
#include <linux/init.h>
|
||||
#include <linux/kernel.h>
|
||||
|
|
|
|||
|
|
@ -80,13 +80,9 @@ static void io_msg_tw_complete(struct io_tw_req tw_req, io_tw_token_t tw)
|
|||
percpu_ref_put(&ctx->refs);
|
||||
}
|
||||
|
||||
static int io_msg_remote_post(struct io_ring_ctx *ctx, struct io_kiocb *req,
|
||||
static void io_msg_remote_post(struct io_ring_ctx *ctx, struct io_kiocb *req,
|
||||
int res, u32 cflags, u64 user_data)
|
||||
{
|
||||
if (!READ_ONCE(ctx->submitter_task)) {
|
||||
kfree_rcu(req, rcu_head);
|
||||
return -EOWNERDEAD;
|
||||
}
|
||||
req->opcode = IORING_OP_NOP;
|
||||
req->cqe.user_data = user_data;
|
||||
io_req_set_res(req, res, cflags);
|
||||
|
|
@ -95,7 +91,6 @@ static int io_msg_remote_post(struct io_ring_ctx *ctx, struct io_kiocb *req,
|
|||
req->tctx = NULL;
|
||||
req->io_task_work.func = io_msg_tw_complete;
|
||||
io_req_task_work_add_remote(req, IOU_F_TWQ_LAZY_WAKE);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int io_msg_data_remote(struct io_ring_ctx *target_ctx,
|
||||
|
|
@ -111,8 +106,8 @@ static int io_msg_data_remote(struct io_ring_ctx *target_ctx,
|
|||
if (msg->flags & IORING_MSG_RING_FLAGS_PASS)
|
||||
flags = msg->cqe_flags;
|
||||
|
||||
return io_msg_remote_post(target_ctx, target, msg->len, flags,
|
||||
msg->user_data);
|
||||
io_msg_remote_post(target_ctx, target, msg->len, flags, msg->user_data);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int __io_msg_ring_data(struct io_ring_ctx *target_ctx,
|
||||
|
|
@ -125,7 +120,11 @@ static int __io_msg_ring_data(struct io_ring_ctx *target_ctx,
|
|||
return -EINVAL;
|
||||
if (!(msg->flags & IORING_MSG_RING_FLAGS_PASS) && msg->dst_fd)
|
||||
return -EINVAL;
|
||||
if (target_ctx->flags & IORING_SETUP_R_DISABLED)
|
||||
/*
|
||||
* Keep IORING_SETUP_R_DISABLED check before submitter_task load
|
||||
* in io_msg_data_remote() -> io_req_task_work_add_remote()
|
||||
*/
|
||||
if (smp_load_acquire(&target_ctx->flags) & IORING_SETUP_R_DISABLED)
|
||||
return -EBADFD;
|
||||
|
||||
if (io_msg_need_remote(target_ctx))
|
||||
|
|
@ -223,10 +222,7 @@ static int io_msg_fd_remote(struct io_kiocb *req)
|
|||
{
|
||||
struct io_ring_ctx *ctx = req->file->private_data;
|
||||
struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
|
||||
struct task_struct *task = READ_ONCE(ctx->submitter_task);
|
||||
|
||||
if (unlikely(!task))
|
||||
return -EOWNERDEAD;
|
||||
struct task_struct *task = ctx->submitter_task;
|
||||
|
||||
init_task_work(&msg->tw, io_msg_tw_fd_complete);
|
||||
if (task_work_add(task, &msg->tw, TWA_SIGNAL))
|
||||
|
|
@ -245,7 +241,11 @@ static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags)
|
|||
return -EINVAL;
|
||||
if (target_ctx == ctx)
|
||||
return -EINVAL;
|
||||
if (target_ctx->flags & IORING_SETUP_R_DISABLED)
|
||||
/*
|
||||
* Keep IORING_SETUP_R_DISABLED check before submitter_task load
|
||||
* in io_msg_fd_remote()
|
||||
*/
|
||||
if (smp_load_acquire(&target_ctx->flags) & IORING_SETUP_R_DISABLED)
|
||||
return -EBADFD;
|
||||
if (!msg->src_file) {
|
||||
int ret = io_msg_grab_file(req, issue_flags);
|
||||
|
|
|
|||
|
|
@ -515,7 +515,11 @@ static inline bool io_send_finish(struct io_kiocb *req,
|
|||
|
||||
cflags = io_put_kbufs(req, sel->val, sel->buf_list, io_bundle_nbufs(kmsg, sel->val));
|
||||
|
||||
if (bundle_finished || req->flags & REQ_F_BL_EMPTY)
|
||||
/*
|
||||
* Don't start new bundles if the buffer list is empty, or if the
|
||||
* current operation needed to go through polling to complete.
|
||||
*/
|
||||
if (bundle_finished || req->flags & (REQ_F_BL_EMPTY | REQ_F_POLLED))
|
||||
goto finish;
|
||||
|
||||
/*
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
// SPDX-License-Identifier: GPL-2.0
|
||||
#include <linux/kernel.h>
|
||||
#include <linux/errno.h>
|
||||
#include <linux/file.h>
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
/* SPDX-License-Identifier: GPL-2.0 */
|
||||
#ifndef IOU_REQ_REF_H
|
||||
#define IOU_REQ_REF_H
|
||||
|
||||
|
|
|
|||
|
|
@ -103,6 +103,10 @@ static int io_register_personality(struct io_ring_ctx *ctx)
|
|||
return id;
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns number of restrictions parsed and added on success, or < 0 for
|
||||
* an error.
|
||||
*/
|
||||
static __cold int io_parse_restrictions(void __user *arg, unsigned int nr_args,
|
||||
struct io_restriction *restrictions)
|
||||
{
|
||||
|
|
@ -129,25 +133,31 @@ static __cold int io_parse_restrictions(void __user *arg, unsigned int nr_args,
|
|||
if (res[i].register_op >= IORING_REGISTER_LAST)
|
||||
goto err;
|
||||
__set_bit(res[i].register_op, restrictions->register_op);
|
||||
restrictions->reg_registered = true;
|
||||
break;
|
||||
case IORING_RESTRICTION_SQE_OP:
|
||||
if (res[i].sqe_op >= IORING_OP_LAST)
|
||||
goto err;
|
||||
__set_bit(res[i].sqe_op, restrictions->sqe_op);
|
||||
restrictions->op_registered = true;
|
||||
break;
|
||||
case IORING_RESTRICTION_SQE_FLAGS_ALLOWED:
|
||||
restrictions->sqe_flags_allowed = res[i].sqe_flags;
|
||||
restrictions->op_registered = true;
|
||||
break;
|
||||
case IORING_RESTRICTION_SQE_FLAGS_REQUIRED:
|
||||
restrictions->sqe_flags_required = res[i].sqe_flags;
|
||||
restrictions->op_registered = true;
|
||||
break;
|
||||
default:
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
|
||||
ret = 0;
|
||||
|
||||
ret = nr_args;
|
||||
if (!nr_args) {
|
||||
restrictions->op_registered = true;
|
||||
restrictions->reg_registered = true;
|
||||
}
|
||||
err:
|
||||
kfree(res);
|
||||
return ret;
|
||||
|
|
@ -163,16 +173,20 @@ static __cold int io_register_restrictions(struct io_ring_ctx *ctx,
|
|||
return -EBADFD;
|
||||
|
||||
/* We allow only a single restrictions registration */
|
||||
if (ctx->restrictions.registered)
|
||||
if (ctx->restrictions.op_registered || ctx->restrictions.reg_registered)
|
||||
return -EBUSY;
|
||||
|
||||
ret = io_parse_restrictions(arg, nr_args, &ctx->restrictions);
|
||||
/* Reset all restrictions if an error happened */
|
||||
if (ret != 0)
|
||||
if (ret < 0) {
|
||||
memset(&ctx->restrictions, 0, sizeof(ctx->restrictions));
|
||||
else
|
||||
ctx->restrictions.registered = true;
|
||||
return ret;
|
||||
return ret;
|
||||
}
|
||||
if (ctx->restrictions.op_registered)
|
||||
ctx->op_restricted = 1;
|
||||
if (ctx->restrictions.reg_registered)
|
||||
ctx->reg_restricted = 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int io_register_enable_rings(struct io_ring_ctx *ctx)
|
||||
|
|
@ -180,8 +194,8 @@ static int io_register_enable_rings(struct io_ring_ctx *ctx)
|
|||
if (!(ctx->flags & IORING_SETUP_R_DISABLED))
|
||||
return -EBADFD;
|
||||
|
||||
if (ctx->flags & IORING_SETUP_SINGLE_ISSUER && !ctx->submitter_task) {
|
||||
WRITE_ONCE(ctx->submitter_task, get_task_struct(current));
|
||||
if (ctx->flags & IORING_SETUP_SINGLE_ISSUER) {
|
||||
ctx->submitter_task = get_task_struct(current);
|
||||
/*
|
||||
* Lazy activation attempts would fail if it was polled before
|
||||
* submitter_task is set.
|
||||
|
|
@ -190,10 +204,8 @@ static int io_register_enable_rings(struct io_ring_ctx *ctx)
|
|||
io_activate_pollwq(ctx);
|
||||
}
|
||||
|
||||
if (ctx->restrictions.registered)
|
||||
ctx->restricted = 1;
|
||||
|
||||
ctx->flags &= ~IORING_SETUP_R_DISABLED;
|
||||
/* Keep submitter_task store before clearing IORING_SETUP_R_DISABLED */
|
||||
smp_store_release(&ctx->flags, ctx->flags & ~IORING_SETUP_R_DISABLED);
|
||||
if (ctx->sq_data && wq_has_sleeper(&ctx->sq_data->wait))
|
||||
wake_up(&ctx->sq_data->wait);
|
||||
return 0;
|
||||
|
|
@ -625,7 +637,7 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
|
|||
if (ctx->submitter_task && ctx->submitter_task != current)
|
||||
return -EEXIST;
|
||||
|
||||
if (ctx->restricted) {
|
||||
if (ctx->reg_restricted && !(ctx->flags & IORING_SETUP_R_DISABLED)) {
|
||||
opcode = array_index_nospec(opcode, IORING_REGISTER_LAST);
|
||||
if (!test_bit(opcode, ctx->restrictions.register_op))
|
||||
return -EACCES;
|
||||
|
|
|
|||
|
|
@ -1329,7 +1329,7 @@ void io_vec_free(struct iou_vec *iv)
|
|||
|
||||
int io_vec_realloc(struct iou_vec *iv, unsigned nr_entries)
|
||||
{
|
||||
gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
|
||||
gfp_t gfp = GFP_KERNEL_ACCOUNT | __GFP_NOWARN;
|
||||
struct iovec *iov;
|
||||
|
||||
iov = kmalloc_array(nr_entries, sizeof(iov[0]), gfp);
|
||||
|
|
|
|||
|
|
@ -90,7 +90,7 @@ bool io_check_coalesce_buffer(struct page **page_array, int nr_pages,
|
|||
struct io_imu_folio_data *data);
|
||||
|
||||
static inline struct io_rsrc_node *io_rsrc_node_lookup(struct io_rsrc_data *data,
|
||||
int index)
|
||||
unsigned int index)
|
||||
{
|
||||
if (index < data->nr)
|
||||
return data->nodes[array_index_nospec(index, data->nr)];
|
||||
|
|
|
|||
|
|
@ -1303,12 +1303,13 @@ static int io_uring_hybrid_poll(struct io_kiocb *req,
|
|||
struct io_comp_batch *iob, unsigned int poll_flags)
|
||||
{
|
||||
struct io_ring_ctx *ctx = req->ctx;
|
||||
u64 runtime, sleep_time;
|
||||
u64 runtime, sleep_time, iopoll_start;
|
||||
int ret;
|
||||
|
||||
iopoll_start = READ_ONCE(req->iopoll_start);
|
||||
sleep_time = io_hybrid_iopoll_delay(ctx, req);
|
||||
ret = io_uring_classic_poll(req, iob, poll_flags);
|
||||
runtime = ktime_get_ns() - req->iopoll_start - sleep_time;
|
||||
runtime = ktime_get_ns() - iopoll_start - sleep_time;
|
||||
|
||||
/*
|
||||
* Use minimum sleep time if we're polling devices with different
|
||||
|
|
@ -1322,9 +1323,9 @@ static int io_uring_hybrid_poll(struct io_kiocb *req,
|
|||
|
||||
int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin)
|
||||
{
|
||||
struct io_wq_work_node *pos, *start, *prev;
|
||||
unsigned int poll_flags = 0;
|
||||
DEFINE_IO_COMP_BATCH(iob);
|
||||
struct io_kiocb *req, *tmp;
|
||||
int nr_events = 0;
|
||||
|
||||
/*
|
||||
|
|
@ -1334,8 +1335,7 @@ int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin)
|
|||
if (ctx->poll_multi_queue || force_nonspin)
|
||||
poll_flags |= BLK_POLL_ONESHOT;
|
||||
|
||||
wq_list_for_each(pos, start, &ctx->iopoll_list) {
|
||||
struct io_kiocb *req = container_of(pos, struct io_kiocb, comp_list);
|
||||
list_for_each_entry(req, &ctx->iopoll_list, iopoll_node) {
|
||||
int ret;
|
||||
|
||||
/*
|
||||
|
|
@ -1364,31 +1364,20 @@ int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin)
|
|||
|
||||
if (!rq_list_empty(&iob.req_list))
|
||||
iob.complete(&iob);
|
||||
else if (!pos)
|
||||
return 0;
|
||||
|
||||
prev = start;
|
||||
wq_list_for_each_resume(pos, prev) {
|
||||
struct io_kiocb *req = container_of(pos, struct io_kiocb, comp_list);
|
||||
|
||||
list_for_each_entry_safe(req, tmp, &ctx->iopoll_list, iopoll_node) {
|
||||
/* order with io_complete_rw_iopoll(), e.g. ->result updates */
|
||||
if (!smp_load_acquire(&req->iopoll_completed))
|
||||
break;
|
||||
continue;
|
||||
list_del(&req->iopoll_node);
|
||||
wq_list_add_tail(&req->comp_list, &ctx->submit_state.compl_reqs);
|
||||
nr_events++;
|
||||
req->cqe.flags = io_put_kbuf(req, req->cqe.res, NULL);
|
||||
if (req->opcode != IORING_OP_URING_CMD)
|
||||
io_req_rw_cleanup(req, 0);
|
||||
}
|
||||
if (unlikely(!nr_events))
|
||||
return 0;
|
||||
|
||||
pos = start ? start->next : ctx->iopoll_list.first;
|
||||
wq_list_cut(&ctx->iopoll_list, prev, start);
|
||||
|
||||
if (WARN_ON_ONCE(!wq_list_empty(&ctx->submit_state.compl_reqs)))
|
||||
return 0;
|
||||
ctx->submit_state.compl_reqs.first = pos;
|
||||
__io_submit_flush_completions(ctx);
|
||||
if (nr_events)
|
||||
__io_submit_flush_completions(ctx);
|
||||
return nr_events;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
/* SPDX-License-Identifier: GPL-2.0 */
|
||||
#ifndef INTERNAL_IO_SLIST_H
|
||||
#define INTERNAL_IO_SLIST_H
|
||||
|
||||
|
|
@ -9,9 +10,6 @@
|
|||
#define wq_list_for_each(pos, prv, head) \
|
||||
for (pos = (head)->first, prv = NULL; pos; prv = pos, pos = (pos)->next)
|
||||
|
||||
#define wq_list_for_each_resume(pos, prv) \
|
||||
for (; pos; prv = pos, pos = (pos)->next)
|
||||
|
||||
#define wq_list_empty(list) (READ_ONCE((list)->first) == NULL)
|
||||
|
||||
#define INIT_WQ_LIST(list) do { \
|
||||
|
|
@ -43,15 +41,6 @@ static inline void wq_list_add_tail(struct io_wq_work_node *node,
|
|||
}
|
||||
}
|
||||
|
||||
static inline void wq_list_add_head(struct io_wq_work_node *node,
|
||||
struct io_wq_work_list *list)
|
||||
{
|
||||
node->next = list->first;
|
||||
if (!node->next)
|
||||
list->last = node;
|
||||
WRITE_ONCE(list->first, node);
|
||||
}
|
||||
|
||||
static inline void wq_list_cut(struct io_wq_work_list *list,
|
||||
struct io_wq_work_node *last,
|
||||
struct io_wq_work_node *prev)
|
||||
|
|
|
|||
|
|
@ -212,7 +212,7 @@ static int __io_sq_thread(struct io_ring_ctx *ctx, struct io_sq_data *sqd,
|
|||
if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE)
|
||||
to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE;
|
||||
|
||||
if (to_submit || !wq_list_empty(&ctx->iopoll_list)) {
|
||||
if (to_submit || !list_empty(&ctx->iopoll_list)) {
|
||||
const struct cred *creds = NULL;
|
||||
|
||||
io_sq_start_worktime(ist);
|
||||
|
|
@ -221,7 +221,7 @@ static int __io_sq_thread(struct io_ring_ctx *ctx, struct io_sq_data *sqd,
|
|||
creds = override_creds(ctx->sq_creds);
|
||||
|
||||
mutex_lock(&ctx->uring_lock);
|
||||
if (!wq_list_empty(&ctx->iopoll_list))
|
||||
if (!list_empty(&ctx->iopoll_list))
|
||||
io_do_iopoll(ctx, true);
|
||||
|
||||
/*
|
||||
|
|
@ -344,7 +344,7 @@ static int io_sq_thread(void *data)
|
|||
list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
|
||||
int ret = __io_sq_thread(ctx, sqd, cap_entries, &ist);
|
||||
|
||||
if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list)))
|
||||
if (!sqt_spin && (ret > 0 || !list_empty(&ctx->iopoll_list)))
|
||||
sqt_spin = true;
|
||||
}
|
||||
if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE))
|
||||
|
|
@ -379,7 +379,7 @@ static int io_sq_thread(void *data)
|
|||
atomic_or(IORING_SQ_NEED_WAKEUP,
|
||||
&ctx->rings->sq_flags);
|
||||
if ((ctx->flags & IORING_SETUP_IOPOLL) &&
|
||||
!wq_list_empty(&ctx->iopoll_list)) {
|
||||
!list_empty(&ctx->iopoll_list)) {
|
||||
needs_sched = false;
|
||||
break;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -62,6 +62,8 @@ int io_fsync_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
|
|||
return -EINVAL;
|
||||
|
||||
sync->off = READ_ONCE(sqe->off);
|
||||
if (sync->off < 0)
|
||||
return -EINVAL;
|
||||
sync->len = READ_ONCE(sqe->len);
|
||||
req->flags |= REQ_F_FORCE_ASYNC;
|
||||
return 0;
|
||||
|
|
|
|||
|
|
@ -122,6 +122,14 @@ int __io_uring_add_tctx_node(struct io_ring_ctx *ctx)
|
|||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Re-activate io-wq keepalive on any new io_uring usage. The wq may have
|
||||
* been marked for idle-exit when the task temporarily had no active
|
||||
* io_uring instances.
|
||||
*/
|
||||
if (tctx->io_wq)
|
||||
io_wq_set_exit_on_idle(tctx->io_wq, false);
|
||||
if (!xa_load(&tctx->xa, (unsigned long)ctx)) {
|
||||
node = kmalloc(sizeof(*node), GFP_KERNEL);
|
||||
if (!node)
|
||||
|
|
@ -183,6 +191,9 @@ __cold void io_uring_del_tctx_node(unsigned long index)
|
|||
if (tctx->last == node->ctx)
|
||||
tctx->last = NULL;
|
||||
kfree(node);
|
||||
|
||||
if (xa_empty(&tctx->xa) && tctx->io_wq)
|
||||
io_wq_set_exit_on_idle(tctx->io_wq, true);
|
||||
}
|
||||
|
||||
__cold void io_uring_clean_tctx(struct io_uring_task *tctx)
|
||||
|
|
|
|||
|
|
@ -130,7 +130,7 @@ __cold void io_flush_timeouts(struct io_ring_ctx *ctx)
|
|||
u32 seq;
|
||||
|
||||
raw_spin_lock_irq(&ctx->timeout_lock);
|
||||
seq = ctx->cached_cq_tail - atomic_read(&ctx->cq_timeouts);
|
||||
seq = READ_ONCE(ctx->cached_cq_tail) - atomic_read(&ctx->cq_timeouts);
|
||||
|
||||
list_for_each_entry_safe(timeout, tmp, &ctx->timeout_list, list) {
|
||||
struct io_kiocb *req = cmd_to_io_kiocb(timeout);
|
||||
|
|
|
|||
355
io_uring/tw.c
Normal file
355
io_uring/tw.c
Normal file
|
|
@ -0,0 +1,355 @@
|
|||
// SPDX-License-Identifier: GPL-2.0
|
||||
/*
|
||||
* Task work handling for io_uring
|
||||
*/
|
||||
#include <linux/kernel.h>
|
||||
#include <linux/errno.h>
|
||||
#include <linux/sched/signal.h>
|
||||
#include <linux/io_uring.h>
|
||||
#include <linux/indirect_call_wrapper.h>
|
||||
|
||||
#include "io_uring.h"
|
||||
#include "tctx.h"
|
||||
#include "poll.h"
|
||||
#include "rw.h"
|
||||
#include "eventfd.h"
|
||||
#include "wait.h"
|
||||
|
||||
void io_fallback_req_func(struct work_struct *work)
|
||||
{
|
||||
struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
|
||||
fallback_work.work);
|
||||
struct llist_node *node = llist_del_all(&ctx->fallback_llist);
|
||||
struct io_kiocb *req, *tmp;
|
||||
struct io_tw_state ts = {};
|
||||
|
||||
percpu_ref_get(&ctx->refs);
|
||||
mutex_lock(&ctx->uring_lock);
|
||||
ts.cancel = io_should_terminate_tw(ctx);
|
||||
llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
|
||||
req->io_task_work.func((struct io_tw_req){req}, ts);
|
||||
io_submit_flush_completions(ctx);
|
||||
mutex_unlock(&ctx->uring_lock);
|
||||
percpu_ref_put(&ctx->refs);
|
||||
}
|
||||
|
||||
static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw)
|
||||
{
|
||||
if (!ctx)
|
||||
return;
|
||||
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
|
||||
atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
|
||||
|
||||
io_submit_flush_completions(ctx);
|
||||
mutex_unlock(&ctx->uring_lock);
|
||||
percpu_ref_put(&ctx->refs);
|
||||
}
|
||||
|
||||
/*
|
||||
* Run queued task_work, returning the number of entries processed in *count.
|
||||
* If more entries than max_entries are available, stop processing once this
|
||||
* is reached and return the rest of the list.
|
||||
*/
|
||||
struct llist_node *io_handle_tw_list(struct llist_node *node,
|
||||
unsigned int *count,
|
||||
unsigned int max_entries)
|
||||
{
|
||||
struct io_ring_ctx *ctx = NULL;
|
||||
struct io_tw_state ts = { };
|
||||
|
||||
do {
|
||||
struct llist_node *next = node->next;
|
||||
struct io_kiocb *req = container_of(node, struct io_kiocb,
|
||||
io_task_work.node);
|
||||
|
||||
if (req->ctx != ctx) {
|
||||
ctx_flush_and_put(ctx, ts);
|
||||
ctx = req->ctx;
|
||||
mutex_lock(&ctx->uring_lock);
|
||||
percpu_ref_get(&ctx->refs);
|
||||
ts.cancel = io_should_terminate_tw(ctx);
|
||||
}
|
||||
INDIRECT_CALL_2(req->io_task_work.func,
|
||||
io_poll_task_func, io_req_rw_complete,
|
||||
(struct io_tw_req){req}, ts);
|
||||
node = next;
|
||||
(*count)++;
|
||||
if (unlikely(need_resched())) {
|
||||
ctx_flush_and_put(ctx, ts);
|
||||
ctx = NULL;
|
||||
cond_resched();
|
||||
}
|
||||
} while (node && *count < max_entries);
|
||||
|
||||
ctx_flush_and_put(ctx, ts);
|
||||
return node;
|
||||
}
|
||||
|
||||
static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
|
||||
{
|
||||
struct io_ring_ctx *last_ctx = NULL;
|
||||
struct io_kiocb *req;
|
||||
|
||||
while (node) {
|
||||
req = container_of(node, struct io_kiocb, io_task_work.node);
|
||||
node = node->next;
|
||||
if (last_ctx != req->ctx) {
|
||||
if (last_ctx) {
|
||||
if (sync)
|
||||
flush_delayed_work(&last_ctx->fallback_work);
|
||||
percpu_ref_put(&last_ctx->refs);
|
||||
}
|
||||
last_ctx = req->ctx;
|
||||
percpu_ref_get(&last_ctx->refs);
|
||||
}
|
||||
if (llist_add(&req->io_task_work.node, &last_ctx->fallback_llist))
|
||||
schedule_delayed_work(&last_ctx->fallback_work, 1);
|
||||
}
|
||||
|
||||
if (last_ctx) {
|
||||
if (sync)
|
||||
flush_delayed_work(&last_ctx->fallback_work);
|
||||
percpu_ref_put(&last_ctx->refs);
|
||||
}
|
||||
}
|
||||
|
||||
static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
|
||||
{
|
||||
struct llist_node *node = llist_del_all(&tctx->task_list);
|
||||
|
||||
__io_fallback_tw(node, sync);
|
||||
}
|
||||
|
||||
struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
|
||||
unsigned int max_entries,
|
||||
unsigned int *count)
|
||||
{
|
||||
struct llist_node *node;
|
||||
|
||||
node = llist_del_all(&tctx->task_list);
|
||||
if (node) {
|
||||
node = llist_reverse_order(node);
|
||||
node = io_handle_tw_list(node, count, max_entries);
|
||||
}
|
||||
|
||||
/* relaxed read is enough as only the task itself sets ->in_cancel */
|
||||
if (unlikely(atomic_read(&tctx->in_cancel)))
|
||||
io_uring_drop_tctx_refs(current);
|
||||
|
||||
trace_io_uring_task_work_run(tctx, *count);
|
||||
return node;
|
||||
}
|
||||
|
||||
void tctx_task_work(struct callback_head *cb)
|
||||
{
|
||||
struct io_uring_task *tctx;
|
||||
struct llist_node *ret;
|
||||
unsigned int count = 0;
|
||||
|
||||
tctx = container_of(cb, struct io_uring_task, task_work);
|
||||
ret = tctx_task_work_run(tctx, UINT_MAX, &count);
|
||||
/* can't happen */
|
||||
WARN_ON_ONCE(ret);
|
||||
}
|
||||
|
||||
void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
|
||||
{
|
||||
struct io_ring_ctx *ctx = req->ctx;
|
||||
unsigned nr_wait, nr_tw, nr_tw_prev;
|
||||
struct llist_node *head;
|
||||
|
||||
/* See comment above IO_CQ_WAKE_INIT */
|
||||
BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
|
||||
|
||||
/*
|
||||
* We don't know how many requests there are in the link and whether
|
||||
* they can even be queued lazily, fall back to non-lazy.
|
||||
*/
|
||||
if (req->flags & IO_REQ_LINK_FLAGS)
|
||||
flags &= ~IOU_F_TWQ_LAZY_WAKE;
|
||||
|
||||
guard(rcu)();
|
||||
|
||||
head = READ_ONCE(ctx->work_llist.first);
|
||||
do {
|
||||
nr_tw_prev = 0;
|
||||
if (head) {
|
||||
struct io_kiocb *first_req = container_of(head,
|
||||
struct io_kiocb,
|
||||
io_task_work.node);
|
||||
/*
|
||||
* Might be executed at any moment, rely on
|
||||
* SLAB_TYPESAFE_BY_RCU to keep it alive.
|
||||
*/
|
||||
nr_tw_prev = READ_ONCE(first_req->nr_tw);
|
||||
}
|
||||
|
||||
/*
|
||||
* Theoretically, it can overflow, but that's fine as one of
|
||||
* previous adds should've tried to wake the task.
|
||||
*/
|
||||
nr_tw = nr_tw_prev + 1;
|
||||
if (!(flags & IOU_F_TWQ_LAZY_WAKE))
|
||||
nr_tw = IO_CQ_WAKE_FORCE;
|
||||
|
||||
req->nr_tw = nr_tw;
|
||||
req->io_task_work.node.next = head;
|
||||
} while (!try_cmpxchg(&ctx->work_llist.first, &head,
|
||||
&req->io_task_work.node));
|
||||
|
||||
/*
|
||||
* cmpxchg implies a full barrier, which pairs with the barrier
|
||||
* in set_current_state() on the io_cqring_wait() side. It's used
|
||||
* to ensure that either we see updated ->cq_wait_nr, or waiters
|
||||
* going to sleep will observe the work added to the list, which
|
||||
* is similar to the wait/wawke task state sync.
|
||||
*/
|
||||
|
||||
if (!head) {
|
||||
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
|
||||
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
|
||||
if (ctx->has_evfd)
|
||||
io_eventfd_signal(ctx, false);
|
||||
}
|
||||
|
||||
nr_wait = atomic_read(&ctx->cq_wait_nr);
|
||||
/* not enough or no one is waiting */
|
||||
if (nr_tw < nr_wait)
|
||||
return;
|
||||
/* the previous add has already woken it up */
|
||||
if (nr_tw_prev >= nr_wait)
|
||||
return;
|
||||
wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
|
||||
}
|
||||
|
||||
void io_req_normal_work_add(struct io_kiocb *req)
|
||||
{
|
||||
struct io_uring_task *tctx = req->tctx;
|
||||
struct io_ring_ctx *ctx = req->ctx;
|
||||
|
||||
/* task_work already pending, we're done */
|
||||
if (!llist_add(&req->io_task_work.node, &tctx->task_list))
|
||||
return;
|
||||
|
||||
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
|
||||
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
|
||||
|
||||
/* SQPOLL doesn't need the task_work added, it'll run it itself */
|
||||
if (ctx->flags & IORING_SETUP_SQPOLL) {
|
||||
__set_notify_signal(tctx->task);
|
||||
return;
|
||||
}
|
||||
|
||||
if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method)))
|
||||
return;
|
||||
|
||||
io_fallback_tw(tctx, false);
|
||||
}
|
||||
|
||||
void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags)
|
||||
{
|
||||
if (WARN_ON_ONCE(!(req->ctx->flags & IORING_SETUP_DEFER_TASKRUN)))
|
||||
return;
|
||||
__io_req_task_work_add(req, flags);
|
||||
}
|
||||
|
||||
void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
|
||||
{
|
||||
struct llist_node *node = llist_del_all(&ctx->work_llist);
|
||||
|
||||
__io_fallback_tw(node, false);
|
||||
node = llist_del_all(&ctx->retry_llist);
|
||||
__io_fallback_tw(node, false);
|
||||
}
|
||||
|
||||
static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
|
||||
int min_events)
|
||||
{
|
||||
if (!io_local_work_pending(ctx))
|
||||
return false;
|
||||
if (events < min_events)
|
||||
return true;
|
||||
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
|
||||
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
|
||||
return false;
|
||||
}
|
||||
|
||||
static int __io_run_local_work_loop(struct llist_node **node,
|
||||
io_tw_token_t tw,
|
||||
int events)
|
||||
{
|
||||
int ret = 0;
|
||||
|
||||
while (*node) {
|
||||
struct llist_node *next = (*node)->next;
|
||||
struct io_kiocb *req = container_of(*node, struct io_kiocb,
|
||||
io_task_work.node);
|
||||
INDIRECT_CALL_2(req->io_task_work.func,
|
||||
io_poll_task_func, io_req_rw_complete,
|
||||
(struct io_tw_req){req}, tw);
|
||||
*node = next;
|
||||
if (++ret >= events)
|
||||
break;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw,
|
||||
int min_events, int max_events)
|
||||
{
|
||||
struct llist_node *node;
|
||||
unsigned int loops = 0;
|
||||
int ret = 0;
|
||||
|
||||
if (WARN_ON_ONCE(ctx->submitter_task != current))
|
||||
return -EEXIST;
|
||||
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
|
||||
atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
|
||||
again:
|
||||
tw.cancel = io_should_terminate_tw(ctx);
|
||||
min_events -= ret;
|
||||
ret = __io_run_local_work_loop(&ctx->retry_llist.first, tw, max_events);
|
||||
if (ctx->retry_llist.first)
|
||||
goto retry_done;
|
||||
|
||||
/*
|
||||
* llists are in reverse order, flip it back the right way before
|
||||
* running the pending items.
|
||||
*/
|
||||
node = llist_reverse_order(llist_del_all(&ctx->work_llist));
|
||||
ret += __io_run_local_work_loop(&node, tw, max_events - ret);
|
||||
ctx->retry_llist.first = node;
|
||||
loops++;
|
||||
|
||||
if (io_run_local_work_continue(ctx, ret, min_events))
|
||||
goto again;
|
||||
retry_done:
|
||||
io_submit_flush_completions(ctx);
|
||||
if (io_run_local_work_continue(ctx, ret, min_events))
|
||||
goto again;
|
||||
|
||||
trace_io_uring_local_work_run(ctx, ret, loops);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int io_run_local_work_locked(struct io_ring_ctx *ctx, int min_events)
|
||||
{
|
||||
struct io_tw_state ts = {};
|
||||
|
||||
if (!io_local_work_pending(ctx))
|
||||
return 0;
|
||||
return __io_run_local_work(ctx, ts, min_events,
|
||||
max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
|
||||
}
|
||||
|
||||
int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events)
|
||||
{
|
||||
struct io_tw_state ts = {};
|
||||
int ret;
|
||||
|
||||
mutex_lock(&ctx->uring_lock);
|
||||
ret = __io_run_local_work(ctx, ts, min_events, max_events);
|
||||
mutex_unlock(&ctx->uring_lock);
|
||||
return ret;
|
||||
}
|
||||
116
io_uring/tw.h
Normal file
116
io_uring/tw.h
Normal file
|
|
@ -0,0 +1,116 @@
|
|||
// SPDX-License-Identifier: GPL-2.0
|
||||
#ifndef IOU_TW_H
|
||||
#define IOU_TW_H
|
||||
|
||||
#include <linux/sched.h>
|
||||
#include <linux/percpu-refcount.h>
|
||||
#include <linux/io_uring_types.h>
|
||||
|
||||
#define IO_LOCAL_TW_DEFAULT_MAX 20
|
||||
|
||||
/*
|
||||
* Terminate the request if either of these conditions are true:
|
||||
*
|
||||
* 1) It's being executed by the original task, but that task is marked
|
||||
* with PF_EXITING as it's exiting.
|
||||
* 2) PF_KTHREAD is set, in which case the invoker of the task_work is
|
||||
* our fallback task_work.
|
||||
* 3) The ring has been closed and is going away.
|
||||
*/
|
||||
static inline bool io_should_terminate_tw(struct io_ring_ctx *ctx)
|
||||
{
|
||||
return (current->flags & (PF_EXITING | PF_KTHREAD)) || percpu_ref_is_dying(&ctx->refs);
|
||||
}
|
||||
|
||||
void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags);
|
||||
struct llist_node *io_handle_tw_list(struct llist_node *node, unsigned int *count, unsigned int max_entries);
|
||||
void tctx_task_work(struct callback_head *cb);
|
||||
int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events);
|
||||
int io_run_task_work_sig(struct io_ring_ctx *ctx);
|
||||
|
||||
__cold void io_fallback_req_func(struct work_struct *work);
|
||||
__cold void io_move_task_work_from_local(struct io_ring_ctx *ctx);
|
||||
int io_run_local_work_locked(struct io_ring_ctx *ctx, int min_events);
|
||||
|
||||
void io_req_local_work_add(struct io_kiocb *req, unsigned flags);
|
||||
void io_req_normal_work_add(struct io_kiocb *req);
|
||||
struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count);
|
||||
|
||||
static inline void __io_req_task_work_add(struct io_kiocb *req, unsigned flags)
|
||||
{
|
||||
if (req->ctx->flags & IORING_SETUP_DEFER_TASKRUN)
|
||||
io_req_local_work_add(req, flags);
|
||||
else
|
||||
io_req_normal_work_add(req);
|
||||
}
|
||||
|
||||
static inline void io_req_task_work_add(struct io_kiocb *req)
|
||||
{
|
||||
__io_req_task_work_add(req, 0);
|
||||
}
|
||||
|
||||
static inline int io_run_task_work(void)
|
||||
{
|
||||
bool ret = false;
|
||||
|
||||
/*
|
||||
* Always check-and-clear the task_work notification signal. With how
|
||||
* signaling works for task_work, we can find it set with nothing to
|
||||
* run. We need to clear it for that case, like get_signal() does.
|
||||
*/
|
||||
if (test_thread_flag(TIF_NOTIFY_SIGNAL))
|
||||
clear_notify_signal();
|
||||
/*
|
||||
* PF_IO_WORKER never returns to userspace, so check here if we have
|
||||
* notify work that needs processing.
|
||||
*/
|
||||
if (current->flags & PF_IO_WORKER) {
|
||||
if (test_thread_flag(TIF_NOTIFY_RESUME)) {
|
||||
__set_current_state(TASK_RUNNING);
|
||||
resume_user_mode_work(NULL);
|
||||
}
|
||||
if (current->io_uring) {
|
||||
unsigned int count = 0;
|
||||
|
||||
__set_current_state(TASK_RUNNING);
|
||||
tctx_task_work_run(current->io_uring, UINT_MAX, &count);
|
||||
if (count)
|
||||
ret = true;
|
||||
}
|
||||
}
|
||||
if (task_work_pending(current)) {
|
||||
__set_current_state(TASK_RUNNING);
|
||||
task_work_run();
|
||||
ret = true;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static inline bool io_local_work_pending(struct io_ring_ctx *ctx)
|
||||
{
|
||||
return !llist_empty(&ctx->work_llist) || !llist_empty(&ctx->retry_llist);
|
||||
}
|
||||
|
||||
static inline bool io_task_work_pending(struct io_ring_ctx *ctx)
|
||||
{
|
||||
return task_work_pending(current) || io_local_work_pending(ctx);
|
||||
}
|
||||
|
||||
static inline void io_tw_lock(struct io_ring_ctx *ctx, io_tw_token_t tw)
|
||||
{
|
||||
lockdep_assert_held(&ctx->uring_lock);
|
||||
}
|
||||
|
||||
static inline bool io_allowed_defer_tw_run(struct io_ring_ctx *ctx)
|
||||
{
|
||||
return likely(ctx->submitter_task == current);
|
||||
}
|
||||
|
||||
static inline bool io_allowed_run_tw(struct io_ring_ctx *ctx)
|
||||
{
|
||||
return likely(!(ctx->flags & IORING_SETUP_DEFER_TASKRUN) ||
|
||||
ctx->submitter_task == current);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
|
@ -104,6 +104,15 @@ void io_uring_cmd_mark_cancelable(struct io_uring_cmd *cmd,
|
|||
struct io_kiocb *req = cmd_to_io_kiocb(cmd);
|
||||
struct io_ring_ctx *ctx = req->ctx;
|
||||
|
||||
/*
|
||||
* Doing cancelations on IOPOLL requests are not supported. Both
|
||||
* because they can't get canceled in the block stack, but also
|
||||
* because iopoll completion data overlaps with the hash_node used
|
||||
* for tracking.
|
||||
*/
|
||||
if (ctx->flags & IORING_SETUP_IOPOLL)
|
||||
return;
|
||||
|
||||
if (!(cmd->flags & IORING_URING_CMD_CANCELABLE)) {
|
||||
cmd->flags |= IORING_URING_CMD_CANCELABLE;
|
||||
io_ring_submit_lock(ctx, issue_flags);
|
||||
|
|
|
|||
308
io_uring/wait.c
Normal file
308
io_uring/wait.c
Normal file
|
|
@ -0,0 +1,308 @@
|
|||
// SPDX-License-Identifier: GPL-2.0
|
||||
/*
|
||||
* Waiting for completion events
|
||||
*/
|
||||
#include <linux/kernel.h>
|
||||
#include <linux/sched/signal.h>
|
||||
#include <linux/io_uring.h>
|
||||
|
||||
#include <trace/events/io_uring.h>
|
||||
|
||||
#include <uapi/linux/io_uring.h>
|
||||
|
||||
#include "io_uring.h"
|
||||
#include "napi.h"
|
||||
#include "wait.h"
|
||||
|
||||
static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
|
||||
int wake_flags, void *key)
|
||||
{
|
||||
struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq);
|
||||
|
||||
/*
|
||||
* Cannot safely flush overflowed CQEs from here, ensure we wake up
|
||||
* the task, and the next invocation will do it.
|
||||
*/
|
||||
if (io_should_wake(iowq) || io_has_work(iowq->ctx))
|
||||
return autoremove_wake_function(curr, mode, wake_flags, key);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int io_run_task_work_sig(struct io_ring_ctx *ctx)
|
||||
{
|
||||
if (io_local_work_pending(ctx)) {
|
||||
__set_current_state(TASK_RUNNING);
|
||||
if (io_run_local_work(ctx, INT_MAX, IO_LOCAL_TW_DEFAULT_MAX) > 0)
|
||||
return 0;
|
||||
}
|
||||
if (io_run_task_work() > 0)
|
||||
return 0;
|
||||
if (task_sigpending(current))
|
||||
return -EINTR;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static bool current_pending_io(void)
|
||||
{
|
||||
struct io_uring_task *tctx = current->io_uring;
|
||||
|
||||
if (!tctx)
|
||||
return false;
|
||||
return percpu_counter_read_positive(&tctx->inflight);
|
||||
}
|
||||
|
||||
static enum hrtimer_restart io_cqring_timer_wakeup(struct hrtimer *timer)
|
||||
{
|
||||
struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t);
|
||||
|
||||
WRITE_ONCE(iowq->hit_timeout, 1);
|
||||
iowq->min_timeout = 0;
|
||||
wake_up_process(iowq->wq.private);
|
||||
return HRTIMER_NORESTART;
|
||||
}
|
||||
|
||||
/*
|
||||
* Doing min_timeout portion. If we saw any timeouts, events, or have work,
|
||||
* wake up. If not, and we have a normal timeout, switch to that and keep
|
||||
* sleeping.
|
||||
*/
|
||||
static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer)
|
||||
{
|
||||
struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t);
|
||||
struct io_ring_ctx *ctx = iowq->ctx;
|
||||
|
||||
/* no general timeout, or shorter (or equal), we are done */
|
||||
if (iowq->timeout == KTIME_MAX ||
|
||||
ktime_compare(iowq->min_timeout, iowq->timeout) >= 0)
|
||||
goto out_wake;
|
||||
/* work we may need to run, wake function will see if we need to wake */
|
||||
if (io_has_work(ctx))
|
||||
goto out_wake;
|
||||
/* got events since we started waiting, min timeout is done */
|
||||
if (iowq->cq_min_tail != READ_ONCE(ctx->rings->cq.tail))
|
||||
goto out_wake;
|
||||
/* if we have any events and min timeout expired, we're done */
|
||||
if (io_cqring_events(ctx))
|
||||
goto out_wake;
|
||||
|
||||
/*
|
||||
* If using deferred task_work running and application is waiting on
|
||||
* more than one request, ensure we reset it now where we are switching
|
||||
* to normal sleeps. Any request completion post min_wait should wake
|
||||
* the task and return.
|
||||
*/
|
||||
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
|
||||
atomic_set(&ctx->cq_wait_nr, 1);
|
||||
smp_mb();
|
||||
if (!llist_empty(&ctx->work_llist))
|
||||
goto out_wake;
|
||||
}
|
||||
|
||||
/* any generated CQE posted past this time should wake us up */
|
||||
iowq->cq_tail = iowq->cq_min_tail;
|
||||
|
||||
hrtimer_update_function(&iowq->t, io_cqring_timer_wakeup);
|
||||
hrtimer_set_expires(timer, iowq->timeout);
|
||||
return HRTIMER_RESTART;
|
||||
out_wake:
|
||||
return io_cqring_timer_wakeup(timer);
|
||||
}
|
||||
|
||||
static int io_cqring_schedule_timeout(struct io_wait_queue *iowq,
|
||||
clockid_t clock_id, ktime_t start_time)
|
||||
{
|
||||
ktime_t timeout;
|
||||
|
||||
if (iowq->min_timeout) {
|
||||
timeout = ktime_add_ns(iowq->min_timeout, start_time);
|
||||
hrtimer_setup_on_stack(&iowq->t, io_cqring_min_timer_wakeup, clock_id,
|
||||
HRTIMER_MODE_ABS);
|
||||
} else {
|
||||
timeout = iowq->timeout;
|
||||
hrtimer_setup_on_stack(&iowq->t, io_cqring_timer_wakeup, clock_id,
|
||||
HRTIMER_MODE_ABS);
|
||||
}
|
||||
|
||||
hrtimer_set_expires_range_ns(&iowq->t, timeout, 0);
|
||||
hrtimer_start_expires(&iowq->t, HRTIMER_MODE_ABS);
|
||||
|
||||
if (!READ_ONCE(iowq->hit_timeout))
|
||||
schedule();
|
||||
|
||||
hrtimer_cancel(&iowq->t);
|
||||
destroy_hrtimer_on_stack(&iowq->t);
|
||||
__set_current_state(TASK_RUNNING);
|
||||
|
||||
return READ_ONCE(iowq->hit_timeout) ? -ETIME : 0;
|
||||
}
|
||||
|
||||
static int __io_cqring_wait_schedule(struct io_ring_ctx *ctx,
|
||||
struct io_wait_queue *iowq,
|
||||
struct ext_arg *ext_arg,
|
||||
ktime_t start_time)
|
||||
{
|
||||
int ret = 0;
|
||||
|
||||
/*
|
||||
* Mark us as being in io_wait if we have pending requests, so cpufreq
|
||||
* can take into account that the task is waiting for IO - turns out
|
||||
* to be important for low QD IO.
|
||||
*/
|
||||
if (ext_arg->iowait && current_pending_io())
|
||||
current->in_iowait = 1;
|
||||
if (iowq->timeout != KTIME_MAX || iowq->min_timeout)
|
||||
ret = io_cqring_schedule_timeout(iowq, ctx->clockid, start_time);
|
||||
else
|
||||
schedule();
|
||||
current->in_iowait = 0;
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* If this returns > 0, the caller should retry */
|
||||
static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
|
||||
struct io_wait_queue *iowq,
|
||||
struct ext_arg *ext_arg,
|
||||
ktime_t start_time)
|
||||
{
|
||||
if (unlikely(READ_ONCE(ctx->check_cq)))
|
||||
return 1;
|
||||
if (unlikely(io_local_work_pending(ctx)))
|
||||
return 1;
|
||||
if (unlikely(task_work_pending(current)))
|
||||
return 1;
|
||||
if (unlikely(task_sigpending(current)))
|
||||
return -EINTR;
|
||||
if (unlikely(io_should_wake(iowq)))
|
||||
return 0;
|
||||
|
||||
return __io_cqring_wait_schedule(ctx, iowq, ext_arg, start_time);
|
||||
}
|
||||
|
||||
/*
|
||||
* Wait until events become available, if we don't already have some. The
|
||||
* application must reap them itself, as they reside on the shared cq ring.
|
||||
*/
|
||||
int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags,
|
||||
struct ext_arg *ext_arg)
|
||||
{
|
||||
struct io_wait_queue iowq;
|
||||
struct io_rings *rings = ctx->rings;
|
||||
ktime_t start_time;
|
||||
int ret;
|
||||
|
||||
min_events = min_t(int, min_events, ctx->cq_entries);
|
||||
|
||||
if (!io_allowed_run_tw(ctx))
|
||||
return -EEXIST;
|
||||
if (io_local_work_pending(ctx))
|
||||
io_run_local_work(ctx, min_events,
|
||||
max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
|
||||
io_run_task_work();
|
||||
|
||||
if (unlikely(test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)))
|
||||
io_cqring_do_overflow_flush(ctx);
|
||||
if (__io_cqring_events_user(ctx) >= min_events)
|
||||
return 0;
|
||||
|
||||
init_waitqueue_func_entry(&iowq.wq, io_wake_function);
|
||||
iowq.wq.private = current;
|
||||
INIT_LIST_HEAD(&iowq.wq.entry);
|
||||
iowq.ctx = ctx;
|
||||
iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events;
|
||||
iowq.cq_min_tail = READ_ONCE(ctx->rings->cq.tail);
|
||||
iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
|
||||
iowq.hit_timeout = 0;
|
||||
iowq.min_timeout = ext_arg->min_time;
|
||||
iowq.timeout = KTIME_MAX;
|
||||
start_time = io_get_time(ctx);
|
||||
|
||||
if (ext_arg->ts_set) {
|
||||
iowq.timeout = timespec64_to_ktime(ext_arg->ts);
|
||||
if (!(flags & IORING_ENTER_ABS_TIMER))
|
||||
iowq.timeout = ktime_add(iowq.timeout, start_time);
|
||||
}
|
||||
|
||||
if (ext_arg->sig) {
|
||||
#ifdef CONFIG_COMPAT
|
||||
if (in_compat_syscall())
|
||||
ret = set_compat_user_sigmask((const compat_sigset_t __user *)ext_arg->sig,
|
||||
ext_arg->argsz);
|
||||
else
|
||||
#endif
|
||||
ret = set_user_sigmask(ext_arg->sig, ext_arg->argsz);
|
||||
|
||||
if (ret)
|
||||
return ret;
|
||||
}
|
||||
|
||||
io_napi_busy_loop(ctx, &iowq);
|
||||
|
||||
trace_io_uring_cqring_wait(ctx, min_events);
|
||||
do {
|
||||
unsigned long check_cq;
|
||||
int nr_wait;
|
||||
|
||||
/* if min timeout has been hit, don't reset wait count */
|
||||
if (!iowq.hit_timeout)
|
||||
nr_wait = (int) iowq.cq_tail -
|
||||
READ_ONCE(ctx->rings->cq.tail);
|
||||
else
|
||||
nr_wait = 1;
|
||||
|
||||
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
|
||||
atomic_set(&ctx->cq_wait_nr, nr_wait);
|
||||
set_current_state(TASK_INTERRUPTIBLE);
|
||||
} else {
|
||||
prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
|
||||
TASK_INTERRUPTIBLE);
|
||||
}
|
||||
|
||||
ret = io_cqring_wait_schedule(ctx, &iowq, ext_arg, start_time);
|
||||
__set_current_state(TASK_RUNNING);
|
||||
atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT);
|
||||
|
||||
/*
|
||||
* Run task_work after scheduling and before io_should_wake().
|
||||
* If we got woken because of task_work being processed, run it
|
||||
* now rather than let the caller do another wait loop.
|
||||
*/
|
||||
if (io_local_work_pending(ctx))
|
||||
io_run_local_work(ctx, nr_wait, nr_wait);
|
||||
io_run_task_work();
|
||||
|
||||
/*
|
||||
* Non-local task_work will be run on exit to userspace, but
|
||||
* if we're using DEFER_TASKRUN, then we could have waited
|
||||
* with a timeout for a number of requests. If the timeout
|
||||
* hits, we could have some requests ready to process. Ensure
|
||||
* this break is _after_ we have run task_work, to avoid
|
||||
* deferring running potentially pending requests until the
|
||||
* next time we wait for events.
|
||||
*/
|
||||
if (ret < 0)
|
||||
break;
|
||||
|
||||
check_cq = READ_ONCE(ctx->check_cq);
|
||||
if (unlikely(check_cq)) {
|
||||
/* let the caller flush overflows, retry */
|
||||
if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
|
||||
io_cqring_do_overflow_flush(ctx);
|
||||
if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) {
|
||||
ret = -EBADR;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (io_should_wake(&iowq)) {
|
||||
ret = 0;
|
||||
break;
|
||||
}
|
||||
cond_resched();
|
||||
} while (1);
|
||||
|
||||
if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
|
||||
finish_wait(&ctx->cq_wait, &iowq.wq);
|
||||
restore_saved_sigmask_unless(ret == -EINTR);
|
||||
|
||||
return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
|
||||
}
|
||||
49
io_uring/wait.h
Normal file
49
io_uring/wait.h
Normal file
|
|
@ -0,0 +1,49 @@
|
|||
// SPDX-License-Identifier: GPL-2.0
|
||||
#ifndef IOU_WAIT_H
|
||||
#define IOU_WAIT_H
|
||||
|
||||
#include <linux/io_uring_types.h>
|
||||
|
||||
/*
|
||||
* No waiters. It's larger than any valid value of the tw counter
|
||||
* so that tests against ->cq_wait_nr would fail and skip wake_up().
|
||||
*/
|
||||
#define IO_CQ_WAKE_INIT (-1U)
|
||||
/* Forced wake up if there is a waiter regardless of ->cq_wait_nr */
|
||||
#define IO_CQ_WAKE_FORCE (IO_CQ_WAKE_INIT >> 1)
|
||||
|
||||
struct ext_arg {
|
||||
size_t argsz;
|
||||
struct timespec64 ts;
|
||||
const sigset_t __user *sig;
|
||||
ktime_t min_time;
|
||||
bool ts_set;
|
||||
bool iowait;
|
||||
};
|
||||
|
||||
int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags,
|
||||
struct ext_arg *ext_arg);
|
||||
int io_run_task_work_sig(struct io_ring_ctx *ctx);
|
||||
void io_cqring_do_overflow_flush(struct io_ring_ctx *ctx);
|
||||
|
||||
static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx)
|
||||
{
|
||||
return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head);
|
||||
}
|
||||
|
||||
static inline unsigned int __io_cqring_events_user(struct io_ring_ctx *ctx)
|
||||
{
|
||||
return READ_ONCE(ctx->rings->cq.tail) - READ_ONCE(ctx->rings->cq.head);
|
||||
}
|
||||
|
||||
/*
|
||||
* Reads the tail/head of the CQ ring while providing an acquire ordering,
|
||||
* see comment at top of io_uring.c.
|
||||
*/
|
||||
static inline unsigned io_cqring_events(struct io_ring_ctx *ctx)
|
||||
{
|
||||
smp_rmb();
|
||||
return __io_cqring_events(ctx);
|
||||
}
|
||||
|
||||
#endif
|
||||
Loading…
Add table
Add a link
Reference in a new issue