From 7e251b43d41b4f44ff597a02ea1d0f759c42989e Mon Sep 17 00:00:00 2001 From: Tamo Date: Mon, 20 May 2024 15:09:45 +0200 Subject: [PATCH] Revert "Stream documents" --- Cargo.lock | 38 +++--- index-scheduler/src/batch.rs | 10 +- index-scheduler/src/lib.rs | 14 +-- meilisearch-auth/src/store.rs | 2 +- meilisearch-types/src/error.rs | 1 + meilisearch/Cargo.toml | 1 - meilisearch/src/routes/indexes/documents.rs | 116 +++++------------- meilisearch/src/routes/mod.rs | 28 ++--- meilitool/src/main.rs | 8 +- milli/Cargo.toml | 4 +- milli/fuzz/.gitignore | 3 - milli/src/error.rs | 3 + milli/src/index.rs | 7 +- milli/src/update/facet/mod.rs | 2 +- milli/src/update/index_documents/mod.rs | 2 +- .../src/update/index_documents/typed_chunk.rs | 3 +- 16 files changed, 91 insertions(+), 151 deletions(-) delete mode 100644 milli/fuzz/.gitignore diff --git a/Cargo.lock b/Cargo.lock index d9e96b029..937fce64a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -378,9 +378,9 @@ dependencies = [ [[package]] name = "arroy" -version = "0.3.1" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73897699bf04bac935c0b120990d2a511e91e563e0f9769f9c8bb983d98dfbc9" +checksum = "efddeb1e7c32a551cc07ef4c3e181e3cd5478fdaf4f0bd799983171c1f6efe57" dependencies = [ "bytemuck", "byteorder", @@ -1536,9 +1536,9 @@ checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" [[package]] name = "doxygen-rs" -version = "0.4.2" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "415b6ec780d34dcf624666747194393603d0373b7141eef01d12ee58881507d9" +checksum = "bff670ea0c9bbb8414e7efa6e23ebde2b8f520a7eef78273a3918cf1903e7505" dependencies = [ "phf", ] @@ -2262,11 +2262,12 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" [[package]] name = "heed" -version = "0.20.1" +version = "0.20.0-alpha.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f7acb9683d7c7068aa46d47557bfa4e35a277964b350d9504a87b03610163fd" +checksum = "9648a50991c86df7d00c56c268c27754fcf4c80be2ba57fc4a00dc928c6fe934" dependencies = [ "bitflags 2.5.0", + "bytemuck", "byteorder", "heed-traits", "heed-types", @@ -2280,15 +2281,15 @@ dependencies = [ [[package]] name = "heed-traits" -version = "0.20.0" +version = "0.20.0-alpha.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb3130048d404c57ce5a1ac61a903696e8fcde7e8c2991e9fcfc1f27c3ef74ff" +checksum = "5ab0b7d9cde969ad36dde692e487dc89d97f7168bf6a7bd3b894ad4bf7278298" [[package]] name = "heed-types" -version = "0.20.0" +version = "0.20.0-alpha.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cb0d6ba3700c9a57e83c013693e3eddb68a6d9b6781cacafc62a0d992e8ddb3" +checksum = "f0cb3567a7363f28b597bf6e9897b9466397951dd0e52df2c8196dd8a71af44a" dependencies = [ "bincode", "byteorder", @@ -3188,13 +3189,14 @@ checksum = "f9d642685b028806386b2b6e75685faadd3eb65a85fff7df711ce18446a422da" [[package]] name = "lmdb-master-sys" -version = "0.2.0" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc9048db3a58c0732d7236abc4909058f9d2708cfb6d7d047eb895fddec6419a" +checksum = "629c123f5321b48fa4f8f4d3b868165b748d9ba79c7103fb58e3a94f736bcedd" dependencies = [ "cc", "doxygen-rs", "libc", + "pkg-config", ] [[package]] @@ -3346,7 +3348,6 @@ dependencies = [ "rayon", "regex", "reqwest", - "roaring", "rustls 0.21.12", "rustls-pemfile", "segment", @@ -4415,6 +4416,12 @@ dependencies = [ "winreg", ] +[[package]] +name = "retain_mut" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c31b5c4033f8fdde8700e4657be2c497e7288f01515be52168c631e2e4d4086" + [[package]] name = "ring" version = "0.17.8" @@ -4432,12 +4439,13 @@ dependencies = [ [[package]] name = "roaring" -version = "0.10.3" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1c77081a55300e016cb86f2864415b7518741879db925b8d488a0ee0d2da6bf" +checksum = "6106b5cf8587f5834158895e9715a3c6c9716c8aefab57f1f7680917191c7873" dependencies = [ "bytemuck", "byteorder", + "retain_mut", "serde", ] diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 582497c15..bc9823a01 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -785,12 +785,10 @@ impl IndexScheduler { let dst = temp_snapshot_dir.path().join("auth"); fs::create_dir_all(&dst)?; // TODO We can't use the open_auth_store_env function here but we should - let auth = unsafe { - milli::heed::EnvOpenOptions::new() - .map_size(1024 * 1024 * 1024) // 1 GiB - .max_dbs(2) - .open(&self.auth_path) - }?; + let auth = milli::heed::EnvOpenOptions::new() + .map_size(1024 * 1024 * 1024) // 1 GiB + .max_dbs(2) + .open(&self.auth_path)?; auth.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?; // 5. Copy and tarball the flat snapshot diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index dd2b296f6..5704f5354 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -453,12 +453,10 @@ impl IndexScheduler { ) }; - let env = unsafe { - heed::EnvOpenOptions::new() - .max_dbs(11) - .map_size(budget.task_db_size) - .open(options.tasks_path) - }?; + let env = heed::EnvOpenOptions::new() + .max_dbs(11) + .map_size(budget.task_db_size) + .open(options.tasks_path)?; let features = features::FeatureData::new(&env, options.instance_features)?; @@ -587,9 +585,9 @@ impl IndexScheduler { } fn is_good_heed(tasks_path: &Path, map_size: usize) -> bool { - if let Ok(env) = unsafe { + if let Ok(env) = heed::EnvOpenOptions::new().map_size(clamp_to_page_size(map_size)).open(tasks_path) - } { + { env.prepare_for_closing().wait(); true } else { diff --git a/meilisearch-auth/src/store.rs b/meilisearch-auth/src/store.rs index ef992e836..1eebd3fe9 100644 --- a/meilisearch-auth/src/store.rs +++ b/meilisearch-auth/src/store.rs @@ -49,7 +49,7 @@ pub fn open_auth_store_env(path: &Path) -> milli::heed::Result let mut options = EnvOpenOptions::new(); options.map_size(AUTH_STORE_SIZE); // 1GB options.max_dbs(2); - unsafe { options.open(path) } + options.open(path) } impl HeedAuthStore { diff --git a/meilisearch-types/src/error.rs b/meilisearch-types/src/error.rs index 158dfae92..eea012331 100644 --- a/meilisearch-types/src/error.rs +++ b/meilisearch-types/src/error.rs @@ -423,6 +423,7 @@ impl ErrorCode for HeedError { HeedError::Mdb(_) | HeedError::Encoding(_) | HeedError::Decoding(_) + | HeedError::InvalidDatabaseTyping | HeedError::DatabaseClosing | HeedError::BadOpenOptions { .. } => Code::Internal, } diff --git a/meilisearch/Cargo.toml b/meilisearch/Cargo.toml index 612c6731b..ed62c5f48 100644 --- a/meilisearch/Cargo.toml +++ b/meilisearch/Cargo.toml @@ -108,7 +108,6 @@ tracing-subscriber = { version = "0.3.18", features = ["json"] } tracing-trace = { version = "0.1.0", path = "../tracing-trace" } tracing-actix-web = "0.7.9" build-info = { version = "1.7.0", path = "../build-info" } -roaring = "0.10.3" [dev-dependencies] actix-rt = "2.9.0" diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index 7c9b4b761..43fab1dae 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -1,14 +1,12 @@ -use std::io::{ErrorKind, Write}; +use std::io::ErrorKind; use actix_web::http::header::CONTENT_TYPE; use actix_web::web::Data; use actix_web::{web, HttpMessage, HttpRequest, HttpResponse}; use bstr::ByteSlice as _; -use bytes::Bytes; use deserr::actix_web::{AwebJson, AwebQueryParameter}; use deserr::Deserr; use futures::StreamExt; -use futures_util::Stream; use index_scheduler::{IndexScheduler, TaskId}; use meilisearch_types::deserr::query_params::Param; use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError}; @@ -24,9 +22,7 @@ use meilisearch_types::tasks::KindWithContent; use meilisearch_types::{milli, Document, Index}; use mime::Mime; use once_cell::sync::Lazy; -use roaring::RoaringBitmap; -use serde::ser::SerializeSeq; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; use serde_json::Value; use tempfile::tempfile; use tokio::fs::File; @@ -234,34 +230,6 @@ pub async fn get_documents( documents_by_query(&index_scheduler, index_uid, query) } -pub struct Writer2Streamer { - sender: tokio::sync::mpsc::Sender>, -} - -impl Write for Writer2Streamer { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.sender.blocking_send(Ok(buf.to_vec().into())).map_err(std::io::Error::other)?; - Ok(buf.len()) - } - - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } -} - -pub fn stream( - data: impl Serialize + Send + 'static, -) -> impl Stream> { - let (sender, receiver) = tokio::sync::mpsc::channel::>(1); - - tokio::task::spawn_blocking(move || { - serde_json::to_writer(std::io::BufWriter::new(Writer2Streamer { sender }), &data) - }); - futures_util::stream::unfold(receiver, |mut receiver| async { - receiver.recv().await.map(|value| (value, receiver)) - }) -} - fn documents_by_query( index_scheduler: &IndexScheduler, index_uid: web::Path, @@ -271,13 +239,12 @@ fn documents_by_query( let BrowseQuery { offset, limit, fields, filter } = query; let index = index_scheduler.index(&index_uid)?; - let documents = retrieve_documents(index, offset, limit, filter, fields)?; + let (total, documents) = retrieve_documents(&index, offset, limit, filter, fields)?; - let ret = PaginationView::new(offset, limit, documents.total_documents as usize, documents); + let ret = PaginationView::new(offset, limit, total as usize, documents); debug!(returns = ?ret, "Get documents"); - - Ok(HttpResponse::Ok().streaming(stream(ret))) + Ok(HttpResponse::Ok().json(ret)) } #[derive(Deserialize, Debug, Deserr)] @@ -623,47 +590,14 @@ fn some_documents<'a, 't: 'a>( })) } -pub struct DocumentsStreamer { - attributes_to_retrieve: Option>, - documents: RoaringBitmap, - rtxn: RoTxn<'static>, - index: Index, - pub total_documents: u64, -} - -impl Serialize for DocumentsStreamer { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - let mut seq = serializer.serialize_seq(Some(self.documents.len() as usize)).unwrap(); - - let documents = some_documents(&self.index, &self.rtxn, self.documents.iter()).unwrap(); - for document in documents { - let document = document.unwrap(); - let document = match self.attributes_to_retrieve { - Some(ref attributes_to_retrieve) => permissive_json_pointer::select_values( - &document, - attributes_to_retrieve.iter().map(|s| s.as_ref()), - ), - None => document, - }; - - seq.serialize_element(&document)?; - } - seq.end() - } -} - -fn retrieve_documents( - index: Index, +fn retrieve_documents>( + index: &Index, offset: usize, limit: usize, filter: Option, - attributes_to_retrieve: Option>, -) -> Result { - let rtxn = index.static_read_txn()?; - + attributes_to_retrieve: Option>, +) -> Result<(u64, Vec), ResponseError> { + let rtxn = index.read_txn()?; let filter = &filter; let filter = if let Some(filter) = filter { parse_filter(filter) @@ -673,7 +607,7 @@ fn retrieve_documents( }; let candidates = if let Some(filter) = filter { - filter.evaluate(&rtxn, &index).map_err(|err| match err { + filter.evaluate(&rtxn, index).map_err(|err| match err { milli::Error::UserError(milli::UserError::InvalidFilter(_)) => { ResponseError::from_msg(err.to_string(), Code::InvalidDocumentFilter) } @@ -683,13 +617,27 @@ fn retrieve_documents( index.documents_ids(&rtxn)? }; - Ok(DocumentsStreamer { - total_documents: candidates.len(), - attributes_to_retrieve, - documents: candidates.into_iter().skip(offset).take(limit).collect(), - rtxn, - index, - }) + let (it, number_of_documents) = { + let number_of_documents = candidates.len(); + ( + some_documents(index, &rtxn, candidates.into_iter().skip(offset).take(limit))?, + number_of_documents, + ) + }; + + let documents: Result, ResponseError> = it + .map(|document| { + Ok(match &attributes_to_retrieve { + Some(attributes_to_retrieve) => permissive_json_pointer::select_values( + &document?, + attributes_to_retrieve.iter().map(|s| s.as_ref()), + ), + None => document?, + }) + }) + .collect(); + + Ok((number_of_documents, documents?)) } fn retrieve_document>( diff --git a/meilisearch/src/routes/mod.rs b/meilisearch/src/routes/mod.rs index a7e84d19c..c25aeee70 100644 --- a/meilisearch/src/routes/mod.rs +++ b/meilisearch/src/routes/mod.rs @@ -1,5 +1,4 @@ use std::collections::BTreeMap; -use std::fmt; use actix_web::web::Data; use actix_web::{web, HttpRequest, HttpResponse}; @@ -125,31 +124,20 @@ pub struct Pagination { pub limit: usize, } -#[derive(Clone, Serialize)] -pub struct PaginationView { - pub results: T, +#[derive(Debug, Clone, Serialize)] +pub struct PaginationView { + pub results: Vec, pub offset: usize, pub limit: usize, pub total: usize, } -impl fmt::Debug for PaginationView { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("PaginationView") - .field("offset", &self.offset) - .field("limit", &self.limit) - .field("total", &self.total) - .field("results", &"[...]") - .finish() - } -} - impl Pagination { /// Given the full data to paginate, returns the selected section. pub fn auto_paginate_sized( self, content: impl IntoIterator + ExactSizeIterator, - ) -> PaginationView> + ) -> PaginationView where T: Serialize, { @@ -163,7 +151,7 @@ impl Pagination { self, total: usize, content: impl IntoIterator, - ) -> PaginationView> + ) -> PaginationView where T: Serialize, { @@ -173,7 +161,7 @@ impl Pagination { /// Given the data already paginated + the total number of elements, it stores /// everything in a [PaginationResult]. - pub fn format_with(self, total: usize, results: Vec) -> PaginationView> + pub fn format_with(self, total: usize, results: Vec) -> PaginationView where T: Serialize, { @@ -181,8 +169,8 @@ impl Pagination { } } -impl PaginationView { - pub fn new(offset: usize, limit: usize, total: usize, results: T) -> Self { +impl PaginationView { + pub fn new(offset: usize, limit: usize, total: usize, results: Vec) -> Self { Self { offset, limit, results, total } } } diff --git a/meilitool/src/main.rs b/meilitool/src/main.rs index 06c4890a5..bfcbfdd6d 100644 --- a/meilitool/src/main.rs +++ b/meilitool/src/main.rs @@ -80,7 +80,9 @@ fn main() -> anyhow::Result<()> { /// Clears the task queue located at `db_path`. fn clear_task_queue(db_path: PathBuf) -> anyhow::Result<()> { let path = db_path.join("tasks"); - let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&path) } + let env = EnvOpenOptions::new() + .max_dbs(100) + .open(&path) .with_context(|| format!("While trying to open {:?}", path.display()))?; eprintln!("Deleting tasks from the database..."); @@ -191,7 +193,9 @@ fn export_a_dump( FileStore::new(db_path.join("update_files")).context("While opening the FileStore")?; let index_scheduler_path = db_path.join("tasks"); - let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) } + let env = EnvOpenOptions::new() + .max_dbs(100) + .open(&index_scheduler_path) .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?; eprintln!("Dumping the keys..."); diff --git a/milli/Cargo.toml b/milli/Cargo.toml index c5dddd0fd..7d903178b 100644 --- a/milli/Cargo.toml +++ b/milli/Cargo.toml @@ -30,7 +30,7 @@ grenad = { version = "0.4.6", default-features = false, features = [ "rayon", "tempfile", ] } -heed = { version = "0.20.1", default-features = false, features = [ +heed = { version = "0.20.0-alpha.9", default-features = false, features = [ "serde-json", "serde-bincode", "read-txn-no-tls", @@ -82,7 +82,7 @@ hf-hub = { git = "https://github.com/dureuill/hf-hub.git", branch = "rust_tls", ] } tiktoken-rs = "0.5.8" liquid = "0.26.4" -arroy = "0.3.1" +arroy = "0.2.0" rand = "0.8.5" tracing = "0.1.40" ureq = { version = "2.9.7", features = ["json"] } diff --git a/milli/fuzz/.gitignore b/milli/fuzz/.gitignore deleted file mode 100644 index a0925114d..000000000 --- a/milli/fuzz/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -target -corpus -artifacts diff --git a/milli/src/error.rs b/milli/src/error.rs index 6db0dcac1..009781fcf 100644 --- a/milli/src/error.rs +++ b/milli/src/error.rs @@ -48,6 +48,8 @@ pub enum InternalError { GrenadInvalidFormatVersion, #[error("Invalid merge while processing {process}")] IndexingMergingKeys { process: &'static str }, + #[error("{}", HeedError::InvalidDatabaseTyping)] + InvalidDatabaseTyping, #[error(transparent)] RayonThreadPool(#[from] ThreadPoolBuildError), #[error(transparent)] @@ -427,6 +429,7 @@ impl From for Error { // TODO use the encoding HeedError::Encoding(_) => InternalError(Serialization(Encoding { db_name: None })), HeedError::Decoding(_) => InternalError(Serialization(Decoding { db_name: None })), + HeedError::InvalidDatabaseTyping => InternalError(InvalidDatabaseTyping), HeedError::DatabaseClosing => InternalError(DatabaseClosing), HeedError::BadOpenOptions { .. } => UserError(InvalidLmdbOpenOptions), } diff --git a/milli/src/index.rs b/milli/src/index.rs index 739a7f202..42b9cb111 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -184,7 +184,7 @@ impl Index { options.max_dbs(25); - let env = unsafe { options.open(path) }?; + let env = options.open(path)?; let mut wtxn = env.write_txn()?; let main = env.database_options().name(MAIN).create(&mut wtxn)?; let word_docids = env.create_database(&mut wtxn, Some(WORD_DOCIDS))?; @@ -294,11 +294,6 @@ impl Index { self.env.read_txn() } - /// Create a static read transaction to be able to read the index without keeping a reference to it. - pub fn static_read_txn(&self) -> heed::Result> { - self.env.clone().static_read_txn() - } - /// Returns the canonicalized path where the heed `Env` of this `Index` lives. pub fn path(&self) -> &Path { self.env.path() diff --git a/milli/src/update/facet/mod.rs b/milli/src/update/facet/mod.rs index 42994551f..0af64c4c5 100644 --- a/milli/src/update/facet/mod.rs +++ b/milli/src/update/facet/mod.rs @@ -379,7 +379,7 @@ pub(crate) mod test_helpers { let mut options = heed::EnvOpenOptions::new(); let options = options.map_size(4096 * 4 * 1000 * 100); let tempdir = tempfile::TempDir::new().unwrap(); - let env = unsafe { options.open(tempdir.path()) }.unwrap(); + let env = options.open(tempdir.path()).unwrap(); let mut wtxn = env.write_txn().unwrap(); let content = env.create_database(&mut wtxn, None).unwrap(); wtxn.commit().unwrap(); diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 4d2fac7cb..936ce1efc 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -556,7 +556,7 @@ where let writer_index = (embedder_index as u16) << 8; for k in 0..=u8::MAX { let writer = - arroy::Writer::new(vector_arroy, writer_index | (k as u16), dimension); + arroy::Writer::new(vector_arroy, writer_index | (k as u16), dimension)?; if writer.is_empty(wtxn)? { break; } diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index e0de2d5a1..6aad290e5 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -661,7 +661,7 @@ pub(crate) fn write_typed_chunk_into_index( )?; let writer_index = (embedder_index as u16) << 8; // FIXME: allow customizing distance - let writers: Vec<_> = (0..=u8::MAX) + let writers: std::result::Result, _> = (0..=u8::MAX) .map(|k| { arroy::Writer::new( index.vector_arroy, @@ -670,6 +670,7 @@ pub(crate) fn write_typed_chunk_into_index( ) }) .collect(); + let writers = writers?; // remove vectors for docids we want them removed let merger = remove_vectors_builder.build();