From c6da324c59e2617613a128e2baa5c82831456d45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=B7=D0=B0=D0=BB=D0=B8=D1=8F=20=D0=A1=D0=BC=D0=B0?= =?UTF-8?q?=D1=80=D0=B0=D0=B3=D0=B4=D0=BE=D0=B2=D0=B0?= Date: Sun, 9 Mar 2025 19:39:20 +0500 Subject: [PATCH] Added methods to specify R/W flags for I/O on ```tokio_uring::fs::File``` --- src/fs/file.rs | 542 ++++++++++++++++++++++++++++++++++++++++-- src/io/mod.rs | 2 +- src/io/read.rs | 3 +- src/io/read_fixed.rs | 2 + src/io/readv.rs | 2 + src/io/socket.rs | 10 +- src/io/write.rs | 3 +- src/io/write_fixed.rs | 2 + src/io/writev.rs | 4 +- src/io/writev_all.rs | 5 +- 10 files changed, 551 insertions(+), 24 deletions(-) diff --git a/src/fs/file.rs b/src/fs/file.rs index 9cd47f21..acce18a3 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -10,6 +10,8 @@ use std::io; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use std::path::Path; +pub type RwFlags = io_uring::types::RwFlags; + /// A reference to an open file on the filesystem. /// /// An instance of a `File` can be read and/or written depending on what options @@ -178,12 +180,121 @@ impl File { /// ``` pub async fn read_at(&self, buf: T, pos: u64) -> crate::BufResult { // Submit the read operation - let op = Op::read_at(&self.fd, buf, pos).unwrap(); + let op = Op::read_at(&self.fd, buf, pos, 0).unwrap(); + op.await + } + + /// Read some bytes at the specified offset from the file into the specified + /// buffer, returning how many bytes were read, and allows the caller to + /// specify input/output operation flags + /// + /// # Return + /// + /// The method returns the operation result and the same buffer value passed + /// as an argument. + /// + /// If the method returns [`Ok(n)`], then the read was successful. A nonzero + /// `n` value indicates that the buffer has been filled with `n` bytes of + /// data from the file. If `n` is `0`, then one of the following happened: + /// + /// 1. The specified offset is the end of the file. + /// 2. The buffer specified was 0 bytes in length. + /// + /// It is not an error if the returned value `n` is smaller than the buffer + /// size, even when the file contains enough data to fill the buffer. + /// + /// # Errors + /// + /// If this function encounters any form of I/O or other error, an error + /// variant will be returned. The buffer is returned on error. + /// + /// # Examples + /// + /// ```no_run + /// use tokio_uring::fs::File; + /// + /// fn main() -> Result<(), Box> { + /// tokio_uring::start(async { + /// let f = File::open("foo.txt").await?; + /// let buffer = vec![0; 10]; + /// + /// // Read up to 10 bytes + /// let (res, buffer) = f.read_at(buffer, 0).await; + /// let n = res?; + /// + /// println!("The bytes: {:?}", &buffer[..n]); + /// + /// // Close the file + /// f.close().await?; + /// Ok(()) + /// }) + /// } + /// ``` + pub async fn read_at_with_flags(&self, buf: T, pos: u64, flags: RwFlags) -> crate::BufResult { + // Submit the read operation + let op = Op::read_at(&self.fd, buf, pos, flags).unwrap(); op.await } /// Read some bytes at the specified offset from the file into the specified - /// array of buffers, returning how many bytes were read. + /// array of buffers, returning how many bytes were read, and allows the caller to + /// specify input/output operation flags. + /// + /// # Return + /// + /// The method returns the operation result and the same array of buffers + /// passed as an argument. + /// + /// If the method returns [`Ok(n)`], then the read was successful. A nonzero + /// `n` value indicates that the buffers have been filled with `n` bytes of + /// data from the file. If `n` is `0`, then one of the following happened: + /// + /// 1. The specified offset is the end of the file. + /// 2. The buffers specified were 0 bytes in length. + /// + /// It is not an error if the returned value `n` is smaller than the buffer + /// size, even when the file contains enough data to fill the buffer. + /// + /// # Errors + /// + /// If this function encounters any form of I/O or other error, an error + /// variant will be returned. The buffer is returned on error. + /// + /// # Examples + /// + /// ```no_run + /// use tokio_uring::fs::File; + /// + /// fn main() -> Result<(), Box> { + /// tokio_uring::start(async { + /// let f = File::open("foo.txt").await?; + /// let buffers = vec![Vec::::with_capacity(10), Vec::::with_capacity(10)]; + /// + /// // Read up to 20 bytes + /// let (res, buffer) = f.readv_at(buffers, 0).await; + /// let n = res?; + /// + /// println!("Read {} bytes", n); + /// + /// // Close the file + /// f.close().await?; + /// Ok(()) + /// }) + /// } + /// ``` + pub async fn readv_at_with_flags( + &self, + bufs: Vec, + pos: u64, + flags: RwFlags, + ) -> crate::BufResult> { + // Submit the read operation + let op = Op::readv_at(&self.fd, bufs, pos, flags).unwrap(); + op.await + } + + /// Read some bytes at the specified offset from the file into the specified + /// array of buffers, returning how many bytes were read /// /// # Return /// @@ -233,7 +344,7 @@ impl File { pos: u64, ) -> crate::BufResult> { // Submit the read operation - let op = Op::readv_at(&self.fd, bufs, pos).unwrap(); + let op = Op::readv_at(&self.fd, bufs, pos, 0).unwrap(); op.await } @@ -289,7 +400,67 @@ impl File { buf: Vec, pos: u64, ) -> crate::BufResult> { - let op = Op::writev_at(&self.fd, buf, pos).unwrap(); + let op = Op::writev_at(&self.fd, buf, pos, 0).unwrap(); + op.await + } + + /// Write data from buffers into this file at the specified offset, + /// returning how many bytes were written. + /// + /// This function will attempt to write the entire contents of `bufs`, but + /// the entire write may not succeed, or the write may also generate an + /// error. The bytes will be written starting at the specified offset. + /// + /// This functions allows the caller to specify I/O operation flags, + /// described in ```man preadv2(2)```. + /// + /// # Return + /// + /// The method returns the operation result and the same array of buffers passed + /// in as an argument. A return value of `0` typically means that the + /// underlying file is no longer able to accept bytes and will likely not be + /// able to in the future as well, or that the buffer provided is empty. + /// + /// # Errors + /// + /// Each call to `write` may generate an I/O error indicating that the + /// operation could not be completed. If an error is returned then no bytes + /// in the buffer were written to this writer. + /// + /// It is **not** considered an error if the entire buffer could not be + /// written to this writer. + /// + /// # Examples + /// + /// ```no_run + /// use tokio_uring::fs::File; + /// + /// fn main() -> Result<(), Box> { + /// tokio_uring::start(async { + /// let file = File::create("foo.txt").await?; + /// + /// // Writes some prefix of the byte string, not necessarily all of it. + /// let bufs = vec!["some".to_owned().into_bytes(), " bytes".to_owned().into_bytes()]; + /// let (res, _) = file.writev_at(bufs, 0).await; + /// let n = res?; + /// + /// println!("wrote {} bytes", n); + /// + /// // Close the file + /// file.close().await?; + /// Ok(()) + /// }) + /// } + /// ``` + /// + /// [`Ok(n)`]: Ok + pub async fn writev_at_with_flags( + &self, + buf: Vec, + pos: u64, + flags: RwFlags, + ) -> crate::BufResult> { + let op = Op::writev_at(&self.fd, buf, pos, flags).unwrap(); op.await } @@ -342,7 +513,61 @@ impl File { buf: Vec, pos: Option, // Use None for files that can't seek ) -> crate::BufResult> { - let op = crate::io::writev_at_all(&self.fd, buf, pos); + let op = crate::io::writev_at_all(&self.fd, buf, pos, 0); + op.await + } + + /// Like `writev_at_with_flags` but will call the `io_uring` `writev` operation multiple times if + /// necessary. + /// + /// Parameter `pos` is an `Option` to allow this function to be used for both files that + /// are seekable and those that are not. The caller is responsible for knowing this. + /// + /// When `None` is supplied, the offset passed to the `io_uring` call will always be zero, even + /// if multiple writev calls are necessary; only the iovec information would be adjusted + /// between calls. A Unix pipe would fall into this category. + /// + /// When `Some(n)` is suppied, the offset passed to the writev call will be incremented by the + /// progress of prior writev calls. A file system's regular file would fall into this category. + /// + /// If the caller passes `Some(n)` for a file that is not seekable, the `io_uring` `writev` + /// operation will return an error once n is not zero. + /// + /// If the caller passes `None`, when the file *is* seekable, when multiple `writev` calls are + /// required to complete the writing of all the bytes, the bytes at position 0 of the file will + /// have been overwritten one or more times with incorrect data. This is true just as if the + /// caller had invoked seperate write calls to a file, all with position 0, when in fact the + /// file was seekable. + /// + /// Performance considerations: + /// + /// The user may want to check that this function is necessary in their use case or performs + /// better than a series of write_all operations would. There is overhead either way and it is + /// not clear which should be faster or give better throughput. + /// + /// This function causes the temporary allocation of a Vec one time to hold the array of iovec + /// that is passed to the kernel. The same array is used for any subsequent calls to get all + /// the bytes written. Whereas individual calls to write_all do not require the Vec to be + /// allocated, they do each incur the normal overhead of setting up the submission and + /// completion structures and going through the future poll mechanism. + /// + /// TODO decide, would a separate `writev_all` function for `file` that did not take a `pos` + /// make things less ambiguous? + /// + /// TODO more complete documentation here. + /// TODO define writev_all functions for net/unix/stream, net/tcp/stream, io/socket. + /// TODO remove usize from result, to be consistent with other write_all_vectored functions. + /// TODO find a way to test this with some stress to the file so the writev calls don't all + /// succeed on their first try. + /// TODO consider replacing the current `write_all` and `write_all_at` functions with a similar + /// mechanism so all the write-all logic is in one place, in the io/write_all.rs file. + pub async fn writev_at_all_with_flags( + &self, + buf: Vec, + pos: Option, // Use None for files that can't seek + flags: RwFlags, + ) -> crate::BufResult> { + let op = crate::io::writev_at_all(&self.fd, buf, pos, flags); op.await } @@ -397,7 +622,62 @@ impl File { T: BoundedBufMut, { let orig_bounds = buf.bounds(); - let (res, buf) = self.read_exact_slice_at(buf.slice_full(), pos).await; + let (res, buf) = self.read_exact_slice_at(buf.slice_full(), pos, 0).await; + (res, T::from_buf_bounds(buf, orig_bounds)) + } + + /// Read the exact number of bytes required to fill `buf` at the specified + /// offset from the file, and allows the caller to specify I/O operation flags. + /// + /// This function reads as many as bytes as necessary to completely fill the + /// specified buffer `buf`. + /// + /// # Return + /// + /// The method returns the operation result and the same buffer value passed + /// as an argument. + /// + /// If the method returns [`Ok(())`], then the read was successful. + /// + /// # Errors + /// + /// If this function encounters an "end of file" before completely filling + /// the buffer, it returns an error of the kind [`ErrorKind::UnexpectedEof`]. + /// The buffer is returned on error. + /// + /// If this function encounters any form of I/O or other error, an error + /// variant will be returned. The buffer is returned on error. + /// + /// # Examples + /// + /// ```no_run + /// use tokio_uring::fs::File; + /// + /// fn main() -> Result<(), Box> { + /// tokio_uring::start(async { + /// let f = File::open("foo.txt").await?; + /// let buffer = Vec::with_capacity(10); + /// + /// // Read up to 10 bytes + /// let (res, buffer) = f.read_exact_at(buffer, 0).await; + /// res?; + /// + /// println!("The bytes: {:?}", buffer); + /// + /// // Close the file + /// f.close().await?; + /// Ok(()) + /// }) + /// } + /// ``` + /// + /// [`ErrorKind::UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof + pub async fn read_exact_at_with_flags(&self, buf: T, pos: u64, flags: RwFlags) -> crate::BufResult<(), T> + where + T: BoundedBufMut, + { + let orig_bounds = buf.bounds(); + let (res, buf) = self.read_exact_slice_at(buf.slice_full(), pos, flags).await; (res, T::from_buf_bounds(buf, orig_bounds)) } @@ -405,6 +685,7 @@ impl File { &self, mut buf: Slice, mut pos: u64, + flags: RwFlags, ) -> crate::BufResult<(), T> { if pos.checked_add(buf.bytes_total() as u64).is_none() { return ( @@ -417,7 +698,7 @@ impl File { } while buf.bytes_total() != 0 { - let (res, slice) = self.read_at(buf, pos).await; + let (res, slice) = self.read_at_with_flags(buf, pos, flags).await; match res { Ok(0) => { return ( @@ -489,7 +770,57 @@ impl File { T: BoundedBufMut, { // Submit the read operation - let op = Op::read_fixed_at(&self.fd, buf, pos).unwrap(); + let op = Op::read_fixed_at(&self.fd, buf, pos, 0).unwrap(); + op.await + } + + /// Like [`read_at`], but using a pre-mapped buffer + /// registered with [`FixedBufRegistry`] and allows the caller + /// to specify I/O operation flags. + /// + /// [`read_at`]: Self::read_at + /// [`FixedBufRegistry`]: crate::buf::fixed::FixedBufRegistry + /// + /// # Errors + /// + /// In addition to errors that can be reported by `read_at`, + /// this operation fails if the buffer is not registered in the + /// current `tokio-uring` runtime. + /// + /// # Examples + /// + /// ```no_run + ///# fn main() -> Result<(), Box> { + /// use tokio_uring::fs::File; + /// use tokio_uring::buf::fixed::FixedBufRegistry; + /// use tokio_uring::buf::BoundedBuf; + /// use std::iter; + /// + /// tokio_uring::start(async { + /// let registry = FixedBufRegistry::new(iter::repeat(vec![0; 10]).take(10)); + /// registry.register()?; + /// + /// let f = File::open("foo.txt").await?; + /// let buffer = registry.check_out(2).unwrap(); + /// + /// // Read up to 10 bytes + /// let (res, buffer) = f.read_fixed_at(buffer, 0).await; + /// let n = res?; + /// + /// println!("The bytes: {:?}", &buffer[..n]); + /// + /// // Close the file + /// f.close().await?; + /// Ok(()) + /// }) + ///# } + /// ``` + pub async fn read_fixed_at_with_flags(&self, buf: T, pos: u64, flags: RwFlags) -> crate::BufResult + where + T: BoundedBufMut, + { + // Submit the read operation + let op = Op::read_fixed_at(&self.fd, buf, pos, flags).unwrap(); op.await } @@ -540,7 +871,58 @@ impl File { /// /// [`Ok(n)`]: Ok pub fn write_at(&self, buf: T, pos: u64) -> UnsubmittedWrite { - UnsubmittedOneshot::write_at(&self.fd, buf, pos) + UnsubmittedOneshot::write_at(&self.fd, buf, pos, 0) + } + + /// Write a buffer into this file at the specified offset, returning how + /// many bytes were written, and allows the caller to specify the I/O + /// operation flags. + /// + /// This function will attempt to write the entire contents of `buf`, but + /// the entire write may not succeed, or the write may also generate an + /// error. The bytes will be written starting at the specified offset. + /// + /// # Return + /// + /// The method returns the operation result and the same buffer value passed + /// in as an argument. A return value of `0` typically means that the + /// underlying file is no longer able to accept bytes and will likely not be + /// able to in the future as well, or that the buffer provided is empty. + /// + /// # Errors + /// + /// Each call to `write` may generate an I/O error indicating that the + /// operation could not be completed. If an error is returned then no bytes + /// in the buffer were written to this writer. + /// + /// It is **not** considered an error if the entire buffer could not be + /// written to this writer. + /// + /// # Examples + /// + /// ```no_run + /// use tokio_uring::fs::File; + /// + /// fn main() -> Result<(), Box> { + /// tokio_uring::start(async { + /// let file = File::create("foo.txt").await?; + /// + /// // Writes some prefix of the byte string, not necessarily all of it. + /// let (res, _) = file.write_at(&b"some bytes"[..], 0).submit().await; + /// let n = res?; + /// + /// println!("wrote {} bytes", n); + /// + /// // Close the file + /// file.close().await?; + /// Ok(()) + /// }) + /// } + /// ``` + /// + /// [`Ok(n)`]: Ok + pub fn write_at_with_flags(&self, buf: T, pos: u64, flags: RwFlags) -> UnsubmittedWrite { + UnsubmittedOneshot::write_at(&self.fd, buf, pos, flags) } /// Attempts to write an entire buffer into this file at the specified offset. @@ -589,7 +971,58 @@ impl File { T: BoundedBuf, { let orig_bounds = buf.bounds(); - let (res, buf) = self.write_all_slice_at(buf.slice_full(), pos).await; + let (res, buf) = self.write_all_slice_at(buf.slice_full(), pos, 0).await; + (res, T::from_buf_bounds(buf, orig_bounds)) + } + + /// Attempts to write an entire buffer into this file at the specified offset, + /// but allows the caller to specify I/O operation flags. + /// + /// This method will continuously call [`write_at`] until there is no more data + /// to be written or an error is returned. + /// This method will not return until the entire buffer has been successfully + /// written or an error occurs. + /// + /// If the buffer contains no data, this will never call [`write_at`]. + /// + /// # Return + /// + /// The method returns the operation result and the same buffer value passed + /// in as an argument. + /// + /// # Errors + /// + /// This function will return the first error that [`write_at`] returns. + /// + /// # Examples + /// + /// ```no_run + /// use tokio_uring::fs::File; + /// + /// fn main() -> Result<(), Box> { + /// tokio_uring::start(async { + /// let file = File::create("foo.txt").await?; + /// + /// // Writes some prefix of the byte string, not necessarily all of it. + /// let (res, _) = file.write_all_at(&b"some bytes"[..], 0).await; + /// res?; + /// + /// println!("wrote all bytes"); + /// + /// // Close the file + /// file.close().await?; + /// Ok(()) + /// }) + /// } + /// ``` + /// + /// [`write_at`]: File::write_at + pub async fn write_all_at_with_flags(&self, buf: T, pos: u64, flags: RwFlags) -> crate::BufResult<(), T> + where + T: BoundedBuf, + { + let orig_bounds = buf.bounds(); + let (res, buf) = self.write_all_slice_at(buf.slice_full(), pos, flags).await; (res, T::from_buf_bounds(buf, orig_bounds)) } @@ -597,6 +1030,7 @@ impl File { &self, mut buf: Slice, mut pos: u64, + flags: RwFlags, ) -> crate::BufResult<(), T> { if pos.checked_add(buf.bytes_init() as u64).is_none() { return ( @@ -609,7 +1043,7 @@ impl File { } while buf.bytes_init() != 0 { - let (res, slice) = self.write_at(buf, pos).submit().await; + let (res, slice) = self.write_at_with_flags(buf, pos, flags).submit().await; match res { Ok(0) => { return ( @@ -681,7 +1115,57 @@ impl File { where T: BoundedBuf, { - let op = Op::write_fixed_at(&self.fd, buf, pos).unwrap(); + let op = Op::write_fixed_at(&self.fd, buf, pos, 0).unwrap(); + op.await + } + + /// Like [`write_at`], but using a pre-mapped buffer + /// registered with [`FixedBufRegistry`] and allows specifying + /// an input-output operation flags (see ```man preadv2(2)```) + /// + /// [`write_at`]: Self::write_at + /// [`FixedBufRegistry`]: crate::buf::fixed::FixedBufRegistry + /// + /// # Errors + /// + /// In addition to errors that can be reported by `write_at`, + /// this operation fails if the buffer is not registered in the + /// current `tokio-uring` runtime. + /// + /// # Examples + /// + /// ```no_run + ///# fn main() -> Result<(), Box> { + /// use tokio_uring::fs::File; + /// use tokio_uring::buf::fixed::FixedBufRegistry; + /// use tokio_uring::buf::BoundedBuf; + /// + /// tokio_uring::start(async { + /// let registry = FixedBufRegistry::new([b"some bytes".to_vec()]); + /// registry.register()?; + /// + /// let file = File::create("foo.txt").await?; + /// + /// let buffer = registry.check_out(0).unwrap(); + /// + /// // Writes some prefix of the buffer content, + /// // not necessarily all of it. + /// let (res, _) = file.write_fixed_at(buffer, 0).await; + /// let n = res?; + /// + /// println!("wrote {} bytes", n); + /// + /// // Close the file + /// file.close().await?; + /// Ok(()) + /// }) + ///# } + /// ``` + pub async fn write_fixed_at_with_flags(&self, buf: T, pos: u64, flags: RwFlags) -> crate::BufResult + where + T: BoundedBuf, + { + let op = Op::write_fixed_at(&self.fd, buf, pos, flags).unwrap(); op.await } @@ -709,7 +1193,36 @@ impl File { T: BoundedBuf, { let orig_bounds = buf.bounds(); - let (res, buf) = self.write_fixed_all_at_slice(buf.slice_full(), pos).await; + let (res, buf) = self.write_fixed_all_at_slice(buf.slice_full(), pos, 0).await; + (res, T::from_buf_bounds(buf, orig_bounds)) + } + + /// Attempts to write an entire buffer into this file at the specified offset. + /// Also, allows specifying input/output operation flags. + /// + /// This method will continuously call [`write_fixed_at`] until there is no more data + /// to be written or an error is returned. + /// This method will not return until the entire buffer has been successfully + /// written or an error occurs. + /// + /// If the buffer contains no data, this will never call [`write_fixed_at`]. + /// + /// # Return + /// + /// The method returns the operation result and the same buffer value passed + /// in as an argument. + /// + /// # Errors + /// + /// This function will return the first error that [`write_fixed_at`] returns. + /// + /// [`write_fixed_at`]: Self::write_fixed_at + pub async fn write_fixed_all_at_with_flags(&self, buf: T, pos: u64, flags: RwFlags) -> crate::BufResult<(), T> + where + T: BoundedBuf, + { + let orig_bounds = buf.bounds(); + let (res, buf) = self.write_fixed_all_at_slice(buf.slice_full(), pos, flags).await; (res, T::from_buf_bounds(buf, orig_bounds)) } @@ -717,6 +1230,7 @@ impl File { &self, mut buf: Slice, mut pos: u64, + flags: RwFlags ) -> crate::BufResult<(), FixedBuf> { if pos.checked_add(buf.bytes_init() as u64).is_none() { return ( @@ -729,7 +1243,7 @@ impl File { } while buf.bytes_init() != 0 { - let (res, slice) = self.write_fixed_at(buf, pos).await; + let (res, slice) = self.write_fixed_at_with_flags(buf, pos, flags).await; match res { Ok(0) => { return ( diff --git a/src/io/mod.rs b/src/io/mod.rs index 1afcef22..dc859282 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -57,4 +57,4 @@ mod write_fixed; mod writev; mod writev_all; -pub(crate) use writev_all::writev_at_all; +pub(crate) use writev_all::writev_at_all; \ No newline at end of file diff --git a/src/io/read.rs b/src/io/read.rs index c3395b40..4e548b31 100644 --- a/src/io/read.rs +++ b/src/io/read.rs @@ -17,7 +17,7 @@ pub(crate) struct Read { } impl Op> { - pub(crate) fn read_at(fd: &SharedFd, buf: T, offset: u64) -> io::Result>> { + pub(crate) fn read_at(fd: &SharedFd, buf: T, offset: u64, flags: io_uring::types::RwFlags) -> io::Result>> { use io_uring::{opcode, types}; CONTEXT.with(|x| { @@ -32,6 +32,7 @@ impl Op> { let len = read.buf.bytes_total(); opcode::Read::new(types::Fd(fd.raw_fd()), ptr, len as _) .offset(offset as _) + .rw_flags(flags) .build() }, ) diff --git a/src/io/read_fixed.rs b/src/io/read_fixed.rs index 3cb96cdb..e1620f16 100644 --- a/src/io/read_fixed.rs +++ b/src/io/read_fixed.rs @@ -25,6 +25,7 @@ where fd: &SharedFd, buf: T, offset: u64, + flags: io_uring::types::RwFlags ) -> io::Result>> { use io_uring::{opcode, types}; @@ -41,6 +42,7 @@ where let buf_index = read_fixed.buf.get_buf().buf_index(); opcode::ReadFixed::new(types::Fd(fd.raw_fd()), ptr, len as _, buf_index) .offset(offset as _) + .rw_flags(flags) .build() }, ) diff --git a/src/io/readv.rs b/src/io/readv.rs index ff71dc79..7aaeb92f 100644 --- a/src/io/readv.rs +++ b/src/io/readv.rs @@ -24,6 +24,7 @@ impl Op> { fd: &SharedFd, mut bufs: Vec, offset: u64, + flags: io_uring::types::RwFlags, ) -> io::Result>> { use io_uring::{opcode, types}; @@ -51,6 +52,7 @@ impl Op> { read.iovs.len() as u32, ) .offset(offset as _) + .rw_flags(flags) .build() }, ) diff --git a/src/io/socket.rs b/src/io/socket.rs index dda1bb36..557e9d47 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -44,7 +44,7 @@ impl Socket { } pub(crate) fn write(&self, buf: T) -> UnsubmittedWrite { - UnsubmittedOneshot::write_at(&self.fd, buf, 0) + UnsubmittedOneshot::write_at(&self.fd, buf, 0, 0) } pub async fn write_all(&self, buf: T) -> crate::BufResult<(), T> { @@ -85,7 +85,7 @@ impl Socket { where T: BoundedBuf, { - let op = Op::write_fixed_at(&self.fd, buf, 0).unwrap(); + let op = Op::write_fixed_at(&self.fd, buf, 0, 0).unwrap(); op.await } @@ -130,7 +130,7 @@ impl Socket { } pub async fn writev(&self, buf: Vec) -> crate::BufResult> { - let op = Op::writev_at(&self.fd, buf, 0).unwrap(); + let op = Op::writev_at(&self.fd, buf, 0, 0).unwrap(); op.await } @@ -169,7 +169,7 @@ impl Socket { } pub(crate) async fn read(&self, buf: T) -> crate::BufResult { - let op = Op::read_at(&self.fd, buf, 0).unwrap(); + let op = Op::read_at(&self.fd, buf, 0, 0).unwrap(); op.await } @@ -177,7 +177,7 @@ impl Socket { where T: BoundedBufMut, { - let op = Op::read_fixed_at(&self.fd, buf, 0).unwrap(); + let op = Op::read_fixed_at(&self.fd, buf, 0, 0).unwrap(); op.await } diff --git a/src/io/write.rs b/src/io/write.rs index 6c607f75..324ca4b5 100644 --- a/src/io/write.rs +++ b/src/io/write.rs @@ -36,7 +36,7 @@ impl OneshotOutputTransform for WriteTransform { } impl UnsubmittedWrite { - pub(crate) fn write_at(fd: &SharedFd, buf: T, offset: u64) -> Self { + pub(crate) fn write_at(fd: &SharedFd, buf: T, offset: u64, flags: io_uring::types::RwFlags) -> Self { use io_uring::{opcode, types}; // Get raw buffer info @@ -53,6 +53,7 @@ impl UnsubmittedWrite { }, opcode::Write::new(types::Fd(fd.raw_fd()), ptr, len as _) .offset(offset as _) + .rw_flags(flags) .build(), ) } diff --git a/src/io/write_fixed.rs b/src/io/write_fixed.rs index 1d2c3e38..4dcf0c8c 100644 --- a/src/io/write_fixed.rs +++ b/src/io/write_fixed.rs @@ -24,6 +24,7 @@ where fd: &SharedFd, buf: T, offset: u64, + flags: io_uring::types::RwFlags, ) -> io::Result>> { use io_uring::{opcode, types}; @@ -40,6 +41,7 @@ where let buf_index = write_fixed.buf.get_buf().buf_index(); opcode::WriteFixed::new(types::Fd(fd.raw_fd()), ptr, len as _, buf_index) .offset(offset as _) + .rw_flags(flags) .build() }, ) diff --git a/src/io/writev.rs b/src/io/writev.rs index 86236ebc..12af8a32 100644 --- a/src/io/writev.rs +++ b/src/io/writev.rs @@ -16,11 +16,12 @@ pub(crate) struct Writev { iovs: Vec, } -impl Op> { +impl Op> { pub(crate) fn writev_at( fd: &SharedFd, mut bufs: Vec, offset: u64, + flags: io_uring::types::RwFlags, ) -> io::Result>> { use io_uring::{opcode, types}; @@ -47,6 +48,7 @@ impl Op> { write.iovs.len() as u32, ) .offset(offset as _) + .rw_flags(flags) .build() }, ) diff --git a/src/io/writev_all.rs b/src/io/writev_all.rs index ef5b9d40..8f571f3d 100644 --- a/src/io/writev_all.rs +++ b/src/io/writev_all.rs @@ -17,6 +17,7 @@ pub(crate) async fn writev_at_all( fd: &SharedFd, mut bufs: Vec, offset: Option, + flags: io_uring::types::RwFlags, ) -> crate::BufResult> { // TODO decide if the function should return immediately if all the buffer lengths // were to sum to zero. That would save an allocation and one call into writev. @@ -50,7 +51,7 @@ pub(crate) async fn writev_at_all( }; // Call the Op that is internal to this module. - let op = Op::writev_at_all2(fd, bufs, iovs, iovs_ptr, iovs_len, o).unwrap(); + let op = Op::writev_at_all2(fd, bufs, iovs, iovs_ptr, iovs_len, o, flags).unwrap(); let res; (res, fd, bufs, iovs) = op.await; @@ -125,6 +126,7 @@ impl Op> { iovs_ptr: *const iovec, iovs_len: u32, offset: u64, + flags: io_uring::types::RwFlags, ) -> io::Result>> { use io_uring::{opcode, types}; @@ -135,6 +137,7 @@ impl Op> { |write| { opcode::Writev::new(types::Fd(write.fd.raw_fd()), iovs_ptr, iovs_len) .offset(offset as _) + .rw_flags(flags) .build() }, )