Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions additional_tests/historical_backend_tests/.env.template
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,15 @@ CLICKHOUSE_HOST=
CLICKHOUSE_PORT=
CLICKHOUSE_USERNAME=
CLICKHOUSE_PASSWORD=

ICEBERG_CATALOG_NAMESPACE=
ICEBERG_CATALOG_WAREHOUSE=
ICEBERG_CATALOG_NAME=
ICEBERG_OHLCV_HISTORY_TABLE=
ICEBERG_CATALOG_URI=
ICEBERG_CATALOG_TOKEN=
ICEBERG_S3_ACCESS_KEY=
ICEBERG_S3_SECRET_KEY=
ICEBERG_S3_REGION=
ICEBERG_S3_ENDPOINT=
CREATE_ICEBERG_DB_IF_MISSING=false
6 changes: 6 additions & 0 deletions additional_tests/historical_backend_tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,9 @@ def _load_historical_backend_creds_env_variables_if_necessary():
async def clickhouse_client():
async with octobot.community.history_backend_client(octobot.enums.CommunityHistoricalBackendType.Clickhouse) as client:
yield client


@pytest_asyncio.fixture
async def iceberg_client():
async with octobot.community.history_backend_client(octobot.enums.CommunityHistoricalBackendType.Iceberg) as client:
yield client
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import pytest

from additional_tests.historical_backend_tests import clickhouse_client
import octobot.community.history_backend.clickhouse_historical_backend_client as clickhouse_historical_backend_client
import octobot.community.history_backend.util as history_backend_util

import octobot_commons.enums as commons_enums

Expand All @@ -27,9 +27,21 @@
pytestmark = pytest.mark.asyncio


EXCHANGE = "binance"
SYMBOL = "BTC/USDT"
SHORT_TIME_FRAME = commons_enums.TimeFrames.FIFTEEN_MINUTES


async def test_fetch_candles_history_range(clickhouse_client):
# unknown candles
min_time, max_time = await clickhouse_client.fetch_candles_history_range(
EXCHANGE+"plop", SYMBOL, SHORT_TIME_FRAME
)
assert min_time == max_time == 0

# known candles
min_time, max_time = await clickhouse_client.fetch_candles_history_range(
"binance", "BTC/USDT", commons_enums.TimeFrames.FOUR_HOURS
EXCHANGE, SYMBOL, SHORT_TIME_FRAME
)
assert 0 < min_time < max_time < time.time()

Expand All @@ -38,12 +50,12 @@ async def test_fetch_candles_history(clickhouse_client):
start_time = 1718785679
end_time = 1721377495
candles_count = math.floor((end_time - start_time) / (
commons_enums.TimeFramesMinutes[commons_enums.TimeFrames.FIFTEEN_MINUTES] * 60
commons_enums.TimeFramesMinutes[SHORT_TIME_FRAME] * 60
))
# requires multiple fetches
assert candles_count == 2879
candles = await clickhouse_client.fetch_candles_history(
"binance", "BTC/USDT", commons_enums.TimeFrames.FIFTEEN_MINUTES, start_time, end_time
EXCHANGE, SYMBOL, SHORT_TIME_FRAME, start_time, end_time
)
assert sorted(candles, key=lambda c: c[0]) == candles
fetched_count = candles_count + 1
Expand All @@ -66,11 +78,11 @@ async def test_deduplicate(clickhouse_client):
start_time = 1718785679
end_time = 1721377495
candles = await clickhouse_client.fetch_candles_history(
"binance", "BTC/USDT", commons_enums.TimeFrames.FIFTEEN_MINUTES, start_time, end_time
EXCHANGE, SYMBOL, SHORT_TIME_FRAME, start_time, end_time
)
duplicated = candles + candles
assert len(duplicated) == len(candles) * 2
assert sorted(candles, key=lambda c: c[0]) == candles
deduplicated = clickhouse_historical_backend_client._deduplicate(duplicated, 0)
deduplicated = history_backend_util.deduplicate(duplicated, 0)
# deduplicated and still sorted
assert deduplicated == candles
159 changes: 159 additions & 0 deletions additional_tests/historical_backend_tests/test_iceberg_candles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# This file is part of OctoBot (https://github.com/Drakkar-Software/OctoBot)
# Copyright (c) 2025 Drakkar-Software, All rights reserved.
#
# OctoBot is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# OctoBot is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public
# License along with OctoBot. If not, see <https://www.gnu.org/licenses/>.
import math
import time
import mock
import pytest
import asyncio
import pyiceberg.table

from additional_tests.historical_backend_tests import iceberg_client
import octobot.community.history_backend.iceberg_historical_backend_client as iceberg_historical_backend_client
import octobot.community.history_backend.util as history_backend_util

import octobot_commons.enums as commons_enums
import octobot_commons.constants as commons_constants


# All test coroutines will be treated as marked.
pytestmark = pytest.mark.asyncio


EXCHANGE = "binance"
SYMBOL = "BTC/USDC"
SHORT_TIME_FRAME = commons_enums.TimeFrames.FIFTEEN_MINUTES


async def test_fetch_candles_history_range(iceberg_client):
# unknown candles
min_time, max_time = await iceberg_client.fetch_candles_history_range(
EXCHANGE+"plop", SYMBOL, SHORT_TIME_FRAME
)
assert min_time == max_time == 0

# known candles
min_time, max_time = await iceberg_client.fetch_candles_history_range(
EXCHANGE, SYMBOL, SHORT_TIME_FRAME
)
assert 0 < min_time < max_time < time.time()


async def test_fetch_candles_history(iceberg_client):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

start_time = 1718785679
end_time = 1721377495
candles_count = math.floor((end_time - start_time) / (
commons_enums.TimeFramesMinutes[SHORT_TIME_FRAME] * 60
))
# requires multiple fetches
assert candles_count == 2879
candles = await iceberg_client.fetch_candles_history(
EXCHANGE, SYMBOL, SHORT_TIME_FRAME, start_time, end_time
)
assert sorted(candles, key=lambda c: c[0]) == candles
fetched_count = candles_count + 1
assert len(candles) == fetched_count
# will fail if parsed time is not UTC
assert candles[0][commons_enums.PriceIndexes.IND_PRICE_TIME.value] == 1718785800
assert (
candles[0][commons_enums.PriceIndexes.IND_PRICE_TIME.value]
!= candles[0][commons_enums.PriceIndexes.IND_PRICE_OPEN.value]
!= candles[0][commons_enums.PriceIndexes.IND_PRICE_HIGH.value]
!= candles[0][commons_enums.PriceIndexes.IND_PRICE_LOW.value]
!= candles[0][commons_enums.PriceIndexes.IND_PRICE_CLOSE.value]
!= candles[0][commons_enums.PriceIndexes.IND_PRICE_VOL.value]
)
# candles are unique
assert len(set(c[0] for c in candles)) == fetched_count


async def test_fetch_candles_history_asynchronousness(iceberg_client):
start_time = 1718785679
end_time_1 = 1721377495
end_time_2 = 1721377495 + 2 * commons_constants.DAYS_TO_SECONDS
end_time_3 = 1721377495 + 23 * commons_constants.DAYS_TO_SECONDS

scan_call_times = []
_to_arrow_call_times = []
_to_arrow_return_times = []

def _get_or_create_table(*args, **kwargs):
table = original_get_or_create_table(*args, **kwargs)
original_scan = table.scan


def _scan(*args, **kwargs):
scan_call_times.append(time.time())
scan_result = original_scan(*args, **kwargs)
original_to_arrow = scan_result.to_arrow

def _to_arrow(*args, **kwargs):
_to_arrow_call_times.append(time.time())
try:
return original_to_arrow(*args, **kwargs)
finally:
_to_arrow_return_times.append(time.time())

scan_result.to_arrow = _to_arrow
return scan_result

table.scan = mock.Mock(side_effect=_scan)
return table

original_get_or_create_table = iceberg_client.get_or_create_table
with (
mock.patch.object(iceberg_client, "get_or_create_table", mock.Mock(side_effect=_get_or_create_table)) as get_or_create_table_mock
):
candles_1, candles_2, candles_3 = await asyncio.gather(
iceberg_client.fetch_candles_history(
EXCHANGE, SYMBOL, SHORT_TIME_FRAME, start_time, end_time_1
),
iceberg_client.fetch_candles_history(
EXCHANGE, SYMBOL, SHORT_TIME_FRAME, start_time, end_time_2
),
iceberg_client.fetch_candles_history(
EXCHANGE, SYMBOL, SHORT_TIME_FRAME, start_time, end_time_3
),
)
assert get_or_create_table_mock.call_count == 3
assert len(scan_call_times) == 3
assert len(_to_arrow_call_times) == 3
assert len(_to_arrow_return_times) == 3

assert scan_call_times[0] <= scan_call_times[1] <= scan_call_times[2]
assert _to_arrow_call_times[0] <= _to_arrow_call_times[1] <= _to_arrow_call_times[2]
assert _to_arrow_return_times[0] < _to_arrow_return_times[1] < _to_arrow_return_times[2]

# all to_arrow calls have been performed before the first to_arrow return,
# which means they are running concurrently in this async context
assert max(_to_arrow_call_times) < min(_to_arrow_return_times)

assert len(candles_1) > 2000
assert len(candles_2) > len(candles_1)
assert len(candles_3) > len(candles_2)


async def test_deduplicate(iceberg_client):
start_time = 1718785679
end_time = 1721377495
candles = await iceberg_client.fetch_candles_history(
EXCHANGE, SYMBOL, SHORT_TIME_FRAME, start_time, end_time
)
duplicated = candles + candles
assert len(duplicated) == len(candles) * 2
assert sorted(candles, key=lambda c: c[0]) == candles
deduplicated = history_backend_util.deduplicate(duplicated, 0)
# deduplicated and still sorted
assert deduplicated == candles
2 changes: 2 additions & 0 deletions octobot/community/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
history_backend_client,
HistoricalBackendClient,
ClickhouseHistoricalBackendClient,
IcebergHistoricalBackendClient,
)

__all__ = [
Expand Down Expand Up @@ -152,4 +153,5 @@
"history_backend_client",
"HistoricalBackendClient",
"ClickhouseHistoricalBackendClient",
"IcebergHistoricalBackendClient",
]
6 changes: 6 additions & 0 deletions octobot/community/history_backend/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,14 @@
ClickhouseHistoricalBackendClient,
)

from octobot.community.history_backend import iceberg_historical_backend_client
from octobot.community.history_backend.iceberg_historical_backend_client import (
IcebergHistoricalBackendClient,
)

__all__ = [
"history_backend_client",
"HistoricalBackendClient",
"ClickhouseHistoricalBackendClient",
"IcebergHistoricalBackendClient",
]
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import octobot_commons.enums as commons_enums
import octobot.constants as constants
import octobot.community.history_backend.historical_backend_client as historical_backend_client
import octobot.community.history_backend.util as history_backend_util


class ClickhouseHistoricalBackendClient(historical_backend_client.HistoricalBackendClient):
Expand Down Expand Up @@ -69,7 +70,7 @@ async def fetch_candles_history(
[time_frame.value, exchange, symbol, first_open_time, last_open_time],
)
formatted = self._format_ohlcvs(result.result_rows)
return _deduplicate(formatted, 0)
return history_backend_util.deduplicate(formatted, 0)

async def fetch_candles_history_range(
self,
Expand All @@ -89,8 +90,8 @@ async def fetch_candles_history_range(
[time_frame.value, exchange, symbol],
)
return (
_get_utc_timestamp_from_datetime(result.result_rows[0][0]),
_get_utc_timestamp_from_datetime(result.result_rows[0][1])
history_backend_util.get_utc_timestamp_from_datetime(result.result_rows[0][0]),
history_backend_util.get_utc_timestamp_from_datetime(result.result_rows[0][1])
)

async def insert_candles_history(self, rows: list, column_names: list) -> None:
Expand All @@ -111,7 +112,7 @@ def _format_ohlcvs(ohlcvs: typing.Iterable) -> list[list[float]]:
# IND_PRICE_VOL = 5
return [
[
int(_get_utc_timestamp_from_datetime(ohlcv[0])),
int(history_backend_util.get_utc_timestamp_from_datetime(ohlcv[0])),
ohlcv[1],
ohlcv[2],
ohlcv[3],
Expand All @@ -124,17 +125,3 @@ def _format_ohlcvs(ohlcvs: typing.Iterable) -> list[list[float]]:
@staticmethod
def get_formatted_time(timestamp: float) -> datetime:
return datetime.fromtimestamp(timestamp, tz=timezone.utc)

def _get_utc_timestamp_from_datetime(dt: datetime) -> float:
"""
Convert a datetime to a timestamp in UTC
WARNING: usable here as we know this DB stores time in UTC only
"""
return dt.replace(tzinfo=timezone.utc).timestamp()


def _deduplicate(elements, key) -> list:
# from https://stackoverflow.com/questions/480214/how-do-i-remove-duplicates-from-a-list-while-preserving-order
seen = set()
seen_add = seen.add
return [x for x in elements if not (x[key] in seen or seen_add(x[key]))]
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import contextlib

import octobot.community.history_backend.clickhouse_historical_backend_client as clickhouse_historical_backend_client
import octobot.community.history_backend.iceberg_historical_backend_client as iceberg_historical_backend_client
import octobot.enums


Expand All @@ -38,6 +39,8 @@ def _create_client(
async with history_backend_client(backend_type) as client:
await client.xxxx()
"""
if backend_type is octobot.enums.CommunityHistoricalBackendType.Iceberg:
return iceberg_historical_backend_client.IcebergHistoricalBackendClient()
if backend_type is octobot.enums.CommunityHistoricalBackendType.Clickhouse:
return clickhouse_historical_backend_client.ClickhouseHistoricalBackendClient()
raise NotImplementedError(f"Unsupported historical backend type: {backend_type}")
Loading