Skip to content

Commit c8c304a

Browse files
committed
fix: allow writing to DeltaTable objects across Python threads
FREEZE! Looks like we were _really_ close to full multi-threaded support before, and just needed to sprinkle a little macro magic on the pyclass definition. See [pyo3 docs](https://pyo3.rs/v0.23.0/class/thread-safety) Fixes #3594 Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
1 parent 765e129 commit c8c304a

File tree

3 files changed

+11
-8
lines changed

3 files changed

+11
-8
lines changed

python/deltalake/writer/_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def try_get_table_and_table_uri(
3333
table_uri = str(table_or_uri)
3434
else:
3535
table = table_or_uri
36-
table_uri = table._table.table_uri()
36+
table_uri = table.table_uri
3737

3838
return (table, table_uri)
3939

python/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ enum PartitionFilterValue {
100100
Multiple(Vec<PyBackedStr>),
101101
}
102102

103-
#[pyclass(module = "deltalake._internal")]
103+
#[pyclass(module = "deltalake._internal", frozen)]
104104
struct RawDeltaTable {
105105
/// The internal reference to the table is guarded by a Mutex to allow for re-using the same
106106
/// [DeltaTable] instance across multiple Python threads
@@ -109,7 +109,7 @@ struct RawDeltaTable {
109109
_config: FsConfig,
110110
}
111111

112-
#[pyclass]
112+
#[pyclass(frozen)]
113113
struct RawDeltaTableMetaData {
114114
#[pyo3(get)]
115115
id: String,
@@ -1645,7 +1645,7 @@ impl RawDeltaTable {
16451645
#[allow(clippy::too_many_arguments)]
16461646
#[pyo3(signature = (data, batch_schema, mode, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, writer_properties=None, commit_properties=None, post_commithook_properties=None))]
16471647
fn write(
1648-
&mut self,
1648+
&self,
16491649
py: Python,
16501650
data: PyRecordBatchReader,
16511651
batch_schema: PyArrowSchema,

python/tests/test_threaded.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,16 @@ def test_multithreaded_write_using_table(tmp_path: pathlib.Path):
5959

6060
dt = DeltaTable(tmp_path)
6161

62-
with pytest.raises(RuntimeError, match="borrowed"):
63-
with ThreadPoolExecutor() as exe:
64-
list(exe.map(lambda _: write_deltalake(dt, table, mode="append"), range(5)))
62+
with ThreadPoolExecutor() as exe:
63+
list(
64+
exe.map(
65+
lambda i: write_deltalake(dt, pl.DataFrame({"a": [i]}), mode="append"),
66+
range(5),
67+
)
68+
)
6569

6670

6771
@pytest.mark.polars
68-
@pytest.mark.xfail(reason="Can fail because of already borrowed")
6972
def test_multithreaded_write_using_path(tmp_path: pathlib.Path):
7073
import polars as pl
7174

0 commit comments

Comments
 (0)