From 89f6eb3a0661a22f34742e68667bcf9645dbc3fc Mon Sep 17 00:00:00 2001 From: Daniel Kukula Date: Sat, 15 Feb 2025 09:01:56 +0100 Subject: [PATCH 1/3] use atom keys --- lib/one_brc/measurements_processor.ex | 1 + lib/one_brc/versions/version_9.ex | 183 ++++++++++++++++++++++++++ 2 files changed, 184 insertions(+) create mode 100644 lib/one_brc/versions/version_9.ex diff --git a/lib/one_brc/measurements_processor.ex b/lib/one_brc/measurements_processor.ex index 08e9014..531f4cc 100644 --- a/lib/one_brc/measurements_processor.ex +++ b/lib/one_brc/measurements_processor.ex @@ -30,6 +30,7 @@ defmodule OneBRC.MeasurementsProcessor do "6" -> OneBRC.MeasurementsProcessor.Version6.process(count) "7" -> OneBRC.MeasurementsProcessor.Version7.process(count) "8" -> OneBRC.MeasurementsProcessor.Version8.process(count) + "9" -> OneBRC.MeasurementsProcessor.Version9.process(count) _ -> raise "Unknown version" end end diff --git a/lib/one_brc/versions/version_9.ex b/lib/one_brc/versions/version_9.ex new file mode 100644 index 0000000..f2126fb --- /dev/null +++ b/lib/one_brc/versions/version_9.ex @@ -0,0 +1,183 @@ +defmodule OneBRC.MeasurementsProcessor.Version9.Worker do + def run(parent_pid) do + send(parent_pid, {:give_work, self()}) + + receive do + {:do_work, chunk} -> + process_chunk(chunk) + run(parent_pid) + + :result -> + send(parent_pid, {:result, :erlang.get()}) + end + end + + defp process_chunk(bin) do + process_chunk_lines(bin) + end + + defp process_chunk_lines(<<>>) do + :ok + end + + defp process_chunk_lines(bin) do + parse_weather_station(bin, bin, 0) + end + + defp parse_weather_station(bin, <>, count) do + <> = bin + parse_temp(temp_bin, String.to_atom(key)) + end + + defp parse_weather_station(bin, <<_c, rest::binary>>, count) do + parse_weather_station(bin, rest, count + 1) + end + + defp parse_weather_station(_bin, <<>>, _count) do + :ok + end + + # ex: 4.5 + defp parse_temp(<>, key) do + parse_temp(rest, key, -1) + end + + defp parse_temp(rest, key) do + parse_temp(rest, key, 1) + end + + defp parse_temp(<<_::4, d1::4, ?., _::4, d2::4, "\n", rest::binary>>, key, mult) do + temp = mult * (d1 * 10 + d2) + process_row(key, temp) + process_chunk_lines(rest) + end + + # ex: 45.3 + defp parse_temp(<<_::4, d1::4, _::4, d2::4, ?., _::4, d3::4, "\n", rest::binary>>, key, mult) do + temp = mult * (d1 * 100 + d2 * 10 + d3) + process_row(key, temp) + process_chunk_lines(rest) + end + + defp process_row(key, val) do + existing_record = :erlang.get(key) + + case existing_record do + :undefined -> + :erlang.put(key, {1, val, val, val}) + + {count, sum, min, max} -> + :erlang.put(key, {count + 1, sum + val, min(min, val), max(max, val)}) + end + end +end + +defmodule OneBRC.MeasurementsProcessor.Version9 do + @moduledoc """ + diff from version 7: + 1. removed :binary.split in process_chunk, using recursive parsing with pattern matching instead. + + Performance: Processes 10 million rows in approx 300ms + """ + import OneBRC.MeasurementsProcessor + alias OneBRC.MeasurementsProcessor.Version9.Worker + + require Logger + + def process(count) do + t1 = System.monotonic_time(:millisecond) + file_path = measurements_file(count) + worker_count = System.schedulers_online() + parent = self() + + wpids = + Enum.map(1..worker_count, fn _ -> + spawn_link(fn -> + Worker.run(parent) + end) + end) + + {:ok, file} = :prim_file.open(file_path, [:raw, :binary, :read]) + :ok = read_and_process(file) + :prim_file.close(file) + + results = + wpids + |> Enum.map(fn wpid -> + send(wpid, :result) + + receive do + {:result, result} -> result + end + end) + + t2 = System.monotonic_time(:millisecond) + + result = + for outer <- results, {key, {count_1, sum_1, min_1, max_1}} <- outer, reduce: %{} do + %{^key => {count_2, sum_2, min_2, max_2}} = acc -> + Map.put(acc, key, { + count_1 + count_2, + sum_1 + sum_2, + min(min_1, min_2), + max(max_1, max_2) + }) + + acc -> + Map.put(acc, key, {count_1, sum_1, min_1, max_1}) + end + |> Enum.sort_by(fn {key, _} -> key end, :desc) + + t3 = System.monotonic_time(:millisecond) + + result_txt = + result + |> Enum.reduce([], fn {key, {count, sum, min, max}}, acc -> + [ + to_string(key), + ?;, + to_string(min / 10.0), + ?;, + to_string(round_to_single_decimal(sum / count / 10.0)), + ?;, + to_string(max / 10.0), + ?\n | acc + ] + end) + + t4 = System.monotonic_time(:millisecond) + + Logger.info("Processing data, stage 1 (processing) took: #{t2 - t1} ms") + Logger.info("Processing data, stage 2 (aggregating) took: #{t3 - t2} ms") + Logger.info("Processing data, stage 3 (sorting & txt creation) took: #{t4 - t3} ms") + + result_txt + end + + defp read_and_process(file) do + chunk_size = 1024 * 1024 * 1 + + case :prim_file.read(file, chunk_size) do + :eof -> + :ok + + {:ok, data} -> + data = + case :prim_file.read_line(file) do + {:ok, line} -> <> + :eof -> data + end + + receive do + {:give_work, worker_pid} -> + send(worker_pid, {:do_work, data}) + end + + read_and_process(file) + end + end + + defp round_to_single_decimal(number) do + round(number * 10) / 10.0 + end +end From 4c50a2d2bba85a7ac68e453a4a82bfc732bd2162 Mon Sep 17 00:00:00 2001 From: Daniel Kukula Date: Sat, 15 Feb 2025 20:50:20 +0100 Subject: [PATCH 2/3] split to separate versions --- lib/one_brc/measurements_processor.ex | 2 + lib/one_brc/versions/version_10.ex | 194 ++++++++++++++++++++++++++ lib/one_brc/versions/version_11.ex | 192 +++++++++++++++++++++++++ lib/one_brc/versions/version_9.ex | 84 +++++------ 4 files changed, 433 insertions(+), 39 deletions(-) create mode 100644 lib/one_brc/versions/version_10.ex create mode 100644 lib/one_brc/versions/version_11.ex diff --git a/lib/one_brc/measurements_processor.ex b/lib/one_brc/measurements_processor.ex index 531f4cc..b21c3bd 100644 --- a/lib/one_brc/measurements_processor.ex +++ b/lib/one_brc/measurements_processor.ex @@ -31,6 +31,8 @@ defmodule OneBRC.MeasurementsProcessor do "7" -> OneBRC.MeasurementsProcessor.Version7.process(count) "8" -> OneBRC.MeasurementsProcessor.Version8.process(count) "9" -> OneBRC.MeasurementsProcessor.Version9.process(count) + "10" -> OneBRC.MeasurementsProcessor.Version10.process(count) + "11" -> OneBRC.MeasurementsProcessor.Version11.process(count) _ -> raise "Unknown version" end end diff --git a/lib/one_brc/versions/version_10.ex b/lib/one_brc/versions/version_10.ex new file mode 100644 index 0000000..b209c6c --- /dev/null +++ b/lib/one_brc/versions/version_10.ex @@ -0,0 +1,194 @@ +defmodule OneBRC.MeasurementsProcessor.Version10.Worker do + @compile inline: [ + parse_weather_station: 3, + parse_temp: 2, + process_row: 2 + ] + def run(parent_pid) do + send(parent_pid, {:give_work, self()}) + + receive do + {:do_work, chunk} -> + process_chunk(chunk) + run(parent_pid) + + :result -> + send(parent_pid, {:result, :erlang.get()}) + end + end + + defp process_chunk(bin) do + process_chunk_lines(bin) + end + + defp process_chunk_lines(<<>>) do + :ok + end + + defp process_chunk_lines(bin) do + parse_weather_station(bin, bin, 0) + end + + defp parse_weather_station(bin, <<";", _rest::binary>>, count) do + <> = bin + parse_temp(temp_bin, String.to_atom(key)) + end + + defp parse_weather_station(bin, <<_c, rest::binary>>, count) do + parse_weather_station(bin, rest, count + 1) + end + + defp parse_weather_station(_bin, <<>>, _count) do + :ok + end + + # ex: -4.5 + defp parse_temp(<>, key) do + temp = -(char_to_num(d1) * 10 + char_to_num(d2)) + process_row(key, temp) + process_chunk_lines(rest) + end + + # ex: 4.5 + defp parse_temp(<>, key) do + temp = char_to_num(d1) * 10 + char_to_num(d2) + process_row(key, temp) + process_chunk_lines(rest) + end + + # ex: -45.3 + defp parse_temp(<>, key) do + temp = -(char_to_num(d1) * 100 + char_to_num(d2) * 10 + char_to_num(d3)) + process_row(key, temp) + process_chunk_lines(rest) + end + + # ex: 45.3 + defp parse_temp(<>, key) do + temp = char_to_num(d1) * 100 + char_to_num(d2) * 10 + char_to_num(d3) + process_row(key, temp) + process_chunk_lines(rest) + end + + defp process_row(key, val) do + existing_record = :erlang.get(key) + + case existing_record do + :undefined -> + :erlang.put(key, {1, val, val, val}) + + {count, sum, min, max} -> + :erlang.put(key, {count + 1, sum + val, min(min, val), max(max, val)}) + end + end + + defp char_to_num(c) do + c - ?0 + end +end + +defmodule OneBRC.MeasurementsProcessor.Version10 do + @moduledoc """ + diff from version 9: + 1. Inline parsing functions + """ + import OneBRC.MeasurementsProcessor + alias OneBRC.MeasurementsProcessor.Version10.Worker + + require Logger + + def process(count) do + t1 = System.monotonic_time(:millisecond) + file_path = measurements_file(count) + worker_count = System.schedulers_online() + parent = self() + + wpids = + Enum.map(1..worker_count, fn _ -> + spawn_link(fn -> + Worker.run(parent) + end) + end) + + {:ok, file} = :prim_file.open(file_path, [:raw, :binary, :read]) + :ok = read_and_process(file) + :prim_file.close(file) + + results = + wpids + |> Enum.map(fn wpid -> + send(wpid, :result) + + receive do + {:result, result} -> result + end + end) + + t2 = System.monotonic_time(:millisecond) + + result = + results + |> List.flatten() + |> Enum.reduce(%{}, fn {key, {count_1, sum_1, min_1, max_1}}, acc -> + case Map.fetch(acc, key) do + :error -> + Map.put(acc, key, {count_1, sum_1, min_1, max_1}) + + {:ok, {count_2, sum_2, min_2, max_2}} -> + Map.put(acc, key, { + count_1 + count_2, + sum_1 + sum_2, + min(min_1, min_2), + max(max_1, max_2) + }) + end + end) + |> Enum.map(fn {key, {count, sum, min, max}} -> + {key, {min / 10.0, round_to_single_decimal(sum / count / 10.0), max / 10.0}} + end) + |> Enum.sort_by(fn {key, _} -> key end) + + t3 = System.monotonic_time(:millisecond) + + result_txt = + result + |> Enum.reduce("", fn {key, {min, mean, max}}, acc -> + acc <> "#{key};#{min};#{mean};#{max}\n" + end) + + t4 = System.monotonic_time(:millisecond) + + Logger.info("Processing data, stage 1 (processing) took: #{t2 - t1} ms") + Logger.info("Processing data, stage 2 (aggregating) took: #{t3 - t2} ms") + Logger.info("Processing data, stage 3 (sorting & txt creation) took: #{t4 - t3} ms") + + result_txt + end + + defp read_and_process(file) do + chunk_size = 1024 * 1024 * 1 + + case :prim_file.read(file, chunk_size) do + :eof -> + :ok + + {:ok, data} -> + data = + case :prim_file.read_line(file) do + {:ok, line} -> <> + :eof -> data + end + + receive do + {:give_work, worker_pid} -> + send(worker_pid, {:do_work, data}) + end + + read_and_process(file) + end + end + + defp round_to_single_decimal(number) do + round(number * 10) / 10.0 + end +end diff --git a/lib/one_brc/versions/version_11.ex b/lib/one_brc/versions/version_11.ex new file mode 100644 index 0000000..3733c05 --- /dev/null +++ b/lib/one_brc/versions/version_11.ex @@ -0,0 +1,192 @@ +defmodule OneBRC.MeasurementsProcessor.Version11.Worker do + @compile inline: [ + parse_weather_station: 3, + parse_temp: 2, + parse_temp: 3, + process_row: 3 + ] + def run(parent_pid) do + send(parent_pid, {:give_work, self()}) + + receive do + {:do_work, chunk} -> + process_chunk(chunk) + run(parent_pid) + + :result -> + send(parent_pid, {:result, :erlang.get()}) + end + end + + defp process_chunk(bin) do + process_chunk_lines(bin) + end + + defp process_chunk_lines(<>) do + process_chunk_lines(rest) + end + + defp process_chunk_lines(<<>>) do + :ok + end + + defp process_chunk_lines(bin) do + parse_weather_station(bin, bin, 0) + end + + defp parse_weather_station(bin, <>, count) do + <> = bin + parse_temp(temp_bin, String.to_atom(key)) + end + + defp parse_weather_station(bin, <<_c, rest::binary>>, count) do + parse_weather_station(bin, rest, count + 1) + end + + defp parse_weather_station(_bin, <<>>, _count) do + :ok + end + + # ex: 4.5 + defp parse_temp(<>, key) do + parse_temp(rest, key, -1) + end + + defp parse_temp(rest, key) do + parse_temp(rest, key, 1) + end + + defp parse_temp(<<_::4, d1::4, ?., _::4, d2::4, rest::binary>>, key, mult) do + temp = mult * (d1 * 10 + d2) + existing_record = :erlang.get(key) + process_row(key, existing_record, temp) + process_chunk_lines(rest) + end + + # ex: 45.3 + defp parse_temp(<<_::4, d1::4, _::4, d2::4, ?., _::4, d3::4, rest::binary>>, key, mult) do + temp = mult * (d1 * 100 + d2 * 10 + d3) + existing_record = :erlang.get(key) + process_row(key, existing_record, temp) + process_chunk_lines(rest) + end + + defp process_row(key, :undefined, val) do + :erlang.put(key, {1, val, val, val}) + end + + defp process_row(key, {count, sum, min, max}, val) do + :erlang.put(key, {count + 1, sum + val, min(min, val), max(max, val)}) + end +end + +defmodule OneBRC.MeasurementsProcessor.Version11 do + @moduledoc """ + diff from version 10: + 1. reduce function heads + 2. refactor processing + + Performance: Processes 10 million rows in approx 300ms + """ + import OneBRC.MeasurementsProcessor + alias OneBRC.MeasurementsProcessor.Version11.Worker + + require Logger + + def process(count) do + t1 = System.monotonic_time(:millisecond) + file_path = measurements_file(count) + worker_count = System.schedulers_online() + parent = self() + + wpids = + Enum.map(1..worker_count, fn _ -> + spawn_link(fn -> + Worker.run(parent) + end) + end) + + {:ok, file} = :prim_file.open(file_path, [:raw, :binary, :read]) + :ok = read_and_process(file) + :prim_file.close(file) + + results = + wpids + |> Enum.map(fn wpid -> + send(wpid, :result) + + receive do + {:result, result} -> result + end + end) + + t2 = System.monotonic_time(:millisecond) + + result = + for outer <- results, {key, {count_1, sum_1, min_1, max_1}} <- outer, reduce: %{} do + %{^key => {count_2, sum_2, min_2, max_2}} = acc -> + Map.put(acc, key, { + count_1 + count_2, + sum_1 + sum_2, + min(min_1, min_2), + max(max_1, max_2) + }) + + acc -> + Map.put(acc, key, {count_1, sum_1, min_1, max_1}) + end + |> Enum.sort_by(fn {key, _} -> key end, :desc) + + t3 = System.monotonic_time(:millisecond) + + result_txt = + result + |> Enum.reduce([], fn {key, {count, sum, min, max}}, acc -> + [ + Atom.to_string(key), + ?;, + Float.to_string(min / 10.0), + ?;, + Float.to_string(round_to_single_decimal(sum / count / 10.0)), + ?;, + Float.to_string(max / 10.0), + ?\n | acc + ] + end) + + t4 = System.monotonic_time(:millisecond) + + Logger.info("Processing data, stage 1 (processing) took: #{t2 - t1} ms") + Logger.info("Processing data, stage 2 (aggregating) took: #{t3 - t2} ms") + Logger.info("Processing data, stage 3 (sorting & txt creation) took: #{t4 - t3} ms") + + result_txt + end + + defp read_and_process(file) do + chunk_size = 1024 * 1024 * 1 + + case :prim_file.read(file, chunk_size) do + :eof -> + :ok + + {:ok, data} -> + data = + case :prim_file.read_line(file) do + {:ok, line} -> <> + :eof -> data + end + + receive do + {:give_work, worker_pid} -> + send(worker_pid, {:do_work, data}) + end + + read_and_process(file) + end + end + + defp round_to_single_decimal(number) do + round(number * 10) / 10.0 + end +end diff --git a/lib/one_brc/versions/version_9.ex b/lib/one_brc/versions/version_9.ex index f2126fb..d799513 100644 --- a/lib/one_brc/versions/version_9.ex +++ b/lib/one_brc/versions/version_9.ex @@ -24,8 +24,8 @@ defmodule OneBRC.MeasurementsProcessor.Version9.Worker do parse_weather_station(bin, bin, 0) end - defp parse_weather_station(bin, <>, count) do - <> = bin + defp parse_weather_station(bin, <<";", _rest::binary>>, count) do + <> = bin parse_temp(temp_bin, String.to_atom(key)) end @@ -37,24 +37,30 @@ defmodule OneBRC.MeasurementsProcessor.Version9.Worker do :ok end - # ex: 4.5 - defp parse_temp(<>, key) do - parse_temp(rest, key, -1) + # ex: -4.5 + defp parse_temp(<>, key) do + temp = -(char_to_num(d1) * 10 + char_to_num(d2)) + process_row(key, temp) + process_chunk_lines(rest) end - defp parse_temp(rest, key) do - parse_temp(rest, key, 1) + # ex: 4.5 + defp parse_temp(<>, key) do + temp = char_to_num(d1) * 10 + char_to_num(d2) + process_row(key, temp) + process_chunk_lines(rest) end - defp parse_temp(<<_::4, d1::4, ?., _::4, d2::4, "\n", rest::binary>>, key, mult) do - temp = mult * (d1 * 10 + d2) + # ex: -45.3 + defp parse_temp(<>, key) do + temp = -(char_to_num(d1) * 100 + char_to_num(d2) * 10 + char_to_num(d3)) process_row(key, temp) process_chunk_lines(rest) end # ex: 45.3 - defp parse_temp(<<_::4, d1::4, _::4, d2::4, ?., _::4, d3::4, "\n", rest::binary>>, key, mult) do - temp = mult * (d1 * 100 + d2 * 10 + d3) + defp parse_temp(<>, key) do + temp = char_to_num(d1) * 100 + char_to_num(d2) * 10 + char_to_num(d3) process_row(key, temp) process_chunk_lines(rest) end @@ -70,14 +76,16 @@ defmodule OneBRC.MeasurementsProcessor.Version9.Worker do :erlang.put(key, {count + 1, sum + val, min(min, val), max(max, val)}) end end + + defp char_to_num(c) do + c - ?0 + end end defmodule OneBRC.MeasurementsProcessor.Version9 do @moduledoc """ - diff from version 7: - 1. removed :binary.split in process_chunk, using recursive parsing with pattern matching instead. - - Performance: Processes 10 million rows in approx 300ms + diff from version 8: + 1. Use String.to_atom for cities """ import OneBRC.MeasurementsProcessor alias OneBRC.MeasurementsProcessor.Version9.Worker @@ -114,35 +122,33 @@ defmodule OneBRC.MeasurementsProcessor.Version9 do t2 = System.monotonic_time(:millisecond) result = - for outer <- results, {key, {count_1, sum_1, min_1, max_1}} <- outer, reduce: %{} do - %{^key => {count_2, sum_2, min_2, max_2}} = acc -> - Map.put(acc, key, { - count_1 + count_2, - sum_1 + sum_2, - min(min_1, min_2), - max(max_1, max_2) - }) - - acc -> - Map.put(acc, key, {count_1, sum_1, min_1, max_1}) - end - |> Enum.sort_by(fn {key, _} -> key end, :desc) + results + |> List.flatten() + |> Enum.reduce(%{}, fn {key, {count_1, sum_1, min_1, max_1}}, acc -> + case Map.fetch(acc, key) do + :error -> + Map.put(acc, key, {count_1, sum_1, min_1, max_1}) + + {:ok, {count_2, sum_2, min_2, max_2}} -> + Map.put(acc, key, { + count_1 + count_2, + sum_1 + sum_2, + min(min_1, min_2), + max(max_1, max_2) + }) + end + end) + |> Enum.map(fn {key, {count, sum, min, max}} -> + {key, {min / 10.0, round_to_single_decimal(sum / count / 10.0), max / 10.0}} + end) + |> Enum.sort_by(fn {key, _} -> key end) t3 = System.monotonic_time(:millisecond) result_txt = result - |> Enum.reduce([], fn {key, {count, sum, min, max}}, acc -> - [ - to_string(key), - ?;, - to_string(min / 10.0), - ?;, - to_string(round_to_single_decimal(sum / count / 10.0)), - ?;, - to_string(max / 10.0), - ?\n | acc - ] + |> Enum.reduce("", fn {key, {min, mean, max}}, acc -> + acc <> "#{key};#{min};#{mean};#{max}\n" end) t4 = System.monotonic_time(:millisecond) From 31a842c5754661159753ab173dae936f7b7d22b1 Mon Sep 17 00:00:00 2001 From: Daniel Kukula Date: Sun, 29 Jun 2025 08:08:08 +0200 Subject: [PATCH 3/3] v12 reduce stacktrace depth --- lib/one_brc/measurements_processor.ex | 1 + lib/one_brc/versions/version_12.ex | 192 ++++++++++++++++++++++++++ 2 files changed, 193 insertions(+) create mode 100644 lib/one_brc/versions/version_12.ex diff --git a/lib/one_brc/measurements_processor.ex b/lib/one_brc/measurements_processor.ex index b21c3bd..2714e51 100644 --- a/lib/one_brc/measurements_processor.ex +++ b/lib/one_brc/measurements_processor.ex @@ -33,6 +33,7 @@ defmodule OneBRC.MeasurementsProcessor do "9" -> OneBRC.MeasurementsProcessor.Version9.process(count) "10" -> OneBRC.MeasurementsProcessor.Version10.process(count) "11" -> OneBRC.MeasurementsProcessor.Version11.process(count) + "12" -> OneBRC.MeasurementsProcessor.Version12.process(count) _ -> raise "Unknown version" end end diff --git a/lib/one_brc/versions/version_12.ex b/lib/one_brc/versions/version_12.ex new file mode 100644 index 0000000..30f379e --- /dev/null +++ b/lib/one_brc/versions/version_12.ex @@ -0,0 +1,192 @@ +defmodule OneBRC.MeasurementsProcessor.Version12.Worker do + @compile inline: [ + parse_weather_station: 3, + parse_temp: 2, + parse_temp: 3, + process_row: 3 + ] + def run(parent_pid) do + :erlang.system_flag(:backtrace_depth, 0) + send(parent_pid, {:give_work, self()}) + + receive do + {:do_work, chunk} -> + process_chunk(chunk) + run(parent_pid) + + :result -> + send(parent_pid, {:result, :erlang.get()}) + end + end + + defp process_chunk(bin) do + process_chunk_lines(bin) + end + + defp process_chunk_lines(<>) do + process_chunk_lines(rest) + end + + defp process_chunk_lines(<<>>) do + :ok + end + + defp process_chunk_lines(bin) do + parse_weather_station(bin, bin, 0) + end + + defp parse_weather_station(bin, <>, count) do + <> = bin + parse_temp(temp_bin, String.to_atom(key)) + end + + defp parse_weather_station(bin, <<_c, rest::binary>>, count) do + parse_weather_station(bin, rest, count + 1) + end + + defp parse_weather_station(_bin, <<>>, _count) do + :ok + end + + # ex: 4.5 + defp parse_temp(<>, key) do + parse_temp(rest, key, -1) + end + + defp parse_temp(rest, key) do + parse_temp(rest, key, 1) + end + + defp parse_temp(<<_::4, d1::4, ?., _::4, d2::4, rest::binary>>, key, mult) do + temp = mult * (d1 * 10 + d2) + existing_record = :erlang.get(key) + process_row(key, existing_record, temp) + process_chunk_lines(rest) + end + + # ex: 45.3 + defp parse_temp(<<_::4, d1::4, _::4, d2::4, ?., _::4, d3::4, rest::binary>>, key, mult) do + temp = mult * (d1 * 100 + d2 * 10 + d3) + existing_record = :erlang.get(key) + process_row(key, existing_record, temp) + process_chunk_lines(rest) + end + + defp process_row(key, :undefined, val) do + :erlang.put(key, {1, val, val, val}) + end + + defp process_row(key, {count, sum, min, max}, val) do + :erlang.put(key, {count + 1, sum + val, min(min, val), max(max, val)}) + end +end + +defmodule OneBRC.MeasurementsProcessor.Version12 do + @moduledoc """ + diff from version 11: + 1. backtrace_depth set to 0 + + Performance: Processes 10 million rows in approx 300ms + """ + import OneBRC.MeasurementsProcessor + alias OneBRC.MeasurementsProcessor.Version12.Worker + + require Logger + + def process(count) do + t1 = System.monotonic_time(:millisecond) + file_path = measurements_file(count) + worker_count = System.schedulers_online() + parent = self() + + wpids = + Enum.map(1..worker_count, fn _ -> + spawn_link(fn -> + Worker.run(parent) + end) + end) + + {:ok, file} = :prim_file.open(file_path, [:raw, :binary, :read]) + :ok = read_and_process(file) + :prim_file.close(file) + + results = + wpids + |> Enum.map(fn wpid -> + send(wpid, :result) + + receive do + {:result, result} -> result + end + end) + + t2 = System.monotonic_time(:millisecond) + + result = + for outer <- results, {key, {count_1, sum_1, min_1, max_1}} <- outer, reduce: %{} do + %{^key => {count_2, sum_2, min_2, max_2}} = acc -> + Map.put(acc, key, { + count_1 + count_2, + sum_1 + sum_2, + min(min_1, min_2), + max(max_1, max_2) + }) + + acc -> + Map.put(acc, key, {count_1, sum_1, min_1, max_1}) + end + |> Enum.sort_by(fn {key, _} -> key end, :desc) + + t3 = System.monotonic_time(:millisecond) + + result_txt = + result + |> Enum.reduce([], fn {key, {count, sum, min, max}}, acc -> + [ + Atom.to_string(key), + ?;, + Float.to_string(min / 10.0), + ?;, + Float.to_string(round_to_single_decimal(sum / count / 10.0)), + ?;, + Float.to_string(max / 10.0), + ?\n | acc + ] + end) + + t4 = System.monotonic_time(:millisecond) + + Logger.info("Processing data, stage 1 (processing) took: #{t2 - t1} ms") + Logger.info("Processing data, stage 2 (aggregating) took: #{t3 - t2} ms") + Logger.info("Processing data, stage 3 (sorting & txt creation) took: #{t4 - t3} ms") + + result_txt + end + + defp read_and_process(file) do + chunk_size = 1024 * 1024 * 1 + + case :prim_file.read(file, chunk_size) do + :eof -> + :ok + + {:ok, data} -> + data = + case :prim_file.read_line(file) do + {:ok, line} -> <> + :eof -> data + end + + receive do + {:give_work, worker_pid} -> + send(worker_pid, {:do_work, data}) + end + + read_and_process(file) + end + end + + defp round_to_single_decimal(number) do + round(number * 10) / 10.0 + end +end