mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-25 11:35:05 +08:00
First batch of PR comment
This commit is contained in:
parent
32bcacefd5
commit
36962b943b
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -4483,7 +4483,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "raw-collections"
|
name = "raw-collections"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+https://github.com/dureuill/raw-collections.git#15e5d7bdebc0c149b2a28b2454f307c717d07f8a"
|
source = "git+https://github.com/meilisearch/raw-collections.git#15e5d7bdebc0c149b2a28b2454f307c717d07f8a"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"allocator-api2",
|
"allocator-api2",
|
||||||
"bitpacking",
|
"bitpacking",
|
||||||
|
@ -22,7 +22,7 @@ flate2 = "1.0.30"
|
|||||||
meilisearch-auth = { path = "../meilisearch-auth" }
|
meilisearch-auth = { path = "../meilisearch-auth" }
|
||||||
meilisearch-types = { path = "../meilisearch-types" }
|
meilisearch-types = { path = "../meilisearch-types" }
|
||||||
page_size = "0.6.0"
|
page_size = "0.6.0"
|
||||||
raw-collections = { git = "https://github.com/dureuill/raw-collections.git", version = "0.1.0" }
|
raw-collections = { git = "https://github.com/meilisearch/raw-collections.git", version = "0.1.0" }
|
||||||
rayon = "1.10.0"
|
rayon = "1.10.0"
|
||||||
roaring = { version = "0.10.6", features = ["serde"] }
|
roaring = { version = "0.10.6", features = ["serde"] }
|
||||||
serde = { version = "1.0.204", features = ["derive"] }
|
serde = { version = "1.0.204", features = ["derive"] }
|
||||||
|
@ -1411,17 +1411,6 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
|
tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
|
||||||
}
|
}
|
||||||
// else if primary_key_has_been_set {
|
|
||||||
// // Everything failed but we've set a primary key.
|
|
||||||
// // We need to remove it.
|
|
||||||
// let mut builder =
|
|
||||||
// milli::update::Settings::new(index_wtxn, index, indexer_config);
|
|
||||||
// builder.reset_primary_key();
|
|
||||||
// builder.execute(
|
|
||||||
// |indexing_step| tracing::trace!(update = ?indexing_step),
|
|
||||||
// || must_stop_processing.clone().get(),
|
|
||||||
// )?;
|
|
||||||
// }
|
|
||||||
|
|
||||||
Ok(tasks)
|
Ok(tasks)
|
||||||
}
|
}
|
||||||
|
@ -1365,6 +1365,7 @@ impl IndexScheduler {
|
|||||||
let ProcessingTasks { batch, processing, progress } =
|
let ProcessingTasks { batch, processing, progress } =
|
||||||
self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone();
|
self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone();
|
||||||
|
|
||||||
|
// ignored for now, might be added to batch details later
|
||||||
let _ = progress;
|
let _ = progress;
|
||||||
|
|
||||||
let ret = tasks.into_iter();
|
let ret = tasks.into_iter();
|
||||||
@ -5198,11 +5199,9 @@ mod tests {
|
|||||||
handle.advance_one_successful_batch();
|
handle.advance_one_successful_batch();
|
||||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "only_first_task_succeed");
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "only_first_task_succeed");
|
||||||
|
|
||||||
// The second batch should fail.
|
|
||||||
handle.advance_one_successful_batch();
|
handle.advance_one_successful_batch();
|
||||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_task_fails");
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_task_fails");
|
||||||
|
|
||||||
// The second batch should fail.
|
|
||||||
handle.advance_one_successful_batch();
|
handle.advance_one_successful_batch();
|
||||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "third_task_fails");
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "third_task_fails");
|
||||||
|
|
||||||
@ -5263,7 +5262,6 @@ mod tests {
|
|||||||
handle.advance_one_successful_batch();
|
handle.advance_one_successful_batch();
|
||||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "only_first_task_succeed");
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "only_first_task_succeed");
|
||||||
|
|
||||||
// The second batch should fail and contains two tasks.
|
|
||||||
handle.advance_one_successful_batch();
|
handle.advance_one_successful_batch();
|
||||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_and_third_tasks_fails");
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_and_third_tasks_fails");
|
||||||
|
|
||||||
|
@ -24,7 +24,7 @@ flate2 = "1.0.30"
|
|||||||
fst = "0.4.7"
|
fst = "0.4.7"
|
||||||
memmap2 = "0.9.4"
|
memmap2 = "0.9.4"
|
||||||
milli = { path = "../milli" }
|
milli = { path = "../milli" }
|
||||||
raw-collections = { git = "https://github.com/dureuill/raw-collections.git", version = "0.1.0" }
|
raw-collections = { git = "https://github.com/meilisearch/raw-collections.git", version = "0.1.0" }
|
||||||
roaring = { version = "0.10.6", features = ["serde"] }
|
roaring = { version = "0.10.6", features = ["serde"] }
|
||||||
serde = { version = "1.0.204", features = ["derive"] }
|
serde = { version = "1.0.204", features = ["derive"] }
|
||||||
serde-cs = "0.2.4"
|
serde-cs = "0.2.4"
|
||||||
|
@ -128,7 +128,6 @@ impl ErrorCode for DocumentFormatError {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO remove that from the place I've borrowed it
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum AllowedType {
|
enum AllowedType {
|
||||||
String,
|
String,
|
||||||
@ -213,7 +212,7 @@ pub fn read_csv(input: &File, output: impl io::Write, delimiter: u8) -> Result<u
|
|||||||
|
|
||||||
/// Reads JSON from file and write it in NDJSON in a file checking it along the way.
|
/// Reads JSON from file and write it in NDJSON in a file checking it along the way.
|
||||||
pub fn read_json(input: &File, output: impl io::Write) -> Result<u64> {
|
pub fn read_json(input: &File, output: impl io::Write) -> Result<u64> {
|
||||||
// We memory map to be able to deserailize into a TopLevelMap<'pl> that
|
// We memory map to be able to deserialize into a TopLevelMap<'pl> that
|
||||||
// does not allocate when possible and only materialize the first/top level.
|
// does not allocate when possible and only materialize the first/top level.
|
||||||
let input = unsafe { Mmap::map(input).map_err(DocumentFormatError::Io)? };
|
let input = unsafe { Mmap::map(input).map_err(DocumentFormatError::Io)? };
|
||||||
let mut doc_alloc = Bump::with_capacity(1024 * 1024 * 1024); // 1MiB
|
let mut doc_alloc = Bump::with_capacity(1024 * 1024 * 1024); // 1MiB
|
||||||
@ -254,7 +253,7 @@ pub fn read_json(input: &File, output: impl io::Write) -> Result<u64> {
|
|||||||
|
|
||||||
/// Reads NDJSON from file and write it in NDJSON in a file checking it along the way.
|
/// Reads NDJSON from file and write it in NDJSON in a file checking it along the way.
|
||||||
pub fn read_ndjson(input: &File, output: impl io::Write) -> Result<u64> {
|
pub fn read_ndjson(input: &File, output: impl io::Write) -> Result<u64> {
|
||||||
// We memory map to be able to deserailize into a TopLevelMap<'pl> that
|
// We memory map to be able to deserialize into a TopLevelMap<'pl> that
|
||||||
// does not allocate when possible and only materialize the first/top level.
|
// does not allocate when possible and only materialize the first/top level.
|
||||||
let input = unsafe { Mmap::map(input).map_err(DocumentFormatError::Io)? };
|
let input = unsafe { Mmap::map(input).map_err(DocumentFormatError::Io)? };
|
||||||
let mut output = BufWriter::new(output);
|
let mut output = BufWriter::new(output);
|
||||||
|
@ -172,7 +172,7 @@ async fn create_mock_with_template(
|
|||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// 3. check API key
|
// 2. check API key
|
||||||
match req.headers.get("Authorization") {
|
match req.headers.get("Authorization") {
|
||||||
Some(api_key) if api_key == API_KEY_BEARER => {
|
Some(api_key) if api_key == API_KEY_BEARER => {
|
||||||
{}
|
{}
|
||||||
|
@ -95,7 +95,7 @@ ureq = { version = "2.10.0", features = ["json"] }
|
|||||||
url = "2.5.2"
|
url = "2.5.2"
|
||||||
rayon-par-bridge = "0.1.0"
|
rayon-par-bridge = "0.1.0"
|
||||||
hashbrown = "0.15.0"
|
hashbrown = "0.15.0"
|
||||||
raw-collections = { git = "https://github.com/dureuill/raw-collections.git", version = "0.1.0" }
|
raw-collections = { git = "https://github.com/meilisearch/raw-collections.git", version = "0.1.0" }
|
||||||
bumpalo = "3.16.0"
|
bumpalo = "3.16.0"
|
||||||
thread_local = "1.1.8"
|
thread_local = "1.1.8"
|
||||||
allocator-api2 = "0.2.18"
|
allocator-api2 = "0.2.18"
|
||||||
|
@ -2,7 +2,7 @@ use std::io::{self, Write};
|
|||||||
|
|
||||||
use grenad::{CompressionType, WriterBuilder};
|
use grenad::{CompressionType, WriterBuilder};
|
||||||
use serde::de::Deserializer;
|
use serde::de::Deserializer;
|
||||||
use serde_json::{to_writer, Value};
|
use serde_json::to_writer;
|
||||||
|
|
||||||
use super::{DocumentsBatchIndex, Error, DOCUMENTS_BATCH_INDEX_KEY};
|
use super::{DocumentsBatchIndex, Error, DOCUMENTS_BATCH_INDEX_KEY};
|
||||||
use crate::documents::serde_impl::DocumentVisitor;
|
use crate::documents::serde_impl::DocumentVisitor;
|
||||||
@ -87,95 +87,6 @@ impl<W: Write> DocumentsBatchBuilder<W> {
|
|||||||
de.deserialize_any(&mut visitor)?
|
de.deserialize_any(&mut visitor)?
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Appends a new CSV file into the batch and updates the `DocumentsBatchIndex` accordingly.
|
|
||||||
pub fn append_csv<R: io::Read>(&mut self, mut reader: csv::Reader<R>) -> Result<(), Error> {
|
|
||||||
// Make sure that we insert the fields ids in order as the obkv writer has this requirement.
|
|
||||||
let mut typed_fields_ids: Vec<_> = reader
|
|
||||||
.headers()?
|
|
||||||
.into_iter()
|
|
||||||
.map(parse_csv_header)
|
|
||||||
.map(|(k, t)| (self.fields_index.insert(k), t))
|
|
||||||
.enumerate()
|
|
||||||
.collect();
|
|
||||||
// Make sure that we insert the fields ids in order as the obkv writer has this requirement.
|
|
||||||
typed_fields_ids.sort_unstable_by_key(|(_, (fid, _))| *fid);
|
|
||||||
|
|
||||||
let mut record = csv::StringRecord::new();
|
|
||||||
let mut line = 0;
|
|
||||||
while reader.read_record(&mut record)? {
|
|
||||||
// We increment here and not at the end of the while loop to take
|
|
||||||
// the header offset into account.
|
|
||||||
line += 1;
|
|
||||||
|
|
||||||
self.obkv_buffer.clear();
|
|
||||||
let mut writer = obkv::KvWriter::new(&mut self.obkv_buffer);
|
|
||||||
|
|
||||||
for (i, (field_id, type_)) in typed_fields_ids.iter() {
|
|
||||||
self.value_buffer.clear();
|
|
||||||
|
|
||||||
let value = &record[*i];
|
|
||||||
let trimmed_value = value.trim();
|
|
||||||
match type_ {
|
|
||||||
AllowedType::Number => {
|
|
||||||
if trimmed_value.is_empty() {
|
|
||||||
to_writer(&mut self.value_buffer, &Value::Null)?;
|
|
||||||
} else if let Ok(integer) = trimmed_value.parse::<i64>() {
|
|
||||||
to_writer(&mut self.value_buffer, &integer)?;
|
|
||||||
} else {
|
|
||||||
match trimmed_value.parse::<f64>() {
|
|
||||||
Ok(float) => {
|
|
||||||
to_writer(&mut self.value_buffer, &float)?;
|
|
||||||
}
|
|
||||||
Err(error) => {
|
|
||||||
return Err(Error::ParseFloat {
|
|
||||||
error,
|
|
||||||
line,
|
|
||||||
value: value.to_string(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
AllowedType::Boolean => {
|
|
||||||
if trimmed_value.is_empty() {
|
|
||||||
to_writer(&mut self.value_buffer, &Value::Null)?;
|
|
||||||
} else {
|
|
||||||
match trimmed_value.parse::<bool>() {
|
|
||||||
Ok(bool) => {
|
|
||||||
to_writer(&mut self.value_buffer, &bool)?;
|
|
||||||
}
|
|
||||||
Err(error) => {
|
|
||||||
return Err(Error::ParseBool {
|
|
||||||
error,
|
|
||||||
line,
|
|
||||||
value: value.to_string(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
AllowedType::String => {
|
|
||||||
if value.is_empty() {
|
|
||||||
to_writer(&mut self.value_buffer, &Value::Null)?;
|
|
||||||
} else {
|
|
||||||
to_writer(&mut self.value_buffer, value)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// We insert into the obkv writer the value buffer that has been filled just above.
|
|
||||||
writer.insert(*field_id, &self.value_buffer)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let internal_id = self.documents_count.to_be_bytes();
|
|
||||||
let document_bytes = writer.into_inner()?;
|
|
||||||
self.writer.insert(internal_id, &document_bytes)?;
|
|
||||||
self.documents_count += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Flushes the content on disk and stores the final version of the `DocumentsBatchIndex`.
|
/// Flushes the content on disk and stores the final version of the `DocumentsBatchIndex`.
|
||||||
pub fn into_inner(mut self) -> io::Result<W> {
|
pub fn into_inner(mut self) -> io::Result<W> {
|
||||||
let DocumentsBatchBuilder { mut writer, fields_index, .. } = self;
|
let DocumentsBatchBuilder { mut writer, fields_index, .. } = self;
|
||||||
@ -189,35 +100,12 @@ impl<W: Write> DocumentsBatchBuilder<W> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
enum AllowedType {
|
|
||||||
String,
|
|
||||||
Boolean,
|
|
||||||
Number,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn parse_csv_header(header: &str) -> (&str, AllowedType) {
|
|
||||||
// if there are several separators we only split on the last one.
|
|
||||||
match header.rsplit_once(':') {
|
|
||||||
Some((field_name, field_type)) => match field_type {
|
|
||||||
"string" => (field_name, AllowedType::String),
|
|
||||||
"boolean" => (field_name, AllowedType::Boolean),
|
|
||||||
"number" => (field_name, AllowedType::Number),
|
|
||||||
// if the pattern isn't recognized, we keep the whole field.
|
|
||||||
_otherwise => (header, AllowedType::String),
|
|
||||||
},
|
|
||||||
None => (header, AllowedType::String),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
|
|
||||||
use serde_json::json;
|
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::documents::{obkv_to_object, DocumentsBatchReader};
|
use crate::documents::DocumentsBatchReader;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn add_single_documents_json() {
|
fn add_single_documents_json() {
|
||||||
@ -253,348 +141,4 @@ mod test {
|
|||||||
|
|
||||||
assert!(cursor.next_document().unwrap().is_none());
|
assert!(cursor.next_document().unwrap().is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn add_documents_csv() {
|
|
||||||
let csv_content = "id:number,field:string\n1,hello!\n2,blabla";
|
|
||||||
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
|
|
||||||
|
|
||||||
let mut builder = DocumentsBatchBuilder::new(Vec::new());
|
|
||||||
builder.append_csv(csv).unwrap();
|
|
||||||
assert_eq!(builder.documents_count(), 2);
|
|
||||||
let vector = builder.into_inner().unwrap();
|
|
||||||
|
|
||||||
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
|
||||||
.unwrap()
|
|
||||||
.into_cursor_and_fields_index();
|
|
||||||
assert_eq!(index.len(), 2);
|
|
||||||
|
|
||||||
let document = cursor.next_document().unwrap().unwrap();
|
|
||||||
assert_eq!(document.iter().count(), 2);
|
|
||||||
|
|
||||||
let document = cursor.next_document().unwrap().unwrap();
|
|
||||||
assert_eq!(document.iter().count(), 2);
|
|
||||||
|
|
||||||
assert!(cursor.next_document().unwrap().is_none());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn simple_csv_document() {
|
|
||||||
let csv_content = r#"city,country,pop
|
|
||||||
"Boston","United States","4628910""#;
|
|
||||||
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
|
|
||||||
|
|
||||||
let mut builder = DocumentsBatchBuilder::new(Vec::new());
|
|
||||||
builder.append_csv(csv).unwrap();
|
|
||||||
let vector = builder.into_inner().unwrap();
|
|
||||||
|
|
||||||
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
|
||||||
.unwrap()
|
|
||||||
.into_cursor_and_fields_index();
|
|
||||||
let doc = cursor.next_document().unwrap().unwrap();
|
|
||||||
let val = obkv_to_object(doc, &index).map(Value::from).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
val,
|
|
||||||
json!({
|
|
||||||
"city": "Boston",
|
|
||||||
"country": "United States",
|
|
||||||
"pop": "4628910",
|
|
||||||
})
|
|
||||||
);
|
|
||||||
|
|
||||||
assert!(cursor.next_document().unwrap().is_none());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn coma_in_field() {
|
|
||||||
let csv_content = r#"city,country,pop
|
|
||||||
"Boston","United, States","4628910""#;
|
|
||||||
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
|
|
||||||
|
|
||||||
let mut builder = DocumentsBatchBuilder::new(Vec::new());
|
|
||||||
builder.append_csv(csv).unwrap();
|
|
||||||
let vector = builder.into_inner().unwrap();
|
|
||||||
|
|
||||||
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
|
||||||
.unwrap()
|
|
||||||
.into_cursor_and_fields_index();
|
|
||||||
|
|
||||||
let doc = cursor.next_document().unwrap().unwrap();
|
|
||||||
let val = obkv_to_object(doc, &index).map(Value::from).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
val,
|
|
||||||
json!({
|
|
||||||
"city": "Boston",
|
|
||||||
"country": "United, States",
|
|
||||||
"pop": "4628910",
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn quote_in_field() {
|
|
||||||
let csv_content = r#"city,country,pop
|
|
||||||
"Boston","United"" States","4628910""#;
|
|
||||||
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
|
|
||||||
|
|
||||||
let mut builder = DocumentsBatchBuilder::new(Vec::new());
|
|
||||||
builder.append_csv(csv).unwrap();
|
|
||||||
let vector = builder.into_inner().unwrap();
|
|
||||||
|
|
||||||
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
|
||||||
.unwrap()
|
|
||||||
.into_cursor_and_fields_index();
|
|
||||||
|
|
||||||
let doc = cursor.next_document().unwrap().unwrap();
|
|
||||||
let val = obkv_to_object(doc, &index).map(Value::from).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
val,
|
|
||||||
json!({
|
|
||||||
"city": "Boston",
|
|
||||||
"country": "United\" States",
|
|
||||||
"pop": "4628910",
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn integer_in_field() {
|
|
||||||
let csv_content = r#"city,country,pop:number
|
|
||||||
"Boston","United States","4628910""#;
|
|
||||||
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
|
|
||||||
|
|
||||||
let mut builder = DocumentsBatchBuilder::new(Vec::new());
|
|
||||||
builder.append_csv(csv).unwrap();
|
|
||||||
let vector = builder.into_inner().unwrap();
|
|
||||||
|
|
||||||
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
|
||||||
.unwrap()
|
|
||||||
.into_cursor_and_fields_index();
|
|
||||||
|
|
||||||
let doc = cursor.next_document().unwrap().unwrap();
|
|
||||||
let val = obkv_to_object(doc, &index).map(Value::from).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
val,
|
|
||||||
json!({
|
|
||||||
"city": "Boston",
|
|
||||||
"country": "United States",
|
|
||||||
"pop": 4628910,
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn integer_as_id() {
|
|
||||||
let csv_content = r#""id:number","title:string","comment:string"
|
|
||||||
"1239","Pride and Prejudice","A great book""#;
|
|
||||||
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
|
|
||||||
|
|
||||||
let mut builder = DocumentsBatchBuilder::new(Vec::new());
|
|
||||||
builder.append_csv(csv).unwrap();
|
|
||||||
let vector = builder.into_inner().unwrap();
|
|
||||||
|
|
||||||
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
|
||||||
.unwrap()
|
|
||||||
.into_cursor_and_fields_index();
|
|
||||||
|
|
||||||
let doc = cursor.next_document().unwrap().unwrap();
|
|
||||||
let val = obkv_to_object(doc, &index).map(Value::from).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
val,
|
|
||||||
json!({
|
|
||||||
"id": 1239,
|
|
||||||
"title": "Pride and Prejudice",
|
|
||||||
"comment": "A great book",
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn float_in_field() {
|
|
||||||
let csv_content = r#"city,country,pop:number
|
|
||||||
"Boston","United States","4628910.01""#;
|
|
||||||
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
|
|
||||||
|
|
||||||
let mut builder = DocumentsBatchBuilder::new(Vec::new());
|
|
||||||
builder.append_csv(csv).unwrap();
|
|
||||||
let vector = builder.into_inner().unwrap();
|
|
||||||
|
|
||||||
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
|
||||||
.unwrap()
|
|
||||||
.into_cursor_and_fields_index();
|
|
||||||
|
|
||||||
let doc = cursor.next_document().unwrap().unwrap();
|
|
||||||
let val = obkv_to_object(doc, &index).map(Value::from).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
val,
|
|
||||||
json!({
|
|
||||||
"city": "Boston",
|
|
||||||
"country": "United States",
|
|
||||||
"pop": 4628910.01,
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn several_colon_in_header() {
|
|
||||||
let csv_content = r#"city:love:string,country:state,pop
|
|
||||||
"Boston","United States","4628910""#;
|
|
||||||
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
|
|
||||||
|
|
||||||
let mut builder = DocumentsBatchBuilder::new(Vec::new());
|
|
||||||
builder.append_csv(csv).unwrap();
|
|
||||||
let vector = builder.into_inner().unwrap();
|
|
||||||
|
|
||||||
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
|
||||||
.unwrap()
|
|
||||||
.into_cursor_and_fields_index();
|
|
||||||
|
|
||||||
let doc = cursor.next_document().unwrap().unwrap();
|
|
||||||
let val = obkv_to_object(doc, &index).map(Value::from).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
val,
|
|
||||||
json!({
|
|
||||||
"city:love": "Boston",
|
|
||||||
"country:state": "United States",
|
|
||||||
"pop": "4628910",
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn ending_by_colon_in_header() {
|
|
||||||
let csv_content = r#"city:,country,pop
|
|
||||||
"Boston","United States","4628910""#;
|
|
||||||
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
|
|
||||||
|
|
||||||
let mut builder = DocumentsBatchBuilder::new(Vec::new());
|
|
||||||
builder.append_csv(csv).unwrap();
|
|
||||||
let vector = builder.into_inner().unwrap();
|
|
||||||
|
|
||||||
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
|
||||||
.unwrap()
|
|
||||||
.into_cursor_and_fields_index();
|
|
||||||
|
|
||||||
let doc = cursor.next_document().unwrap().unwrap();
|
|
||||||
let val = obkv_to_object(doc, &index).map(Value::from).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
val,
|
|
||||||
json!({
|
|
||||||
"city:": "Boston",
|
|
||||||
"country": "United States",
|
|
||||||
"pop": "4628910",
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn starting_by_colon_in_header() {
|
|
||||||
let csv_content = r#":city,country,pop
|
|
||||||
"Boston","United States","4628910""#;
|
|
||||||
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
|
|
||||||
|
|
||||||
let mut builder = DocumentsBatchBuilder::new(Vec::new());
|
|
||||||
builder.append_csv(csv).unwrap();
|
|
||||||
let vector = builder.into_inner().unwrap();
|
|
||||||
|
|
||||||
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
|
||||||
.unwrap()
|
|
||||||
.into_cursor_and_fields_index();
|
|
||||||
|
|
||||||
let doc = cursor.next_document().unwrap().unwrap();
|
|
||||||
let val = obkv_to_object(doc, &index).map(Value::from).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
val,
|
|
||||||
json!({
|
|
||||||
":city": "Boston",
|
|
||||||
"country": "United States",
|
|
||||||
"pop": "4628910",
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[ignore]
|
|
||||||
#[test]
|
|
||||||
fn starting_by_colon_in_header2() {
|
|
||||||
let csv_content = r#":string,country,pop
|
|
||||||
"Boston","United States","4628910""#;
|
|
||||||
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
|
|
||||||
|
|
||||||
let mut builder = DocumentsBatchBuilder::new(Vec::new());
|
|
||||||
builder.append_csv(csv).unwrap();
|
|
||||||
let vector = builder.into_inner().unwrap();
|
|
||||||
|
|
||||||
let (mut cursor, _) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
|
||||||
.unwrap()
|
|
||||||
.into_cursor_and_fields_index();
|
|
||||||
|
|
||||||
assert!(cursor.next_document().is_err());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn double_colon_in_header() {
|
|
||||||
let csv_content = r#"city::string,country,pop
|
|
||||||
"Boston","United States","4628910""#;
|
|
||||||
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
|
|
||||||
|
|
||||||
let mut builder = DocumentsBatchBuilder::new(Vec::new());
|
|
||||||
builder.append_csv(csv).unwrap();
|
|
||||||
let vector = builder.into_inner().unwrap();
|
|
||||||
|
|
||||||
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
|
||||||
.unwrap()
|
|
||||||
.into_cursor_and_fields_index();
|
|
||||||
|
|
||||||
let doc = cursor.next_document().unwrap().unwrap();
|
|
||||||
let val = obkv_to_object(doc, &index).map(Value::from).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
val,
|
|
||||||
json!({
|
|
||||||
"city:": "Boston",
|
|
||||||
"country": "United States",
|
|
||||||
"pop": "4628910",
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn bad_type_in_header() {
|
|
||||||
let csv_content = r#"city,country:number,pop
|
|
||||||
"Boston","United States","4628910""#;
|
|
||||||
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
|
|
||||||
|
|
||||||
let mut builder = DocumentsBatchBuilder::new(Vec::new());
|
|
||||||
assert!(builder.append_csv(csv).is_err());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn bad_column_count1() {
|
|
||||||
let csv_content = r#"city,country,pop
|
|
||||||
"Boston","United States","4628910", "too much
|
|
||||||
let csv = csv::Reader::from_reader(Cursor::new(csv_content"#;
|
|
||||||
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
|
|
||||||
|
|
||||||
let mut builder = DocumentsBatchBuilder::new(Vec::new());
|
|
||||||
assert!(builder.append_csv(csv).is_err());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn bad_column_count2() {
|
|
||||||
let csv_content = r#"city,country,pop
|
|
||||||
"Boston","United States""#;
|
|
||||||
let csv = csv::Reader::from_reader(Cursor::new(csv_content));
|
|
||||||
|
|
||||||
let mut builder = DocumentsBatchBuilder::new(Vec::new());
|
|
||||||
assert!(builder.append_csv(csv).is_err());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -253,33 +253,4 @@ mod test {
|
|||||||
{"id": 2,"a": 0,"b": 0},
|
{"id": 2,"a": 0,"b": 0},
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn csv_types_dont_panic() {
|
|
||||||
let csv1_content =
|
|
||||||
"id:number,b:boolean,c,d:number\n1,,,\n2,true,doggo,2\n3,false,the best doggo,-2\n4,,\"Hello, World!\",2.5";
|
|
||||||
let csv1 = csv::Reader::from_reader(Cursor::new(csv1_content));
|
|
||||||
|
|
||||||
let mut builder = DocumentsBatchBuilder::new(Vec::new());
|
|
||||||
builder.append_csv(csv1).unwrap();
|
|
||||||
let vector = builder.into_inner().unwrap();
|
|
||||||
|
|
||||||
DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn out_of_order_csv_fields() {
|
|
||||||
let csv1_content = "id:number,b\n1,0";
|
|
||||||
let csv1 = csv::Reader::from_reader(Cursor::new(csv1_content));
|
|
||||||
|
|
||||||
let csv2_content = "id:number,a,b\n2,0,0";
|
|
||||||
let csv2 = csv::Reader::from_reader(Cursor::new(csv2_content));
|
|
||||||
|
|
||||||
let mut builder = DocumentsBatchBuilder::new(Vec::new());
|
|
||||||
builder.append_csv(csv1).unwrap();
|
|
||||||
builder.append_csv(csv2).unwrap();
|
|
||||||
let vector = builder.into_inner().unwrap();
|
|
||||||
|
|
||||||
DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -102,6 +102,7 @@ impl Metadata {
|
|||||||
rules: &'rules [LocalizedAttributesRule],
|
rules: &'rules [LocalizedAttributesRule],
|
||||||
) -> Option<&'rules [Language]> {
|
) -> Option<&'rules [Language]> {
|
||||||
let localized_attributes_rule_id = self.localized_attributes_rule_id?.get();
|
let localized_attributes_rule_id = self.localized_attributes_rule_id?.get();
|
||||||
|
// - 1: `localized_attributes_rule_id` is NonZero
|
||||||
let rule = rules.get((localized_attributes_rule_id - 1) as usize).unwrap();
|
let rule = rules.get((localized_attributes_rule_id - 1) as usize).unwrap();
|
||||||
Some(rule.locales())
|
Some(rule.locales())
|
||||||
}
|
}
|
||||||
@ -160,6 +161,7 @@ impl MetadataBuilder {
|
|||||||
.iter()
|
.iter()
|
||||||
.flat_map(|v| v.iter())
|
.flat_map(|v| v.iter())
|
||||||
.position(|rule| rule.match_str(field))
|
.position(|rule| rule.match_str(field))
|
||||||
|
// saturating_add(1): make `id` `NonZero`
|
||||||
.map(|id| NonZeroU16::new(id.saturating_add(1).try_into().unwrap()).unwrap());
|
.map(|id| NonZeroU16::new(id.saturating_add(1).try_into().unwrap()).unwrap());
|
||||||
|
|
||||||
Metadata { searchable, filterable, sortable, localized_attributes_rule_id }
|
Metadata { searchable, filterable, sortable, localized_attributes_rule_id }
|
||||||
|
@ -46,6 +46,7 @@ fn encode_f64_into_ordered_bytes(
|
|||||||
f: f64,
|
f: f64,
|
||||||
buffer: &mut [u8; 16],
|
buffer: &mut [u8; 16],
|
||||||
) -> Result<(), InvalidGloballyOrderedFloatError> {
|
) -> Result<(), InvalidGloballyOrderedFloatError> {
|
||||||
|
// write the globally ordered float
|
||||||
let bytes = f64_into_bytes(f).ok_or(InvalidGloballyOrderedFloatError { float: f })?;
|
let bytes = f64_into_bytes(f).ok_or(InvalidGloballyOrderedFloatError { float: f })?;
|
||||||
buffer[..8].copy_from_slice(&bytes[..]);
|
buffer[..8].copy_from_slice(&bytes[..]);
|
||||||
// Then the f64 value just to be able to read it back
|
// Then the f64 value just to be able to read it back
|
||||||
|
@ -87,23 +87,10 @@ pub enum WriterOperation {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub enum ArroyOperation {
|
pub enum ArroyOperation {
|
||||||
/// TODO: call when deleting regular documents
|
DeleteVectors { docid: DocumentId },
|
||||||
DeleteVectors {
|
SetVectors { docid: DocumentId, embedder_id: u8, embeddings: Vec<Embedding> },
|
||||||
docid: DocumentId,
|
SetVector { docid: DocumentId, embedder_id: u8, embedding: Embedding },
|
||||||
},
|
Finish { configs: Vec<IndexEmbeddingConfig> },
|
||||||
SetVectors {
|
|
||||||
docid: DocumentId,
|
|
||||||
embedder_id: u8,
|
|
||||||
embeddings: Vec<Embedding>,
|
|
||||||
},
|
|
||||||
SetVector {
|
|
||||||
docid: DocumentId,
|
|
||||||
embedder_id: u8,
|
|
||||||
embedding: Embedding,
|
|
||||||
},
|
|
||||||
Finish {
|
|
||||||
configs: Vec<IndexEmbeddingConfig>,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct DbOperation {
|
pub struct DbOperation {
|
||||||
@ -334,7 +321,6 @@ impl DocidsSender for FacetDocidsSender<'_> {
|
|||||||
fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> {
|
fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> {
|
||||||
let (facet_kind, key) = FacetKind::extract_from_key(key);
|
let (facet_kind, key) = FacetKind::extract_from_key(key);
|
||||||
let database = Database::from(facet_kind);
|
let database = Database::from(facet_kind);
|
||||||
// let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value));
|
|
||||||
let entry = match facet_kind {
|
let entry = match facet_kind {
|
||||||
// skip level group size
|
// skip level group size
|
||||||
FacetKind::String | FacetKind::Number => {
|
FacetKind::String | FacetKind::Number => {
|
||||||
|
@ -140,7 +140,6 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
|
|||||||
)?;
|
)?;
|
||||||
document_extractor_data.docids_delta.insert_add_u32(docid);
|
document_extractor_data.docids_delta.insert_add_u32(docid);
|
||||||
self.document_sender.uncompressed(docid, external_docid, content).unwrap();
|
self.document_sender.uncompressed(docid, external_docid, content).unwrap();
|
||||||
// extracted_dictionary_sender.send(self, dictionary: &[u8]);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -137,7 +137,6 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor {
|
|||||||
fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
|
fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
|
||||||
Ok(RefCell::new(GeoExtractorData {
|
Ok(RefCell::new(GeoExtractorData {
|
||||||
removed: bumpalo::collections::Vec::new_in(extractor_alloc),
|
removed: bumpalo::collections::Vec::new_in(extractor_alloc),
|
||||||
// inserted: Uell::new_in(extractor_alloc),
|
|
||||||
inserted: bumpalo::collections::Vec::new_in(extractor_alloc),
|
inserted: bumpalo::collections::Vec::new_in(extractor_alloc),
|
||||||
spilled_inserted: None,
|
spilled_inserted: None,
|
||||||
spilled_removed: None,
|
spilled_removed: None,
|
||||||
@ -242,7 +241,7 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Extracts and validate the latitude and latitude from a document geo field.
|
/// Extracts and validates the latitude and latitude from a document geo field.
|
||||||
///
|
///
|
||||||
/// It can be of the form `{ "lat": 0.0, "lng": "1.0" }`.
|
/// It can be of the form `{ "lat": 0.0, "lng": "1.0" }`.
|
||||||
pub fn extract_geo_coordinates(
|
pub fn extract_geo_coordinates(
|
||||||
|
@ -35,7 +35,6 @@ pub struct WordDocidsBalancedCaches<'extractor> {
|
|||||||
unsafe impl<'extractor> MostlySend for WordDocidsBalancedCaches<'extractor> {}
|
unsafe impl<'extractor> MostlySend for WordDocidsBalancedCaches<'extractor> {}
|
||||||
|
|
||||||
impl<'extractor> WordDocidsBalancedCaches<'extractor> {
|
impl<'extractor> WordDocidsBalancedCaches<'extractor> {
|
||||||
/// TODO Make sure to give the same max_memory to all of them, without splitting it
|
|
||||||
pub fn new_in(buckets: usize, max_memory: Option<usize>, alloc: &'extractor Bump) -> Self {
|
pub fn new_in(buckets: usize, max_memory: Option<usize>, alloc: &'extractor Bump) -> Self {
|
||||||
Self {
|
Self {
|
||||||
word_fid_docids: BalancedCaches::new_in(buckets, max_memory, alloc),
|
word_fid_docids: BalancedCaches::new_in(buckets, max_memory, alloc),
|
||||||
|
Loading…
Reference in New Issue
Block a user