Skip to main content

tokio/io/bsd/
poll_aio.rs

1//! Use POSIX AIO futures with Tokio.
2
3use crate::io::interest::Interest;
4use crate::runtime::io::{ReadyEvent, Registration};
5use crate::runtime::scheduler;
6use mio::event::Source;
7use mio::Registry;
8use mio::Token;
9use std::fmt;
10use std::io;
11use std::ops::{Deref, DerefMut};
12use std::os::fd::{AsFd, BorrowedFd};
13use std::os::unix::io::{AsRawFd, RawFd};
14use std::task::{ready, Context, Poll};
15
16/// Like [`mio::event::Source`], but for POSIX AIO only.
17///
18/// Tokio's consumer must pass an implementor of this trait to create a
19/// [`Aio`] object.  Implementors must implement at least one of [`AioSource::register`] and
20/// [`AioSource::register_borrowed`].
21pub trait AioSource {
22    /// Registers this AIO event source with Tokio's reactor.
23    ///
24    /// # Safety
25    ///
26    /// It's memory-safe, but not I/O safe.  If the file referenced by `kq` gets dropped, then this
27    /// source may end up notifying the wrong file.
28    #[deprecated(since = "1.52.0", note = "use register_borrowed instead")]
29    fn register(&mut self, _kq: RawFd, _token: usize) {
30        // This default implementation exists so new AioSource implementors that implement the
31        // register_borrowed method can compile without the need to implement register.
32        unimplemented!("Use AioSource::register_borrowed instead")
33    }
34
35    /// Registers this AIO event source with Tokio's reactor.
36    fn register_borrowed(&mut self, kq: BorrowedFd<'_>, token: usize) {
37        // This default implementation serves to provide backwards compatibility with AioSource
38        // implementors written before 1.52.0 that only implemented the unsafe `register` method.
39        #[allow(deprecated)]
40        self.register(kq.as_raw_fd(), token)
41    }
42
43    /// Deregisters this AIO event source with Tokio's reactor.
44    fn deregister(&mut self);
45}
46
47/// Wraps the user's AioSource in order to implement mio::event::Source, which
48/// is what the rest of the crate wants.
49struct MioSource<T>(T);
50
51impl<T: AioSource> Source for MioSource<T> {
52    fn register(
53        &mut self,
54        registry: &Registry,
55        token: Token,
56        interests: mio::Interest,
57    ) -> io::Result<()> {
58        assert!(interests.is_aio() || interests.is_lio());
59        self.0
60            .register_borrowed(registry.as_fd(), usize::from(token));
61        Ok(())
62    }
63
64    fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
65        self.0.deregister();
66        Ok(())
67    }
68
69    fn reregister(
70        &mut self,
71        registry: &Registry,
72        token: Token,
73        interests: mio::Interest,
74    ) -> io::Result<()> {
75        assert!(interests.is_aio() || interests.is_lio());
76        self.0
77            .register_borrowed(registry.as_fd(), usize::from(token));
78        Ok(())
79    }
80}
81
82/// Associates a POSIX AIO control block with the reactor that drives it.
83///
84/// `Aio`'s wrapped type must implement [`AioSource`] to be driven
85/// by the reactor.
86///
87/// The wrapped source may be accessed through the `Aio` via the `Deref` and
88/// `DerefMut` traits.
89///
90/// ## Clearing readiness
91///
92/// If [`Aio::poll_ready`] returns ready, but the consumer determines that the
93/// Source is not completely ready and must return to the Pending state,
94/// [`Aio::clear_ready`] may be used.  This can be useful with
95/// [`lio_listio`], which may generate a kevent when only a portion of the
96/// operations have completed.
97///
98/// ## Platforms
99///
100/// Only FreeBSD implements POSIX AIO with kqueue notification, so
101/// `Aio` is only available for that operating system.
102///
103/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
104// Note: Unlike every other kqueue event source, POSIX AIO registers events not
105// via kevent(2) but when the aiocb is submitted to the kernel via aio_read,
106// aio_write, etc.  It needs the kqueue's file descriptor to do that.  So
107// AsyncFd can't be used for POSIX AIO.
108//
109// Note that Aio doesn't implement Drop.  There's no need.  Unlike other
110// kqueue sources, simply dropping the object effectively deregisters it.
111pub struct Aio<E> {
112    io: MioSource<E>,
113    registration: Registration,
114}
115
116// ===== impl Aio =====
117
118impl<E: AioSource> Aio<E> {
119    /// Creates a new `Aio` suitable for use with POSIX AIO functions.
120    ///
121    /// It will be associated with the default reactor.  The runtime is usually
122    /// set implicitly when this function is called from a future driven by a
123    /// Tokio runtime, otherwise runtime can be set explicitly with
124    /// [`Runtime::enter`](crate::runtime::Runtime::enter) function.
125    pub fn new_for_aio(io: E) -> io::Result<Self> {
126        Self::new_with_interest(io, Interest::AIO)
127    }
128
129    /// Creates a new `Aio` suitable for use with [`lio_listio`].
130    ///
131    /// It will be associated with the default reactor.  The runtime is usually
132    /// set implicitly when this function is called from a future driven by a
133    /// Tokio runtime, otherwise runtime can be set explicitly with
134    /// [`Runtime::enter`](crate::runtime::Runtime::enter) function.
135    ///
136    /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
137    pub fn new_for_lio(io: E) -> io::Result<Self> {
138        Self::new_with_interest(io, Interest::LIO)
139    }
140
141    fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
142        let mut io = MioSource(io);
143        let handle = scheduler::Handle::current();
144        let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
145        Ok(Self { io, registration })
146    }
147
148    /// Indicates to Tokio that the source is no longer ready.  The internal
149    /// readiness flag will be cleared, and tokio will wait for the next
150    /// edge-triggered readiness notification from the OS.
151    ///
152    /// It is critical that this method not be called unless your code
153    /// _actually observes_ that the source is _not_ ready.  The OS must
154    /// deliver a subsequent notification, or this source will block
155    /// forever.  It is equally critical that you `do` call this method if you
156    /// resubmit the same structure to the kernel and poll it again.
157    ///
158    /// This method is not very useful with AIO readiness, since each `aiocb`
159    /// structure is typically only used once.  It's main use with
160    /// [`lio_listio`], which will sometimes send notification when only a
161    /// portion of its elements are complete.  In that case, the caller must
162    /// call `clear_ready` before resubmitting it.
163    ///
164    /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
165    pub fn clear_ready(&self, ev: AioEvent) {
166        self.registration.clear_readiness(ev.0)
167    }
168
169    /// Destroy the [`Aio`] and return its inner source.
170    pub fn into_inner(self) -> E {
171        self.io.0
172    }
173
174    /// Polls for readiness.  Either AIO or LIO counts.
175    ///
176    /// This method returns:
177    ///  * `Poll::Pending` if the underlying operation is not complete, whether
178    ///     or not it completed successfully.  This will be true if the OS is
179    ///     still processing it, or if it has not yet been submitted to the OS.
180    ///  * `Poll::Ready(Ok(_))` if the underlying operation is complete.
181    ///  * `Poll::Ready(Err(_))` if the reactor has been shutdown.  This does
182    ///     _not_ indicate that the underlying operation encountered an error.
183    ///
184    /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context`
185    /// is scheduled to receive a wakeup when the underlying operation
186    /// completes. Note that on multiple calls to `poll_ready`, only the `Waker` from the
187    /// `Context` passed to the most recent call is scheduled to receive a wakeup.
188    pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<AioEvent>> {
189        let ev = ready!(self.registration.poll_read_ready(cx))?;
190        Poll::Ready(Ok(AioEvent(ev)))
191    }
192}
193
194impl<E: AioSource> Deref for Aio<E> {
195    type Target = E;
196
197    fn deref(&self) -> &E {
198        &self.io.0
199    }
200}
201
202impl<E: AioSource> DerefMut for Aio<E> {
203    fn deref_mut(&mut self) -> &mut E {
204        &mut self.io.0
205    }
206}
207
208impl<E: AioSource + fmt::Debug> fmt::Debug for Aio<E> {
209    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210        f.debug_struct("Aio").field("io", &self.io.0).finish()
211    }
212}
213
214/// Opaque data returned by [`Aio::poll_ready`].
215///
216/// It can be fed back to [`Aio::clear_ready`].
217#[derive(Debug)]
218pub struct AioEvent(ReadyEvent);