diff --git a/NNNN-swift-system-io-uring.md b/NNNN-swift-system-io-uring.md new file mode 100644 index 00000000..9b0eb1dc --- /dev/null +++ b/NNNN-swift-system-io-uring.md @@ -0,0 +1,434 @@ +# IORing, a Swift System API for io_uring + +* Proposal: [SE-NNNN](NNNN-filename.md) +* Authors: [Lucy Satheesan](https://github.com/oxy), [David Smith](https://github.com/Catfish-Man/) +* Review Manager: TBD +* Status: **Awaiting implementation** +* Implementation: [apple/swift-system#208](https://github.com/apple/swift-system/pull/208) + +## Introduction + +`io_uring` is Linux's solution to asynchronous and batched syscalls, with a particular focus on IO. We propose a low-level Swift API for it in Swift System that could either be used directly by projects with unusual needs, or via intermediaries like Swift NIO, to address scalability and thread pool starvation issues. + +## Motivation + +Up until recently, the overwhelmingly dominant file IO syscalls on major Unix platforms have been synchronous, e.g. `read(2)`. This design is very simple and proved sufficient for many uses for decades, but is less than ideal for Swift's needs in a few major ways: + +1. Requiring an entire OS thread for each concurrent operation imposes significant memory overhead +2. Requiring a separate syscall for each operation imposes significant CPU/time overhead to switch into and out of kernel mode repeatedly. This has been exacerbated in recent years by mitigations for the Spectre family of security exploits increasing the cost of syscalls. +3. Swift's N:M coroutine-on-thread-pool concurrency model assumes that threads will not be blocked. Each thread waiting for a syscall means a CPU core being left idle. In practice systems like NIO that deal in highly concurrent IO have had to work around this by providing their own thread pools. + +Non-file IO (network, pipes, etc…) has been in a somewhat better place with `epoll` and `kqueue` for asynchronously waiting for readability, but syscall overhead remains a significant issue for highly scalable systems. + +With the introduction of `io_uring` in 2019, Linux now has the kernel level tools to address these three problems directly. However, `io_uring` is quite complex and maps poorly into Swift. We expect that by providing a Swift interface to it, we can enable Swift on Linux servers to scale better and be more efficient than it has been in the past. + +## Proposed solution + +`struct IORing: ~Copyable` provides facilities for + +* Registering and unregistering resources (files and buffers), an `io_uring` specific variation on Unix file descriptors that improves their efficiency +* Registering and unregistering eventfds, which allow asynchronous waiting for completions +* Enqueueing IO requests +* Dequeueing IO completions + +`class IOResource` represents, via its two typealiases `IORingFileSlot` and `IORingBuffer`, registered file descriptors and buffers. Ideally we'd express the lifetimes of these as being dependent on the lifetime of the ring, but so far that's proven intractable, so we use a reference type. We expect that the up-front overhead of this should be negligible for larger operations, and smaller or one-shot operations can use non-registered buffers and file descriptors. + +`struct IORequest: ~Copyable` represents an IO operation that can be enqueued for the kernel to execute. It supports a wide variety of operations matching traditional unix file and socket operations. + +IORequest operations are expressed as overloaded static methods on `IORequest`, e.g. `openat` is spelled + +```swift + public static func opening( + _ path: FilePath, + in directory: FileDescriptor, + into slot: IORingFileSlot, + mode: FileDescriptor.AccessMode, + options: FileDescriptor.OpenOptions = FileDescriptor.OpenOptions(), + permissions: FilePermissions? = nil, + context: UInt64 = 0 + ) -> IORequest + + public static func opening( + _ path: FilePath, + in directory: FileDescriptor, + mode: FileDescriptor.AccessMode, + options: FileDescriptor.OpenOptions = FileDescriptor.OpenOptions(), + permissions: FilePermissions? = nil, + context: UInt64 = 0 + ) -> IORequest +``` + +which allows clients to decide whether they want to open the file into a slot on the ring, or have it return a file descriptor via a completion. Similarly, read operations have overloads for "use a buffer from the ring" or "read into this `UnsafeMutableBufferPointer`" + +Multiple `IORequests` can be enqueued on a single `IORing` using the `prepare(…)` family of methods, and then submitted together using `submitPreparedRequests`, allowing for things like "open this file, read its contents, and then close it" to be a single syscall. Conveniences are provided for preparing and submitting requests in one call. + +Since IO operations can execute in parallel or out of order by default, linked chains of operations can be established with `prepare(linkedRequests:…)` and related methods. Separate chains can still execute in parallel, and if an operation early in the chain fails, all subsequent operations will deliver cancellation errors as their completion. + +Already-completed results can be retrieved from the ring using `tryConsumeCompletion`, which never waits but may return nil, or `blockingConsumeCompletion(timeout:)`, which synchronously waits (up to an optional timeout) until an operation completes. There's also a bulk version of `blockingConsumeCompletion`, which may reduce the number of syscalls issued. It takes a closure which will be called repeatedly as completions are available (see Future Directions for potential improvements to this API). + +Since neither polling nor synchronously waiting is optimal in many cases, `IORing` also exposes the ability to register an eventfd (see `man eventfd(2)`), which will become readable when completions are available on the ring. This can then be monitored asynchronously with `epoll`, `kqueue`, or for clients who are linking libdispatch, `DispatchSource`. + +`struct IOCompletion: ~Copyable` represents the result of an IO operation and provides + +* Flags indicating various operation-specific metadata about the now-completed syscall +* The context associated with the operation when it was enqueued, as an `UnsafeRawPointer` or a `UInt64` +* The result of the operation, as an `Int32` with operation-specific meaning +* The error, if one occurred + +Unfortunately the underlying kernel API makes it relatively difficult to determine which `IORequest` led to a given `IOCompletion`, so it's expected that users will need to create this association themselves via the context parameter. + +`IORingError` represents failure of an operation. + +`IORing.Features` describes the supported features of the underlying kernel `IORing` implementation, which can be used to provide graceful reduction in functionality when running on older systems. + +## Detailed design + +```swift +public class IOResource { } +public typealias IORingFileSlot = IOResource +public typealias IORingBuffer = IOResource + +extension IORingBuffer { + public var unsafeBuffer: UnsafeMutableRawBufferPointer +} + +// IORing is intentionally not Sendable, to avoid internal locking overhead +public struct IORing: ~Copyable { + + public init(queueDepth: UInt32) throws(IORingError) + + public mutating func registerEventFD(_ descriptor: FileDescriptor) throws(IORingError) + public mutating func unregisterEventFD(_ descriptor: FileDescriptor) throws(IORingError) + + // An IORing.RegisteredResources is a view into the buffers or files registered with the ring, if any + public struct RegisteredResources: RandomAccessCollection { + public subscript(position: Int) -> IOResource + public subscript(position: UInt16) -> IOResource // This is useful because io_uring likes to use UInt16s as indexes + } + + public mutating func registerFileSlots(count: Int) throws(IORingError) -> RegisteredResources + + public func unregisterFiles() + + public var registeredFileSlots: RegisteredResources + + public mutating func registerBuffers( + _ buffers: some Collection + ) throws(IORingError) -> RegisteredResources + + public mutating func registerBuffers( + _ buffers: UnsafeMutableRawBufferPointer... + ) throws(IORingError) -> RegisteredResources + + public func unregisterBuffers() + + public var registeredBuffers: RegisteredResources + + public func prepare(requests: IORequest...) + public func prepare(linkedRequests: IORequest...) + + public func submitPreparedRequests(timeout: Duration? = nil) throws(IORingError) + public func submit(requests: IORequest..., timeout: Duration? = nil) throws(IORingError) + public func submit(linkedRequests: IORequest..., timeout: Duration? = nil) throws(IORingError) + + public func submitPreparedRequests() throws(IORingError) + public func submitPreparedRequestsAndWait(timeout: Duration? = nil) throws(IORingError) + + public func submitPreparedRequestsAndConsumeCompletions( + minimumCount: UInt32 = 1, + timeout: Duration? = nil, + consumer: (consuming IOCompletion?, IORingError?, Bool) throws(E) -> Void + ) throws(E) + + public func blockingConsumeCompletion( + timeout: Duration? = nil + ) throws(IORingError) -> IOCompletion + + public func blockingConsumeCompletions( + minimumCount: UInt32 = 1, + timeout: Duration? = nil, + consumer: (consuming IOCompletion?, IORingError?, Bool) throws(E) -> Void + ) throws(E) + + public func tryConsumeCompletion() -> IOCompletion? + + public struct Features { + //IORING_FEAT_SINGLE_MMAP is handled internally + public var nonDroppingCompletions: Bool //IORING_FEAT_NODROP + public var stableSubmissions: Bool //IORING_FEAT_SUBMIT_STABLE + public var currentFilePosition: Bool //IORING_FEAT_RW_CUR_POS + public var assumingTaskCredentials: Bool //IORING_FEAT_CUR_PERSONALITY + public var fastPolling: Bool //IORING_FEAT_FAST_POLL + public var epoll32BitFlags: Bool //IORING_FEAT_POLL_32BITS + public var pollNonFixedFiles: Bool //IORING_FEAT_SQPOLL_NONFIXED + public var extendedArguments: Bool //IORING_FEAT_EXT_ARG + public var nativeWorkers: Bool //IORING_FEAT_NATIVE_WORKERS + public var resourceTags: Bool //IORING_FEAT_RSRC_TAGS + public var allowsSkippingSuccessfulCompletions: Bool //IORING_FEAT_CQE_SKIP + public var improvedLinkedFiles: Bool //IORING_FEAT_LINKED_FILE + public var registerRegisteredRings: Bool //IORING_FEAT_REG_REG_RING + public var minimumTimeout: Bool //IORING_FEAT_MIN_TIMEOUT + public var bundledSendReceive: Bool //IORING_FEAT_RECVSEND_BUNDLE + } + public static var supportedFeatures: Features +} + +public struct IORequest: ~Copyable { + public static func nop(context: UInt64 = 0) -> IORequest + + // overloads for each combination of registered vs unregistered buffer/descriptor + // Read + public static func reading( + _ file: IORingFileSlot, + into buffer: IORingBuffer, + at offset: UInt64 = 0, + context: UInt64 = 0 + ) -> IORequest + + public static func reading( + _ file: FileDescriptor, + into buffer: IORingBuffer, + at offset: UInt64 = 0, + context: UInt64 = 0 + ) -> IORequest + + public static func reading( + _ file: IORingFileSlot, + into buffer: UnsafeMutableRawBufferPointer, + at offset: UInt64 = 0, + context: UInt64 = 0 + ) -> IORequest + + public static func reading( + _ file: FileDescriptor, + into buffer: UnsafeMutableRawBufferPointer, + at offset: UInt64 = 0, + context: UInt64 = 0 + ) -> IORequest + + // Write + public static func writing( + _ buffer: IORingBuffer, + into file: IORingFileSlot, + at offset: UInt64 = 0, + context: UInt64 = 0 + ) -> IORequest + + public static func writing( + _ buffer: IORingBuffer, + into file: FileDescriptor, + at offset: UInt64 = 0, + context: UInt64 = 0 + ) -> IORequest + + public static func writing( + _ buffer: UnsafeMutableRawBufferPointer, + into file: IORingFileSlot, + at offset: UInt64 = 0, + context: UInt64 = 0 + ) -> IORequest + + public static func writing( + _ buffer: UnsafeMutableRawBufferPointer, + into file: FileDescriptor, + at offset: UInt64 = 0, + context: UInt64 = 0 + ) -> IORequest + + // Close + public static func closing( + _ file: FileDescriptor, + context: UInt64 = 0 + ) -> IORequest + + public static func closing( + _ file: IORingFileSlot, + context: UInt64 = 0 + ) -> IORequest + + // Open At + public static func opening( + _ path: FilePath, + in directory: FileDescriptor, + into slot: IORingFileSlot, + mode: FileDescriptor.AccessMode, + options: FileDescriptor.OpenOptions = FileDescriptor.OpenOptions(), + permissions: FilePermissions? = nil, + context: UInt64 = 0 + ) -> IORequest + + public static func opening( + _ path: FilePath, + in directory: FileDescriptor, + mode: FileDescriptor.AccessMode, + options: FileDescriptor.OpenOptions = FileDescriptor.OpenOptions(), + permissions: FilePermissions? = nil, + context: UInt64 = 0 + ) -> IORequest + + public static func unlinking( + _ path: FilePath, + in directory: FileDescriptor, + context: UInt64 = 0 + ) -> IORequest + + // Other operations follow in the same pattern +} + +public struct IOCompletion { + + public struct Flags: OptionSet, Hashable, Codable { + public let rawValue: UInt32 + + public init(rawValue: UInt32) + + public static let moreCompletions: Flags + public static let socketNotEmpty: Flags + public static let isNotificationEvent: Flags + } + + //These are both the same value, but having both eliminates some ugly casts in client code + public var context: UInt64 + public var contextPointer: UnsafeRawPointer + + public var result: Int32 + + public var error: IORingError? // Convenience wrapper over `result` + + public var flags: Flags +} + +public struct IORingError: Error, Equatable { + static var missingRequiredFeatures: IORingError + static var operationCanceled: IORingError + static var timedOut: IORingError + static var resourceRegistrationFailed: IORingError + // Other error values to be filled out as the set of supported operations expands in the future + static var unknown: IORingError(errorCode: Int) +} + +``` + +## Usage Examples + +### Blocking + +```swift +let ring = try IORing(queueDepth: 2) + +//Make space on the ring for our file (this is optional, but improves performance with repeated use) +let file = ring.registerFiles(count: 1)[0] + +var statInfo = Glibc.stat() // System doesn't have an abstraction for stat() right now +// Build our requests to open the file and find out how big it is +ring.prepare(linkedRequests: + .opening(path, + in: parentDirectory, + into: file, + mode: mode, + options: openOptions, + permissions: nil + ), + .readingMetadataOf(file, + into: &statInfo + ) +) +//batch submit 2 syscalls in 1! +try ring.submitPreparedRequestsAndConsumeCompletions(minimumCount: 2) { (completion: consuming IOCompletion?, error, done) in + if let error { + throw error //or other error handling as desired + } +} + +// We could register our buffer with the ring too, but we're only using it once +let buffer = UnsafeMutableRawBufferPointer.allocate(Int(statInfo.st_size)) + +// Build our requests to read the file and close it +ring.prepare(linkedRequests: + .reading(file, + into: buffer + ), + .closing(file) +) + +//batch submit 2 syscalls in 1! +try ring.submitPreparedRequestsAndConsumeCompletions(minimumCount: 2) { (completion: consuming IOCompletion?, error, done) in + if let error { + throw error //or other error handling as desired + } +} + +processBuffer(buffer) +``` + +### Using libdispatch to wait for the read asynchronously + +```swift +//Initial setup as above up through creating buffer, omitted for brevity + +//Make the read request with a context so we can get the buffer out of it in the completion handler +… +.reading(file, into: buffer, context: UInt64(buffer.baseAddress!)) +… + +// Make an eventfd and register it with the ring +let eventfd = eventfd(0, 0) +ring.registerEventFD(eventfd) + +// Make a read source to monitor the eventfd for readability +let readabilityMonitor = DispatchSource.makeReadSource(fileDescriptor: eventfd) +readabilityMonitor.setEventHandler { + let completion = ring.blockingConsumeCompletion() + if let error = completion.error { + //handle failure to read the file + } + processBuffer(completion.contextPointer) +} +readabilityMonitor.activate() + +ring.submitPreparedRequests //note, not "AndConsumeCompletions" this time +``` + +## Source compatibility + +This is an all-new API in Swift System, so has no backwards compatibility implications. Of note, though, this API is only available on Linux. + +## ABI compatibility + +Swift on Linux does not have a stable ABI, and we will likely take advantage of this to evolve IORing as compiler support improves, as described in Future Directions. + +## Implications on adoption + +This feature is intrinsically linked to Linux kernel support, so constrains the deployment target of anything that adopts it to newer kernels. Exactly which features of the evolving io_uring syscall surface area we need is under consideration. + +## Future directions + +* While most Swift users on Darwin are not limited by IO scalability issues, the thread pool considerations still make introducing something similar to this appealing if and when the relevant OS support is available. We should attempt to the best of our ability to not design this in a way that's gratuitously incompatible with non-Linux OSs, although Swift System does not attempt to have an API that's identical on all platforms. +* The set of syscalls covered by `io_uring` has grown significantly and is still growing. We should leave room for supporting additional operations in the future. +* Once same-element requirements and pack counts as integer generic arguments are supported by the compiler, we should consider adding something along the lines of the following to allow preparing, submitting, and waiting for an entire set of operations at once: + +``` +func submitLinkedRequestsAndWait( + _ requests: repeat each Request +) where Request == IORequest + -> InlineArray<(repeat each Request).count, IOCompletion> +``` +* Once mutable borrows are supported, we should consider replacing the closure-taking bulk completion APIs (e.g. `blockingConsumeCompletions(…)`) with ones that return a sequence of completions instead +* We should consider making more types noncopyable as compiler support improves +* liburing has a "peek next completion" operation that doesn't consume it, and then a "mark consumed" operation. We may want to add something similar +* liburing has support for operations allocating their own buffers and returning them via the completion, we may want to support this +* We may want to provide API for asynchronously waiting, rather than just exposing the eventfd to let people roll their own async waits. Doing this really well has *considerable* implications for the concurrency runtime though. +* We should almost certainly expose API for more of the configuration options in `io_uring_setup` +* The API for feature probing is functional but not especially nice. Finding a better way to present that concept would be desirable. + +## Alternatives considered + +* We could use a NIO-style separate thread pool, but we believe `io_uring` is likely a better option for scalability. We may still want to provide a thread-pool backed version as an option, because many Linux systems currently disable `io_uring` due to security concerns. +* We could multiplex all IO onto a single actor as `AsyncBytes` currently does, but this has a number of downsides that make it entirely unsuitable to server usage. Most notably, it eliminates IO parallelism entirely. +* Using POSIX AIO instead of or as well as io_uring would greatly increase our ability to support older kernels and other Unix systems, but it has well-documented performance and usability issues that have prevented its adoption elsewhere, and apply just as much to Swift. +* Earlier versions of this proposal had higher level "managed" abstractions over IORing. These have been removed due to lack of interest from clients, but could be added back later if needed. +* I considered making any or all of `IORingError`, `IOCompletion`, and `IORequest` nested struct declarations inside `IORing`. The main reason I haven't done so is I was a little concerned about the ambiguity of having a type called `Error`. I'd be particularly interested in feedback on this choice. + +## Acknowledgments + +The NIO team, in particular Cory Benfield and Franz Busch, have provided invaluable feedback and direction on this project. diff --git a/Package.swift b/Package.swift index b71ade66..14b040e7 100644 --- a/Package.swift +++ b/Package.swift @@ -48,4 +48,4 @@ let package = Package( dependencies: ["SystemPackage"], cSettings: cSettings, swiftSettings: swiftSettings), - ]) + ]) \ No newline at end of file diff --git a/Sources/CSystem/include/CSystemLinux.h b/Sources/CSystem/include/CSystemLinux.h index b172d658..6489c4f3 100644 --- a/Sources/CSystem/include/CSystemLinux.h +++ b/Sources/CSystem/include/CSystemLinux.h @@ -21,5 +21,6 @@ #include #include #include +#include "io_uring.h" #endif diff --git a/Sources/CSystem/include/io_uring.h b/Sources/CSystem/include/io_uring.h new file mode 100644 index 00000000..5cab757c --- /dev/null +++ b/Sources/CSystem/include/io_uring.h @@ -0,0 +1,61 @@ +#include +#include +#include + +#include +#include + +#ifndef SWIFT_IORING_C_WRAPPER +#define SWIFT_IORING_C_WRAPPER + +#ifdef __alpha__ +/* + * alpha is the only exception, all other architectures + * have common numbers for new system calls. + */ +# ifndef __NR_io_uring_setup +# define __NR_io_uring_setup 535 +# endif +# ifndef __NR_io_uring_enter +# define __NR_io_uring_enter 536 +# endif +# ifndef __NR_io_uring_register +# define __NR_io_uring_register 537 +# endif +#else /* !__alpha__ */ +# ifndef __NR_io_uring_setup +# define __NR_io_uring_setup 425 +# endif +# ifndef __NR_io_uring_enter +# define __NR_io_uring_enter 426 +# endif +# ifndef __NR_io_uring_register +# define __NR_io_uring_register 427 +# endif +#endif + +int io_uring_register(int fd, unsigned int opcode, void *arg, + unsigned int nr_args) +{ + return syscall(__NR_io_uring_register, fd, opcode, arg, nr_args); +} + +int io_uring_setup(unsigned int entries, struct io_uring_params *p) +{ + return syscall(__NR_io_uring_setup, entries, p); +} + +int io_uring_enter2(int fd, unsigned int to_submit, unsigned int min_complete, + unsigned int flags, void *args, size_t sz) +{ + return syscall(__NR_io_uring_enter, fd, to_submit, min_complete, + flags, args, _NSIG / 8); +} + +int io_uring_enter(int fd, unsigned int to_submit, unsigned int min_complete, + unsigned int flags, sigset_t *sig) +{ + return io_uring_enter2(fd, to_submit, min_complete, flags, sig, _NSIG / 8); +} + +#endif diff --git a/Sources/CSystem/shims.c b/Sources/CSystem/shims.c deleted file mode 100644 index f492a2ae..00000000 --- a/Sources/CSystem/shims.c +++ /dev/null @@ -1,18 +0,0 @@ -/* - This source file is part of the Swift System open source project - - Copyright (c) 2020 Apple Inc. and the Swift System project authors - Licensed under Apache License v2.0 with Runtime Library Exception - - See https://swift.org/LICENSE.txt for license information -*/ - -#ifdef __linux__ - -#include - -#endif - -#if defined(_WIN32) -#include -#endif diff --git a/Sources/System/IOCompletion.swift b/Sources/System/IOCompletion.swift new file mode 100644 index 00000000..ee81797e --- /dev/null +++ b/Sources/System/IOCompletion.swift @@ -0,0 +1,56 @@ +@_implementationOnly import CSystem + +public struct IOCompletion: ~Copyable { + let rawValue: io_uring_cqe +} + +extension IOCompletion { + public struct Flags: OptionSet, Hashable, Codable { + public let rawValue: UInt32 + + public init(rawValue: UInt32) { + self.rawValue = rawValue + } + + public static let allocatedBuffer = Flags(rawValue: 1 << 0) + public static let moreCompletions = Flags(rawValue: 1 << 1) + public static let socketNotEmpty = Flags(rawValue: 1 << 2) + public static let isNotificationEvent = Flags(rawValue: 1 << 3) + } +} + +extension IOCompletion { + public var userData: UInt64 { //TODO: naming? + get { + rawValue.user_data + } + } + + public var userPointer: UnsafeRawPointer? { + get { + UnsafeRawPointer(bitPattern: UInt(rawValue.user_data)) + } + } + + public var result: Int32 { + get { + rawValue.res + } + } + + public var flags: IOCompletion.Flags { + get { + Flags(rawValue: rawValue.flags & 0x0000FFFF) + } + } + + public var bufferIndex: UInt16? { + get { + if self.flags.contains(.allocatedBuffer) { + return UInt16(rawValue.flags >> 16) + } else { + return nil + } + } + } +} diff --git a/Sources/System/IORequest.swift b/Sources/System/IORequest.swift new file mode 100644 index 00000000..1eed6edf --- /dev/null +++ b/Sources/System/IORequest.swift @@ -0,0 +1,383 @@ +@_implementationOnly import struct CSystem.io_uring_sqe + +@usableFromInline +internal enum IORequestCore { + case nop // nothing here + case openat( + atDirectory: FileDescriptor, + path: FilePath, + FileDescriptor.AccessMode, + options: FileDescriptor.OpenOptions = FileDescriptor.OpenOptions(), + permissions: FilePermissions? = nil, + userData: UInt64 = 0 + ) + case openatSlot( + atDirectory: FileDescriptor, + path: FilePath, + FileDescriptor.AccessMode, + options: FileDescriptor.OpenOptions = FileDescriptor.OpenOptions(), + permissions: FilePermissions? = nil, + intoSlot: IORingFileSlot, + userData: UInt64 = 0 + ) + case read( + file: FileDescriptor, + buffer: IORingBuffer, + offset: UInt64 = 0, + userData: UInt64 = 0 + ) + case readUnregistered( + file: FileDescriptor, + buffer: UnsafeMutableRawBufferPointer, + offset: UInt64 = 0, + userData: UInt64 = 0 + ) + case readSlot( + file: IORingFileSlot, + buffer: IORingBuffer, + offset: UInt64 = 0, + userData: UInt64 = 0 + ) + case readUnregisteredSlot( + file: IORingFileSlot, + buffer: UnsafeMutableRawBufferPointer, + offset: UInt64 = 0, + userData: UInt64 = 0 + ) + case write( + file: FileDescriptor, + buffer: IORingBuffer, + offset: UInt64 = 0, + userData: UInt64 = 0 + ) + case writeUnregistered( + file: FileDescriptor, + buffer: UnsafeMutableRawBufferPointer, + offset: UInt64 = 0, + userData: UInt64 = 0 + ) + case writeSlot( + file: IORingFileSlot, + buffer: IORingBuffer, + offset: UInt64 = 0, + userData: UInt64 = 0 + ) + case writeUnregisteredSlot( + file: IORingFileSlot, + buffer: UnsafeMutableRawBufferPointer, + offset: UInt64 = 0, + userData: UInt64 = 0 + ) + case close( + FileDescriptor, + userData: UInt64 = 0 + ) + case closeSlot( + IORingFileSlot, + userData: UInt64 = 0 + ) + case unlinkAt( + atDirectory: FileDescriptor, + path: FilePath, + userData: UInt64 = 0 + ) +} + +@inline(__always) +internal func makeRawRequest_readWrite_registered( + file: FileDescriptor, + buffer: IORingBuffer, + offset: UInt64, + userData: UInt64 = 0, + request: consuming RawIORequest +) -> RawIORequest { + request.fileDescriptor = file + request.buffer = buffer.unsafeBuffer + request.rawValue.buf_index = UInt16(exactly: buffer.index)! + request.offset = offset + request.rawValue.user_data = userData + return request +} + +@inline(__always) +internal func makeRawRequest_readWrite_registered_slot( + file: IORingFileSlot, + buffer: IORingBuffer, + offset: UInt64, + userData: UInt64 = 0, + request: consuming RawIORequest +) -> RawIORequest { + request.rawValue.fd = Int32(exactly: file.index)! + request.flags = .fixedFile + request.buffer = buffer.unsafeBuffer + request.rawValue.buf_index = UInt16(exactly: buffer.index)! + request.offset = offset + request.rawValue.user_data = userData + return request +} + +@inline(__always) +internal func makeRawRequest_readWrite_unregistered( + file: FileDescriptor, + buffer: UnsafeMutableRawBufferPointer, + offset: UInt64, + userData: UInt64 = 0, + request: consuming RawIORequest +) -> RawIORequest { + request.fileDescriptor = file + request.buffer = buffer + request.offset = offset + request.rawValue.user_data = userData + return request +} + +@inline(__always) +internal func makeRawRequest_readWrite_unregistered_slot( + file: IORingFileSlot, + buffer: UnsafeMutableRawBufferPointer, + offset: UInt64, + userData: UInt64 = 0, + request: consuming RawIORequest +) -> RawIORequest { + request.rawValue.fd = Int32(exactly: file.index)! + request.flags = .fixedFile + request.buffer = buffer + request.offset = offset + request.rawValue.user_data = userData + return request +} + +public struct IORequest { + @usableFromInline var core: IORequestCore + + @inlinable internal consuming func extractCore() -> IORequestCore { + return core + } +} + +extension IORequest { + public static func nop(userData: UInt64 = 0) -> IORequest { + IORequest(core: .nop) + } + + public static func reading( + _ file: IORingFileSlot, + into buffer: IORingBuffer, + at offset: UInt64 = 0, + userData: UInt64 = 0 + ) -> IORequest { + IORequest(core: .readSlot(file: file, buffer: buffer, offset: offset, userData: userData)) + } + + public static func reading( + _ file: FileDescriptor, + into buffer: IORingBuffer, + at offset: UInt64 = 0, + userData: UInt64 = 0 + ) -> IORequest { + IORequest(core: .read(file: file, buffer: buffer, offset: offset, userData: userData)) + } + + public static func reading( + _ file: IORingFileSlot, + into buffer: UnsafeMutableRawBufferPointer, + at offset: UInt64 = 0, + userData: UInt64 = 0 + ) -> IORequest { + IORequest( + core: .readUnregisteredSlot( + file: file, buffer: buffer, offset: offset, userData: userData)) + } + + public static func reading( + _ file: FileDescriptor, + into buffer: UnsafeMutableRawBufferPointer, + at offset: UInt64 = 0, + userData: UInt64 = 0 + ) -> IORequest { + IORequest( + core: .readUnregistered(file: file, buffer: buffer, offset: offset, userData: userData)) + } + + public static func writing( + _ buffer: IORingBuffer, + into file: IORingFileSlot, + at offset: UInt64 = 0, + userData: UInt64 = 0 + ) -> IORequest { + IORequest(core: .writeSlot(file: file, buffer: buffer, offset: offset, userData: userData)) + } + + public static func writing( + _ buffer: IORingBuffer, + into file: FileDescriptor, + at offset: UInt64 = 0, + userData: UInt64 = 0 + ) -> IORequest { + IORequest(core: .write(file: file, buffer: buffer, offset: offset, userData: userData)) + } + + public static func writing( + _ buffer: UnsafeMutableRawBufferPointer, + into file: IORingFileSlot, + at offset: UInt64 = 0, + userData: UInt64 = 0 + ) -> IORequest { + IORequest( + core: .writeUnregisteredSlot( + file: file, buffer: buffer, offset: offset, userData: userData)) + } + + public static func writing( + _ buffer: UnsafeMutableRawBufferPointer, + into file: FileDescriptor, + at offset: UInt64 = 0, + userData: UInt64 = 0 + ) -> IORequest { + IORequest( + core: .writeUnregistered(file: file, buffer: buffer, offset: offset, userData: userData) + ) + } + + public static func closing( + _ file: FileDescriptor, + userData: UInt64 = 0 + ) -> IORequest { + IORequest(core: .close(file, userData: userData)) + } + + public static func closing( + _ file: IORingFileSlot, + userData: UInt64 = 0 + ) -> IORequest { + IORequest(core: .closeSlot(file, userData: userData)) + } + + public static func opening( + _ path: FilePath, + in directory: FileDescriptor, + into slot: IORingFileSlot, + mode: FileDescriptor.AccessMode, + options: FileDescriptor.OpenOptions = FileDescriptor.OpenOptions(), + permissions: FilePermissions? = nil, + userData: UInt64 = 0 + ) -> IORequest { + IORequest( + core: .openatSlot( + atDirectory: directory, path: path, mode, options: options, + permissions: permissions, intoSlot: slot, userData: userData)) + } + + public static func opening( + _ path: FilePath, + in directory: FileDescriptor, + mode: FileDescriptor.AccessMode, + options: FileDescriptor.OpenOptions = FileDescriptor.OpenOptions(), + permissions: FilePermissions? = nil, + userData: UInt64 = 0 + ) -> IORequest { + IORequest( + core: .openat( + atDirectory: directory, path: path, mode, options: options, + permissions: permissions, userData: userData + )) + } + + public static func unlinking( + _ path: FilePath, + in directory: FileDescriptor, + userData: UInt64 = 0 + ) -> IORequest { + IORequest(core: .unlinkAt(atDirectory: directory, path: path, userData: userData)) + } + + @inline(__always) + public consuming func makeRawRequest() -> RawIORequest { + var request = RawIORequest() + switch extractCore() { + case .nop: + request.operation = .nop + case .openatSlot( + let atDirectory, let path, let mode, let options, let permissions, let fileSlot, + let userData): + // TODO: use rawValue less + request.operation = .openAt + request.fileDescriptor = atDirectory + request.rawValue.addr = UInt64( + UInt( + bitPattern: path.withPlatformString { ptr in + ptr //this is unsavory, but we keep it alive by storing path alongside it in the request + })) + request.rawValue.open_flags = UInt32(bitPattern: options.rawValue | mode.rawValue) + request.rawValue.len = permissions?.rawValue ?? 0 + request.rawValue.file_index = UInt32(fileSlot.index + 1) + request.path = path + request.rawValue.user_data = userData + case .openat( + let atDirectory, let path, let mode, let options, let permissions, let userData): + request.operation = .openAt + request.fileDescriptor = atDirectory + request.rawValue.addr = UInt64( + UInt( + bitPattern: path.withPlatformString { ptr in + ptr //this is unsavory, but we keep it alive by storing path alongside it in the request + })) + request.rawValue.open_flags = UInt32(bitPattern: options.rawValue | mode.rawValue) + request.rawValue.len = permissions?.rawValue ?? 0 + request.path = path + request.rawValue.user_data = userData + case .write(let file, let buffer, let offset, let userData): + request.operation = .writeFixed + return makeRawRequest_readWrite_registered( + file: file, buffer: buffer, offset: offset, userData: userData, request: request) + case .writeSlot(let file, let buffer, let offset, let userData): + request.operation = .writeFixed + return makeRawRequest_readWrite_registered_slot( + file: file, buffer: buffer, offset: offset, userData: userData, request: request) + case .writeUnregistered(let file, let buffer, let offset, let userData): + request.operation = .write + return makeRawRequest_readWrite_unregistered( + file: file, buffer: buffer, offset: offset, userData: userData, request: request) + case .writeUnregisteredSlot(let file, let buffer, let offset, let userData): + request.operation = .write + return makeRawRequest_readWrite_unregistered_slot( + file: file, buffer: buffer, offset: offset, userData: userData, request: request) + case .read(let file, let buffer, let offset, let userData): + request.operation = .readFixed + return makeRawRequest_readWrite_registered( + file: file, buffer: buffer, offset: offset, userData: userData, request: request) + case .readSlot(let file, let buffer, let offset, let userData): + request.operation = .readFixed + return makeRawRequest_readWrite_registered_slot( + file: file, buffer: buffer, offset: offset, userData: userData, request: request) + case .readUnregistered(let file, let buffer, let offset, let userData): + request.operation = .read + return makeRawRequest_readWrite_unregistered( + file: file, buffer: buffer, offset: offset, userData: userData, request: request) + case .readUnregisteredSlot(let file, let buffer, let offset, let userData): + request.operation = .read + return makeRawRequest_readWrite_unregistered_slot( + file: file, buffer: buffer, offset: offset, userData: userData, request: request) + case .close(let file, let userData): + request.operation = .close + request.fileDescriptor = file + request.rawValue.user_data = userData + case .closeSlot(let file, let userData): + request.operation = .close + request.rawValue.file_index = UInt32(file.index + 1) + request.rawValue.user_data = userData + case .unlinkAt(let atDirectory, let path, let userData): + request.operation = .unlinkAt + request.fileDescriptor = atDirectory + request.rawValue.addr = UInt64( + UInt( + bitPattern: path.withPlatformString { ptr in + ptr //this is unsavory, but we keep it alive by storing path alongside it in the request + }) + ) + request.path = path + request.rawValue.user_data = userData + } + return request + } +} diff --git a/Sources/System/IORing.swift b/Sources/System/IORing.swift new file mode 100644 index 00000000..79840c1c --- /dev/null +++ b/Sources/System/IORing.swift @@ -0,0 +1,657 @@ +@_implementationOnly import CSystem +import Glibc // needed for mmap +import Synchronization + +@_implementationOnly import struct CSystem.io_uring_sqe + +// XXX: this *really* shouldn't be here. oh well. +extension UnsafeMutableRawPointer { + func advanced(by offset: UInt32) -> UnsafeMutableRawPointer { + return advanced(by: Int(offset)) + } +} + +extension UnsafeMutableRawBufferPointer { + func to_iovec() -> iovec { + iovec(iov_base: baseAddress, iov_len: count) + } +} + +// all pointers in this struct reference kernel-visible memory +@usableFromInline struct SQRing: ~Copyable { + let kernelHead: UnsafePointer> + let kernelTail: UnsafePointer> + var userTail: UInt32 + + // from liburing: the kernel should never change these + // might change in the future with resizable rings? + let ringMask: UInt32 + // let ringEntries: UInt32 - absorbed into array.count + + // ring flags bitfield + // currently used by the kernel only in SQPOLL mode to indicate + // when the polling thread needs to be woken up + let flags: UnsafePointer> + + // ring array + // maps indexes between the actual ring and the submissionQueueEntries list, + // allowing the latter to be used as a kind of freelist with enough work? + // currently, just 1:1 mapping (0.. +} + +struct CQRing: ~Copyable { + let kernelHead: UnsafePointer> + let kernelTail: UnsafePointer> + + // TODO: determine if this is actually used + var userHead: UInt32 + + let ringMask: UInt32 + + let cqes: UnsafeBufferPointer +} + +public struct IOResource { + public typealias Resource = T + @usableFromInline let resource: T + @usableFromInline let index: Int + + internal init( + resource: T, + index: Int + ) { + self.resource = resource + self.index = index + } +} + +public typealias IORingFileSlot = IOResource +public typealias IORingBuffer = IOResource + +extension IORingFileSlot { + public var unsafeFileSlot: Int { + return index + } +} +extension IORingBuffer { + public var unsafeBuffer: UnsafeMutableRawBufferPointer { + return .init(start: resource.iov_base, count: resource.iov_len) + } +} + +@inline(__always) +internal func _writeRequest( + _ request: __owned RawIORequest, ring: inout SQRing, + submissionQueueEntries: UnsafeMutableBufferPointer +) + -> Bool +{ + let entry = _blockingGetSubmissionEntry( + ring: &ring, submissionQueueEntries: submissionQueueEntries) + entry.pointee = request.rawValue + return true +} + +@inline(__always) +internal func _blockingGetSubmissionEntry( + ring: inout SQRing, submissionQueueEntries: UnsafeMutableBufferPointer +) -> UnsafeMutablePointer< + io_uring_sqe +> { + while true { + if let entry = _getSubmissionEntry( + ring: &ring, + submissionQueueEntries: submissionQueueEntries + ) { + return entry + } + // TODO: actually block here instead of spinning + } + +} + +//TODO: omitting signal mask for now +//Tell the kernel that we've submitted requests and/or are waiting for completions +internal func _enter( + ringDescriptor: Int32, + numEvents: UInt32, + minCompletions: UInt32, + flags: UInt32 +) throws -> Int32 { + // Ring always needs enter right now; + // TODO: support SQPOLL here + while true { + let ret = io_uring_enter(ringDescriptor, numEvents, minCompletions, flags, nil) + // error handling: + // EAGAIN / EINTR (try again), + // EBADF / EBADFD / EOPNOTSUPP / ENXIO + // (failure in ring lifetime management, fatal), + // EINVAL (bad constant flag?, fatal), + // EFAULT (bad address for argument from library, fatal) + if ret == -EAGAIN || ret == -EINTR { + //TODO: should we wait a bit on AGAIN? + continue + } else if ret < 0 { + fatalError( + "fatal error in submitting requests: " + Errno(rawValue: -ret).debugDescription + ) + } else { + return ret + } + } +} + +internal func _submitRequests(ring: borrowing SQRing, ringDescriptor: Int32) throws { + let flushedEvents = _flushQueue(ring: ring) + _ = try _enter( + ringDescriptor: ringDescriptor, numEvents: flushedEvents, minCompletions: 0, flags: 0) +} + +internal func _getUnconsumedSubmissionCount(ring: borrowing SQRing) -> UInt32 { + return ring.userTail - ring.kernelHead.pointee.load(ordering: .acquiring) +} + +internal func _getUnconsumedCompletionCount(ring: borrowing CQRing) -> UInt32 { + return ring.kernelTail.pointee.load(ordering: .acquiring) + - ring.kernelHead.pointee.load(ordering: .acquiring) +} + +//TODO: pretty sure this is supposed to do more than it does +internal func _flushQueue(ring: borrowing SQRing) -> UInt32 { + ring.kernelTail.pointee.store( + ring.userTail, ordering: .releasing + ) + return _getUnconsumedSubmissionCount(ring: ring) +} + +@inline(__always) +internal func _getSubmissionEntry( + ring: inout SQRing, submissionQueueEntries: UnsafeMutableBufferPointer +) -> UnsafeMutablePointer< + io_uring_sqe +>? { + let next = ring.userTail &+ 1 //this is expected to wrap + + // FEAT: smp load when SQPOLL in use (not in MVP) + let kernelHead = ring.kernelHead.pointee.load(ordering: .acquiring) + + // FEAT: 128-bit event support (not in MVP) + if next - kernelHead <= ring.array.count { + // let sqe = &sq->sqes[(sq->sqe_tail & sq->ring_mask) << shift]; + let sqeIndex = Int( + ring.userTail & ring.ringMask + ) + + let sqe = submissionQueueEntries + .baseAddress.unsafelyUnwrapped + .advanced(by: sqeIndex) + + ring.userTail = next + return sqe + } + return nil +} + +public struct IORing: ~Copyable { + let ringFlags: UInt32 + let ringDescriptor: Int32 + + @usableFromInline var submissionRing: SQRing + // FEAT: set this eventually + let submissionPolling: Bool = false + + let completionRing: CQRing + + let submissionQueueEntries: UnsafeMutableBufferPointer + + // kept around for unmap / cleanup + let ringSize: Int + let ringPtr: UnsafeMutableRawPointer + + var _registeredFiles: [UInt32]? + var _registeredBuffers: [iovec]? + + public init(queueDepth: UInt32) throws { + var params = io_uring_params() + + ringDescriptor = withUnsafeMutablePointer(to: ¶ms) { + return io_uring_setup(queueDepth, $0) + } + + if params.features & IORING_FEAT_SINGLE_MMAP == 0 + || params.features & IORING_FEAT_NODROP == 0 + { + close(ringDescriptor) + // TODO: error handling + throw IORingError.missingRequiredFeatures + } + + if ringDescriptor < 0 { + // TODO: error handling + } + + let submitRingSize = + params.sq_off.array + + params.sq_entries * UInt32(MemoryLayout.size) + + let completionRingSize = + params.cq_off.cqes + + params.cq_entries * UInt32(MemoryLayout.size) + + ringSize = Int(max(submitRingSize, completionRingSize)) + + ringPtr = mmap( + /* addr: */ nil, + /* len: */ ringSize, + /* prot: */ PROT_READ | PROT_WRITE, + /* flags: */ MAP_SHARED | MAP_POPULATE, + /* fd: */ ringDescriptor, + /* offset: */ __off_t(IORING_OFF_SQ_RING) + ) + + if ringPtr == MAP_FAILED { + perror("mmap") + // TODO: error handling + fatalError("mmap failed in ring setup") + } + + let submissionRing = SQRing( + kernelHead: UnsafePointer>( + ringPtr.advanced(by: params.sq_off.head) + .assumingMemoryBound(to: Atomic.self) + ), + kernelTail: UnsafePointer>( + ringPtr.advanced(by: params.sq_off.tail) + .assumingMemoryBound(to: Atomic.self) + ), + userTail: 0, // no requests yet + ringMask: ringPtr.advanced(by: params.sq_off.ring_mask) + .assumingMemoryBound(to: UInt32.self).pointee, + flags: UnsafePointer>( + ringPtr.advanced(by: params.sq_off.flags) + .assumingMemoryBound(to: Atomic.self) + ), + array: UnsafeMutableBufferPointer( + start: ringPtr.advanced(by: params.sq_off.array) + .assumingMemoryBound(to: UInt32.self), + count: Int( + ringPtr.advanced(by: params.sq_off.ring_entries) + .assumingMemoryBound(to: UInt32.self).pointee) + ) + ) + + // fill submission ring array with 1:1 map to underlying SQEs + for i in 0...size, + /* prot: */ PROT_READ | PROT_WRITE, + /* flags: */ MAP_SHARED | MAP_POPULATE, + /* fd: */ ringDescriptor, + /* offset: */ __off_t(IORING_OFF_SQES) + ) + + if sqes == MAP_FAILED { + perror("mmap") + // TODO: error handling + fatalError("sqe mmap failed in ring setup") + } + + submissionQueueEntries = UnsafeMutableBufferPointer( + start: sqes!.assumingMemoryBound(to: io_uring_sqe.self), + count: Int(params.sq_entries) + ) + + let completionRing = CQRing( + kernelHead: UnsafePointer>( + ringPtr.advanced(by: params.cq_off.head) + .assumingMemoryBound(to: Atomic.self) + ), + kernelTail: UnsafePointer>( + ringPtr.advanced(by: params.cq_off.tail) + .assumingMemoryBound(to: Atomic.self) + ), + userHead: 0, // no completions yet + ringMask: ringPtr.advanced(by: params.cq_off.ring_mask) + .assumingMemoryBound(to: UInt32.self).pointee, + cqes: UnsafeBufferPointer( + start: ringPtr.advanced(by: params.cq_off.cqes) + .assumingMemoryBound(to: io_uring_cqe.self), + count: Int( + ringPtr.advanced(by: params.cq_off.ring_entries) + .assumingMemoryBound(to: UInt32.self).pointee) + ) + ) + + self.submissionRing = submissionRing + self.completionRing = completionRing + + self.ringFlags = params.flags + } + + private func _blockingConsumeCompletionGuts( + minimumCount: UInt32, + maximumCount: UInt32, + extraArgs: UnsafeMutablePointer? = nil, + consumer: (consuming IOCompletion?, IORingError?, Bool) throws -> Void + ) rethrows { + var count = 0 + while let completion = _tryConsumeCompletion(ring: completionRing) { + count += 1 + if completion.result < 0 { + try consumer(nil, IORingError(completionResult: completion.result), false) + } else { + try consumer(completion, nil, false) + } + if count == maximumCount { + try consumer(nil, nil, true) + return + } + } + + if count < minimumCount { + while count < minimumCount { + var sz = 0 + if extraArgs != nil { + sz = MemoryLayout.size + } + let res = io_uring_enter2( + ringDescriptor, + 0, + minimumCount, + IORING_ENTER_GETEVENTS, + extraArgs, + sz + ) + // error handling: + // EAGAIN / EINTR (try again), + // EBADF / EBADFD / EOPNOTSUPP / ENXIO + // (failure in ring lifetime management, fatal), + // EINVAL (bad constant flag?, fatal), + // EFAULT (bad address for argument from library, fatal) + // EBUSY (not enough space for events; implies events filled + // by kernel between kernelTail load and now) + if res >= 0 || res == -EBUSY { + break + } else if res == -EAGAIN || res == -EINTR { + continue + } + fatalError( + "fatal error in receiving requests: " + + Errno(rawValue: -res).debugDescription + ) + } + var count = 0 + while let completion = _tryConsumeCompletion(ring: completionRing) { + count += 1 + if completion.result < 0 { + try consumer(nil, IORingError(completionResult: completion.result), false) + } else { + try consumer(completion, nil, false) + } + if count == maximumCount { + break + } + } + try consumer(nil, nil, true) + } + } + + internal func _blockingConsumeOneCompletion( + extraArgs: UnsafeMutablePointer? = nil + ) throws -> IOCompletion { + var result: IOCompletion? = nil + try _blockingConsumeCompletionGuts(minimumCount: 1, maximumCount: 1, extraArgs: extraArgs) { + (completion: consuming IOCompletion?, error, done) in + if let error { + throw error + } + if let completion { + result = consume completion + } + } + return result.take()! + } + + public func blockingConsumeCompletion( + timeout: Duration? = nil + ) throws -> IOCompletion { + if let timeout { + var ts = __kernel_timespec( + tv_sec: timeout.components.seconds, + tv_nsec: timeout.components.attoseconds / 1_000_000_000 + ) + return try withUnsafePointer(to: &ts) { tsPtr in + var args = io_uring_getevents_arg( + sigmask: 0, + sigmask_sz: 0, + pad: 0, + ts: UInt64(UInt(bitPattern: tsPtr)) + ) + return try _blockingConsumeOneCompletion(extraArgs: &args) + } + } else { + return try _blockingConsumeOneCompletion() + } + } + + public func blockingConsumeCompletions( + minimumCount: UInt32 = 1, + timeout: Duration? = nil, + consumer: (consuming IOCompletion?, IORingError?, Bool) throws -> Void + ) throws { + if let timeout { + var ts = __kernel_timespec( + tv_sec: timeout.components.seconds, + tv_nsec: timeout.components.attoseconds / 1_000_000_000 + ) + return try withUnsafePointer(to: &ts) { tsPtr in + var args = io_uring_getevents_arg( + sigmask: 0, + sigmask_sz: 0, + pad: 0, + ts: UInt64(UInt(bitPattern: tsPtr)) + ) + try _blockingConsumeCompletionGuts( + minimumCount: minimumCount, maximumCount: UInt32.max, extraArgs: &args, + consumer: consumer) + } + } else { + try _blockingConsumeCompletionGuts( + minimumCount: minimumCount, maximumCount: UInt32.max, consumer: consumer) + } + } + + // public func peekNextCompletion() -> IOCompletion { + + // } + + public func tryConsumeCompletion() -> IOCompletion? { + return _tryConsumeCompletion(ring: completionRing) + } + + func _tryConsumeCompletion(ring: borrowing CQRing) -> IOCompletion? { + let tail = ring.kernelTail.pointee.load(ordering: .acquiring) + let head = ring.kernelHead.pointee.load(ordering: .acquiring) + + if tail != head { + // 32 byte copy - oh well + let res = ring.cqes[Int(head & ring.ringMask)] + ring.kernelHead.pointee.store(head &+ 1, ordering: .releasing) + return IOCompletion(rawValue: res) + } + + return nil + } + + internal func handleRegistrationResult(_ result: Int32) throws { + //TODO: error handling + } + + public mutating func registerEventFD(_ descriptor: FileDescriptor) throws { + var rawfd = descriptor.rawValue + let result = withUnsafePointer(to: &rawfd) { fdptr in + return io_uring_register( + ringDescriptor, + IORING_REGISTER_EVENTFD, + UnsafeMutableRawPointer(mutating: fdptr), + 1 + ) + } + try handleRegistrationResult(result) + } + + public mutating func unregisterEventFD() throws { + let result = io_uring_register( + ringDescriptor, + IORING_UNREGISTER_EVENTFD, + nil, + 0 + ) + try handleRegistrationResult(result) + } + + public mutating func registerFileSlots(count: Int) -> RegisteredResources< + IORingFileSlot.Resource + > { + precondition(_registeredFiles == nil) + precondition(count < UInt32.max) + let files = [UInt32](repeating: UInt32.max, count: count) + + let regResult = files.withUnsafeBufferPointer { bPtr in + io_uring_register( + self.ringDescriptor, + IORING_REGISTER_FILES, + UnsafeMutableRawPointer(mutating: bPtr.baseAddress!), + UInt32(truncatingIfNeeded: count) + ) + } + + // TODO: error handling + _registeredFiles = files + return registeredFileSlots + } + + public func unregisterFiles() { + fatalError("failed to unregister files") + } + + public var registeredFileSlots: RegisteredResources { + RegisteredResources(resources: _registeredFiles ?? []) + } + + public mutating func registerBuffers(_ buffers: some Collection) + -> RegisteredResources + { + precondition(buffers.count < UInt32.max) + precondition(_registeredBuffers == nil) + //TODO: check if io_uring has preconditions it needs for the buffers (e.g. alignment) + let iovecs = buffers.map { $0.to_iovec() } + let regResult = iovecs.withUnsafeBufferPointer { bPtr in + io_uring_register( + self.ringDescriptor, + IORING_REGISTER_BUFFERS, + UnsafeMutableRawPointer(mutating: bPtr.baseAddress!), + UInt32(truncatingIfNeeded: buffers.count) + ) + } + + // TODO: error handling + _registeredBuffers = iovecs + return registeredBuffers + } + + public mutating func registerBuffers(_ buffers: UnsafeMutableRawBufferPointer...) + -> RegisteredResources + { + registerBuffers(buffers) + } + + public struct RegisteredResources: RandomAccessCollection { + let resources: [T] + + public var startIndex: Int { 0 } + public var endIndex: Int { resources.endIndex } + init(resources: [T]) { + self.resources = resources + } + public subscript(position: Int) -> IOResource { + IOResource(resource: resources[position], index: position) + } + public subscript(position: UInt16) -> IOResource { + IOResource(resource: resources[Int(position)], index: Int(position)) + } + } + + public var registeredBuffers: RegisteredResources { + RegisteredResources(resources: _registeredBuffers ?? []) + } + + public func unregisterBuffers() { + fatalError("failed to unregister buffers: TODO") + } + + public func submitPreparedRequests() throws { + try _submitRequests(ring: submissionRing, ringDescriptor: ringDescriptor) + } + + public func submitPreparedRequestsAndConsumeCompletions( + minimumCount: UInt32 = 1, + timeout: Duration? = nil, + consumer: (consuming IOCompletion?, IORingError?, Bool) throws -> Void + ) throws { + //TODO: optimize this to one uring_enter + try submitPreparedRequests() + try blockingConsumeCompletions( + minimumCount: minimumCount, + timeout: timeout, + consumer: consumer + ) + } + + public mutating func prepare(request: __owned IORequest) -> Bool { + var raw: RawIORequest? = request.makeRawRequest() + return _writeRequest( + raw.take()!, ring: &submissionRing, submissionQueueEntries: submissionQueueEntries) + } + + mutating func prepare(linkedRequests: some BidirectionalCollection) { + guard linkedRequests.count > 0 else { + return + } + let last = linkedRequests.last! + for req in linkedRequests.dropLast() { + var raw = req.makeRawRequest() + raw.linkToNextRequest() + _writeRequest( + raw, ring: &submissionRing, submissionQueueEntries: submissionQueueEntries) + } + _writeRequest( + last.makeRawRequest(), ring: &submissionRing, + submissionQueueEntries: submissionQueueEntries) + } + + //@inlinable //TODO: make sure the array allocation gets optimized out... + public mutating func prepare(linkedRequests: IORequest...) { + prepare(linkedRequests: linkedRequests) + } + + public mutating func submit(linkedRequests: IORequest...) throws { + prepare(linkedRequests: linkedRequests) + try submitPreparedRequests() + } + + deinit { + munmap(ringPtr, ringSize) + munmap( + UnsafeMutableRawPointer(submissionQueueEntries.baseAddress!), + submissionQueueEntries.count * MemoryLayout.size + ) + close(ringDescriptor) + } +} diff --git a/Sources/System/IORingError.swift b/Sources/System/IORingError.swift new file mode 100644 index 00000000..fbd70bce --- /dev/null +++ b/Sources/System/IORingError.swift @@ -0,0 +1,10 @@ +//TODO: make this not an enum +public enum IORingError: Error, Equatable { + case missingRequiredFeatures + case operationCanceled + case unknown(errorCode: Int) + + internal init(completionResult: Int32) { + self = .unknown(errorCode: Int(completionResult)) //TODO, flesh this out + } +} diff --git a/Sources/System/Internals/WindowsSyscallAdapters.swift b/Sources/System/Internals/WindowsSyscallAdapters.swift index d56d33e4..706881ec 100644 --- a/Sources/System/Internals/WindowsSyscallAdapters.swift +++ b/Sources/System/Internals/WindowsSyscallAdapters.swift @@ -187,7 +187,7 @@ internal func pwrite( internal func pipe( _ fds: UnsafeMutablePointer, bytesReserved: UInt32 = 4096 ) -> CInt { -  return _pipe(fds, bytesReserved, _O_BINARY | _O_NOINHERIT); + return _pipe(fds, bytesReserved, _O_BINARY | _O_NOINHERIT); } @inline(__always) diff --git a/Sources/System/RawIORequest.swift b/Sources/System/RawIORequest.swift new file mode 100644 index 00000000..50c97c61 --- /dev/null +++ b/Sources/System/RawIORequest.swift @@ -0,0 +1,188 @@ +// TODO: investigate @usableFromInline / @_implementationOnly dichotomy +@_implementationOnly import CSystem +@_implementationOnly import struct CSystem.io_uring_sqe + +//TODO: make this internal +public struct RawIORequest: ~Copyable { + var rawValue: io_uring_sqe + var path: FilePath? //buffer owner for the path pointer that the sqe may have + + public init() { + self.rawValue = io_uring_sqe() + } +} + +extension RawIORequest { + enum Operation: UInt8 { + case nop = 0 + case readv = 1 + case writev = 2 + case fsync = 3 + case readFixed = 4 + case writeFixed = 5 + case pollAdd = 6 + case pollRemove = 7 + case syncFileRange = 8 + case sendMessage = 9 + case receiveMessage = 10 + // ... + case link_timeout = 15 + // ... + case openAt = 18 + case close = 19 + case filesUpdate = 20 + case statx = 21 + case read = 22 + case write = 23 + // ... + case openAt2 = 28 + // ... + case unlinkAt = 36 + } + + public struct Flags: OptionSet, Hashable, Codable { + public let rawValue: UInt8 + + public init(rawValue: UInt8) { + self.rawValue = rawValue + } + + public static let fixedFile = Flags(rawValue: 1 << 0) + public static let drainQueue = Flags(rawValue: 1 << 1) + public static let linkRequest = Flags(rawValue: 1 << 2) + public static let hardlinkRequest = Flags(rawValue: 1 << 3) + public static let asynchronous = Flags(rawValue: 1 << 4) + public static let selectBuffer = Flags(rawValue: 1 << 5) + public static let skipSuccess = Flags(rawValue: 1 << 6) + } + + var operation: Operation { + get { Operation(rawValue: rawValue.opcode)! } + set { rawValue.opcode = newValue.rawValue } + } + + public var flags: Flags { + get { Flags(rawValue: rawValue.flags) } + set { rawValue.flags = newValue.rawValue } + } + + public mutating func linkToNextRequest() { + flags = Flags(rawValue: flags.rawValue | Flags.linkRequest.rawValue) + } + + public var fileDescriptor: FileDescriptor { + get { FileDescriptor(rawValue: rawValue.fd) } + set { rawValue.fd = newValue.rawValue } + } + + public var offset: UInt64? { + get { + if (rawValue.off == UInt64.max) { + return nil + } else { + return rawValue.off + } + } + set { + if let val = newValue { + rawValue.off = val + } else { + rawValue.off = UInt64.max + } + } + } + + public var buffer: UnsafeMutableRawBufferPointer { + get { + let ptr = UnsafeMutableRawPointer(bitPattern: UInt(exactly: rawValue.addr)!) + return UnsafeMutableRawBufferPointer(start: ptr, count: Int(rawValue.len)) + } + + set { + // TODO: cleanup? + rawValue.addr = UInt64(Int(bitPattern: newValue.baseAddress!)) + rawValue.len = UInt32(exactly: newValue.count)! + } + } + + public enum RequestFlags { + case readWriteFlags(ReadWriteFlags) + // case fsyncFlags(FsyncFlags?) + // poll_events + // poll32_events + // sync_range_flags + // msg_flags + case timeoutFlags(TimeOutFlags) + // accept_flags + // cancel_flags + case openFlags(FileDescriptor.OpenOptions) + // statx_flags + // fadvise_advice + // splice_flags + } + + public struct ReadWriteFlags: OptionSet, Hashable, Codable { + public var rawValue: UInt32 + public init(rawValue: UInt32) { + self.rawValue = rawValue + } + + public static let highPriority = ReadWriteFlags(rawValue: 1 << 0) + + // sync with only data integrity + public static let dataSync = ReadWriteFlags(rawValue: 1 << 1) + + // sync with full data + file integrity + public static let fileSync = ReadWriteFlags(rawValue: 1 << 2) + + // return -EAGAIN if operation blocks + public static let noWait = ReadWriteFlags(rawValue: 1 << 3) + + // append to end of the file + public static let append = ReadWriteFlags(rawValue: 1 << 4) + } + + public struct TimeOutFlags: OptionSet, Hashable, Codable { + public var rawValue: UInt32 + + public init(rawValue: UInt32) { + self.rawValue = rawValue + } + + public static let relativeTime: RawIORequest.TimeOutFlags = TimeOutFlags(rawValue: 0) + public static let absoluteTime: RawIORequest.TimeOutFlags = TimeOutFlags(rawValue: 1 << 0) + } +} + +extension RawIORequest { + static func nop() -> RawIORequest { + var req: RawIORequest = RawIORequest() + req.operation = .nop + return req + } + + //TODO: typed errors + static func withTimeoutRequest( + linkedTo opEntry: UnsafeMutablePointer, + in timeoutEntry: UnsafeMutablePointer, + duration: Duration, + flags: TimeOutFlags, + work: () throws -> R) rethrows -> R { + + opEntry.pointee.flags |= Flags.linkRequest.rawValue + opEntry.pointee.off = 1 + var ts = __kernel_timespec( + tv_sec: duration.components.seconds, + tv_nsec: duration.components.attoseconds / 1_000_000_000 + ) + return try withUnsafePointer(to: &ts) { tsPtr in + var req: RawIORequest = RawIORequest() + req.operation = .link_timeout + req.rawValue.timeout_flags = flags.rawValue + req.rawValue.len = 1 + req.rawValue.addr = UInt64(UInt(bitPattern: tsPtr)) + timeoutEntry.pointee = req.rawValue + return try work() + } + } +} \ No newline at end of file diff --git a/Tests/SystemTests/IORequestTests.swift b/Tests/SystemTests/IORequestTests.swift new file mode 100644 index 00000000..4aaf7543 --- /dev/null +++ b/Tests/SystemTests/IORequestTests.swift @@ -0,0 +1,63 @@ +import XCTest + +#if SYSTEM_PACKAGE +@testable import SystemPackage +#else +import System +#endif + +func requestBytes(_ request: consuming RawIORequest) -> [UInt8] { + return withUnsafePointer(to: request) { + let requestBuf = UnsafeBufferPointer(start: $0, count: 1) + let rawBytes = UnsafeRawBufferPointer(requestBuf) + return .init(rawBytes) + } +} + +// This test suite compares various IORequests bit-for-bit to IORequests +// that were generated with liburing or manually written out, +// which are known to work correctly. +final class IORequestTests: XCTestCase { + func testNop() { + let req = IORequest.nop().makeRawRequest() + let sourceBytes = requestBytes(req) + // convenient property of nop: it's all zeros! + // for some unknown reason, liburing sets the fd field to -1. + // we're not trying to be bug-compatible with it, so 0 *should* work. + XCTAssertEqual(sourceBytes, .init(repeating: 0, count: 64)) + } + + func testOpenatFixedFile() throws { + let pathPtr = UnsafePointer(bitPattern: 0x414141410badf00d)! + let fileSlot: IORingFileSlot = IORingFileSlot(resource: UInt32.max, index: 0) + let req = IORequest.opening(FilePath(platformString: pathPtr), + in: FileDescriptor(rawValue: -100), + into: fileSlot, + mode: .readOnly, + options: [], + permissions: nil + ) + + let expectedRequest: [UInt8] = { + var bin = [UInt8].init(repeating: 0, count: 64) + bin[0] = 0x12 // opcode for the request + // bin[1] = 0 - no request flags + // bin[2...3] = 0 - padding + bin[4...7] = [0x9c, 0xff, 0xff, 0xff] // -100 in UInt32 - dirfd + // bin[8...15] = 0 - zeroes + withUnsafeBytes(of: pathPtr) { + // path pointer + bin[16...23] = ArraySlice($0) + } + // bin[24...43] = 0 - zeroes + withUnsafeBytes(of: UInt32(fileSlot.index + 1)) { + // file index + 1 - yes, unfortunately + bin[44...47] = ArraySlice($0) + } + return bin + }() + + let actualRequest = requestBytes(req.makeRawRequest()) + XCTAssertEqual(expectedRequest, actualRequest) + } +} diff --git a/Tests/SystemTests/IORingTests.swift b/Tests/SystemTests/IORingTests.swift new file mode 100644 index 00000000..306516be --- /dev/null +++ b/Tests/SystemTests/IORingTests.swift @@ -0,0 +1,20 @@ +import XCTest + +#if SYSTEM_PACKAGE +import SystemPackage +#else +import System +#endif + +final class IORingTests: XCTestCase { + func testInit() throws { + _ = try IORing(queueDepth: 32) + } + + func testNop() throws { + var ring = try IORing(queueDepth: 32) + try ring.submit(linkedRequests: IORequest.nop()) + let completion = try ring.blockingConsumeCompletion() + XCTAssertEqual(completion.result, 0) + } +}