Hornet

5 minute read

img

Motivation

Stress testing is a process of deliberately putting a system under intense load to test its stability. Stress testing can validate that a system can withstand expected stress, determine its limit and check its error handling capabilities.

Recently I wanted to stress test the project I’m working on. I checked existing load/stress testing libraries in the Elixir community. I found a couple of abandoned projects and one relatively well-maintained one - Chaperon.

The scenario I wanted is to stress test my system with constant operations per second (o/s) rate over multiple (12-24) hours. The chaperon library provides this feature with the SpreadAsync module. From its docs, it does exactly what I want: “Action that calls a function with a given rate over a given interval of time (ms)”. But after checking its source code, I found a flaw: it schedules a new process for each execution. For example, if you want to run a function with rate 3000 o/s over 20 hours, chaperon will start 3000 * 60 * 60 * 20 = 216_000_000 processes. If you run multiple stress tests or a single test with high o/s rate, VM machine just fails. If it does not fail, RAM usage increases over time and it may exhaust RAM completely on your machine (I think it happens because chaperon stores the result of each execution).

I decided to create a new library which would handle long-running stress tests more gracefully, spawning the optimal number of processes. I called this library Hornet. In this post, I’ll describe its design.

High-level design

Let’s examine basic entities of the Hornet:

  1. Worker - it periodically executes a given function. Usually, there are several workers during a single stress test.
  2. RateCounter - it periodically calculates the current rate of o/s.
  3. Scheduler - it periodically checks if the current rate is equal to the required rate. it increases the number of workers if it’s not true.

Worker

Hornet accepts three required parameters:

  1. rate(o/s)
  2. func - the anonymous function that has to be executed
  3. id - this unique id is used for internal process names

Initially, the period of execution for a process is process_period (Default value is 100ms), i.e. it executes the given function every process_period ms . So Scheduler calculates the starting number of processes using the following logic:

  • process_rate = rate (o/s) for a single process = 1_000 ms / process_period
  • if process_rate / rate <= 1, it means a single process can execute the given function maintaining the required rate, so Hornet starts a single process with a period of 1_000 / rate
  • if process_rate / rate > 1, it means a single proces can not maintain a given rate, so Hornet starts rate / process_rate workers with proces_period period

RateCounter

RateCounter keeps track of the current rate by storing counter. Every worker after executing the func function increments this counter. Periodically in rate_period ms RateCounter calculates the current rate with counter / rate_period.

Scheduler

Often situations may occur when it takes longer to execute the given function than process_period. For example, if the function requires heavy calculations. In these situations, scheduler stops the running workers and increases process_period and starts new workers.

Implementation details

To better describe the implementation details, let’s examine code fragments I find important for each entity mentioned above. I added comments to the lines that need explanation.

Worker

Worker’s job is to periodcally execute the given given function and to increment counter in the RateCounter after execution.

defmodule Hornet.Worker do
  @moduledoc false

  use GenServer

  def init(params) do
    ...

    Process.send_after(self(), :run_and_schedule, interval) \\ during initialization, the next execution is scheduled

    ...
  end

  def handle_info(:run_and_schedule, state) do
    execute_and_schedule(state)

    {:noreply, state}
  end

  defp execute_and_schedule(state) do
    execute(state)

    Process.send_after(self(), :run_and_schedule, state.interval) \\ we schedule the next execution
  end

  defp execute(state) do
    state.func.() \\ we execute the anonymous function
    :ok = RateCounter.inc(state.rate_counter) \\ and increase `counter` in `RateCounter` to calculate the current rate
  end
end

Sidenote: I’m using Process.send_after/3 to schedule execution instead of :timer.send_interval/2 becuase when experimenting with big number of workers (millions), starting large number of associated timers just freezes VM.

RateCounter

RateCounter is pretty simple. It stores counter (count) which is incremented by workers (:inc) and re-calculates the current rate periodically (:calculate_rate).

defmodule Hornet.RateCounter do
  use GenServer

  def init(params) do
    ...

    {:ok, timer} = :timer.send_interval(interval, :calculate_rate) \\ we schedule rate calculation

    ...
  end

  def handle_info(:calculate_rate, state) do
    rate = state.count * 1000 / state.interval \\ rate is calculated by  `current_counter / counter_calculation_interval`
    new_state = %{rate: rate, count: 0, timer: state.timer, interval: state.interval} \\ and `counter` is reset

    {:noreply, new_state}
  end

  def handle_cast(:inc, state) do
    new_state = %{state | count: state.count + 1} \\ counter is incremented by workers

    {:noreply, new_state}
  end
end

Scheduler

The most complex logic is in Scheduler. Its responsibilities include:

  • Starting all processes (workers, counter, supervisors)
  • Periodically checking the current rate and starting new processes if the current rate is not equal to the expected rate

Workers are started under a separate supervisor called Hornet.Worker.WorkerSupervisor. RateCounter and Hornet.Worker.WorkerSupervisor are supervised by dynamic supervisor Hornet.DynamicSupervisor so we can easily stop the current workers and start new ones.

defmodule Hornet.Scheduler do
  use GenServer

  def init(params) do

    ...

    {:ok, supervisor} = HornetDynamicSupervisor.start_link() \\ Hornet starts workers under DynamicSupervisor, so they can be easily replaced

    {:ok, rate_counter} =
      DynamicSupervisor.start_child(supervisor, %{
        id: RateCounter,
        start: {RateCounter, :start_link, [[interval: rate_period]]}
      })  \\ here we start Scheduler

    {pid, workers_count} = start_workers(supervisor, worker_params, rate_counter, period) \\ the implementation of `start_workers` is listed below

    {:ok, timer} = :timer.send_interval(adjust_period, :adjust_workers) \\ we periodcally adjust the number of workers

    ...

  end

  ...

  def handle_info(:adjust_workers, state) do
    cond do
      correct_rate?(state) -> \\ if the current rate is correct, we do nothing
        {:noreply, state}

      ...

      true ->
        adjust_workers(state) \\ if the expected rate can not be reached, we adjust workers
    end
  end

  defp adjust_workers(state) do
    :ok = DynamicSupervisor.terminate_child(state.supervisor, state.worker_supervisor) \\ stop the current workers
    new_period = state.period + state.period_step \\ we increase execution period for a single process
    {pid, workers_count} =
      start_workers(state.supervisor, state.params, state.rate_counter, new_period) \\ and we start new workers

    new_state = %{
      state
      | worker_supervisor: pid,
        current_workers_count: workers_count,
        period: new_period
    }

    {:noreply, new_state}
  end

  defp correct_rate?(state) do
    current_rate = RateCounter.rate(state.rate_counter) \\ fetch the current rate from RateCounter
    expected_rate = state.params[:rate]
    error_rate = expected_rate * state.error_rate   \\ we allow error rate. by default it is 10 % (0.1)

    if current_rate > expected_rate do
      current_rate - expected_rate < error_rate
    else
      expected_rate - current_rate < error_rate
    end
  end

  defp start_workers(supervisor, params, rate_counter, period) do
    ...


    {interval, initial_workers_number} = calculate_workers_number(rate, period)

    ...

    {:ok, pid} =
      DynamicSupervisor.start_child(supervisor, %{
        id: :worker_supervisor,
        start: {WorkerSupervisor, :start_link, [params]},
        type: :supervisor
      })

    {pid, initial_workers_number}
  end

  defp calculate_workers_number(rate, period) do
    tps = 1_000 / period

    if rate / tps <= 1 do
      period = round(1000 / rate)

      {period, 1}
    else
      workers = round(rate / tps)

      {period, workers}
    end
  end
end

Conclusion

I hope the library will be useful for stress testing. The libary is availale on GitHub - https://github.com/ayrat555/hornet.

Categories:

Updated:

Comments