Skip to content

Commit d8df681

Browse files
committed
🌒 load_postgres.exs
1 parent ce86b8e commit d8df681

File tree

1 file changed

+180
-0
lines changed

1 file changed

+180
-0
lines changed

scripts/load_postgres.exs

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
Mix.install([
2+
# Database and Ecto
3+
{:ecto_sql, "~> 3.10"},
4+
{:postgrex, ">= 0.0.0"}
5+
])
6+
7+
# Configuration constants
8+
defmodule LoadPostgres do
9+
@moduledoc false
10+
@target_accumulated_messages 1000
11+
@batch_size 100
12+
@total_throughput 5000
13+
@duration_seconds 60
14+
@concurrency 10
15+
16+
# Generate random string for names
17+
defp random_string(length \\ 10) do
18+
length
19+
|> :crypto.strong_rand_bytes()
20+
|> Base.encode16()
21+
|> binary_part(0, length)
22+
end
23+
24+
# Insert new rows with random names
25+
defp insert_batch(conn, batch_size) do
26+
names = for _ <- 1..batch_size, do: random_string()
27+
28+
values = Enum.map_join(names, ", ", fn name -> "(DEFAULT, '#{name}', 0)" end)
29+
30+
query = "INSERT INTO load_test_table_1 (id, name, idx) VALUES #{values}"
31+
Postgrex.query!(conn, query, [])
32+
33+
batch_size
34+
end
35+
36+
# Update existing rows by incrementing idx
37+
defp update_batch(conn, batch_size) do
38+
query = """
39+
UPDATE load_test_table_1
40+
SET idx = idx + 1
41+
WHERE id IN (
42+
SELECT id FROM load_test_table_1
43+
ORDER BY RANDOM()
44+
LIMIT #{batch_size}
45+
)
46+
"""
47+
48+
result = Postgrex.query!(conn, query, [])
49+
result.num_rows
50+
end
51+
52+
# Delete rows to maintain target count
53+
defp delete_batch(conn, batch_size) do
54+
query = """
55+
DELETE FROM load_test_table_1
56+
WHERE id IN (
57+
SELECT id FROM load_test_table_1
58+
ORDER BY RANDOM()
59+
LIMIT #{batch_size}
60+
)
61+
"""
62+
63+
result = Postgrex.query!(conn, query, [])
64+
result.num_rows
65+
end
66+
67+
# Get current row count
68+
defp get_row_count(conn) do
69+
result = Postgrex.query!(conn, "SELECT COUNT(*) FROM load_test_table_1", [])
70+
result.rows |> List.first() |> List.first()
71+
end
72+
73+
# Main load testing function
74+
def run_load_test(conn) do
75+
IO.puts("Starting load test...")
76+
IO.puts("Target accumulated messages: #{@target_accumulated_messages}")
77+
IO.puts("Batch size: #{@batch_size}")
78+
IO.puts("Total throughput: #{@total_throughput}")
79+
IO.puts("Concurrency: #{@concurrency}")
80+
81+
# Start by truncating the table
82+
IO.puts("\nTruncating table...")
83+
Postgrex.query!(conn, "TRUNCATE load_test_table_1 RESTART IDENTITY", [])
84+
85+
# Phase 1: Insert until we reach target accumulated messages
86+
IO.puts("\nPhase 1: Inserting messages to reach target...")
87+
insert_until_target(conn)
88+
89+
# Phase 2: Maintain target count while processing total throughput
90+
IO.puts("\nPhase 2: Processing total throughput while maintaining target count...")
91+
process_throughput(conn)
92+
93+
IO.puts("\nLoad test completed!")
94+
end
95+
96+
defp insert_until_target(conn) do
97+
current_count = get_row_count(conn)
98+
99+
if current_count < @target_accumulated_messages do
100+
needed_messages = @target_accumulated_messages - current_count
101+
batches_needed = ceil(needed_messages / @batch_size)
102+
103+
IO.puts("Need #{batches_needed} batches to reach target...")
104+
105+
for batch_num <- 1..batches_needed do
106+
inserted = insert_batch(conn, @batch_size)
107+
current_count = get_row_count(conn)
108+
IO.puts("Batch #{batch_num}: Inserted #{inserted} rows. Total: #{current_count}")
109+
end
110+
end
111+
end
112+
113+
defp process_throughput(conn) do
114+
operations_per_second = @total_throughput / @batch_size
115+
total_operations = round(@duration_seconds * operations_per_second)
116+
ms_per_operation = round(1000 / operations_per_second * @concurrency)
117+
118+
# Process operations with concurrency
119+
1..total_operations
120+
|> Stream.map(fn _ ->
121+
case :rand.uniform() do
122+
x when x <= 0.6 -> :update
123+
x when x <= 0.9 -> :insert
124+
_ -> :delete
125+
end
126+
end)
127+
|> Task.async_stream(
128+
fn operation ->
129+
case operation do
130+
:insert ->
131+
insert_batch(conn, @batch_size)
132+
133+
:update ->
134+
update_batch(conn, @batch_size)
135+
136+
:delete ->
137+
delete_batch(conn, @batch_size)
138+
end
139+
140+
Process.sleep(ms_per_operation)
141+
end,
142+
max_concurrency: @concurrency,
143+
ordered: false,
144+
timeout: 30_000
145+
)
146+
|> Stream.map(fn {:ok, result} -> result end)
147+
|> Stream.chunk_every(ceil(operations_per_second * @concurrency))
148+
|> Enum.each(fn _chunk ->
149+
current_count = get_row_count(conn)
150+
IO.puts("Processed batch. Current row count: #{current_count}")
151+
end)
152+
153+
# Keep connection alive for a moment to see final results
154+
Process.sleep(1000)
155+
final_count = get_row_count(conn)
156+
IO.puts("\nFinal row count: #{final_count}")
157+
end
158+
end
159+
160+
{:ok, conn} =
161+
Postgrex.start_link(
162+
database: "dune",
163+
username: "postgres",
164+
password: "postgres",
165+
hostname: "localhost",
166+
port: 5432
167+
)
168+
169+
# Setup the load test table with proper schema
170+
Postgrex.query!(
171+
conn,
172+
"CREATE TABLE IF NOT EXISTS load_test_table_1 (id SERIAL PRIMARY KEY, name VARCHAR(255), idx INTEGER DEFAULT 0)",
173+
[]
174+
)
175+
176+
# Run the load test
177+
LoadPostgres.run_load_test(conn)
178+
179+
# Close connection
180+
GenServer.stop(conn)

0 commit comments

Comments
 (0)