More than a year ago I released a telegram rss/atom/json feed reader bot in Rust called El Monitorro. The bot does extensive background processing to synchronize feeds and deliver updates to users.

In this post, I’ll describe my initial approach for background processing and why I created Fang - background processing library for Rust.

Sidenote: from this point, I’ll use only synchronization as my main example.

Initial approach

My initial implementation was simple, it can even be called naive. I executed every synchronization job in the tokio task in the same thread. This solution worked like a charm when the bot didn’t have a lot of users and feeds to synchronize. Every minute It was able to process ~ 200 feeds. I even wrote a post about it. It’s exactly one year old. :)

But as the bot became more popular (~ 1000 feeds), I noticed one weird issue. Some feeds were synchronizing normally, but other feeds (especially newly added) weren’t synchronizing at all. I think it’s related to a couple of problems:

• All feeds weren’t able to synchronize in the given period (1 minute)
• There is no order in a way the tokio library executes tokio tasks

I was able to mitigate this issue by increasing the synchronization period and I did it a couple of time already because the number of feeds keeps increasing.

I wasn’t satisfied for a couple of reasons:

• My main reason for using Rust for this bot was to get instant updates
• I don’t have any control over tokio tasks and I don’t have a way to inspect the total number of tasks, the current tasks that being executed etc

So I decided to create a simple background processing framework that solved all issues I had with tokio tasks

Fang

Compared with my initial approach, the approach used in Fang is not as naive, but still, it’s simple:

• Tasks are saved to the Postgres database
• Fang starts the specified number of workers. Each worker runs in a separate thread
• Each worker executes not processed tasks
• If there are no tasks left in the DB, a worker sleeps for the given number of seconds
• If any worker fails during task execution, it will be restarted

The only thing that has to be kept in mind in this approach is every task should be fetched by a single worker. Otherwise, the same task will be processed several times. It was achieved using for update skip locked in the select query:

match fang_tasks::table
.limit(1)
.for_update()
.skip_locked()
{
Ok(record) => Some(record),
_ => None,
}


Now let’s see how to use fang.

1. Every job should implement fang::Runnable trait which is used by fang to execute it.
    use fang::Error;
use fang::Runnable;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct Job {
pub number: u16,
}

#[typetag::serde]
impl Runnable for Job {
fn run(&self) -> Result<(), Error> {
println!("the number is {}", self.number);

Ok(())
}
}

1. Use WorkerPool to start workers. It accepts two parameters - the number of workers and the prefix for the worker thread name.
use fang::WorkerPool;

WorkerPool::new(10, "sync".to_string()).start();


That’s it. The usage is pretty straightforward.

Future features

• Workers for specific types of tasks. Currently, each worker executes all types of tasks

I already prepared a PR for it. And it’s already used in my bot. I’ll release it soon.

• Configurable DB records retention. Currently, fang doesn’t remove tasks from the DB.

Again, the pr is ready. I’ll try to release it soon. This feature is used in my bot.

One thing I noticed is without removing finished tasks, the processing becomes slower over time. I think it’s related to the performance of vps that I’m using. I use the cheapest vps from ovh and the number of tasks after a couple of hour of running the bot is over a couple of hundred thousands. Even though I added db indexes, queries become slower.

• Retries
• Extendable/new backends