Skip to content

Commit 983aedd

Browse files
committed
feat: Support register/deregister of tables in ListingSchemaProvider incl. func to load a table at a later point
Signed-off-by: Alexander Falk <alexfalk7@gmail.com>
1 parent 64f6369 commit 983aedd

File tree

1 file changed

+44
-18
lines changed
  • crates/core/src/data_catalog/storage

1 file changed

+44
-18
lines changed

crates/core/src/data_catalog/storage/mod.rs

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ use futures::TryStreamExt;
1313
use object_store::ObjectStore;
1414

1515
use crate::errors::DeltaResult;
16-
use crate::open_table_with_storage_options;
17-
use crate::storage::*;
1816
use crate::table::builder::ensure_table_uri;
17+
use crate::DeltaTableBuilder;
18+
use crate::{storage::*, DeltaTable};
1919

2020
const DELTA_LOG_FOLDER: &str = "_delta_log";
2121

@@ -36,7 +36,7 @@ pub struct ListingSchemaProvider {
3636
/// Underlying object store
3737
store: Arc<dyn ObjectStore>,
3838
/// A map of table names to a fully quilfied storage location
39-
tables: DashMap<String, String>,
39+
tables: DashMap<String, Arc<dyn TableProvider>>,
4040
/// Options used to create underlying object stores
4141
storage_options: StorageOptions,
4242
}
@@ -73,17 +73,42 @@ impl ListingSchemaProvider {
7373
parent = p;
7474
}
7575
}
76+
7677
for table in tables.into_iter() {
7778
let table_name = normalize_table_name(table)?;
7879
let table_path = table
7980
.to_str()
8081
.ok_or_else(|| DataFusionError::Internal("Cannot parse file name!".to_string()))?
8182
.to_string();
8283
if !self.table_exist(&table_name) {
83-
let table_url = format!("{}/{table_path}", self.authority);
84-
self.tables.insert(table_name.to_string(), table_url);
84+
let table_url = format!("{}/{}", self.authority, table_path);
85+
let Ok(delta_table) = DeltaTableBuilder::from_uri(table_url)
86+
.with_storage_options(self.storage_options.0.clone())
87+
.build()
88+
else {
89+
continue;
90+
};
91+
let _ = self.register_table(table_name, Arc::new(delta_table));
92+
}
93+
}
94+
Ok(())
95+
}
96+
97+
/// Tables are not initialized but have a reference setup. To initialize the delta
98+
/// table, the `load()` function must be called on the delta table. This function helps with
99+
/// that and ensures the DashMap is updated
100+
pub async fn load_table(&self, table_name: &str) -> datafusion::common::Result<()> {
101+
if let Some(mut table) = self.tables.get_mut(&table_name.to_string()) {
102+
if let Some(delta_table) = table.value().as_any().downcast_ref::<DeltaTable>() {
103+
// If table has not yet been loaded, we remove it from the tables map and add it again
104+
if delta_table.state.is_none() {
105+
let mut delta_table = delta_table.clone();
106+
delta_table.load().await?;
107+
*table = Arc::from(delta_table);
108+
}
85109
}
86110
}
111+
87112
Ok(())
88113
}
89114
}
@@ -112,31 +137,31 @@ impl SchemaProvider for ListingSchemaProvider {
112137
}
113138

114139
async fn table(&self, name: &str) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
115-
let Some(location) = self.tables.get(name).map(|t| t.clone()) else {
140+
let Some(provider) = self.tables.get(name).map(|t| t.clone()) else {
116141
return Ok(None);
117142
};
118-
let provider =
119-
open_table_with_storage_options(location, self.storage_options.0.clone()).await?;
120-
Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
143+
Ok(Some(provider))
121144
}
122145

123146
fn register_table(
124147
&self,
125-
_name: String,
126-
_table: Arc<dyn TableProvider>,
148+
name: String,
149+
table: Arc<dyn TableProvider>,
127150
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
128-
Err(DataFusionError::Execution(
129-
"schema provider does not support registering tables".to_owned(),
130-
))
151+
if !self.table_exist(name.as_str()) {
152+
self.tables.insert(name, table.clone());
153+
}
154+
Ok(Some(table))
131155
}
132156

133157
fn deregister_table(
134158
&self,
135-
_name: &str,
159+
name: &str,
136160
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
137-
Err(DataFusionError::Execution(
138-
"schema provider does not support deregistering tables".to_owned(),
139-
))
161+
if let Some(table) = self.tables.remove(name) {
162+
return Ok(Some(table.1));
163+
}
164+
Ok(None)
140165
}
141166

142167
fn table_exist(&self, name: &str) -> bool {
@@ -177,6 +202,7 @@ mod tests {
177202
async fn test_query_table() {
178203
let schema = Arc::new(ListingSchemaProvider::try_new("../test/tests/data/", None).unwrap());
179204
schema.refresh().await.unwrap();
205+
schema.load_table("simple_table").await.unwrap();
180206

181207
let ctx = SessionContext::new();
182208
let catalog = Arc::new(MemoryCatalogProvider::default());

0 commit comments

Comments
 (0)