Friday, December 8, 2023
Google search engine
HomeUncategorizedHow Quickwit Rust Actor Framework tackles the challenges of efficient indexing

How Quickwit Rust Actor Framework tackles the challenges of efficient indexing

Ferris & co coordinating a choregraphy.

At Quickwit, we are building the most cost-efficient search engine for logs and traces. Such an engine typically ingests massive amounts of data while serving a comparatively low number of search queries. Under this workload, most of your CPU is spent on indexing, making our indexing pipeline a crucial component.

Like most search engines like Elasticsearch or Solr, Quickwit builds its inverted index in small batches.
The indexing pipeline’s role is to receive a continuous stream of documents, cut it into batches of typically 30s, and perform a sequence of operations to produce an new piece of the index we call split.

pipeline-simplified.png

This component is full of fascinating engineering challenges:

  • It should be efficient. In practice, this means the whole pipeline should be carefully streamlined to make sure we do not waste CPU while waiting for another resource.
  • The pipeline should be robust and have a clear and simple way to handle errors.
  • The code needs to be easy to test, navigate, and maintain.

In this article, we delve into how we ended up adopting an actor model base solution for our indexing pipeline and discuss some of the distinctive features of our implementation. Paul Masurel one of our co-founders also gave a talk on this topic at FOSDEM23.

First, let’s crack open the indexing pipeline box and briefly describe what it does. The following is a summary of the steps involved in the indexing pipeline.
Each steps is annotated with the resource it consumes.

  • Parse and transform documents. (CPU)
  • Index JSON documents in an in-memory index. (CPU)
  • Trigger the end of the split according to commit_timeout_secs or split_num_docs_target. This step is what breaks the stream into batches. (CPU, Disk IO)
  • Compress and serializes the in-memory split to the disk. (CPU, Disk IO)
  • Stage the split. (We need to record the split before starting the upload to be able to clean up on failures. (Wait for Metastore)
  • Upload the split to the target storage (Local FS, Object storage). (Disk IO or Network)
  • Publish the split to make it available for search queries. (Wait for Metastore)

As you can see, a document needs to go through a bunch of steps before it can be searchable. A naive implementation would run all of these steps sequentially.





let active_split = ingest_enougth_docs_to_make_split();





let serialized_split = save_split(active_split, local_working_directory);





let split_metadata = serialized_split.metadata();

stage_split(&split_metadata, metastore);





upload_split(serialized_split, storage);





publish_split(split_metadata.split_id, metastore);

However, we saw that these steps are operating on different resources: CPU, disk, network, and sometimes we are simply waiting for another service’s response. Simply running these steps one after the other is a waste of resources. For instance, our CPU would be idle while we are writing our split to disk. We need to run these steps concurrently in order to get the most out of the available computing resources. We can achieve this by dedicating workers to each step and streamlining our pipeline.

Now, if a worker cannot keep up with the incoming tasks, it will accumulate them in its work queue, and eventually crash the system. This calls for some backpressure mechanism where we set a maximum number of items that can be held in a worker’s backlog. When that maximum is reached, all workers attempting to send a job to it will have to wait, thus slowing down and allowing the lagging worker to catch up.

In Rust, bounded mpsc (multiple-producer-single-consumer) channels are a natural solution for this task queue. Here is what such a worker might look like:

use tokio::sync::mpsc::*;



struct Split { }



fn start_worker() -> io::Result<Sender<Split>> {



let (tx, mut rx) = channel::<Split>(3);

tokio::task::spawn(async move {

for split in rx.recv().await {

process_upload(split).await?;

}

io::Result::Ok(())

});

Ok(tx)

}

It turns out this worker setup is what is conceptually called an actor.

As framed by Carl Hewitt in 1973, an actor is a computational entity that, in response to a message it receives, can concurrently:

  • send a finite number of messages to other actors;
  • create a finite number of new actors
  • designate the behavior to be used for the next message it receives.

Before jumping into the specificity of our actor framework, We must recommend the blog post from Alice Ryhl titled Actors with Tokio which shows how to use Actors in Tokio rust, as a programming pattern and without any framework.

Implementing an actor-based solution requires too much wiring to manage the actors themselves. We do not want to mix this with the application code. A few of the actor’s management chores that need to be taken care of are:

  • Monitor or supervise actors and restart them when they fail.
  • Monitor actors’ jobs and schedule retries if necessary.
  • Manage message scheduling including time and priority aspects.
  • Provide testing and observability utilities (ex. mocking time, performance metrics).

Actor frameworks can be handy to abstract away these aspects.

We ended up implementing our own framework called quickwit-actors.
But why implement our own framework when there are a plethora of actor frameworks in the Rust ecosystem (Actix, meio, riker etc …)? We found that most available options except Actix were not actively maintained.

Our main issue with Actix was the lack of message priority. Our indexer, for instance, is in charge of cutting batches every 30 seconds or so. We want to be able to have a message processed as soon as possible after the 30 seconds have elapsed. In Actix and in quickwit-actors, this is done by asking the framework to send a message to our actor after 30 seconds has elapsed. In Actix, that message is enqueued, and will only be processed once all of the pending messages have been processed. In Quickwit, this could mean 10 seconds later for this specific actor. As a solution, in our actor framework, each actor has a high-priority and a low-priority queue. Scheduled messages are delivered to the high-priority queue.

Implementing our own framework also made it possible to bake in some precious features. Such as measuring and reporting backpressure as metrics, and automatically detecting actors that are blocked on some operation.

Quickwit actor framework, the basics

From a Rust developer perspective, you define your actor type and implement the Actor trait, which only requires one method to be implemented. Next, you implement the Handler trait which is generic over the Message type it should handle;

#[derive(Debug)]

struct HealthCheckMessage;



#[derive(Debug, Default)]

struct HealthActor(usize);



#[async_trait]

impl Actor for HealthActor {

type ObservableState = usize;



fn observable_state(&self) -> Self::ObservableState {

self.0

}



async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(),ActorExitStatus> {

ctx.send_self_message(HealthCheckMessage).await?;

Ok(())

}

}



impl Handler<HealthCheckMessage> for HealthActor {

type Reply = ();



async fn handle(&mut self, health_check_msg: HealthCheckMessage, ctx: &ActorContext<Self>) -> ActorResult<()> {



check_health_status().await?;





self.0 += 1;



ctx.schedule_self_msg(Duration::from_secs(2), HealthCheckMessage).await;

Ok(())

}

}



#[tokio::main]

async fn main() {



let universe = Universe::new();





let health_checker = HealthActor::default();

let (_health_checker_mailbox, health_checker_handle) = universe

.spawn_actor(health_checker)

.spawn();



health_checker_handle.join().await;

}

The above is an actor that checks the health status of an imaginary service every two seconds and maintains a state that counts the number of checks it has performed since its inception. This actor sends the HealthCheckMessage to itself.

  • First in the initialize method of the Actor trait: the method that is initially run when an actor starts.
  • Then in the handle method of the Handler trait, but this time with a two seconds delay. This creates an infinite loop of HealthCheckMessage handling.

In main, we create a Universe for our actors to live in. When an actor is spawned, we get back two things:

  • A type-safe Mailbox that can be used to send messages to the corresponding actor.
  • A handle that can be used to monitor as well as wait, quit, kill, pause, and resume the actor.

All Quickwit actors are async by default. To deal with the CPU-intensive actors, we simply schedule them on a different runtime. This trick is also used by InfluxDB as exposed in this blog post. Typically, RuntimeType::Blocking.get_runtime_handle() is used to request an actor to be scheduled as synchronous actor.

Because our actors can be running in different runtimes, we had to introduce the concept of Universe to isolate groups of actors within the same process. For instance, each unit test typically starts by instantiating its own universe. A universe holds:

  • An ActorRegistry that keeps track of actors instantiated in this universe.
  • A universe’s KillSwitch to shut down all of the actors in this universe.
  • A SchedulerClient to handle and mock time.

Another feature of the universe that’s handy when testing is the method Universe::with_accelerated_time(). It provides a universe that accelerates time whenever no actor has any message to process.

schedule-compression.png

Using the actor framework in Quickwit

Concretely, here is how we use the actor framework within the Quickwit indexer service: when JSON documents arrive in Quickwit via the source actor, they are parsed by the doc-processor and buffered in an in-memory split (indexer actor). When the commit_timeout_secs has expired or the split_num_docs_target is reached, a new active split is created to keep receiving ingested documents while the previous split is finalized in the subsequent actors before being marked as searchable by the publisher. You can also distinguish the indexing pipeline (purple group) for indexing documents from the merge pipeline (green group) for merging splits that are not matured yet.

pipeline.png

quickwit-actors has proven to be a valuable asset for Quickwit. Though it lacks many features and documentation, it provides us the flexibility to mold it into what we need.

We recently released the framework under the MIT Licensed. We don’t really expect it to become a popular crate, but we hope the MIT License will make it easy for people to experiment with it and scrap whatever pieces of code they see fit.

Read More

Previous article
Next article
RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

- Advertisment -
Google search engine

Most Popular

Recent Comments