The State of Rust Client-Side Middleware
Introduction
When writing a non-trivial web service, we sooner or later discover operations we'd like to perform on requests and/or responses independently of their destinations. For instance, we may want to authenticate all incoming requests. We may want to set a certain header on all outgoing responses. We may want to assign a unique "request ID" to requests on the way in, and transfer it to the corresponding response on the way out. And so forth. Such operations are nowadays colloquially referred to as "middleware" (the term had a very different meaning c. 2000).
In the Rust ecosystem, the canonical library for middleware is tower. tower
provides a general set of abstractions for building middleware on top of any request/response-style service. In fact, their core abstraction is actually named Service. A tower
Service
is anything that takes a request of known type & asynchronously returns a response of a known type. They also define a Layer abstraction that allows application developers to compose middleware & build a "stack" of operations on top of their core service.
For example, here's such a stack for a service I'm building right now:
Router::new() .route("/healthcheck", get(healthcheck)) // more routes... // requests // | // v // +--------- SetRequestIdLayer ---------+ // | +------- OTEL layer -------+ | // | | +----- TraceLayer -----+ | | // | | | +--- PropagateRequestIdLayer ---+ | | | // | | | | | | | | // | | | | handler | | | | // | | | | | | | | // | | | +--- PropagateRequestIdLayer ---+ | | | // | | +----- TraceLayer -----+ | | // | +------- OTEL Layer -------+ | // +--------- SetRequestIdLayer ---------+ // | // v // responses .layer(PropagateRequestIdLayer::new( HeaderName::from_static("x-request-id"), )) .layer( TraceLayer::new_for_http() .make_span_with(DefaultMakeSpan::new().include_headers(true)) .on_response(DefaultOnResponse::new().include_headers(true)), ) .layer(axum::middleware::from_fn_with_state( state.clone(), otel_middleware, )) .layer(SetRequestIdLayer::new( HeaderName::from_static("x-request-id"), RequestIdGenerator::default(), )) .with_state(state)
This is all fairly standard stuff; what I only learned recently, and what occasioned this post, is that the same problem exists on the client side: while building a non-trivial HTTP client, I discovered operations I wanted to perform on requests and/or responses independently of the request targets or bodies. I wanted to respect rate-limits for certain sites, for instance. I wanted to retry failed requests. I wanted to set certain headers on all requests that went out through my client. And so forth.
Regrettably, the the situation on the client-side is not so tidy as on the server-side. The problem is that tower-http, which is the tower
crate with middleware that is specific to HTTP, chose to use "the http and http-body crates as the HTTP abstractions." That's not a problem in itself; http
and http-body
are fine libraries. The issue is that the canonical HTTP client in Rust is Reqwest. Reqwest
, while also a fine library, doesn't use http
… at all. It uses its own abstractions for pretty-much everything the http
crate provides. Which means that all the nice middleware provided by tower-http
can't be used with a Reqwest
client. This has been long-recognized as an issue, but as far as I know there's no work ongoing within the Reqwest project to remedy this.
I've found a few attempts to improve this situation. reqwest-middleware creates its own, client-side middleware abstractions in the style of tower
, but implemented in terms of reqwest
. I've used it, and it does the job, but… it's not tower
, meaning that any tower
functionality it wants to provide has to be re-implemented.
There's a new crate, tower-reqwest that "provides adapters to use [the] reqwest
client with tower_http
layers." I spent some time working with this crate, as well. The author has re-implemented the reqwest
Client API on top of tower
Service
, which I'm not sure I want to sign-up for.
But still: at it's heart, tower-reqwest
has an interesting idea: reqwest::Client implements Service
; what if we could just write a tower
service designed to wrap a Reqwest
client and that did nothing but translate between Reqwest
requests & responses and http
requests & responses? Then we could stack all the tower
& tower-http
middleware on top of that we want.
This rest of this post is, as usual, a meandering account of how that worked-out. The TL;DR; is that it did work-out, and I'm now using it, but only after a serious refresher in Rust futures, pinning, & structural projection. If you just want the solution, you should probably hop over to the Github project accompanying this post, since from here on out I'm going to be attempting to show how I got there, with a few detours into topics that came up along the way.
In particular, I'd like to demonstrate a style of coding that I've been using recently. I picked it up from the functional world (from Idris in particular), wherein I begin by sketching the "shape" of a term, and work interactively with the language to complete it. In Idris, one can include "holes" in one's terms, and ask the compiler, for each hole, "what do I know at this point?" and "what is the type of the sub-term that goes here?" I've increasingly been using Emacs' eglot as an LSP client for rust-analyzer, along with the todo!() & unwrap() macros to work in a similar way in Rust. For all I know, everyone in the Rust community already codes this way, and I'm just figuring that out now: still, I learned it from Idris (and Rocq, neƩ Coq) and find it interesting that Rust supports it as well.
The First Attempt
So: we want to write a service (in the tower
sense of the word) that wraps another service that "speaks reqwest":
// This is our tower Service. It wraps an inner service S which must take reqwest Requests: struct ReqwestService<S> where S: tower::Service<reqwest::Request>, { inner: S, }
We're going to need an error type. I like Snafu:
#[derive(Debug, Snafu)] pub enum Error { } type Result<T> = std::result::Result<T, Error>; type StdResult<T, E> = std::result::Result<T, E>;
To "play" in the tower
ecosystem, we need to implement the Service trait:
pub trait Service<Request> { type Response; type Error; type Future: Future<Output = Result<Self::Response, Self::Error>>; // Required methods fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>; fn call(&mut self, req: Request) -> Self::Future; }
It is here that we need to declare the type of requests that our service accepts. The entire point of this service is to compose with tower
& tower-http
layers above it, so that has to be http::Request<B>, but what should B
, the type which we'll use to model request bodies, be? The hell with it: this is a first draft, so let's just set it to String for now:
impl<S> tower::Service<http::Request<String>> for ReqwestService<S> where S: tower::Service<reqwest::Request> //...
and if we take the same attiude towards our responses, we already know two out of three of our associated types:
type Response = http::Response<String>; // We need to declare our error type; may as well use our own. type Error = Error; // The future type... well, not sure, yet: type Future = todo!();
Now we have two methods to implement: poll_ready()
and call()
. Let's skip the former for now, since the action's going to be in the latter. In order to make a "call" to our service, we need to sequence three operations:
- a synchronous, fallible operation converting an incoming
http::Request
into areqwest::Request
- an async, fallible operation sending that
reqwest::Request
through our inner service; note that when this resolves, we only have the response head - meaning that we have a third, async, fallible operation that consists of collecting the response body and converting it to our outgoing response type (i.e.
Self::Response
)
Huh. This seems a bit challenging at first blush: if the first operation fails, we need to somehow build a future that immediately resolves to that error. If it succeeds, we can at least name the future we'll be dealing with: it's just S::Future
. But if that succeeds, we can't even name the final future we'll need to await.
Well, whatever happens, we know what we want our future to resolve to:
Result<http::Response<String>>
Rust gives us a way to express such a type:
dyn Future<Output = Result<http::Response<String>>>
Such a thing will have to live indefinitely, so let's put it on the heap. And, as we'll discuss more precisely below, futures don't like to be moved. So let's set our Future
associated type to:
type Future = Pin<Box<dyn Future<Output = Result<http::Response<String>>>>>;
Alright, that at least gives us the signature and the initial shape of call()
:
fn call(&mut self, req: http::Request<String>) -> Self::Future { match reqwest::Request::try_from(req) { Ok(req) => todo!(), Err(err) => todo!(), } }
Let's handle the failure case first, because that's easier. I have to return a future that resolves to Self::Future::Output
. But… this shouldn't be hard; I know the result: err
. Fortunately, the futures crate provides just the thing: err() takes an error type and returns a future that immediately resolves to that thing. The only catch is that here, err: reqwest::Error
, and we've promised a result containing our own error type. Let's create an error variant for this failure mode and use it:
#[derive(Debug, Snafu)] pub enum Error { #[snafu(display( "Failed to convert the incoming response body to an http response body: {source}" ))] ResponseBody { source: reqwest::Error }, } // ... fn call(&mut self, req: http::Request<String>) -> Self::Future { match reqwest::Request::try_from(req) { Ok(req) => todo!(), Err(err) => Box::pin(err(ReqwestBodySnafu.into_error(err))), } }
Cool. On to the success case. We've now got a reqwest
Request to give to our inner service, a fallible operation, and if it succeeds we then want to perform a second, async, fallible operation (collecting the response body and converting it to an http
Response). futures
again has us covered: they define an extension trait for fallible async operations, TryFutureExt, that defines a method that does exactly what we want: and_then(). So we can write:
#[derive(Debug, Snafu)] pub enum Error { //... #[snafu(display( "Failed to convert the incoming response body to an http response body: {source}" ))] ResponseBody { source: reqwest::Error }, } // ... fn call(&mut self, req: http::Request<String>) -> Self::Future { match reqwest::Request::try_from(req) { Ok(req) => Box::pin(self .inner .call(req) // Recall that `call()` returns a a `Result` that we'll need to handle. Just unwrap it // for now .unwrap() .and_then(|rsp: S::Response| todo!())), Err(err) => Box::pin(err(ReqwestBodySnafu.into_error(err))), } }
Inside our second future, we'll need to collect the response body and translate it to an http::Response<String>
instance. Again, we'll just unwrap()
our intermediate Results
for now:
fn call(&mut self, req: http::Request<String>) -> Self::Future { match reqwest::Request::try_from(req) { Ok(req) => self .inner .call(req) .unwrap() .and_then(|rsp: S::Response| async move { let status = rsp.status(); // We should copy over the headers, as well. Leave it for now. http::Response::builder().status(status).body(rsp.text().await.unwrap()).unwrap() }), Err(err) => Box::pin(err(ReqwestBodySnafu.into_error(err))), } }
This looks good, but in the event, I wound-up imposing a few more requirements on S
in order to get this to compile. First, we demand that S
return a response of reqwest::Response
. If we don't do this, we'll have no way to convert from S's responses into our responses (since the only information we'll have is the name of the associated type S::Response
; that could be a string, the unit type, or some type we don't even know about).
We also have to constrain it to use reqwest::Error
as its error type, in order to get the invocation of TryFutureExt::and_then
to type-check.
Finally, and since we've type-erased the future our service returns, we've required S
's future to have 'static lifetime. This is because we return a Box<dyn Future...>
; by default, the trait object will have a 'static lifetime (see here). We can explicitly name the lifetime instead, but it wasn't obvious to me what that should be.
My first implementation can be found in the file first.rs in this post's associated project, but here's the Service
implementation:
impl<S> tower::Service<http::Request<String>> for ReqwestService<S> where S: tower::Service<reqwest::Request, Response = reqwest::Response, Error = reqwest::Error>, S::Future: 'static, { // Our response type type Response = http::Response<String>; // We need to declare our error type; may as well use our own. type Error = Error; // The problem with using `TryFutureExt::and_then` is that we can't *name* the future // it returns; instead, we erase the type & just return a trait object. type Future = Pin<Box<dyn Future<Output = Result<http::Response<String>>>>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<StdResult<(), Self::Error>> { // Skip for now todo!() } fn call(&mut self, req: http::Request<String>) -> Self::Future { // Right off the bat; synchronously, we can try to turn our request into a // `reqwest::Request`-- if that fails, then we're done. match reqwest::Request::try_from(req) { // If we're here, we have a `reqwest::Request`, so we can delegate to our wrapped, "inner" // service... Ok(req) => Box::pin( self.inner .call(req) // which will return a `reqwest::Error` on failure-- map that to this module's // error type... .map_err(|err: reqwest::Error| ReqwestSnafu.into_error(err)) // and then use the `futures` `TryResultExt` trait to sequence a _second_ future // after that call. At this point, we only have the response head. .and_then(|rsp: S::Response| async move { let status = rsp.status(); match rsp.text().await { // We need to translate the `reqwest::Response` into an // `http::Response<String>`; this is just a stub where we'll copy over // the status code & body, but that's it. Ok(text) => http::Response::builder() .status(status) .body(text) .context(TextSnafu), Err(err) => Err(ResponseBodySnafu.into_error(err)), } }), ), // If we're here, we never even send the request out. We nevertheless have to return a // future. The `futures` crate provides a handy implementation that will resolve // immediately to an error value. Err(err) => Box::pin(futures::future::err(ReqwestBodySnafu.into_error(err))), } } }
All the constraints are unfortunate, but this implementation does work; for example:
// Build a tower Service: let mut service = ServiceBuilder::new() // Pick a generic tower service... .layer(RateLimitLayer::new(1, Duration::from_secs(1))) // along with a tower-http service: .layer(SetRequestHeaderLayer::overriding( USER_AGENT, HeaderValue::from_static("reqwest-tower/0.0.1 (+sp1ff@pobox.com)"), )) // and finally, closest to the reqwest Client, install *our* service: .layer(ReqwestServiceLayer) .service(reqwest::Client::new()); // Now put a (n http) request through our service: let req = http::request::Request::builder() .method(http::Method::GET) .uri("https://unwoundstack.com") .body(String::default()) .expect("Failed to build a Request!?"); let rsp: http::Response<String> = service.call(req).await.expect("Failed to send a request"); debug!("Got {}", rsp.status());
My Second Attempt
At this point, we have a solution, but not one that's terribly satisfying. The biggest problem, to my mind, was the requirement that we model the request & response bodies as String
– pretty limiting. In what ways did my first attempt depend on the fact that the message bodies were strings? I used that fact in defining the types associated with my Service
implementation, and in my logic for converting the inner service's responses to http
responses. We'd like to foist this work off onto our caller. The way we express behavior without implementation is typeclass- er, traits. We can define a trait that models the desired behavior, demand that our callers implement that trait, and make our Service
implementation generic over all such implementations.
// Let's wriggle-out of constraining the inner service's response type by factoring-out the logic // for going from whatever our wrapped service returns to our desired response type: #[async_trait] trait FromResponse { // The type of response returned by our wrapped service type InnerResponse; // The type of response body desired by middleware stacked on top of us type ResponseBody; // How to convert from the former to the latter async fn try_into_response( &self, _: Self::InnerResponse, ) -> Result<http::Response<Self::ResponseBody>>; }
Now we need to parameterize our Service by the response translation type…
struct ReqwestService<S, R> where S: tower::Service<reqwest::Request>, // and it's probably a good idea to make sure their associated types match-up right off the bat: R: FromResponse<InnerResponse = S::Response>, { inner: S, from_response: R, }
The first point of improvement is to parameterize the request body type. reqwest::Request
implements TryFrom<http::Request<B>>
when B: Into<Body>
… which is actually pretty constrained. But still, it's an improvement. Notice that we've traded diminished complexity in the call()
implementation for increased work at the trait level. I regard this as a good trade because the latter takes place at compile-time and the former at run-time:
impl<S, ReqBody, R> tower::Service<http::Request<ReqBody>> for ReqwestService<S, R> where ReqBody: Into<reqwest::Body>, // Again, make sure the response that our inner service is returning matches-up with what our // translation trait expects, R: FromResponse<InnerResponse = S::Response>, // Since an `R` instance will be moved into a Future, it can't have any references that would // limit its lifetime. R: FromResponse + Clone + 'static, // Likewise S::Response: 'static, // We now only require that S be a `Service<reqwest::Service>` S: tower::Service<reqwest::Request>, // We can relax the requirement on S::Error to at least implement `StdError`-- since we're // erasing the type, we also need to require static lifetime. S::Error: StdError + 'static, // Finally, and since we've type-erased the future *our* service returns, we need S's future to // have 'static lifetime. S::Future: 'static, { // Our response type (see above) type Response = http::Response<R::ResponseBody>; // We need to declare our error type; may as well use our own. type Error = Error; // The problem with using `TryFutureExt::and_then` is that we can't *name* the future // it returns; instead, we erase the type & just return a trait object. type Future = Pin<Box<dyn Future<Output = Result<http::Response<R::ResponseBody>>>>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<StdResult<(), Self::Error>> { // Skip for now todo!() } fn call(&mut self, req: http::Request<ReqBody>) -> Self::Future { // Right off the bat; synchronously, we can try to turn our request into a // `reqwest::Request`-- if that fails, then we're done. match reqwest::Request::try_from(req) { // If we're here, we have a `reqwest::Request`, so we can delegate to our wrapped, "inner" // service... Ok(req) => { let from_response = self.from_response.clone(); Box::pin( self.inner .call(req) // which will return a `reqwest::Error` on failure-- map that to this module's // error type... .map_err(|err: S::Error| ReqwestSnafu.into_error(Box::new(err))) // and then use the `futures` `TryResultExt` trait to sequence a _second_ future // after that call. At this point, we only have the response head. .and_then(|rsp: S::Response| async move { from_response.try_into_response(rsp).await }), ) } // If we're here, we never even send the request out. We nevertheless have to return a // future. The `futures` crate provides a handy implementation that will resolve // immediately to an error value. Err(err) => Box::pin(futures::future::err(ReqwestBodySnafu.into_error(err))), } } }
You can find the full implementation in the file second.rs in this post's associated project.
My Third Attempt
We next turn to the 'static
bounds on our inner service's response, and on the future returned by our inner service. These bounds were driven by the fact that we're returning a boxed, type-erased future in our call() implementation (as mentioned here). To be honest, I didn't have a great sense of how limiting that would be in practice, but I didn't care for it, and the bound doesn't generally appear on Service
implementations in the tower-http
crate.
In order to get rid of them, I chose to get more control over the implementation of my future by implementing it myself:
enum State { InitialConversionFailed, InnerPending, ResponseBodyPending, } struct ReqwestServiceFuture<S, R> where S: tower::Service<reqwest::Request>, R: FromResponse<InnerResponse = S::Response>, { from_response: R, state: State, first_err: Option<reqwest::Error>, inner_fut: Option<S::Future>, body_fut: Option<Pin<Box<dyn Future<Output = Result<http::Response<R::ResponseBody>>>>>>, } // We'll instantiate this future with the results of our initial conversion operation impl<S, R> ReqwestServiceFuture<S, R> where S: tower::Service<reqwest::Request>, R: FromResponse<InnerResponse = S::Response>, { pub fn new( from_response: R, res: StdResult<<S as Service<reqwest::Request>>::Future, reqwest::Error>, ) -> ReqwestServiceFuture<S, R> { match res { Ok(fut) => Self { from_response, state: State::InnerPending, first_err: None, inner_fut: Some(fut), body_fut: None, }, Err(err) => Self { from_response, state: State::InitialConversionFailed, first_err: Some(err), inner_fut: None, body_fut: None, }, } } }
Our future will still take the form of a state machine, but now more explicitly, with states corresponding to the three variants of the State
enum. The reader may find the ReqwestServiceFuture
struct a bit odd. For instance, why not this:
enum State<S> where S: tower::Service<reqwest::Request> { InitialConversionFailed(reqwest::Error), InnerPending(S::Future), ResponseBodyPending(Pin<Box<dyn Future<Output = Result<http::Response<R::ResponseBody>>>>>), } struct ReqwestServiceFuture<S, R> where S: tower::Service<reqwest::Request>, R: FromResponse<InnerResponse = S::Response>, { from_response: R, state: State<S>, }
which would seem to represent our states more faithfully? This, in fact, was my first attempt, but I could never figure-out how to match state
(which entailed taking a borrow of some kind), and then mutate it to change states. A look at some of the tower-http
source (here, e.g.) led me to this approach: moving state out into fields in the struct representing our future made it easier to work with them, and wrapping them each in an Option makes it easier for us to move out of them (because Option
, unlike the types of my fields, provides an implementation of Default).
So: a first draft of implementing our future "by hand" might look like:
impl<S, R> Future for ReqwestServiceFuture<S, R> where S: tower::Service<reqwest::Request>, S::Error: StdError + 'static, R: FromResponse<InnerResponse = S::Response> + Clone + 'static, { type Output = Result<http::Response<R::ResponseBody>>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { match self.state { State::InitialConversionFailed => { // Initial conversion failed, so we'll resolve right away... just not sure to what: todo!() } State::InnerPending => { // We've kicked-off the call through our "inner" Service-- we need to poll self.inner_fut todo!() } State::ResponseBodyPending => { // We've kicked-off the call to collect our response body-- we need to poll self.body_fut todo!() } } } }
OK… that first match arms looks pretty simple; let's start there. We've failed to convert the incoming http
request into a reqwest
request, so we need to return Poll::Ready with a payload of Err
. Furthermore, we've got the reqwest::Error sitting in our first_err
field. This should be easy. A first attempt could be:
#[derive(Debug, Snafu)] pub enum Error { // ... // Whip-up an Error variant for this particular failure... #[snafu(display( "Failed to convert the incoming http request body to a reqwest request body: {source}" ))] ReqwestBody { source: reqwest::Error }, } impl<S, R> Future for ReqwestServiceFuture<S, R> // ... { // ... fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { match self.state { State::InitialConversionFailed => { Poll::Ready(Err( ReqwestBodySnafu.into_error(std::mem::take(&mut self.first_err).unwrap(/* known good */)) )) }
If we're in state InitialConversionFailed
then we know that self.first_err
is Some
, and that it contains the error produced by the failed conversion attempt. Let's take it out of the future (leaving a None
in its place) and hand it off to the Error
we're constructing. Alas:
error[E0596]: cannot borrow data in dereference of `Pin<&mut ReqwestServiceFuture<S, R>>` as mutable --> post/src/bin/fourth.rs:182:64 | 182 | ReqwestBodySnafu.into_error(std::mem::take(&mut self.first_err).unwrap()) | ^^^^^^^^^^^^^^^^^^^ cannot borrow as mutable | = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Pin<&mut ReqwestServiceFuture<S, R>>
It is at this point we have to address the fact that we don't have, as this method's receiver, a &mut ReqwestServiceFuture
, we have a pin of a &mut ReqwestServiceFuture
.
Pinning
A Google search for "rust pin" turned up page after page of posts on this topic. The standard library docs are quite good. I also recommend fasterthanlime's post on the topic: Pin & Suffering. Herewith I throw my own post on to the pile, with a focus on getting this particular future implemented.
If you're not coming from a background in C/C++, you may be unfamiliar with the idea of moving things around in memory. Briefly, the idea is this: at any given time, each value in your program is located at a particular place in memory. As you assign that value to other variables, pass it into functions, or generally use it in your program, the compiler is allowed, under certain language-specific rules, to move that value to another location in memory. The docs for the pin
module in the standard library have a nice example here.
This is, generally speaking, transparent to the user in higher-level languages. Things can get tricky, however, in the case of self-referential structs: structs which contain a pointer to another field inside themselves. Yes, in Rust, we need to resort to unsafe code to construct such a struct, but having done so I trust that it is clear to the reader that naively moving that struct to another memory location would be disastrous because the pointer would now be pointing to old memory.
As it turns out, this is the case in which we often find ourselves when it comes to futures. When we write something like:
async fn foo() -> Thing
under the hood, the compiler implements a future, and changes your function to something like:
fn foo() -> impl Future<Output = Thing>
where the thing that implements the future, and actually gets returned from foo()
is some un-namable, compiler-generated type, that is in general self-referential. The type is a state machine driven by invocation of its poll()
method, moving through the work that it can do each time, and returning Poll::Pending
when it hits its next await point. As a rule, you need to fix the location of a future before you poll it for the first time, and you can't move it once you've polled it for the first time, again because the compiler-generated types are (in general) self-referential.
The Pin type was introduced to express this. Pin
wraps another type, typically a type that acts like a pointer (i.e. it implements Deref or DerefMut), in such a way as to make it impossible to move the pointee once it's been wrapped (or "pinned", in the parlance). Before we get into that, we need to take note of the fact that most Rust types are perfectly insensitive to being moved. Take an i32
, for instance: the compiler can move it all over the place and it's still the same integer. Rust models this property with the Unpin marker trait. Just as the compiler will mark your type if it's Sync
, or Send
, it will mark it as Unpin
if it can prove that the type can be safely moved.
With that in mind, how can we pin a value? Pin
offers two constructors. The first is new(): you can simply call new on a reference (mutable or not) to your value to get a new Pin
. Here's the example from the method's docs:
let mut val: u8 = 11; let mut pinned: Pin<&mut u8> = Pin::new(&mut val);
The catch here is: you can only call new()
on values whose types are Unpin
! It gets worse: if your type is Unpin
, there's actually nothing stopping you from moving your pinned value; we can do this:
let mut val: u8 = 11; let mut pinned: Pin<&mut u8> = Pin::new(&mut val); let mut val2: u8 = 12; let _ = mem::swap(&mut *pinned, &mut val2);
(Playground link), because Pin
implements DerefMut
when its wrapped type implements it with a target that is itself Unpin
. In other words, pinning a value of a type that is itself Unpin via pin::new()
doesn't do much. The doc's comment string expresses its use well: "Since [the wrapped value] doesn't care about being moved, we can safely create a 'facade' Pin which will allow [the wrapped value] to participate in Pin-bound APIs without checking that pinning guarantees are actually upheld."
Interesting. Alright, on to the second constructor, new_unchecked(). This has the same signature as new()
, but is implemented for all pointee types, and is unsafe. In other words, if you need to pin a value of type that is not Unpin
, you have to use unsafe code, and do it through new_unchecked()
. Now, that's not the end of the world: all it means is that we're moving into the space that C & C++ programmers have comfortably occupied for decades, the one in which the compiler can't prove you're not eliciting undefined behavior and you, the programmer, just have to get it right.
In this case, by creating a Pin
via new_unchecked()
, you the coder are promising that the value is truly pinned. You are "making the guarantee that the value's data will not be moved nor have its storage invalidated until it gets dropped." Making good on this guarantee can be subtle. Recall that Pin
is generally used to wrap a type that implements Deref
and/or DerefMut
: they must not move out of their self
arguments! You also need to be sure that the reference obtained through these traits is not used in a move; " in particular, it must not be possible to obtain a &mut Ptr::Target
and then move out of that reference (using, for example mem::swap()
)".
In light of all that, I should refine my description of Pin
from a "wrapper" to a type that expresses "move-safety"; if I have a Pin
instance, then I know that either the pinned thing doesn't care about being moved, or that whoever gave me that Pin
is guaranteeing that the thing won't move.
Back to My Third Attempt
Alright– back to our problem: how do we move the error out of self.first_err
and into our return value? Well, we have a pinned ReqwestServiceFuture
, and we ultimately want to get at the first_err
field. A good first step might be to jump from that pin to a pin of first_err
. This ought to be feasible, since the enclosing struct being fixed would imply that all of its fields are also fixed. This is known as structural pinning, and is done through the map_unchecked() and map_unchecked_mut()
methods:
let mut first_err: Pin<&mut Option<reqwest::Error>> = unsafe { self.map_unchecked_mut(|this| &mut this.first_err) };
At first blush, this may not seem like much of a step forward, but: reqwest::Error
is Unpin, which means Option<reqwest::Error>
is, too. That means that Pin<Option<reqwest::Error>>
implements DerefMut (as well as Deref
), meaning that in this particular case, we can reach through the pin to the Option
and just take() it (leaving a None
in it's place). Furthermore, since we know that the error is waiting for us, we can just unwrap()
it:
Poll::Ready(Err( ReqwestBodySnafu.into_error(first_err.take().unwrap(/* known good */)) ))
Note that we're not violating the pin contract for first_err
, here– it's Unpin
, so we're allowed to move it. We are, however, clearly violating the ReqwestServiceFuture
invariant that if the state is InitialConversionFailed
, then first_err
will be Some
. Imagine that, after we return via this route, someone were to call poll()
again: we would panic at the unwrap()
. This is not unusual: per the standard library docs: "Once a future has completed (returned Ready from poll), calling its poll method again may panic, block forever, or cause other kinds of problems; the Future trait places no requirements on the effects of such a call." The futures crate defines an extension trate, FusedFuture that tracks whether its implementors have completed or not.
So: to recap, that leaves our implementation here:
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { match self.state { State::InitialConversionFailed => { // Initial conversion failed, so we'll resolve right away. Project the pin down to // `first_err`, let mut first_err = unsafe { self.map_unchecked_mut(|this| &mut this.first_err) }; // and reach through that pin to take the error out of the future & return it to our // caller. Poll::Ready(Err( ReqwestBodySnafu.into_error(first_err.take().unwrap(/* known good */)) )) } State::InnerPending => { // We've kicked-off the call through our "inner" Service-- we need to poll self.inner_fut todo!() } State::ResponseBodyPending => { // We've kicked-off the call to collect our response body-- we need to poll self.body_fut todo!() } } }
Alright, let's move on to the InnerPending
case, in which we converted the incoming request successfully and dispatched it through our inner service. We got in return a future of type S::Future
and we'd like to poll it. Now, in order to invoke poll()
on that, we're going to need to pin it (recall, poll()
takes a Pin
). We should be able to do the same thing we did above, structurally pin self.inner_fut
through map_unchecked_mut()
. However, there's an important difference in this case: above, the thing to which we were projecting our pin was itself Unpin
, whereas we have no such guarantee, here (S::Future
will, in fact, generally not be movable!). But that's OK– we have no interest in moving it, we just want to poll it, giving us something like this:
State::InnerPending => { // We've kicked-off the call through our "inner" Service-- we need to poll self.inner_fut. // Project the pin: let inner_fut: Pin<&mut <S as Service<reqwest::Request>>::Future> = unsafe { self.as_mut().map_unchecked_mut(|this| this.inner_fut.as_mut().unwrap(/* known good */) ) }; match inner_fut.poll(cx) { Poll::Ready(Ok(_rsp)) => todo!(), Poll::Ready(Err(_err)) => todo!(), Poll::Pending => Poll::Pending, } }
I've taken a few liberties, here. First, I unwrapped my Option
inside the lambda I passed to map_unchecked_mut()
; there's no significant difference, here– it just makes the code to follow a bit cleaner. Second, I called as_mut() on my receiver before projecting the pin: "This method is useful when doing multiple calls to functions that consume the pinning pointer." map_unchecked_mut()
will consume the Pin
on which it is invoked, and we're going to need it, below. Finally, I avoided a nested match statement by destructuring the result in the Poll::Ready
case.
If our inner future is pending, then there's not much to say: we return Poll::Pending
to our caller. If our inner service returned an error, we need to return Poll::Ready
with an appropriate error value. Let's skip that for now & move on to the interesting case: what happens when our inner future has successfully resolved to a reqwest::Response
? Conceptually, this is straightforward– we need to feed that response into our FromResponse
implementation, receiving another future in return.
One subtlety: we need to poll that future now; if we don't, the runtime likely will never invoke poll()
on us again. This is because futures aren't polled (by any real runtime) in a tight loop– the future is expected to arrange, in a future-specific way, for the runtime to be notified when conditions are such that the future can make progress. Amos' post goes into this nicely. What's salient to our case is: our inner service's future has resolved; as far as the runtime knows, it's done with it. We have a shiny new future that's never been polled, so as far as the runtime is concerned, it may as well not exist. Ergo, even if we return Poll::Pending
right now, the runtime would never be alerted that conditions have changed in such a way as to enable us to make progress, and so it will have no reason to ever poll us again.
Therefore, we need to poll that future– if it resolves, then great: we're done. If it returns Poll::Pending
, then we need to shift to state ResponseBodyPending
and move that pending future into our body_fut
field. We again begin by projecting our pin, except that in this case, we need access to two of our fields:
State::InnerPending => { // We've kicked-off the call through our "inner" Service-- we need to poll self.inner_fut. // Project the pin: let inner_fut: Pin<&mut <S as Service<reqwest::Request>>::Future> = unsafe { self.as_mut().map_unchecked_mut(|this| this.inner_fut.as_mut().unwrap(/* known good */) ) }; match inner_fut.poll(cx) { Poll::Ready(Ok(_rsp)) => { let (mut state, mut body_fut) = unsafe { let this = self.get_unchecked_mut(); ( Pin::new_unchecked(&mut this.state), Pin::new_unchecked(&mut this.body_fut), ) }; todo!()}, // ... } }
get_unchecked_mut() pulls a mutable reference to the pinned value out of the pin (consuming the pin), with the caveat "You must guarantee that you will never move the data out of the mutable reference you receive when you call this function". Fine: we can't move our future– we've been living with this constraint for some time. Let's now shift state & create our new future:
State::InnerPending => { // We've kicked-off the call through our "inner" Service-- we need to poll self.inner_fut. // Project the pin: let inner_fut: Pin<&mut <S as Service<reqwest::Request>>::Future> = unsafe { self.as_mut().map_unchecked_mut(|this| this.inner_fut.as_mut().unwrap(/* known good */) ) }; match inner_fut.poll(cx) { Poll::Ready(Ok(rsp)) => { let from_response = self.from_response.clone(); let (mut state, mut body_fut) = unsafe { let this = self.get_unchecked_mut(); ( Pin::new_unchecked(&mut this.state), Pin::new_unchecked(&mut this.body_fut), ) }; *state = State::ResponseBodyPending; *body_fut = Some(from_response.try_into_response(rsp)); todo!()}, // ... } }
This brings up an interesting question: I hope that by now it's clear to the reader that upholding the Pin
contract means that we can't move the thing being pinned once we've created the Pin
(unless the thing is Unpin
, of course). What about modifying the thing in place? Is that permissible?
Well, the module docs again come to our rescue: "[pinning is] the ability to rely on [the] guarantee that the value… will:
- Not be moved out of its memory location
- More generally, remain valid at that same memory location."
and: "The purpose of pinning is not just to prevent a value from being moved, but more generally to be able to rely on the pinned value remaining valid at a specific place in memory." We're free to edit a pinned value, in-place, so long as we keep it valid. We're clearly doing that here.
Finally, let's poll it. We regrettably have to jump through a few hoops to get the types to work-out, but otherwise the solution is fairly straightforward:
State::InnerPending => { // We've kicked-off the call through our "inner" Service-- we need to poll self.inner_fut. // Project the pin: let inner_fut = unsafe { self.as_mut().map_unchecked_mut(|this| this.inner_fut.as_mut().unwrap(/* known good */) ) }; match inner_fut.poll(cx) { Poll::Ready(Ok(_rsp)) => { let from_response = self.from_response.clone(); let (mut state, mut body_fut) = unsafe { let this = self.get_unchecked_mut(); ( Pin::new_unchecked(&mut this.state), Pin::new_unchecked(&mut this.body_fut), ) }; *state = State::ResponseBodyPending; *body_fut = Some(from_response.try_into_response(rsp)); match Pin::new(body_fut.get_mut().as_mut().unwrap()).poll(cx) { res @ Poll::Ready(_) => res, Poll::Pending => Poll::Pending, } } }
And here is the complete implementation:
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { match self.state { State::InitialConversionFailed => { let mut first_err: Pin<&mut Option<reqwest::Error>> = unsafe { self.map_unchecked_mut(|this| &mut this.first_err) }; Poll::Ready(Err( ReqwestBodySnafu.into_error(first_err.take().unwrap(/* known good */)) )) } State::InnerPending => { let inner_fut = unsafe { self.as_mut() .map_unchecked_mut(|this| this.inner_fut.as_mut().unwrap()) }; match inner_fut.poll(cx) { Poll::Ready(Ok(rsp)) => { let from_response = self.from_response.clone(); let (mut state, mut body_fut) = unsafe { let this = self.get_unchecked_mut(); ( Pin::new_unchecked(&mut this.state), Pin::new_unchecked(&mut this.body_fut), ) }; *state = State::ResponseBodyPending; *body_fut = Some(from_response.try_into_response(rsp)); match Pin::new(body_fut.get_mut().as_mut().unwrap()).poll(cx) { res @ Poll::Ready(_) => res, Poll::Pending => Poll::Pending, } } Poll::Ready(Err(err)) => { Poll::Ready(Err(ReqwestSnafu.into_error(Box::new(err)))) } Poll::Pending => Poll::Pending, } } State::ResponseBodyPending => { let body_fut = unsafe { self.as_mut() .map_unchecked_mut(|this| this.body_fut.as_mut().unwrap()) }; match body_fut.poll(cx) { res @ Poll::Ready(_) => res, Poll::Pending => Poll::Pending, } } } }
You can find this in the file fourth.rs in this post's associated project.
Conclusion
There's some code duplication in that poll()
implementation, and we still have 'static
lifetime bounds on the Error
and FromResponse
implementations. Also, splitting-up the state among multiple fields of ReqwestServiceFuture
still seems inelegant to me. I haven't mentioned the pin-project crate, both because this post has gotten long enough already, and because it's less useful in this particular situation, where we need to reach through Options
to get at the fields we want to project.
All that said, I'm using the code in actual applications, and the journey, while time-consuming & challenging, has been fruitful. It has, I hope, fixed my understanding of pinning & pin projection once & for all. On a different project this week, I found myself needing to build a Stream implementation on top of a ScyllaDB paged query (i.e. an ansync iterator over a page datastore query). Having worked through all this, the solution seemed obvious & I knocked it out in maybe 30 minutes. I don't know if there was a conscious decision, or whether it grew organically, but there has been an emergent approach to explaining async Rust that focused on its approachability: "it's so simple; just type this, and this, and this, and see? It just works." This approach may appeal to the new async Rust programmer, but it unfortunately leaves them completely unequipped to reason about what's happening when something inevitably doesn't work.
On a different note, again with the caveat that I'm perhaps just now figuring-out something that everyone else has known for a long time, I'm intrigued by the idiom of "cooperative coding": sketch the term, then talk with the LSP server/compiler/whatever to work on filling-in the gaps. Those of us on the path of "making illegal state unrepresentable" are going to be working with increasingly complex type systems; this is an intriguing approach toward helping us manage that complexity.
05/11/25 17:59