Skip to content

Commit 73d0327

Browse files
committed
[Community] init Iceberg support
1 parent 40c3525 commit 73d0327

File tree

13 files changed

+543
-25
lines changed

13 files changed

+543
-25
lines changed

additional_tests/historical_backend_tests/.env.template

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,14 @@ CLICKHOUSE_HOST=
22
CLICKHOUSE_PORT=
33
CLICKHOUSE_USERNAME=
44
CLICKHOUSE_PASSWORD=
5+
6+
ICEBERG_CATALOG_NAMESPACE=
7+
ICEBERG_CATALOG_WAREHOUSE=
8+
ICEBERG_CATALOG_NAME=
9+
ICEBERG_CATALOG_URI=
10+
ICEBERG_CATALOG_TOKEN=
11+
ICEBERG_S3_ACCESS_KEY=
12+
ICEBERG_S3_SECRET_KEY=
13+
ICEBERG_S3_REGION=
14+
ICEBERG_S3_ENDPOINT=
15+
CREATE_ICEBERG_DB_IF_MISSING=false

additional_tests/historical_backend_tests/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,9 @@ def _load_historical_backend_creds_env_variables_if_necessary():
3737
async def clickhouse_client():
3838
async with octobot.community.history_backend_client(octobot.enums.CommunityHistoricalBackendType.Clickhouse) as client:
3939
yield client
40+
41+
42+
@pytest_asyncio.fixture
43+
async def iceberg_client():
44+
async with octobot.community.history_backend_client(octobot.enums.CommunityHistoricalBackendType.Iceberg) as client:
45+
yield client

additional_tests/historical_backend_tests/test_clickhouse_candles.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import pytest
1919

2020
from additional_tests.historical_backend_tests import clickhouse_client
21-
import octobot.community.history_backend.clickhouse_historical_backend_client as clickhouse_historical_backend_client
21+
import octobot.community.history_backend.util as history_backend_util
2222

2323
import octobot_commons.enums as commons_enums
2424

@@ -27,9 +27,21 @@
2727
pytestmark = pytest.mark.asyncio
2828

2929

30+
EXCHANGE = "binance"
31+
SYMBOL = "BTC/USDT"
32+
SHORT_TIME_FRAME = commons_enums.TimeFrames.FIFTEEN_MINUTES
33+
34+
3035
async def test_fetch_candles_history_range(clickhouse_client):
36+
# unknown candles
37+
min_time, max_time = await clickhouse_client.fetch_candles_history_range(
38+
EXCHANGE+"plop", SYMBOL, SHORT_TIME_FRAME
39+
)
40+
assert min_time == max_time == 0
41+
42+
# known candles
3143
min_time, max_time = await clickhouse_client.fetch_candles_history_range(
32-
"binance", "BTC/USDT", commons_enums.TimeFrames.FOUR_HOURS
44+
EXCHANGE, SYMBOL, SHORT_TIME_FRAME
3345
)
3446
assert 0 < min_time < max_time < time.time()
3547

@@ -38,12 +50,12 @@ async def test_fetch_candles_history(clickhouse_client):
3850
start_time = 1718785679
3951
end_time = 1721377495
4052
candles_count = math.floor((end_time - start_time) / (
41-
commons_enums.TimeFramesMinutes[commons_enums.TimeFrames.FIFTEEN_MINUTES] * 60
53+
commons_enums.TimeFramesMinutes[SHORT_TIME_FRAME] * 60
4254
))
4355
# requires multiple fetches
4456
assert candles_count == 2879
4557
candles = await clickhouse_client.fetch_candles_history(
46-
"binance", "BTC/USDT", commons_enums.TimeFrames.FIFTEEN_MINUTES, start_time, end_time
58+
EXCHANGE, SYMBOL, SHORT_TIME_FRAME, start_time, end_time
4759
)
4860
assert sorted(candles, key=lambda c: c[0]) == candles
4961
fetched_count = candles_count + 1
@@ -66,11 +78,11 @@ async def test_deduplicate(clickhouse_client):
6678
start_time = 1718785679
6779
end_time = 1721377495
6880
candles = await clickhouse_client.fetch_candles_history(
69-
"binance", "BTC/USDT", commons_enums.TimeFrames.FIFTEEN_MINUTES, start_time, end_time
81+
EXCHANGE, SYMBOL, SHORT_TIME_FRAME, start_time, end_time
7082
)
7183
duplicated = candles + candles
7284
assert len(duplicated) == len(candles) * 2
7385
assert sorted(candles, key=lambda c: c[0]) == candles
74-
deduplicated = clickhouse_historical_backend_client._deduplicate(duplicated, 0)
86+
deduplicated = history_backend_util.deduplicate(duplicated, 0)
7587
# deduplicated and still sorted
7688
assert deduplicated == candles
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
# This file is part of OctoBot (https://github.com/Drakkar-Software/OctoBot)
2+
# Copyright (c) 2025 Drakkar-Software, All rights reserved.
3+
#
4+
# OctoBot is free software; you can redistribute it and/or
5+
# modify it under the terms of the GNU General Public License
6+
# as published by the Free Software Foundation; either
7+
# version 3.0 of the License, or (at your option) any later version.
8+
#
9+
# OctoBot is distributed in the hope that it will be useful,
10+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12+
# General Public License for more details.
13+
#
14+
# You should have received a copy of the GNU General Public
15+
# License along with OctoBot. If not, see <https://www.gnu.org/licenses/>.
16+
import math
17+
import time
18+
import mock
19+
import pytest
20+
import asyncio
21+
import pyiceberg.table
22+
23+
from additional_tests.historical_backend_tests import iceberg_client
24+
import octobot.community.history_backend.iceberg_historical_backend_client as iceberg_historical_backend_client
25+
import octobot.community.history_backend.util as history_backend_util
26+
27+
import octobot_commons.enums as commons_enums
28+
import octobot_commons.constants as commons_constants
29+
30+
31+
# All test coroutines will be treated as marked.
32+
pytestmark = pytest.mark.asyncio
33+
34+
35+
EXCHANGE = "binance"
36+
SYMBOL = "BTC/USDC"
37+
SHORT_TIME_FRAME = commons_enums.TimeFrames.FIFTEEN_MINUTES
38+
39+
40+
async def test_fetch_candles_history_range(iceberg_client):
41+
# unknown candles
42+
min_time, max_time = await iceberg_client.fetch_candles_history_range(
43+
EXCHANGE+"plop", SYMBOL, SHORT_TIME_FRAME
44+
)
45+
assert min_time == max_time == 0
46+
47+
# known candles
48+
min_time, max_time = await iceberg_client.fetch_candles_history_range(
49+
EXCHANGE, SYMBOL, SHORT_TIME_FRAME
50+
)
51+
assert 0 < min_time < max_time < time.time()
52+
53+
54+
async def test_fetch_candles_history(iceberg_client):
55+
start_time = 1718785679
56+
end_time = 1721377495
57+
candles_count = math.floor((end_time - start_time) / (
58+
commons_enums.TimeFramesMinutes[SHORT_TIME_FRAME] * 60
59+
))
60+
# requires multiple fetches
61+
assert candles_count == 2879
62+
candles = await iceberg_client.fetch_candles_history(
63+
EXCHANGE, SYMBOL, SHORT_TIME_FRAME, start_time, end_time
64+
)
65+
assert sorted(candles, key=lambda c: c[0]) == candles
66+
fetched_count = candles_count + 1
67+
assert len(candles) == fetched_count
68+
# will fail if parsed time is not UTC
69+
assert candles[0][commons_enums.PriceIndexes.IND_PRICE_TIME.value] == 1718785800
70+
assert (
71+
candles[0][commons_enums.PriceIndexes.IND_PRICE_TIME.value]
72+
!= candles[0][commons_enums.PriceIndexes.IND_PRICE_OPEN.value]
73+
!= candles[0][commons_enums.PriceIndexes.IND_PRICE_HIGH.value]
74+
!= candles[0][commons_enums.PriceIndexes.IND_PRICE_LOW.value]
75+
!= candles[0][commons_enums.PriceIndexes.IND_PRICE_CLOSE.value]
76+
!= candles[0][commons_enums.PriceIndexes.IND_PRICE_VOL.value]
77+
)
78+
# candles are unique
79+
assert len(set(c[0] for c in candles)) == fetched_count
80+
81+
82+
async def test_fetch_candles_history_asynchronousness(iceberg_client):
83+
start_time = 1718785679
84+
end_time_1 = 1721377495
85+
end_time_2 = 1721377495 + 2 * commons_constants.DAYS_TO_SECONDS
86+
end_time_3 = 1721377495 + 23 * commons_constants.DAYS_TO_SECONDS
87+
88+
scan_call_times = []
89+
_to_arrow_call_times = []
90+
_to_arrow_return_times = []
91+
92+
def _get_or_create_table(*args, **kwargs):
93+
table = original_get_or_create_table(*args, **kwargs)
94+
original_scan = table.scan
95+
96+
97+
def _scan(*args, **kwargs):
98+
scan_call_times.append(time.time())
99+
scan_result = original_scan(*args, **kwargs)
100+
original_to_arrow = scan_result.to_arrow
101+
102+
def _to_arrow(*args, **kwargs):
103+
_to_arrow_call_times.append(time.time())
104+
try:
105+
return original_to_arrow(*args, **kwargs)
106+
finally:
107+
_to_arrow_return_times.append(time.time())
108+
109+
scan_result.to_arrow = _to_arrow
110+
return scan_result
111+
112+
table.scan = mock.Mock(side_effect=_scan)
113+
return table
114+
115+
original_get_or_create_table = iceberg_client.get_or_create_table
116+
with (
117+
mock.patch.object(iceberg_client, "get_or_create_table", mock.Mock(side_effect=_get_or_create_table)) as get_or_create_table_mock
118+
):
119+
candles_1, candles_2, candles_3 = await asyncio.gather(
120+
iceberg_client.fetch_candles_history(
121+
EXCHANGE, SYMBOL, SHORT_TIME_FRAME, start_time, end_time_1
122+
),
123+
iceberg_client.fetch_candles_history(
124+
EXCHANGE, SYMBOL, SHORT_TIME_FRAME, start_time, end_time_2
125+
),
126+
iceberg_client.fetch_candles_history(
127+
EXCHANGE, SYMBOL, SHORT_TIME_FRAME, start_time, end_time_3
128+
),
129+
)
130+
assert get_or_create_table_mock.call_count == 3
131+
assert len(scan_call_times) == 3
132+
assert len(_to_arrow_call_times) == 3
133+
assert len(_to_arrow_return_times) == 3
134+
135+
assert scan_call_times[0] <= scan_call_times[1] <= scan_call_times[2]
136+
assert _to_arrow_call_times[0] <= _to_arrow_call_times[1] <= _to_arrow_call_times[2]
137+
assert _to_arrow_return_times[0] < _to_arrow_return_times[1] < _to_arrow_return_times[2]
138+
139+
# all to_arrow calls have been performed before the first to_arrow return,
140+
# which means they are running concurrently in this async context
141+
assert max(_to_arrow_call_times) < min(_to_arrow_return_times)
142+
143+
assert len(candles_1) > 2000
144+
assert len(candles_2) > len(candles_1)
145+
assert len(candles_3) > len(candles_2)
146+
147+
148+
async def test_deduplicate(iceberg_client):
149+
start_time = 1718785679
150+
end_time = 1721377495
151+
candles = await iceberg_client.fetch_candles_history(
152+
EXCHANGE, SYMBOL, SHORT_TIME_FRAME, start_time, end_time
153+
)
154+
duplicated = candles + candles
155+
assert len(duplicated) == len(candles) * 2
156+
assert sorted(candles, key=lambda c: c[0]) == candles
157+
deduplicated = history_backend_util.deduplicate(duplicated, 0)
158+
# deduplicated and still sorted
159+
assert deduplicated == candles

octobot/community/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
history_backend_client,
9898
HistoricalBackendClient,
9999
ClickhouseHistoricalBackendClient,
100+
IcebergHistoricalBackendClient,
100101
)
101102

102103
__all__ = [
@@ -152,4 +153,5 @@
152153
"history_backend_client",
153154
"HistoricalBackendClient",
154155
"ClickhouseHistoricalBackendClient",
156+
"IcebergHistoricalBackendClient",
155157
]

octobot/community/history_backend/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,14 @@
2929
ClickhouseHistoricalBackendClient,
3030
)
3131

32+
from octobot.community.history_backend import iceberg_historical_backend_client
33+
from octobot.community.history_backend.iceberg_historical_backend_client import (
34+
IcebergHistoricalBackendClient,
35+
)
36+
3237
__all__ = [
3338
"history_backend_client",
3439
"HistoricalBackendClient",
3540
"ClickhouseHistoricalBackendClient",
41+
"IcebergHistoricalBackendClient",
3642
]

octobot/community/history_backend/clickhouse_historical_backend_client.py

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import octobot_commons.enums as commons_enums
2323
import octobot.constants as constants
2424
import octobot.community.history_backend.historical_backend_client as historical_backend_client
25+
import octobot.community.history_backend.util as history_backend_util
2526

2627

2728
class ClickhouseHistoricalBackendClient(historical_backend_client.HistoricalBackendClient):
@@ -69,7 +70,7 @@ async def fetch_candles_history(
6970
[time_frame.value, exchange, symbol, first_open_time, last_open_time],
7071
)
7172
formatted = self._format_ohlcvs(result.result_rows)
72-
return _deduplicate(formatted, 0)
73+
return history_backend_util.deduplicate(formatted, 0)
7374

7475
async def fetch_candles_history_range(
7576
self,
@@ -89,8 +90,8 @@ async def fetch_candles_history_range(
8990
[time_frame.value, exchange, symbol],
9091
)
9192
return (
92-
_get_utc_timestamp_from_datetime(result.result_rows[0][0]),
93-
_get_utc_timestamp_from_datetime(result.result_rows[0][1])
93+
history_backend_util.get_utc_timestamp_from_datetime(result.result_rows[0][0]),
94+
history_backend_util.get_utc_timestamp_from_datetime(result.result_rows[0][1])
9495
)
9596

9697
async def insert_candles_history(self, rows: list, column_names: list) -> None:
@@ -111,7 +112,7 @@ def _format_ohlcvs(ohlcvs: typing.Iterable) -> list[list[float]]:
111112
# IND_PRICE_VOL = 5
112113
return [
113114
[
114-
int(_get_utc_timestamp_from_datetime(ohlcv[0])),
115+
int(history_backend_util.get_utc_timestamp_from_datetime(ohlcv[0])),
115116
ohlcv[1],
116117
ohlcv[2],
117118
ohlcv[3],
@@ -124,17 +125,3 @@ def _format_ohlcvs(ohlcvs: typing.Iterable) -> list[list[float]]:
124125
@staticmethod
125126
def get_formatted_time(timestamp: float) -> datetime:
126127
return datetime.fromtimestamp(timestamp, tz=timezone.utc)
127-
128-
def _get_utc_timestamp_from_datetime(dt: datetime) -> float:
129-
"""
130-
Convert a datetime to a timestamp in UTC
131-
WARNING: usable here as we know this DB stores time in UTC only
132-
"""
133-
return dt.replace(tzinfo=timezone.utc).timestamp()
134-
135-
136-
def _deduplicate(elements, key) -> list:
137-
# from https://stackoverflow.com/questions/480214/how-do-i-remove-duplicates-from-a-list-while-preserving-order
138-
seen = set()
139-
seen_add = seen.add
140-
return [x for x in elements if not (x[key] in seen or seen_add(x[key]))]

octobot/community/history_backend/history_backend_factory.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import contextlib
1717

1818
import octobot.community.history_backend.clickhouse_historical_backend_client as clickhouse_historical_backend_client
19+
import octobot.community.history_backend.iceberg_historical_backend_client as iceberg_historical_backend_client
1920
import octobot.enums
2021

2122

@@ -38,6 +39,8 @@ def _create_client(
3839
async with history_backend_client(backend_type) as client:
3940
await client.xxxx()
4041
"""
42+
if backend_type is octobot.enums.CommunityHistoricalBackendType.Iceberg:
43+
return iceberg_historical_backend_client.IcebergHistoricalBackendClient()
4144
if backend_type is octobot.enums.CommunityHistoricalBackendType.Clickhouse:
4245
return clickhouse_historical_backend_client.ClickhouseHistoricalBackendClient()
4346
raise NotImplementedError(f"Unsupported historical backend type: {backend_type}")

0 commit comments

Comments
 (0)