From 8770e07397eeae07624e6a60c23072b6df04e740 Mon Sep 17 00:00:00 2001 From: Irevoire Date: Wed, 21 Sep 2022 12:01:46 +0200 Subject: [PATCH] I can index documents without meilisearch --- Cargo.lock | 14 ++ Cargo.toml | 1 + document-formats/Cargo.toml | 14 ++ document-formats/src/lib.rs | 155 +++++++++++++++++ file-store/src/lib.rs | 2 +- index-scheduler/Cargo.toml | 2 + index-scheduler/src/batch.rs | 5 +- index-scheduler/src/index_mapper.rs | 1 + index-scheduler/src/index_scheduler.rs | 224 +++++-------------------- index-scheduler/src/lib.rs | 52 ++++-- index-scheduler/src/task.rs | 15 +- index/src/error.rs | 63 +++++++ index/src/index.rs | 31 ++-- 13 files changed, 357 insertions(+), 222 deletions(-) create mode 100644 document-formats/Cargo.toml create mode 100644 document-formats/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index e08525829..3a6937ced 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1077,6 +1077,18 @@ dependencies = [ "winapi", ] +[[package]] +name = "document-formats" +version = "0.1.0" +dependencies = [ + "csv", + "either", + "meilisearch-types", + "milli 0.33.0", + "serde", + "serde_json", +] + [[package]] name = "downcast" version = "0.11.0" @@ -1784,10 +1796,12 @@ dependencies = [ "big_s", "bincode", "csv", + "document-formats", "file-store", "index", "insta", "log", + "meilisearch-types", "milli 0.33.0", "nelson", "roaring 0.9.0", diff --git a/Cargo.toml b/Cargo.toml index 28a0e8742..7c989e134 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "meilisearch-lib", "meilisearch-auth", "index-scheduler", + "document-formats", "index", "file-store", "permissive-json-pointer", diff --git a/document-formats/Cargo.toml b/document-formats/Cargo.toml new file mode 100644 index 000000000..7f923dea4 --- /dev/null +++ b/document-formats/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "document-formats" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +csv = "1.1.6" +meilisearch-types = { path = "../meilisearch-types" } +either = { version = "1.6.1", features = ["serde"] } +milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.33.0" } +serde_json = { version = "1.0.85", features = ["preserve_order"] } +serde = { version = "1.0.136", features = ["derive"] } diff --git a/document-formats/src/lib.rs b/document-formats/src/lib.rs new file mode 100644 index 000000000..ebc98f3fb --- /dev/null +++ b/document-formats/src/lib.rs @@ -0,0 +1,155 @@ +use std::borrow::Borrow; +use std::fmt::{self, Debug, Display}; +use std::io::{self, BufReader, Read, Seek, Write}; + +use either::Either; +use meilisearch_types::error::{Code, ErrorCode}; +use meilisearch_types::internal_error; +use milli::documents::{DocumentsBatchBuilder, Error}; +use milli::Object; +use serde::Deserialize; + +type Result = std::result::Result; + +#[derive(Debug)] +pub enum PayloadType { + Ndjson, + Json, + Csv, +} + +impl fmt::Display for PayloadType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + PayloadType::Ndjson => f.write_str("ndjson"), + PayloadType::Json => f.write_str("json"), + PayloadType::Csv => f.write_str("csv"), + } + } +} + +#[derive(Debug)] +pub enum DocumentFormatError { + Internal(Box), + MalformedPayload(Error, PayloadType), +} + +impl Display for DocumentFormatError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Internal(e) => write!(f, "An internal error has occurred: `{}`.", e), + Self::MalformedPayload(me, b) => match me.borrow() { + Error::Json(se) => { + // https://github.com/meilisearch/meilisearch/issues/2107 + // The user input maybe insanely long. We need to truncate it. + let mut serde_msg = se.to_string(); + let ellipsis = "..."; + if serde_msg.len() > 100 + ellipsis.len() { + serde_msg.replace_range(50..serde_msg.len() - 85, ellipsis); + } + + write!( + f, + "The `{}` payload provided is malformed. `Couldn't serialize document value: {}`.", + b, serde_msg + ) + } + _ => write!(f, "The `{}` payload provided is malformed: `{}`.", b, me), + }, + } + } +} + +impl std::error::Error for DocumentFormatError {} + +impl From<(PayloadType, Error)> for DocumentFormatError { + fn from((ty, error): (PayloadType, Error)) -> Self { + match error { + Error::Io(e) => Self::Internal(Box::new(e)), + e => Self::MalformedPayload(e, ty), + } + } +} + +impl ErrorCode for DocumentFormatError { + fn error_code(&self) -> Code { + match self { + DocumentFormatError::Internal(_) => Code::Internal, + DocumentFormatError::MalformedPayload(_, _) => Code::MalformedPayload, + } + } +} + +internal_error!(DocumentFormatError: io::Error); + +/// Reads CSV from input and write an obkv batch to writer. +pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result { + let mut builder = DocumentsBatchBuilder::new(writer); + + let csv = csv::Reader::from_reader(input); + builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?; + + let count = builder.documents_count(); + let _ = builder + .into_inner() + .map_err(Into::into) + .map_err(DocumentFormatError::Internal)?; + + Ok(count as usize) +} + +/// Reads JSON Lines from input and write an obkv batch to writer. +pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result { + let mut builder = DocumentsBatchBuilder::new(writer); + let reader = BufReader::new(input); + + for result in serde_json::Deserializer::from_reader(reader).into_iter() { + let object = result + .map_err(Error::Json) + .map_err(|e| (PayloadType::Ndjson, e))?; + builder + .append_json_object(&object) + .map_err(Into::into) + .map_err(DocumentFormatError::Internal)?; + } + + let count = builder.documents_count(); + let _ = builder + .into_inner() + .map_err(Into::into) + .map_err(DocumentFormatError::Internal)?; + + Ok(count as usize) +} + +/// Reads JSON from input and write an obkv batch to writer. +pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result { + let mut builder = DocumentsBatchBuilder::new(writer); + let reader = BufReader::new(input); + + #[derive(Deserialize, Debug)] + #[serde(transparent)] + struct ArrayOrSingleObject { + #[serde(with = "either::serde_untagged")] + inner: Either, Object>, + } + + let content: ArrayOrSingleObject = serde_json::from_reader(reader) + .map_err(Error::Json) + .map_err(|e| (PayloadType::Json, e))?; + + for object in content.inner.map_right(|o| vec![o]).into_inner() { + builder + .append_json_object(&object) + .map_err(Into::into) + .map_err(DocumentFormatError::Internal)?; + } + + let count = builder.documents_count(); + let _ = builder + .into_inner() + .map_err(Into::into) + .map_err(DocumentFormatError::Internal)?; + + Ok(count as usize) +} diff --git a/file-store/src/lib.rs b/file-store/src/lib.rs index e983aa115..c125fffed 100644 --- a/file-store/src/lib.rs +++ b/file-store/src/lib.rs @@ -40,7 +40,7 @@ pub struct FileStore { #[cfg(not(test))] impl FileStore { pub fn new(path: impl AsRef) -> Result { - let path = path.as_ref().join(UPDATE_FILES_PATH); + let path = path.as_ref().to_path_buf(); std::fs::create_dir_all(&path)?; Ok(FileStore { path }) } diff --git a/index-scheduler/Cargo.toml b/index-scheduler/Cargo.toml index dd5d2b5f2..10d152f60 100644 --- a/index-scheduler/Cargo.toml +++ b/index-scheduler/Cargo.toml @@ -13,6 +13,8 @@ file-store = { path = "../file-store" } log = "0.4.14" milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.33.0" } index = { path = "../index" } +meilisearch-types = { path = "../meilisearch-types" } +document-formats = { path = "../document-formats" } roaring = "0.9.0" serde = { version = "1.0.136", features = ["derive"] } tempfile = "3.3.0" diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index b9ee10a76..902301165 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -459,13 +459,12 @@ impl IndexScheduler { }); } Err(error) => { - // TODO: TAMO: find a way to convert all errors to the `Task::Error` type - // task.error = Some(error); + task.error = Some(error.into()); } } } - todo!() + Ok(tasks) } Batch::SettingsAndDocumentAddition { index_uid, diff --git a/index-scheduler/src/index_mapper.rs b/index-scheduler/src/index_mapper.rs index e57c5d00b..4dd0f9093 100644 --- a/index-scheduler/src/index_mapper.rs +++ b/index-scheduler/src/index_mapper.rs @@ -54,6 +54,7 @@ impl IndexMapper { Ok(index) => index, Err(Error::IndexNotFound(_)) => { let uuid = Uuid::new_v4(); + self.index_mapping.put(wtxn, name, &uuid)?; Index::open( self.base_path.join(uuid.to_string()), name.to_string(), diff --git a/index-scheduler/src/index_scheduler.rs b/index-scheduler/src/index_scheduler.rs index e6f207368..a332397ff 100644 --- a/index-scheduler/src/index_scheduler.rs +++ b/index-scheduler/src/index_scheduler.rs @@ -91,11 +91,12 @@ impl IndexScheduler { let wake_up = SignalEvent::auto(true); let processing_tasks = (OffsetDateTime::now_utc(), RoaringBitmap::new()); + let file_store = FileStore::new(&update_file_path)?; Ok(Self { // by default there is no processing tasks processing_tasks: Arc::new(RwLock::new(processing_tasks)), - file_store: FileStore::new(update_file_path)?, + file_store, all_tasks: env.create_database(Some(db_name::ALL_TASKS))?, status: env.create_database(Some(db_name::STATUS))?, kind: env.create_database(Some(db_name::KIND))?, @@ -274,166 +275,6 @@ impl IndexScheduler { Ok(()) } - #[cfg(truc)] - fn process_batch(&self, wtxn: &mut RwTxn, batch: &mut Batch) -> Result<()> { - match batch { - Batch::One(task) => match &task.kind { - KindWithContent::ClearAllDocuments { index_name } => { - self.index(&index_name)?.clear_documents()?; - } - KindWithContent::RenameIndex { - index_name: _, - new_name, - } => { - if self.available_index.get(wtxn, &new_name)?.unwrap_or(false) { - return Err(Error::IndexAlreadyExists(new_name.to_string())); - } - todo!("wait for @guigui insight"); - } - KindWithContent::CreateIndex { - index_name, - primary_key, - } => { - if self - .available_index - .get(wtxn, &index_name)? - .unwrap_or(false) - { - return Err(Error::IndexAlreadyExists(index_name.to_string())); - } - - self.available_index.put(wtxn, &index_name, &true)?; - // TODO: TAMO: give real info to the index - let index = Index::open( - index_name.to_string(), - index_name.to_string(), - 100_000_000, - Arc::default(), - )?; - if let Some(primary_key) = primary_key { - index.update_primary_key(primary_key.to_string())?; - } - self.index_map - .write() - .map_err(|_| Error::CorruptedTaskQueue)? - .insert(index_name.to_string(), index.clone()); - } - KindWithContent::DeleteIndex { index_name } => { - if !self.available_index.delete(wtxn, &index_name)? { - return Err(Error::IndexNotFound(index_name.to_string())); - } - if let Some(index) = self - .index_map - .write() - .map_err(|_| Error::CorruptedTaskQueue)? - .remove(index_name) - { - index.delete()?; - } else { - // TODO: TAMO: fix the path - std::fs::remove_file(index_name)?; - } - } - KindWithContent::SwapIndex { lhs, rhs } => { - if !self.available_index.get(wtxn, &lhs)?.unwrap_or(false) { - return Err(Error::IndexNotFound(lhs.to_string())); - } - if !self.available_index.get(wtxn, &rhs)?.unwrap_or(false) { - return Err(Error::IndexNotFound(rhs.to_string())); - } - - let lhs_bitmap = self.index_tasks.get(wtxn, lhs)?; - let rhs_bitmap = self.index_tasks.get(wtxn, rhs)?; - // the bitmap are lazily created and thus may not exists. - if let Some(bitmap) = rhs_bitmap { - self.index_tasks.put(wtxn, lhs, &bitmap)?; - } - if let Some(bitmap) = lhs_bitmap { - self.index_tasks.put(wtxn, rhs, &bitmap)?; - } - - let mut index_map = self - .index_map - .write() - .map_err(|_| Error::CorruptedTaskQueue)?; - - let lhs_index = index_map.remove(lhs).unwrap(); - let rhs_index = index_map.remove(rhs).unwrap(); - - index_map.insert(lhs.to_string(), rhs_index); - index_map.insert(rhs.to_string(), lhs_index); - } - _ => unreachable!(), - }, - Batch::Cancel(_) => todo!(), - Batch::Snapshot(_) => todo!(), - Batch::Dump(_) => todo!(), - Batch::Contiguous { tasks, kind } => { - // it's safe because you can't batch 0 contiguous tasks. - let first_task = &tasks[0]; - // and the two kind of tasks we batch MUST have ONE index name. - let index_name = first_task.indexes().unwrap()[0]; - let index = self.index(index_name)?; - - match kind { - Kind::DocumentAddition => { - let content_files = tasks.iter().map(|task| match &task.kind { - KindWithContent::DocumentAddition { content_file, .. } => { - content_file.clone() - } - k => unreachable!( - "Internal error, `{:?}` is not supposed to be reachable here", - k.as_kind() - ), - }); - let results = index.update_documents( - IndexDocumentsMethod::UpdateDocuments, - None, - self.file_store.clone(), - content_files, - )?; - - for (task, result) in tasks.iter_mut().zip(results) { - task.finished_at = Some(OffsetDateTime::now_utc()); - match result { - Ok(_) => task.status = Status::Succeeded, - Err(_) => task.status = Status::Succeeded, - } - } - } - Kind::DocumentDeletion => { - let ids: Vec<_> = tasks - .iter() - .flat_map(|task| match &task.kind { - KindWithContent::DocumentDeletion { documents_ids, .. } => { - documents_ids.clone() - } - k => unreachable!( - "Internal error, `{:?}` is not supposed to be reachable here", - k.as_kind() - ), - }) - .collect(); - - let result = index.delete_documents(&ids); - - for task in tasks.iter_mut() { - task.finished_at = Some(OffsetDateTime::now_utc()); - match result { - Ok(_) => task.status = Status::Succeeded, - Err(_) => task.status = Status::Succeeded, - } - } - } - _ => unreachable!(), - } - } - Batch::Empty => todo!(), - } - - Ok(()) - } - /// Notify the scheduler there is or may be work to do. pub fn notify(&self) { self.wake_up.signal() @@ -443,34 +284,15 @@ impl IndexScheduler { #[cfg(test)] mod tests { use big_s::S; - use insta::assert_debug_snapshot; - use tempfile::TempDir; use uuid::Uuid; - use crate::assert_smol_debug_snapshot; + use crate::{assert_smol_debug_snapshot, tests::index_scheduler}; use super::*; - fn new() -> IndexScheduler { - let dir = TempDir::new().unwrap(); - IndexScheduler::new( - dir.path().join("db_path"), - dir.path().join("file_store"), - dir.path().join("indexes"), - 100_000_000, - IndexerConfig::default(), - ) - .unwrap() - } - - #[test] - fn simple_new() { - new(); - } - #[test] fn register() { - let index_scheduler = new(); + let (index_scheduler, _) = index_scheduler(); let kinds = [ KindWithContent::IndexCreation { index_uid: S("catto"), @@ -541,4 +363,42 @@ mod tests { assert_smol_debug_snapshot!(index_tasks, @r###"[("catto", RoaringBitmap<[0, 1, 3]>), ("doggo", RoaringBitmap<[4]>)]"###); } + + #[test] + fn document_addition() { + let (index_scheduler, _dir) = index_scheduler(); + + let content = r#" + { + "id": 1, + "doggo": "bob" + }"#; + + let (uuid, mut file) = index_scheduler.file_store.new_update().unwrap(); + document_formats::read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + file.persist().unwrap(); + + index_scheduler + .register(KindWithContent::DocumentAddition { + index_uid: S("doggos"), + primary_key: Some(S("id")), + content_file: uuid, + documents_count: 100, + allow_index_creation: true, + }) + .unwrap(); + + index_scheduler.tick().unwrap(); + + let doggos = index_scheduler.index("doggos").unwrap(); + + let rtxn = doggos.read_txn().unwrap(); + let documents: Vec<_> = doggos + .all_documents(&rtxn) + .unwrap() + .collect::>() + .unwrap(); + + assert_smol_debug_snapshot!(documents, @r###"[{"id": Number(1), "doggo": String("bob")}]"###); + } } diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index ff9ad3470..d90972174 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -18,19 +18,45 @@ pub use task::TaskView as Task; #[cfg(test)] mod tests { + use milli::update::IndexerConfig; + use tempfile::TempDir; + + use crate::IndexScheduler; + #[macro_export] macro_rules! assert_smol_debug_snapshot { - ($value:expr, @$snapshot:literal) => {{ - let value = format!("{:?}", $value); - insta::assert_snapshot!(value, stringify!($value), @$snapshot); - }}; - ($name:expr, $value:expr) => {{ - let value = format!("{:?}", $value); - insta::assert_snapshot!(Some($name), value, stringify!($value)); - }}; - ($value:expr) => {{ - let value = format!("{:?}", $value); - insta::assert_snapshot!($crate::_macro_support::AutoName, value, stringify!($value)); - }}; -} + ($value:expr, @$snapshot:literal) => {{ + let value = format!("{:?}", $value); + insta::assert_snapshot!(value, stringify!($value), @$snapshot); + }}; + ($name:expr, $value:expr) => {{ + let value = format!("{:?}", $value); + insta::assert_snapshot!(Some($name), value, stringify!($value)); + }}; + ($value:expr) => {{ + let value = format!("{:?}", $value); + insta::assert_snapshot!($crate::_macro_support::AutoName, value, stringify!($value)); + }}; + } + + pub fn index_scheduler() -> (IndexScheduler, TempDir) { + let dir = TempDir::new().unwrap(); + + ( + IndexScheduler::new( + dir.path().join("db_path"), + dir.path().join("file_store"), + dir.path().join("indexes"), + 1024 * 1024, + IndexerConfig::default(), + ) + .unwrap(), + dir, + ) + } + + #[test] + fn simple_new() { + index_scheduler(); + } } diff --git a/index-scheduler/src/task.rs b/index-scheduler/src/task.rs index 6c999dd8a..fb4ac1e08 100644 --- a/index-scheduler/src/task.rs +++ b/index-scheduler/src/task.rs @@ -1,5 +1,6 @@ use anyhow::Result; use index::{Settings, Unchecked}; +use meilisearch_types::error::ResponseError; use milli::DocumentId; use serde::{Deserialize, Serialize, Serializer}; @@ -18,16 +19,6 @@ pub enum Status { Failed, } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Error { - message: String, - code: String, - #[serde(rename = "type")] - kind: String, - link: String, -} - #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct TaskView { @@ -38,7 +29,7 @@ pub struct TaskView { pub kind: Kind, pub details: Option
, - pub error: Option, + pub error: Option, #[serde(serialize_with = "serialize_duration")] pub duration: Option, @@ -62,7 +53,7 @@ pub struct Task { #[serde(with = "time::serde::rfc3339::option")] pub finished_at: Option, - pub error: Option, + pub error: Option, pub details: Option
, pub status: Status, diff --git a/index/src/error.rs b/index/src/error.rs index 667dfcde3..b2ecfea0f 100644 --- a/index/src/error.rs +++ b/index/src/error.rs @@ -1,7 +1,9 @@ use std::error::Error; +use std::fmt; use meilisearch_types::error::{Code, ErrorCode}; use meilisearch_types::internal_error; +use milli::UserError; use serde_json::Value; pub type Result = std::result::Result; @@ -27,6 +29,17 @@ internal_error!( milli::documents::Error ); +impl ErrorCode for IndexError { + fn error_code(&self) -> Code { + match self { + IndexError::Internal(_) => Code::Internal, + IndexError::DocumentNotFound(_) => Code::DocumentNotFound, + IndexError::Facet(e) => e.error_code(), + IndexError::Milli(e) => MilliError(e).error_code(), + } + } +} + impl From for IndexError { fn from(error: milli::UserError) -> IndexError { IndexError::Milli(error.into()) @@ -46,3 +59,53 @@ impl ErrorCode for FacetError { } } } + +#[derive(Debug)] +pub struct MilliError<'a>(pub &'a milli::Error); + +impl Error for MilliError<'_> {} + +impl fmt::Display for MilliError<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +impl ErrorCode for MilliError<'_> { + fn error_code(&self) -> Code { + match self.0 { + milli::Error::InternalError(_) => Code::Internal, + milli::Error::IoError(_) => Code::Internal, + milli::Error::UserError(ref error) => { + match error { + // TODO: wait for spec for new error codes. + UserError::SerdeJson(_) + | UserError::InvalidLmdbOpenOptions + | UserError::DocumentLimitReached + | UserError::AccessingSoftDeletedDocument { .. } + | UserError::UnknownInternalDocumentId { .. } => Code::Internal, + UserError::InvalidStoreFile => Code::InvalidStore, + UserError::NoSpaceLeftOnDevice => Code::NoSpaceLeftOnDevice, + UserError::MaxDatabaseSizeReached => Code::DatabaseSizeLimitReached, + UserError::AttributeLimitReached => Code::MaxFieldsLimitExceeded, + UserError::InvalidFilter(_) => Code::Filter, + UserError::MissingDocumentId { .. } => Code::MissingDocumentId, + UserError::InvalidDocumentId { .. } | UserError::TooManyDocumentIds { .. } => { + Code::InvalidDocumentId + } + UserError::MissingPrimaryKey => Code::MissingPrimaryKey, + UserError::PrimaryKeyCannotBeChanged(_) => Code::PrimaryKeyAlreadyPresent, + UserError::SortRankingRuleMissing => Code::Sort, + UserError::InvalidFacetsDistribution { .. } => Code::BadRequest, + UserError::InvalidSortableAttribute { .. } => Code::Sort, + UserError::CriterionError(_) => Code::InvalidRankingRule, + UserError::InvalidGeoField { .. } => Code::InvalidGeoField, + UserError::SortError(_) => Code::Sort, + UserError::InvalidMinTypoWordLenSetting(_, _) => { + Code::InvalidMinWordLengthForTypo + } + } + } + } + } +} diff --git a/index/src/index.rs b/index/src/index.rs index 1b3494a18..809a7dbdc 100644 --- a/index/src/index.rs +++ b/index/src/index.rs @@ -248,26 +248,20 @@ impl Index { limit: usize, attributes_to_retrieve: Option>, ) -> Result<(u64, Vec)> { - let txn = self.read_txn()?; - - let fields_ids_map = self.fields_ids_map(&txn)?; - let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); + let rtxn = self.read_txn()?; let mut documents = Vec::new(); - for entry in self.all_documents(&txn)?.skip(offset).take(limit) { - let (_id, obkv) = entry?; - let document = obkv_to_json(&all_fields, &fields_ids_map, obkv)?; + for document in self.all_documents(&rtxn)?.skip(offset).take(limit) { let document = match &attributes_to_retrieve { Some(attributes_to_retrieve) => permissive_json_pointer::select_values( - &document, + &document?, attributes_to_retrieve.iter().map(|s| s.as_ref()), ), - None => document, + None => document?, }; documents.push(document); } - - let number_of_documents = self.number_of_documents(&txn)?; + let number_of_documents = self.number_of_documents(&rtxn)?; Ok((number_of_documents, documents)) } @@ -306,6 +300,21 @@ impl Index { Ok(document) } + pub fn all_documents<'a>( + &self, + rtxn: &'a RoTxn, + ) -> Result> + 'a> { + let fields_ids_map = self.fields_ids_map(&rtxn)?; + let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); + + Ok(self.inner.all_documents(&rtxn)?.map(move |ret| { + ret.map_err(IndexError::from) + .and_then(|(_key, document)| -> Result<_> { + Ok(obkv_to_json(&all_fields, &fields_ids_map, document)?) + }) + })) + } + pub fn size(&self) -> Result { Ok(self.inner.on_disk_size()?) }