# Hornet

### Motivation

Stress testing is a process of deliberately putting a system under intense load to test its stability. Stress testing can validate that a system can withstand expected stress, determine its limit and check its error handling capabilities.

Recently I wanted to stress test the project I’m working on. I checked existing load/stress testing libraries in the Elixir community. I found a couple of abandoned projects and one relatively well-maintained one - Chaperon.

The scenario I wanted is to stress test my system with constant operations per second (o/s) rate over multiple (12-24) hours. The chaperon library provides this feature with the SpreadAsync module. From its docs, it does exactly what I want: “Action that calls a function with a given rate over a given interval of time (ms)”. But after checking its source code, I found a flaw: it schedules a new process for each execution. For example, if you want to run a function with rate 3000 o/s over 20 hours, `chaperon` will start 3000 * 60 * 60 * 20 = 216_000_000 processes. If you run multiple stress tests or a single test with high o/s rate, VM machine just fails. If it does not fail, RAM usage increases over time and it may exhaust RAM completely on your machine (I think it happens because `chaperon` stores the result of each execution).

I decided to create a new library which would handle long-running stress tests more gracefully, spawning the optimal number of processes. I called this library `Hornet`. In this post, I’ll describe its design.

### High-level design

Let’s examine basic entities of the Hornet:

1. Worker - it periodically executes a given function. Usually, there are several workers during a single stress test.
2. RateCounter - it periodically calculates the current rate of o/s.
3. Scheduler - it periodically checks if the current rate is equal to the required rate. it increases the number of workers if it’s not true.

#### Worker

Hornet accepts three required parameters:

1. `rate`(o/s)
2. `func` - the anonymous function that has to be executed
3. `id` - this unique id is used for internal process names

Initially, the period of execution for a process is `process_period` (Default value is 100ms), i.e. it executes the given function every `process_period` ms . So Scheduler calculates the starting number of processes using the following logic:

• `process_rate` = rate (o/s) for a single process = 1_000 ms / `process_period`
• if `process_rate` / `rate` <= 1, it means a single process can execute the given function maintaining the required rate, so Hornet starts a single process with a period of 1_000 / `rate`
• if `process_rate` / `rate` > 1, it means a single proces can not maintain a given rate, so Hornet starts `rate / process_rate` workers with `proces_period` period

#### RateCounter

`RateCounter` keeps track of the current rate by storing `counter`. Every worker after executing the `func` function increments this counter. Periodically in `rate_period` ms RateCounter calculates the current rate with `counter` / `rate_period`.

#### Scheduler

Often situations may occur when it takes longer to execute the given function than `process_period`. For example, if the function requires heavy calculations. In these situations, scheduler stops the running workers and increases `process_period` and starts new workers.

### Implementation details

To better describe the implementation details, let’s examine code fragments I find important for each entity mentioned above. I added comments to the lines that need explanation.

#### Worker

`Worker`’s job is to periodcally execute the given given function and to increment counter in the `RateCounter` after execution.

``````defmodule Hornet.Worker do
@moduledoc false

use GenServer

def init(params) do
...

Process.send_after(self(), :run_and_schedule, interval) \\ during initialization, the next execution is scheduled

...
end

def handle_info(:run_and_schedule, state) do
execute_and_schedule(state)

end

defp execute_and_schedule(state) do
execute(state)

Process.send_after(self(), :run_and_schedule, state.interval) \\ we schedule the next execution
end

defp execute(state) do
state.func.() \\ we execute the anonymous function
:ok = RateCounter.inc(state.rate_counter) \\ and increase `counter` in `RateCounter` to calculate the current rate
end
end
``````

Sidenote: I’m using `Process.send_after/3` to schedule execution instead of `:timer.send_interval/2` becuase when experimenting with big number of workers (millions), starting large number of associated timers just freezes VM.

#### RateCounter

`RateCounter` is pretty simple. It stores counter (`count`) which is incremented by workers (`:inc`) and re-calculates the current rate periodically (`:calculate_rate`).

``````defmodule Hornet.RateCounter do
use GenServer

def init(params) do
...

{:ok, timer} = :timer.send_interval(interval, :calculate_rate) \\ we schedule rate calculation

...
end

def handle_info(:calculate_rate, state) do
rate = state.count * 1000 / state.interval \\ rate is calculated by  `current_counter / counter_calculation_interval`
new_state = %{rate: rate, count: 0, timer: state.timer, interval: state.interval} \\ and `counter` is reset

end

def handle_cast(:inc, state) do
new_state = %{state | count: state.count + 1} \\ counter is incremented by workers

end
end
``````

#### Scheduler

The most complex logic is in `Scheduler`. Its responsibilities include:

• Starting all processes (workers, counter, supervisors)
• Periodically checking the current rate and starting new processes if the current rate is not equal to the expected rate

Workers are started under a separate supervisor called `Hornet.Worker.WorkerSupervisor`. `RateCounter` and `Hornet.Worker.WorkerSupervisor` are supervised by dynamic supervisor `Hornet.DynamicSupervisor` so we can easily stop the current workers and start new ones.

``````defmodule Hornet.Scheduler do
use GenServer

def init(params) do

...

{:ok, supervisor} = HornetDynamicSupervisor.start_link() \\ Hornet starts workers under DynamicSupervisor, so they can be easily replaced

{:ok, rate_counter} =
DynamicSupervisor.start_child(supervisor, %{
id: RateCounter,
})  \\ here we start Scheduler

{pid, workers_count} = start_workers(supervisor, worker_params, rate_counter, period) \\ the implementation of `start_workers` is listed below

...

end

...

cond do
correct_rate?(state) -> \\ if the current rate is correct, we do nothing

...

true ->
adjust_workers(state) \\ if the expected rate can not be reached, we adjust workers
end
end

:ok = DynamicSupervisor.terminate_child(state.supervisor, state.worker_supervisor) \\ stop the current workers
new_period = state.period + state.period_step \\ we increase execution period for a single process
{pid, workers_count} =
start_workers(state.supervisor, state.params, state.rate_counter, new_period) \\ and we start new workers

new_state = %{
state
| worker_supervisor: pid,
current_workers_count: workers_count,
period: new_period
}

end

defp correct_rate?(state) do
current_rate = RateCounter.rate(state.rate_counter) \\ fetch the current rate from RateCounter
expected_rate = state.params[:rate]
error_rate = expected_rate * state.error_rate   \\ we allow error rate. by default it is 10 % (0.1)

if current_rate > expected_rate do
current_rate - expected_rate < error_rate
else
expected_rate - current_rate < error_rate
end
end

defp start_workers(supervisor, params, rate_counter, period) do
...

{interval, initial_workers_number} = calculate_workers_number(rate, period)

...

{:ok, pid} =
DynamicSupervisor.start_child(supervisor, %{
id: :worker_supervisor,
type: :supervisor
})

{pid, initial_workers_number}
end

defp calculate_workers_number(rate, period) do
tps = 1_000 / period

if rate / tps <= 1 do
period = round(1000 / rate)

{period, 1}
else
workers = round(rate / tps)

{period, workers}
end
end
end
``````

### Conclusion

I hope the library will be useful for stress testing. The libary is availale on GitHub - https://github.com/ayrat555/hornet.

Categories:

Updated: