Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/directory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod file_watcher;
pub mod footer;
mod managed_directory;
mod ram_directory;
mod nrt_directory;
mod watch_event_router;

/// Errors specific to the directory module.
Expand Down Expand Up @@ -48,6 +49,7 @@ pub use memmap2::Advice;
pub use self::managed_directory::ManagedDirectory;
#[cfg(feature = "mmap")]
pub use self::mmap_directory::MmapDirectory;
pub use self::nrt_directory::NrtDirectory;

/// Write object for Directory.
///
Expand Down
156 changes: 156 additions & 0 deletions src/directory/nrt_directory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
use std::collections::HashSet;
use std::fmt;
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};

use crate::core::META_FILEPATH;
use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError};
use crate::directory::{Directory, DirectoryLock, FileHandle, FileSlice, Lock, TerminatingWrite,
WatchCallback, WatchHandle, WritePtr};
use crate::directory::RamDirectory;

/// A Directory that overlays a `RamDirectory` on top of a base `Directory`.
///
/// - Writes (open_write and atomic_write) go to the in-memory overlay.
/// - Reads first check the overlay, then fallback to the base directory.
/// - sync_directory() persists overlay files that do not yet exist in the base directory.
#[derive(Clone)]
pub struct NrtDirectory {
base: Box<dyn Directory>,
overlay: RamDirectory,
/// Tracks files written into the overlay to decide what to persist on sync.
overlay_paths: Arc<RwLock<HashSet<PathBuf>>>,
}

impl NrtDirectory {
/// Wraps a base directory with an NRT overlay.
pub fn wrap(base: Box<dyn Directory>) -> NrtDirectory {
NrtDirectory {
base,
overlay: RamDirectory::default(),
overlay_paths: Arc::new(RwLock::new(HashSet::new())),
}
}

/// Persist overlay files into the base directory if missing there.
fn persist_overlay_into_base(&self) -> crate::Result<()> {
let snapshot_paths: Vec<PathBuf> = {
let guard = self.overlay_paths.read().unwrap();
guard.iter().cloned().collect()
};
// First copy all non-meta files. `meta.json` must be written last atomically.
for path in snapshot_paths {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, but if we update the meta json file before a segment file, and this function has an io Error in the middle, the index will be corrupted right now?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve moved the meta publication to the end of sync_directory() and removed the early base write in atomic_write. We now snapshot overlay paths, persist all data files first, and atomically write meta.json last to avoid partial states. Build is green; please have another look.

if path == *META_FILEPATH {
continue;
}
// Skip if base already has the file
if self.base.exists(&path).unwrap_or(false) {
continue;
}
// Read bytes from overlay
let file_slice: FileSlice = match self.overlay.open_read(&path) {
Ok(slice) => slice,
Err(OpenReadError::FileDoesNotExist(_)) => continue, // was removed meanwhile
Err(e) => return Err(e.into()),
};
let bytes = file_slice
.read_bytes()
.map_err(|io_err| OpenReadError::IoError {
io_error: Arc::new(io_err),
filepath: path.clone(),
})?;
// Write to base
let mut dest_wrt: WritePtr = self.base.open_write(&path)?;
dest_wrt.write_all(bytes.as_slice())?;
dest_wrt.terminate()?;
}
// Then, if present, write `meta.json` atomically to the base directory.
if self.overlay.exists(&*META_FILEPATH).unwrap_or(false) {
// Read meta from overlay atomically to a buffer and then write to base atomically.
if let Ok(meta_bytes) = self.overlay.atomic_read(&*META_FILEPATH) {
self.base.atomic_write(&*META_FILEPATH, &meta_bytes)?;
}
}
Ok(())
}
}

impl fmt::Debug for NrtDirectory {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "NrtDirectory")
}
}

impl Directory for NrtDirectory {
fn get_file_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>, OpenReadError> {
if self.overlay.exists(path).unwrap_or(false) {
return self.overlay.get_file_handle(path);
}
self.base.get_file_handle(path)
}

fn open_read(&self, path: &Path) -> Result<FileSlice, OpenReadError> {
if self.overlay.exists(path).unwrap_or(false) {
return self.overlay.open_read(path);
}
self.base.open_read(path)
}

fn delete(&self, path: &Path) -> Result<(), DeleteError> {
let _ = self.overlay.delete(path); // best-effort
self.base.delete(path)
}

fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
if self.overlay.exists(path).unwrap_or(false) {
return Ok(true);
}
self.base.exists(path)
}

fn open_write(&self, path: &Path) -> Result<WritePtr, OpenWriteError> {
{
let mut guard = self.overlay_paths.write().unwrap();
guard.insert(path.to_path_buf());
}
self.overlay.open_write(path)
}

fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
if self.overlay.exists(path).unwrap_or(false) {
return self.overlay.atomic_read(path);
}
self.base.atomic_read(path)
}

fn atomic_write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
{
let mut guard = self.overlay_paths.write().unwrap();
guard.insert(path.to_path_buf());
}
// Always write to the overlay first. We do not write meta.json to base here,
// to ensure meta is published only after all files are persisted in sync_directory().
self.overlay.atomic_write(path, data)?;
Ok(())
}

fn acquire_lock(&self, lock: &Lock) -> Result<DirectoryLock, crate::directory::error::LockError> {
self.base.acquire_lock(lock)
}

fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
// Watch meta.json changes on the base directory
self.base.watch(watch_callback)
}

fn sync_directory(&self) -> io::Result<()> {
// Best effort: persist overlay, then sync base directory
if let Err(err) = self.persist_overlay_into_base() {
return Err(io::Error::new(io::ErrorKind::Other, format!("{err}")));
}
self.base.sync_directory()
}
}


42 changes: 42 additions & 0 deletions src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,41 @@ impl<D: Document> IndexWriter<D> {
&self.index
}

/// Creates a near-real-time `Searcher` reflecting the latest `soft_commit()` state.
///
/// This builds a `Searcher` directly from the IndexWriter's in-memory committed segments,
/// without relying on `meta.json` having been persisted.
pub fn nrt_searcher(&self) -> crate::Result<crate::Searcher> {
use crate::core::searcher::{SearcherGeneration, SearcherInner};
use crate::directory::{Directory as _, META_LOCK};
use crate::store::DOCSTORE_CACHE_CAPACITY;
use crate::Inventory;

// Prevent GC from removing files while we open readers
let _meta_lock = self.index.directory().acquire_lock(&META_LOCK)?;

// Load the current in-memory committed segment metas
let metas = self.segment_updater().committed_segment_metas();
let mut segment_readers = Vec::with_capacity(metas.len());
for meta in metas {
let segment = self.index.segment(meta);
let seg_reader = SegmentReader::open(&segment)?;
segment_readers.push(seg_reader);
}

// Track generation using a fresh Inventory
let inventory: Inventory<SearcherGeneration> = Inventory::default();
let generation = inventory.track(SearcherGeneration::from_segment_readers(&segment_readers, 0));
let searcher_inner = SearcherInner::new(
self.index.schema(),
self.index.clone(),
segment_readers,
generation,
DOCSTORE_CACHE_CAPACITY,
)?;
Ok(Arc::new(searcher_inner).into())
}

/// If there are some merging threads, blocks until they all finish their work and
/// then drop the `IndexWriter`.
pub fn wait_merging_threads(mut self) -> crate::Result<()> {
Expand Down Expand Up @@ -665,6 +700,13 @@ impl<D: Document> IndexWriter<D> {
self.prepare_commit()?.commit()
}

/// Soft-commit the current changes: publishes them to the in-memory committed set so
/// they become visible to a near real-time reader, without persisting `meta.json`.
pub fn soft_commit(&mut self) -> crate::Result<Opstamp> {
let prepared = self.prepare_commit()?;
prepared.soft_commit()
}

pub(crate) fn segment_updater(&self) -> &SegmentUpdater {
&self.segment_updater
}
Expand Down
9 changes: 9 additions & 0 deletions src/indexer/prepared_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ impl<'a, D: Document> PreparedCommit<'a, D> {
self.commit_future().wait()
}

/// Soft-commit: publish segments in-memory for near real-time search without persisting
/// `meta.json`. This does not call gc or directory sync.
pub fn soft_commit(self) -> crate::Result<Opstamp> {
self.index_writer
.segment_updater()
.schedule_soft_commit(self.opstamp, self.payload)
.wait()
}

/// Proceeds to commit.
///
/// Unfortunately, contrary to what `PrepareCommit` may suggests,
Expand Down
23 changes: 23 additions & 0 deletions src/indexer/segment_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,29 @@ impl SegmentUpdater {
})
}

/// Perform a soft commit: publish current uncommitted segments as committed in the
/// in-memory state, but do not write `meta.json`.
pub(crate) fn schedule_soft_commit(
&self,
opstamp: Opstamp,
_payload: Option<String>,
) -> FutureResult<Opstamp> {
let segment_updater: SegmentUpdater = self.clone();
self.schedule_task(move || {
let segment_entries = segment_updater.purge_deletes(opstamp)?;
// Move them to committed in memory only
segment_updater.segment_manager.commit(segment_entries);
// Do not persist meta.json. Keep active_index_meta opstamp as-is.
segment_updater.consider_merge_options();
Ok(opstamp)
})
}

/// Returns the current list of committed segment metas from the in-memory state.
pub(crate) fn committed_segment_metas(&self) -> Vec<SegmentMeta> {
self.segment_manager.committed_segment_metas()
}

fn store_meta(&self, index_meta: &IndexMeta) {
*self.active_index_meta.write().unwrap() = Arc::new(index_meta.clone());
}
Expand Down
Loading