Saved by the compiler: Parallelizing a loop with Rust and rayon

Saved by the compiler: Parallelizing a loop with Rust and rayon

·

8 min read

The Rust compiler just saved me from a nasty threading bug. I was working on an open source development tool for Docker apps with lots of microservices, and I decided to parallelize the routine that transformed docker-compose.yml files. This was mostly an excuse to check out the awesome rayon library, but it turned into a great example of what real-world Rust development is like.

The original routine looked something like this:

/// Process our pods, flattening and transforming them using our
/// plugins, and output them to the specified directory.
fn output_helper(&self, op: Operation, export_dir: &Path) -> Result<()> {
    // Output each pod.
    for pod in &self.pods {
        // Don't export pods which aren't enabled.
        if !pod.enabled_in(&self.current_target) {
            continue;
        }

        // Figure out where to put our pod.
        // ...

        // Combine overrides, make it standalone, tweak as needed, and
        // output.
        let mut file = try!(pod.merged_file(&self.current_target));
        try!(file.make_standalone(&self.pods_dir()));
        let ctx = plugins::Context::new(self, pod);
        try!(self.plugins().transform(op, &ctx, &mut file));
        try!(file.write_to_path(out_path));
    }
    Ok(())
}

To convert this to a parallel loop, I started by changing:

for pod in &self.pods {

To:

self.pods.par_iter().map(|pod| -> Result<()> {

Here, Result<()> means "this closure might return an error, or it might return an empty tuple, basically void." (For more information on Rust error handling, check out the Rust book.) But it couldn't be that easy, could it?

Nope. The Rust compiler showed me the following error message. It's long, but I'll translate:

error[E0277]: the trait bound `plugins::PluginTransform + 'static: std::marker::Sync` is not satisfied
   --> src/project.rs:397:30
    |
397 |         self.pods.par_iter().map(|pod| -> Result<()> {
    |                              ^^^ trait `plugins::PluginTransform + 'static: std::marker::Sync` not satisfied
    |
    = note: `plugins::PluginTransform + 'static` cannot be shared between threads safely
    = note: required because it appears within the type `Box<plugins::PluginTransform + 'static>`
    = note: required because of the requirements on the impl of `std::marker::Sync` for `std::ptr::Unique<Box<plugins::PluginTransform + 'static>>`
    = note: required because it appears within the type `alloc::raw_vec::RawVec<Box<plugins::PluginTransform + 'static>>`
    = note: required because it appears within the type `std::vec::Vec<Box<plugins::PluginTransform + 'static>>`
    = note: required because it appears within the type `plugins::Manager`
    = note: required because it appears within the type `std::option::Option<plugins::Manager>`
    = note: required because it appears within the type `project::Project`
    = note: required because it appears within the type `&project::Project`
    = note: required because it appears within the type `&&project::Project`
    = note: required because it appears within the type `[closure@src/project.rs:397:34: 424:10 op:&plugins::Operation, self:&&project::Project, export_dir:&&std::path::Path]`

The key bit to take away here is that "Sync is not satisfied" for the trait plugins::PluginTransform. In Rust, Sync is a special trait that tells the compiler that it's safe to share an object between two threads. That whole long list of "notes" afterwards looks intimidating, but it's really just telling me that my PluginTransform objects live inside my plugins::Manager type, which in turn lives inside my Project. The Rust compiler is verbose, but it tries to be helpful.

A bit of investigation reveals that PluginTransform inherits from the Plugin trait:

pub trait PluginTransform: Plugin {

So we can fix this problem by changing the declaration of Plugin from:

pub trait Plugin {

To:

pub trait Plugin: Sync {

This says, "All types implementing Plugin must also allow me to access them from multiple threads." With that fixed, I re-run cargo test and get a similar message:

error[E0277]: the trait bound `plugins::transform::vault::GenerateToken + 'static: std::marker::Sync` is not satisfied
   --> src/plugins/transform/vault.rs:230:6
    |
230 | impl plugins::Plugin for Plugin {
    |      ^^^^^^^^^^^^^^^ trait `plugins::transform::vault::GenerateToken + 'static: std::marker::Sync` not satisfied
    |

This time, I change:

trait GenerateToken: Debug {

To:

trait GenerateToken: Debug + Sync {

This says, "All types which implement GenerateToken must also implement Debug (so I can print them), as well as Sync (so I can share them). We're making progress!

Re-running cargo test, however, reveals the actual bug, and it would have been a nightmare to debug:

error[E0277]: the trait bound `std::rc::Rc<std::cell::RefCell<std::vec::Vec<(std::string::String, std::vec::Vec<std::string::String>, vault::client::VaultDuration)>>>: std::marker::Sync` is not satisfied
   --> src/plugins/transform/vault.rs:124:6
    |
124 | impl GenerateToken for MockVault {
    |      ^^^^^^^^^^^^^ trait `std::rc::Rc<std::cell::RefCell<std::vec::Vec<(std::string::String, std::vec::Vec<std::string::String>, vault::client::VaultDuration)>>>: std::marker::Sync` not satisfied
    |

Uh-oh. In our test harness, we have a type MockVault, which contains an Rc<RefCell<_>>:

type MockVaultCalls = Rc<RefCell<Vec<(String, Vec<String>, VaultDuration)>>>;

/// A fake interface to vault for testing purposes.
#[derive(Debug)]
#[cfg(test)]
struct MockVault {
    /// The tokens we were asked to generate.  We store these in a RefCell
    /// so that we can have "interior" mutability, because we don't want
    /// `generate_token` to be `&mut self` in the general case.
    calls: MockVaultCalls,
}

We use the MockVault to simulate a connection to Hashicorp's Vault, a secure central storage for passwords and other secrets, which issues time-limited credentials. And when we test our Vault code, we use the calls member here to record all the requests that we would have made to Vault.

The type Rc<RefCell<_>> is a hack. The GenerateToken API assumes that our token-generator is a read-only object. This is good, because we want to access it from multiple threads! But in the test code, we need to create some "interior" mutable state. Basically, we ask Rust to replace compile-time borrow checks with run-time borrow checks. (For more details, see the Rust book.) But Rc<RefCell<_>> is a lightweight mechanism designed for single-threaded code.

The fix is to replace Rc<RefCell<_>> with Arc<RwLock<_>>, which is fully thread-safe:

type MockVaultCalls = Arc<RwLock<Vec<(String, Vec<String>, VaultDuration)>>>;

Once we make this change, Rust reminds us to change all the code that accesses calls, too:

error: no method named `borrow_mut` found for type `std::sync::Arc<std::sync::RwLock<std::vec::Vec<(std::string::String, std::vec::Vec<std::string::String>, vault::client::VaultDuration)>>>` in the current scope
   --> src/plugins/transform/vault.rs:132:20
    |
132 |         self.calls.borrow_mut().push((display_name.to_owned(), policies, ttl));
    |                    ^^^^^^^^^^
    |
...
error: no method named `borrow` found for type `std::sync::Arc<std::sync::RwLock<std::vec::Vec<(std::string::String, std::vec::Vec<std::string::String>, vault::client::VaultDuration)>>>` in the current scope
   --> src/plugins/transform/vault.rs:368:23
    |
368 |     let calls = calls.borrow();
    |                       ^^^^^^
    |

Phew. If we hadn't caught that Rc<RefCell<_>>, our test suites might have randomly segfaulted a couple of times a week, and it would have taken us weeks to track it down. I don't even think our current test suites could trigger this bug, but I bet a future version would have been able to, leaving a nasty surprise for us someday. The Rust compiler dug down through layers of data structures and found the one bit that wasn't thread safe.

Wrapping it up

Rust also complains about that fact that we're calling continue from inside a closure:

error[E0267]: `continue` inside of a closure
   --> src/project.rs:402:17
    |
402 |                 continue;
    |                 ^^^^^^^^ cannot break inside of a closure

We can fix this by moving this code:

// Don't export pods which aren't enabled.
if !pod.enabled_in(&self.current_target) {
    continue;
}

...into a filter call on our parallel iterator:

self.pods.par_iter()
    // Don't export pods which aren't enabled.
    .filter(|pod| pod.enabled_in(&self.current_target))
    // Process each pod in parallel.
    .map(|pod| -> Result<()> {

The final problem is that we now have a parallel computation that returns a bunch of Result<()> objects, and we need to boil them down to a single Result<()> like we had before. If we have a bunch of error messages, I'm happy to just pick one. No need to overwhelm the user.

I spoke to Josh Stone, and he helped me come up with the following:

    .map(|pod| -> Result<()> {
        // ...
    }
    // If more than one parallel branch fails, just return one error.
    .reduce_with(|result1, result2| result1.and(result2))
    .unwrap_or(Ok(()))

This says, "Given any two results named result1 and result2, take result1 if it's an error, otherwise take result2." It's basically a short-circuit && operator, but for Result values instead of booleans. I've filed an issue suggesting that rayon should provide those last two lines as a built-in function.

And yes, that's a genuine parallel map-reduce in Rust!

So here's our final loop, using work-stealing parallelism:

fn output_helper(&self, op: Operation, export_dir: &Path) -> Result<()> {
    // Output each pod.  This isn't especially slow (except maybe the
    // Vault plugin), but parallelizing things is easy.
    self.pods.par_iter()

        // Don't export pods which aren't enabled.
        .filter(|pod| pod.enabled_in(&self.current_target))

        // Process each pod in parallel.
        .map(|pod| -> Result<()> {
            // Figure out where to put our pod.
            // ...

            // Combine overrides, make it standalone, tweak as needed, and
            // output.
            let mut file = try!(pod.merged_file(&self.current_target));
            try!(file.make_standalone(&self.pods_dir()));
            let ctx = plugins::Context::new(self, pod);
            try!(self.plugins().transform(op, &ctx, &mut file));
            try!(file.write_to_path(out_path));
            Ok(())
        })

        // If more than one parallel branch fails, just return one error.
        .reduce_with(|result1, result2| result1.and(result2))
        .unwrap_or(Ok(()))
}

This was pretty painless! I have a lot more things to parallelize, of course. But this is what day-to-day Rust development is like: I have to do a bit of extra work to satisfy the compiler (which mostly becomes a reflex). But in turn, the compiler ferrets out all kinds of subtle concurrency errors and generally watches my back. It's an interesting tradeoff, and overall I like it.


The best of AI, right in your inbox