Experiments >

Summing bucket

Experiment #7524th March, 2021by Joshua Nussbaum

Continuing on the thread of message rollups, I tried to implement an example in Elixir, but it didn’t work out.

My plan was to use 2 GenServer’s per account, one to count messages per minute, and the other to store a FIFO series of 15 data points (1 data point per minute resolution, so 15 points is 15 minutes worth of data points). My reasoning was that it would allow me to scale the window of the FIFO series based on the speed and acceleration of data.

What I learned is that 2 GenServer’s - each with their own timer - would get out of sync. So I’ll need to refactor to do a single timer to control how the data drains betweens GenServers or it might be easier to have a single GenServer.

Code

defmodule IngestBuffer do
  use GenServer

  require Logger

  @name __MODULE__
  @window 1000

  def start_link(_options) do
    GenServer.start_link(__MODULE__, nil, name: @name)
  end

  @impl true
  def init(_) do
    schedule_drain()

    {:ok, 0}
  end

  @impl true
  def handle_call(:increment, _from, counter) do
    {:reply, :ok, counter + 1}
  end

  @impl true
  def handle_info(:drain, counter) do
    IO.puts("Draining #{counter}")
    Messages.collect(nil, counter)

    schedule_drain()

    {:noreply, 0}
  end

  defp schedule_drain() do
    IO.puts "scheduling drain #{inspect self()}"
    Process.send_after(self(), :drain, @window) |> IO.inspect
  end
end

defmodule SeriesBucket do
  use GenServer

  @name __MODULE__
  @window 1000
  @length 15

  def start_link(_options) do
    GenServer.start_link(__MODULE__, nil, name: @name)
  end

  @impl true
  def init(_) do
    schedule_drain()

    {:ok, []}
  end

  @impl true
  def handle_call({:collect, value}, _from, series) do
    new_state = cond do
      length(series) >= @length ->
        [value|Enum.take(series, @length-1)]
      true ->
        [value|series]
    end

    {:noreply, :ok, new_state}
  end

  @impl true
  def handle_info(:drain, series) do
    IO.puts("Draining #{inspect series}")

    schedule_drain()

    {:noreply, 0}
  end

  defp schedule_drain() do
    IO.puts "scheduling drain #{inspect self()}"
    Process.send_after(self(), :drain, @window) |> IO.inspect
  end
end

defmodule Messages do
  def increment(_account_id) do
    GenServer.call(IngestBuffer, :increment)
  end

  def collect(_account_id, value) do
    GenServer.call(SeriesBucket, {:collect, value})
  end
end

children = [IngestBuffer, SeriesBucket]
Supervisor.start_link(children, strategy: :one_for_one)

:sys.trace(IngestBuffer, true)
IO.inspect(Process.whereis(IngestBuffer))
Messages.increment(1) |> IO.inspect
view all experiments

Stay tuned in

Learn how to add more experimentation to your workflow