From 6996d26a251995137b36910821ec466b6209cf1e Mon Sep 17 00:00:00 2001 From: Guillaume De Saint Martin Date: Tue, 30 Sep 2025 23:43:10 +0200 Subject: [PATCH] [Community] init Iceberg support --- .../historical_backend_tests/.env.template | 12 + .../historical_backend_tests/__init__.py | 6 + .../test_clickhouse_candles.py | 24 +- .../test_iceberg_candles.py | 159 +++++++++ octobot/community/__init__.py | 2 + octobot/community/history_backend/__init__.py | 6 + .../clickhouse_historical_backend_client.py | 23 +- .../history_backend_factory.py | 3 + .../iceberg_historical_backend_client.py | 310 ++++++++++++++++++ octobot/community/history_backend/util.py | 16 + octobot/constants.py | 12 + octobot/enums.py | 3 +- requirements.txt | 2 + 13 files changed, 553 insertions(+), 25 deletions(-) create mode 100644 additional_tests/historical_backend_tests/test_iceberg_candles.py create mode 100644 octobot/community/history_backend/iceberg_historical_backend_client.py create mode 100644 octobot/community/history_backend/util.py diff --git a/additional_tests/historical_backend_tests/.env.template b/additional_tests/historical_backend_tests/.env.template index ffdf1d897..b008b15dd 100644 --- a/additional_tests/historical_backend_tests/.env.template +++ b/additional_tests/historical_backend_tests/.env.template @@ -2,3 +2,15 @@ CLICKHOUSE_HOST= CLICKHOUSE_PORT= CLICKHOUSE_USERNAME= CLICKHOUSE_PASSWORD= + +ICEBERG_CATALOG_NAMESPACE= +ICEBERG_CATALOG_WAREHOUSE= +ICEBERG_CATALOG_NAME= +ICEBERG_OHLCV_HISTORY_TABLE= +ICEBERG_CATALOG_URI= +ICEBERG_CATALOG_TOKEN= +ICEBERG_S3_ACCESS_KEY= +ICEBERG_S3_SECRET_KEY= +ICEBERG_S3_REGION= +ICEBERG_S3_ENDPOINT= +CREATE_ICEBERG_DB_IF_MISSING=false diff --git a/additional_tests/historical_backend_tests/__init__.py b/additional_tests/historical_backend_tests/__init__.py index 59eb9141c..e0a94a67c 100644 --- a/additional_tests/historical_backend_tests/__init__.py +++ b/additional_tests/historical_backend_tests/__init__.py @@ -37,3 +37,9 @@ def _load_historical_backend_creds_env_variables_if_necessary(): async def clickhouse_client(): async with octobot.community.history_backend_client(octobot.enums.CommunityHistoricalBackendType.Clickhouse) as client: yield client + + +@pytest_asyncio.fixture +async def iceberg_client(): + async with octobot.community.history_backend_client(octobot.enums.CommunityHistoricalBackendType.Iceberg) as client: + yield client diff --git a/additional_tests/historical_backend_tests/test_clickhouse_candles.py b/additional_tests/historical_backend_tests/test_clickhouse_candles.py index 6473ecc88..8c485a9a4 100644 --- a/additional_tests/historical_backend_tests/test_clickhouse_candles.py +++ b/additional_tests/historical_backend_tests/test_clickhouse_candles.py @@ -18,7 +18,7 @@ import pytest from additional_tests.historical_backend_tests import clickhouse_client -import octobot.community.history_backend.clickhouse_historical_backend_client as clickhouse_historical_backend_client +import octobot.community.history_backend.util as history_backend_util import octobot_commons.enums as commons_enums @@ -27,9 +27,21 @@ pytestmark = pytest.mark.asyncio +EXCHANGE = "binance" +SYMBOL = "BTC/USDT" +SHORT_TIME_FRAME = commons_enums.TimeFrames.FIFTEEN_MINUTES + + async def test_fetch_candles_history_range(clickhouse_client): + # unknown candles + min_time, max_time = await clickhouse_client.fetch_candles_history_range( + EXCHANGE+"plop", SYMBOL, SHORT_TIME_FRAME + ) + assert min_time == max_time == 0 + + # known candles min_time, max_time = await clickhouse_client.fetch_candles_history_range( - "binance", "BTC/USDT", commons_enums.TimeFrames.FOUR_HOURS + EXCHANGE, SYMBOL, SHORT_TIME_FRAME ) assert 0 < min_time < max_time < time.time() @@ -38,12 +50,12 @@ async def test_fetch_candles_history(clickhouse_client): start_time = 1718785679 end_time = 1721377495 candles_count = math.floor((end_time - start_time) / ( - commons_enums.TimeFramesMinutes[commons_enums.TimeFrames.FIFTEEN_MINUTES] * 60 + commons_enums.TimeFramesMinutes[SHORT_TIME_FRAME] * 60 )) # requires multiple fetches assert candles_count == 2879 candles = await clickhouse_client.fetch_candles_history( - "binance", "BTC/USDT", commons_enums.TimeFrames.FIFTEEN_MINUTES, start_time, end_time + EXCHANGE, SYMBOL, SHORT_TIME_FRAME, start_time, end_time ) assert sorted(candles, key=lambda c: c[0]) == candles fetched_count = candles_count + 1 @@ -66,11 +78,11 @@ async def test_deduplicate(clickhouse_client): start_time = 1718785679 end_time = 1721377495 candles = await clickhouse_client.fetch_candles_history( - "binance", "BTC/USDT", commons_enums.TimeFrames.FIFTEEN_MINUTES, start_time, end_time + EXCHANGE, SYMBOL, SHORT_TIME_FRAME, start_time, end_time ) duplicated = candles + candles assert len(duplicated) == len(candles) * 2 assert sorted(candles, key=lambda c: c[0]) == candles - deduplicated = clickhouse_historical_backend_client._deduplicate(duplicated, 0) + deduplicated = history_backend_util.deduplicate(duplicated, 0) # deduplicated and still sorted assert deduplicated == candles diff --git a/additional_tests/historical_backend_tests/test_iceberg_candles.py b/additional_tests/historical_backend_tests/test_iceberg_candles.py new file mode 100644 index 000000000..3f2bc537d --- /dev/null +++ b/additional_tests/historical_backend_tests/test_iceberg_candles.py @@ -0,0 +1,159 @@ +# This file is part of OctoBot (https://github.com/Drakkar-Software/OctoBot) +# Copyright (c) 2025 Drakkar-Software, All rights reserved. +# +# OctoBot is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either +# version 3.0 of the License, or (at your option) any later version. +# +# OctoBot is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public +# License along with OctoBot. If not, see . +import math +import time +import mock +import pytest +import asyncio +import pyiceberg.table + +from additional_tests.historical_backend_tests import iceberg_client +import octobot.community.history_backend.iceberg_historical_backend_client as iceberg_historical_backend_client +import octobot.community.history_backend.util as history_backend_util + +import octobot_commons.enums as commons_enums +import octobot_commons.constants as commons_constants + + +# All test coroutines will be treated as marked. +pytestmark = pytest.mark.asyncio + + +EXCHANGE = "binance" +SYMBOL = "BTC/USDC" +SHORT_TIME_FRAME = commons_enums.TimeFrames.FIFTEEN_MINUTES + + +async def test_fetch_candles_history_range(iceberg_client): + # unknown candles + min_time, max_time = await iceberg_client.fetch_candles_history_range( + EXCHANGE+"plop", SYMBOL, SHORT_TIME_FRAME + ) + assert min_time == max_time == 0 + + # known candles + min_time, max_time = await iceberg_client.fetch_candles_history_range( + EXCHANGE, SYMBOL, SHORT_TIME_FRAME + ) + assert 0 < min_time < max_time < time.time() + + +async def test_fetch_candles_history(iceberg_client): + start_time = 1718785679 + end_time = 1721377495 + candles_count = math.floor((end_time - start_time) / ( + commons_enums.TimeFramesMinutes[SHORT_TIME_FRAME] * 60 + )) + # requires multiple fetches + assert candles_count == 2879 + candles = await iceberg_client.fetch_candles_history( + EXCHANGE, SYMBOL, SHORT_TIME_FRAME, start_time, end_time + ) + assert sorted(candles, key=lambda c: c[0]) == candles + fetched_count = candles_count + 1 + assert len(candles) == fetched_count + # will fail if parsed time is not UTC + assert candles[0][commons_enums.PriceIndexes.IND_PRICE_TIME.value] == 1718785800 + assert ( + candles[0][commons_enums.PriceIndexes.IND_PRICE_TIME.value] + != candles[0][commons_enums.PriceIndexes.IND_PRICE_OPEN.value] + != candles[0][commons_enums.PriceIndexes.IND_PRICE_HIGH.value] + != candles[0][commons_enums.PriceIndexes.IND_PRICE_LOW.value] + != candles[0][commons_enums.PriceIndexes.IND_PRICE_CLOSE.value] + != candles[0][commons_enums.PriceIndexes.IND_PRICE_VOL.value] + ) + # candles are unique + assert len(set(c[0] for c in candles)) == fetched_count + + +async def test_fetch_candles_history_asynchronousness(iceberg_client): + start_time = 1718785679 + end_time_1 = 1721377495 + end_time_2 = 1721377495 + 2 * commons_constants.DAYS_TO_SECONDS + end_time_3 = 1721377495 + 23 * commons_constants.DAYS_TO_SECONDS + + scan_call_times = [] + _to_arrow_call_times = [] + _to_arrow_return_times = [] + + def _get_or_create_table(*args, **kwargs): + table = original_get_or_create_table(*args, **kwargs) + original_scan = table.scan + + + def _scan(*args, **kwargs): + scan_call_times.append(time.time()) + scan_result = original_scan(*args, **kwargs) + original_to_arrow = scan_result.to_arrow + + def _to_arrow(*args, **kwargs): + _to_arrow_call_times.append(time.time()) + try: + return original_to_arrow(*args, **kwargs) + finally: + _to_arrow_return_times.append(time.time()) + + scan_result.to_arrow = _to_arrow + return scan_result + + table.scan = mock.Mock(side_effect=_scan) + return table + + original_get_or_create_table = iceberg_client.get_or_create_table + with ( + mock.patch.object(iceberg_client, "get_or_create_table", mock.Mock(side_effect=_get_or_create_table)) as get_or_create_table_mock + ): + candles_1, candles_2, candles_3 = await asyncio.gather( + iceberg_client.fetch_candles_history( + EXCHANGE, SYMBOL, SHORT_TIME_FRAME, start_time, end_time_1 + ), + iceberg_client.fetch_candles_history( + EXCHANGE, SYMBOL, SHORT_TIME_FRAME, start_time, end_time_2 + ), + iceberg_client.fetch_candles_history( + EXCHANGE, SYMBOL, SHORT_TIME_FRAME, start_time, end_time_3 + ), + ) + assert get_or_create_table_mock.call_count == 3 + assert len(scan_call_times) == 3 + assert len(_to_arrow_call_times) == 3 + assert len(_to_arrow_return_times) == 3 + + assert scan_call_times[0] <= scan_call_times[1] <= scan_call_times[2] + assert _to_arrow_call_times[0] <= _to_arrow_call_times[1] <= _to_arrow_call_times[2] + assert _to_arrow_return_times[0] < _to_arrow_return_times[1] < _to_arrow_return_times[2] + + # all to_arrow calls have been performed before the first to_arrow return, + # which means they are running concurrently in this async context + assert max(_to_arrow_call_times) < min(_to_arrow_return_times) + + assert len(candles_1) > 2000 + assert len(candles_2) > len(candles_1) + assert len(candles_3) > len(candles_2) + + +async def test_deduplicate(iceberg_client): + start_time = 1718785679 + end_time = 1721377495 + candles = await iceberg_client.fetch_candles_history( + EXCHANGE, SYMBOL, SHORT_TIME_FRAME, start_time, end_time + ) + duplicated = candles + candles + assert len(duplicated) == len(candles) * 2 + assert sorted(candles, key=lambda c: c[0]) == candles + deduplicated = history_backend_util.deduplicate(duplicated, 0) + # deduplicated and still sorted + assert deduplicated == candles diff --git a/octobot/community/__init__.py b/octobot/community/__init__.py index 31a5945f9..783796b8d 100644 --- a/octobot/community/__init__.py +++ b/octobot/community/__init__.py @@ -97,6 +97,7 @@ history_backend_client, HistoricalBackendClient, ClickhouseHistoricalBackendClient, + IcebergHistoricalBackendClient, ) __all__ = [ @@ -152,4 +153,5 @@ "history_backend_client", "HistoricalBackendClient", "ClickhouseHistoricalBackendClient", + "IcebergHistoricalBackendClient", ] diff --git a/octobot/community/history_backend/__init__.py b/octobot/community/history_backend/__init__.py index fe51fed09..b5b3c51b4 100644 --- a/octobot/community/history_backend/__init__.py +++ b/octobot/community/history_backend/__init__.py @@ -29,8 +29,14 @@ ClickhouseHistoricalBackendClient, ) +from octobot.community.history_backend import iceberg_historical_backend_client +from octobot.community.history_backend.iceberg_historical_backend_client import ( + IcebergHistoricalBackendClient, +) + __all__ = [ "history_backend_client", "HistoricalBackendClient", "ClickhouseHistoricalBackendClient", + "IcebergHistoricalBackendClient", ] diff --git a/octobot/community/history_backend/clickhouse_historical_backend_client.py b/octobot/community/history_backend/clickhouse_historical_backend_client.py index 75cc91795..319c803da 100644 --- a/octobot/community/history_backend/clickhouse_historical_backend_client.py +++ b/octobot/community/history_backend/clickhouse_historical_backend_client.py @@ -22,6 +22,7 @@ import octobot_commons.enums as commons_enums import octobot.constants as constants import octobot.community.history_backend.historical_backend_client as historical_backend_client +import octobot.community.history_backend.util as history_backend_util class ClickhouseHistoricalBackendClient(historical_backend_client.HistoricalBackendClient): @@ -69,7 +70,7 @@ async def fetch_candles_history( [time_frame.value, exchange, symbol, first_open_time, last_open_time], ) formatted = self._format_ohlcvs(result.result_rows) - return _deduplicate(formatted, 0) + return history_backend_util.deduplicate(formatted, 0) async def fetch_candles_history_range( self, @@ -89,8 +90,8 @@ async def fetch_candles_history_range( [time_frame.value, exchange, symbol], ) return ( - _get_utc_timestamp_from_datetime(result.result_rows[0][0]), - _get_utc_timestamp_from_datetime(result.result_rows[0][1]) + history_backend_util.get_utc_timestamp_from_datetime(result.result_rows[0][0]), + history_backend_util.get_utc_timestamp_from_datetime(result.result_rows[0][1]) ) async def insert_candles_history(self, rows: list, column_names: list) -> None: @@ -111,7 +112,7 @@ def _format_ohlcvs(ohlcvs: typing.Iterable) -> list[list[float]]: # IND_PRICE_VOL = 5 return [ [ - int(_get_utc_timestamp_from_datetime(ohlcv[0])), + int(history_backend_util.get_utc_timestamp_from_datetime(ohlcv[0])), ohlcv[1], ohlcv[2], ohlcv[3], @@ -124,17 +125,3 @@ def _format_ohlcvs(ohlcvs: typing.Iterable) -> list[list[float]]: @staticmethod def get_formatted_time(timestamp: float) -> datetime: return datetime.fromtimestamp(timestamp, tz=timezone.utc) - -def _get_utc_timestamp_from_datetime(dt: datetime) -> float: - """ - Convert a datetime to a timestamp in UTC - WARNING: usable here as we know this DB stores time in UTC only - """ - return dt.replace(tzinfo=timezone.utc).timestamp() - - -def _deduplicate(elements, key) -> list: - # from https://stackoverflow.com/questions/480214/how-do-i-remove-duplicates-from-a-list-while-preserving-order - seen = set() - seen_add = seen.add - return [x for x in elements if not (x[key] in seen or seen_add(x[key]))] diff --git a/octobot/community/history_backend/history_backend_factory.py b/octobot/community/history_backend/history_backend_factory.py index 6effe4327..e65ca3e2d 100644 --- a/octobot/community/history_backend/history_backend_factory.py +++ b/octobot/community/history_backend/history_backend_factory.py @@ -16,6 +16,7 @@ import contextlib import octobot.community.history_backend.clickhouse_historical_backend_client as clickhouse_historical_backend_client +import octobot.community.history_backend.iceberg_historical_backend_client as iceberg_historical_backend_client import octobot.enums @@ -38,6 +39,8 @@ def _create_client( async with history_backend_client(backend_type) as client: await client.xxxx() """ + if backend_type is octobot.enums.CommunityHistoricalBackendType.Iceberg: + return iceberg_historical_backend_client.IcebergHistoricalBackendClient() if backend_type is octobot.enums.CommunityHistoricalBackendType.Clickhouse: return clickhouse_historical_backend_client.ClickhouseHistoricalBackendClient() raise NotImplementedError(f"Unsupported historical backend type: {backend_type}") diff --git a/octobot/community/history_backend/iceberg_historical_backend_client.py b/octobot/community/history_backend/iceberg_historical_backend_client.py new file mode 100644 index 000000000..88e61286c --- /dev/null +++ b/octobot/community/history_backend/iceberg_historical_backend_client.py @@ -0,0 +1,310 @@ +# This file is part of OctoBot (https://github.com/Drakkar-Software/OctoBot) +# Copyright (c) 2025 Drakkar-Software, All rights reserved. +# +# OctoBot is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either +# version 3.0 of the License, or (at your option) any later version. +# +# OctoBot is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public +# License along with OctoBot. If not, see . +import typing +import enum +import datetime +import os +import asyncio +import concurrent.futures + +import pyarrow +import pyiceberg.catalog +import pyiceberg.schema +import pyiceberg.types +import pyiceberg.exceptions +import pyiceberg.expressions +import pyiceberg.table +import pyiceberg.table.sorting + +import octobot_commons.logging as commons_logging +import octobot_commons.enums as commons_enums +import octobot.constants as constants +import octobot.community.history_backend.historical_backend_client as historical_backend_client +import octobot.community.history_backend.util as history_backend_util + + +class TableNames(enum.Enum): + OHLCV_HISTORY = constants.ICEBERG_OHLCV_HISTORY_TABLE + + +# pyiceberg itself uses a thread pool so each thread will create children threads, limit the number of "master" threads +_MAX_EXECUTOR_WORKERS = min(4, (os.cpu_count() or 1)) # use a max of 4 workers + + +class IcebergHistoricalBackendClient(historical_backend_client.HistoricalBackendClient): + + def __init__(self): + self.namespace: typing.Optional[str] = None + self.catalog: pyiceberg.catalog.Catalog = None # type: ignore + self._executor: typing.Optional[concurrent.futures.ThreadPoolExecutor] = None # only availble when client is open + + async def open(self): + try: + self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=_MAX_EXECUTOR_WORKERS) + self.catalog = self._load_catalog() + except Exception as err: + message = f"Error when connecting to Iceberg server, {err.__class__.__name__}: {err}" + self.get_logger().exception(err, True, message) + raise err.__class__(message) from err + + async def close(self): + if self._executor is not None: + self._executor.shutdown() + self._executor = None + + async def _run_in_executor(self, func, *args): + if self._executor is None: + raise ValueError(f"{self.__class__.__name__} is not open") + # run blocking IO code in a dedicated thread poool executor that is shutdown on close + # to avoid leaving threads alive when the client is closed + return await asyncio.get_running_loop().run_in_executor(self._executor, func, *args) + + async def fetch_candles_history( + self, + exchange: str, + symbol: str, + time_frame: commons_enums.TimeFrames, + first_open_time: float, + last_open_time: float + ) -> list[list[float]]: + return await self._run_in_executor( + self._sync_fetch_candles_history, + exchange, symbol, time_frame, first_open_time, last_open_time + ) + + def _sync_fetch_candles_history( + self, + exchange: str, + symbol: str, + time_frame: commons_enums.TimeFrames, + first_open_time: float, + last_open_time: float + ) -> list[list[float]]: + table = self.get_or_create_table(TableNames.OHLCV_HISTORY) + filter = pyiceberg.expressions.And( + pyiceberg.expressions.EqualTo("time_frame", time_frame.value), + pyiceberg.expressions.EqualTo("exchange_internal_name", exchange), + pyiceberg.expressions.EqualTo("symbol", symbol), + pyiceberg.expressions.GreaterThanOrEqual("timestamp", self._get_formatted_time(first_open_time)), + pyiceberg.expressions.LessThanOrEqual("timestamp", self._get_formatted_time(last_open_time)) + ) + result = table.scan( + row_filter=filter, + selected_fields=("timestamp", "open", "high", "low", "close", "volume"), + case_sensitive=True, + ) + + return self._format_ohlcvs(result.to_arrow()) + + async def fetch_candles_history_range( + self, + exchange: str, + symbol: str, + time_frame: commons_enums.TimeFrames + ) -> tuple[float, float]: + return await self._run_in_executor( + self._sync_fetch_candles_history_range, + exchange, symbol, time_frame + ) + + def _sync_fetch_candles_history_range( + self, + exchange: str, + symbol: str, + time_frame: commons_enums.TimeFrames + ) -> tuple[float, float]: + table = self.get_or_create_table(TableNames.OHLCV_HISTORY) + filter = pyiceberg.expressions.And( + pyiceberg.expressions.EqualTo("time_frame", time_frame.value), + pyiceberg.expressions.EqualTo("exchange_internal_name", exchange), + pyiceberg.expressions.EqualTo("symbol", symbol), + ) + result = table.scan( + row_filter=filter, + selected_fields=("timestamp", ), + case_sensitive=True, + ) + res = result.to_arrow() + # impossible to select the min and max timestamp from the query, + # select all and parse the minimum instead + batches = list(res.to_batches(max_chunksize=10)) + if batches: + min_ts = batches[0].to_pydict()['timestamp'][0] + max_ts = batches[-1].to_pydict()['timestamp'][-1] + return ( + history_backend_util.get_utc_timestamp_from_datetime(min_ts), + history_backend_util.get_utc_timestamp_from_datetime(max_ts) + ) + return 0, 0 + + async def insert_candles_history(self, rows: list, column_names: list) -> None: + await self._run_in_executor( + self._sync_insert_candles_history, + rows, column_names + ) + + def _sync_insert_candles_history(self, rows: list, column_names: list) -> None: + if not rows: + return + schema = self._pyarrow_get_ohlcv_schema() + table = self.get_or_create_table(TableNames.OHLCV_HISTORY) + timestamp_fields = [name for name in column_names if isinstance(schema.field(name).type, pyarrow.TimestampType)] + pa_arrays = [ + pyarrow.array([ + self._get_formatted_time(row[i]) if name in timestamp_fields else row[i] + for row in rows + ]) + for i, name in enumerate(column_names) + ] + pa_table = pyarrow.Table.from_arrays(pa_arrays, schema=schema) + # warning: try not to insert duplicate candles, duplicates will be deduplicated later on anyway + table.append(pa_table) + # note: alternative upsert syntax could prevent duplicates but is really slow and silentlycrashes the process + # when used with a few thousand rows + # table.upsert(pa_table, join_cols=["timestamp", "exchange_internal_name", "symbol", "time_frame"]) + self.get_logger().info( + f"Successfully inserted {len(rows)} rows into " + f"{TableNames.OHLCV_HISTORY.value} for {pa_table['exchange_internal_name'][0]}:{pa_table['symbol'][0]}:{pa_table['time_frame'][0]}" + ) + + @staticmethod + def _get_formatted_time(timestamp: float) -> str: + return datetime.datetime.fromtimestamp( + timestamp, tz=datetime.timezone.utc + ).isoformat(sep='T').replace("+00:00", "") + + @staticmethod + def _format_ohlcvs(ohlcvs_table: pyarrow.Table) -> list[list[float]]: + # uses PriceIndexes order + # IND_PRICE_TIME = 0 + # IND_PRICE_OPEN = 1 + # IND_PRICE_HIGH = 2 + # IND_PRICE_LOW = 3 + # IND_PRICE_CLOSE = 4 + # IND_PRICE_VOL = 5 + ohlcvs = [ + # convert table into list of candles + [history_backend_util.get_utc_timestamp_from_datetime(t), o, h, l, c, v] + for batch in ohlcvs_table.to_batches() + if (batch_dict := batch.to_pydict()) + for t, o, h, l, c, v in zip( + batch_dict['timestamp'], batch_dict['open'], batch_dict['high'], batch_dict['low'], batch_dict['close'], batch_dict['volume'] + ) + ] + # ensure no duplicates as they can happen due to no unicity constraint + ohlcvs = history_backend_util.deduplicate(ohlcvs, 0) + return ohlcvs + + def _load_catalog(self) -> pyiceberg.catalog.Catalog: + self.namespace = constants.ICEBERG_CATALOG_NAMESPACE + catalog = pyiceberg.catalog.load_catalog(constants.ICEBERG_CATALOG_NAME, **self._get_catalog_properties()) + if constants.CREATE_ICEBERG_DB_IF_MISSING: + self._ensure_namespace(catalog) + self.get_logger().info(f"PyIceberg catalog '{constants.ICEBERG_CATALOG_NAME}' initialized successfully") + return catalog + + def get_or_create_table(self, table_name: TableNames) -> pyiceberg.table.Table: + try: + return self.catalog.load_table(f"{self.namespace}.{table_name.value}") + except pyiceberg.exceptions.NoSuchTableError: + table_by_ns = {ns: self.catalog.list_tables(ns) for ns in self.catalog.list_namespaces()} + self.get_logger().info( + f"Table {table_name.value} does not exist in {self.namespace} namespace. Tables by namespace: {table_by_ns}" + ) + if constants.CREATE_ICEBERG_DB_IF_MISSING: + self.catalog.create_table( + identifier=f"{self.namespace}.{table_name.value}", + schema=self._get_schema_for_table(table_name), + sort_order=pyiceberg.table.sorting.SortOrder( + pyiceberg.table.sorting.SortField( + source_id=1, + direction=pyiceberg.table.sorting.SortDirection.ASC, + ) + ) + ) + self.get_logger().info(f"Table {table_name.value} created successfully") + return self.catalog.load_table(f"{self.namespace}.{table_name.value}") + raise + + def _ensure_namespace(self, catalog: pyiceberg.catalog.Catalog): + # Check if namespace exists, create if not + namespaces = [ns[0] for ns in catalog.list_namespaces()] + if self.namespace not in namespaces: + catalog.create_namespace(self.namespace) + self.get_logger().info(f"Namespace {self.namespace} created") + + @staticmethod + def _get_catalog_properties() -> dict: + catalog_properties = { + "type": "rest", + "uri": constants.ICEBERG_CATALOG_URI, + "warehouse": constants.ICEBERG_CATALOG_WAREHOUSE, + "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO", + "s3.force-virtual-addressing": False, + } + if constants.ICEBERG_CATALOG_TOKEN: + catalog_properties["token"] = constants.ICEBERG_CATALOG_TOKEN + if constants.ICEBERG_S3_ACCESS_KEY and constants.ICEBERG_S3_SECRET_KEY: + catalog_properties.update({ + "s3.access-key-id": constants.ICEBERG_S3_ACCESS_KEY, + "s3.secret-access-key": constants.ICEBERG_S3_SECRET_KEY, + "s3.region": constants.ICEBERG_S3_REGION, + }) + + if constants.ICEBERG_S3_ENDPOINT: + catalog_properties["s3.endpoint"] = constants.ICEBERG_S3_ENDPOINT + return catalog_properties + + @staticmethod + def _get_schema_for_table(table_name: TableNames) -> typing.Union[pyarrow.Schema, pyiceberg.schema.Schema]: + if table_name is TableNames.OHLCV_HISTORY: + return IcebergHistoricalBackendClient._get_ohlcv_schema() + raise ValueError(f"No schema found for table '{table_name}'. Available schemas: {list(TableNames)}") + + @staticmethod + def _get_ohlcv_schema() -> pyiceberg.schema.Schema: + """Schema for OHLCV data""" + return pyiceberg.schema.Schema( + pyiceberg.types.NestedField(1, "timestamp", pyiceberg.types.TimestampType(), required=True), + pyiceberg.types.NestedField(2, "exchange_internal_name", pyiceberg.types.StringType(), required=True), + pyiceberg.types.NestedField(3, "symbol", pyiceberg.types.StringType(), required=True), + pyiceberg.types.NestedField(4, "time_frame", pyiceberg.types.StringType(), required=True), + pyiceberg.types.NestedField(5, "open", pyiceberg.types.DoubleType(), required=True), + pyiceberg.types.NestedField(6, "high", pyiceberg.types.DoubleType(), required=True), + pyiceberg.types.NestedField(7, "low", pyiceberg.types.DoubleType(), required=True), + pyiceberg.types.NestedField(8, "close", pyiceberg.types.DoubleType(), required=True), + pyiceberg.types.NestedField(9, "volume", pyiceberg.types.DoubleType(), required=True), + pyiceberg.types.NestedField(10, "updated_at", pyiceberg.types.TimestampType(), required=False, initial_default=None), + ) + @staticmethod + def _pyarrow_get_ohlcv_schema() -> pyarrow.schema: + """Schema for OHLCV data""" + return pyarrow.schema([ + pyarrow.field("timestamp", pyarrow.timestamp("us"), False), # Adjust precision as needed (e.g., "ms" for milliseconds) + pyarrow.field("exchange_internal_name", pyarrow.string(), False), + pyarrow.field("symbol", pyarrow.string(), False), + pyarrow.field("time_frame", pyarrow.string(), False), + pyarrow.field("open", pyarrow.float64(), False), + pyarrow.field("high", pyarrow.float64(), False), + pyarrow.field("low", pyarrow.float64(), False), + pyarrow.field("close", pyarrow.float64(), False), + pyarrow.field("volume", pyarrow.float64(), False), + pyarrow.field("updated_at", pyarrow.timestamp("us"), True), + ]) + + @classmethod + def get_logger(cls): + return commons_logging.get_logger(cls.__name__) diff --git a/octobot/community/history_backend/util.py b/octobot/community/history_backend/util.py new file mode 100644 index 000000000..52801215f --- /dev/null +++ b/octobot/community/history_backend/util.py @@ -0,0 +1,16 @@ +import datetime + + +def get_utc_timestamp_from_datetime(dt: datetime.datetime) -> float: + """ + Convert a datetime to a timestamp in UTC + WARNING: usable here as we know this DB stores time in UTC only + """ + return dt.replace(tzinfo=datetime.timezone.utc).timestamp() + + +def deduplicate(elements, key) -> list: + # from https://stackoverflow.com/questions/480214/how-do-i-remove-duplicates-from-a-list-while-preserving-order + seen = set() + seen_add = seen.add + return [x for x in elements if not (x[key] in seen or seen_add(x[key]))] diff --git a/octobot/constants.py b/octobot/constants.py index 372bc1105..3a3f35c3c 100644 --- a/octobot/constants.py +++ b/octobot/constants.py @@ -94,6 +94,18 @@ CLICKHOUSE_USERNAME = os.getenv("CLICKHOUSE_USERNAME") CLICKHOUSE_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD") +ICEBERG_CATALOG_NAMESPACE = os.getenv("ICEBERG_CATALOG_NAMESPACE") +ICEBERG_CATALOG_NAME = os.getenv("ICEBERG_CATALOG_NAME") +ICEBERG_CATALOG_WAREHOUSE = os.getenv("ICEBERG_CATALOG_WAREHOUSE") +ICEBERG_OHLCV_HISTORY_TABLE = os.getenv("ICEBERG_OHLCV_HISTORY_TABLE") +ICEBERG_CATALOG_URI = os.getenv("ICEBERG_CATALOG_URI") +ICEBERG_CATALOG_TOKEN = os.getenv("ICEBERG_CATALOG_TOKEN") +ICEBERG_S3_ACCESS_KEY = os.getenv("ICEBERG_S3_ACCESS_KEY") +ICEBERG_S3_SECRET_KEY = os.getenv("ICEBERG_S3_SECRET_KEY") +ICEBERG_S3_REGION = os.getenv("ICEBERG_S3_REGION") +ICEBERG_S3_ENDPOINT = os.getenv("ICEBERG_S3_ENDPOINT") +CREATE_ICEBERG_DB_IF_MISSING = os_util.parse_boolean_environment_var("CREATE_ICEBERG_DB_IF_MISSING", "false") + OCTOBOT_MARKET_MAKING_URL = os.getenv("OCTOBOT_MARKET_MAKING_URL", "https://market-making.octobot.cloud") ERROR_TRACKER_DSN = os.getenv("ERROR_TRACKER_DSN") diff --git a/octobot/enums.py b/octobot/enums.py index 93e6b8d9a..eddac3877 100644 --- a/octobot/enums.py +++ b/octobot/enums.py @@ -24,7 +24,8 @@ class CommunityFeedType(enum.Enum): class CommunityHistoricalBackendType(enum.Enum): Clickhouse = "Clickhouse" - DEFAULT = Clickhouse + Iceberg = "Iceberg" + DEFAULT = Iceberg class CommunityEnvironments(enum.Enum): diff --git a/requirements.txt b/requirements.txt index 844066325..bfbb04ff1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -22,6 +22,8 @@ websockets==15.0.1 # used by supabase, a recent version is required, see https:/ gmqtt==0.7.0 pgpy==0.6.0 clickhouse-connect==0.8.18 +pyiceberg==0.10.0 +pyarrow==21.0.0 # Error tracking sentry-sdk==2.35.0 # always make sure sentry_aiohttp_transport.py keep working