mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-22 18:17:39 +08:00
Add more timing logs to the Transform
This commit is contained in:
parent
0fb6acefc3
commit
0fc446c62f
@ -150,6 +150,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[logging_timer::time]
|
||||||
pub fn read_documents<R, FP, FA>(
|
pub fn read_documents<R, FP, FA>(
|
||||||
&mut self,
|
&mut self,
|
||||||
reader: EnrichedDocumentsBatchReader<R>,
|
reader: EnrichedDocumentsBatchReader<R>,
|
||||||
@ -162,6 +163,8 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
FP: Fn(UpdateIndexingStep) + Sync,
|
FP: Fn(UpdateIndexingStep) + Sync,
|
||||||
FA: Fn() -> bool + Sync,
|
FA: Fn() -> bool + Sync,
|
||||||
{
|
{
|
||||||
|
puffin::profile_function!();
|
||||||
|
|
||||||
let (mut cursor, fields_index) = reader.into_cursor_and_fields_index();
|
let (mut cursor, fields_index) = reader.into_cursor_and_fields_index();
|
||||||
let external_documents_ids = self.index.external_documents_ids();
|
let external_documents_ids = self.index.external_documents_ids();
|
||||||
let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?;
|
let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?;
|
||||||
@ -212,13 +215,12 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
field_buffer_cache.sort_unstable_by(|(f1, _), (f2, _)| f1.cmp(f2));
|
field_buffer_cache.sort_unstable_by(|(f1, _), (f2, _)| f1.cmp(f2));
|
||||||
|
|
||||||
// Build the new obkv document.
|
// Build the new obkv document.
|
||||||
let mut writer = obkv::KvWriter::new(&mut obkv_buffer);
|
let mut writer = KvWriter::new(&mut obkv_buffer);
|
||||||
for (k, v) in field_buffer_cache.iter() {
|
for (k, v) in field_buffer_cache.iter() {
|
||||||
writer.insert(*k, v)?;
|
writer.insert(*k, v)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut original_docid = None;
|
let mut original_docid = None;
|
||||||
|
|
||||||
let docid = match self.new_external_documents_ids_builder.entry((*external_id).into()) {
|
let docid = match self.new_external_documents_ids_builder.entry((*external_id).into()) {
|
||||||
HEntry::Occupied(entry) => *entry.get() as u32,
|
HEntry::Occupied(entry) => *entry.get() as u32,
|
||||||
HEntry::Vacant(entry) => {
|
HEntry::Vacant(entry) => {
|
||||||
@ -275,24 +277,19 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
&mut document_sorter_buffer,
|
&mut document_sorter_buffer,
|
||||||
)?;
|
)?;
|
||||||
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
||||||
match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? {
|
let base_obkv = KvReader::new(base_obkv);
|
||||||
Some(flattened_obkv) => {
|
if let Some(flattened_obkv) = self.flatten_from_fields_ids_map(base_obkv)? {
|
||||||
// we recreate our buffer with the flattened documents
|
// we recreate our buffer with the flattened documents
|
||||||
document_sorter_buffer.clear();
|
document_sorter_buffer.clear();
|
||||||
document_sorter_buffer.push(Operation::Addition as u8);
|
document_sorter_buffer.push(Operation::Addition as u8);
|
||||||
into_del_add_obkv(
|
into_del_add_obkv(
|
||||||
KvReaderU16::new(&flattened_obkv),
|
KvReaderU16::new(&flattened_obkv),
|
||||||
true,
|
true,
|
||||||
keep_original_version,
|
keep_original_version,
|
||||||
&mut document_sorter_buffer,
|
&mut document_sorter_buffer,
|
||||||
)?;
|
)?;
|
||||||
self.flattened_sorter
|
|
||||||
.insert(docid.to_be_bytes(), &document_sorter_buffer)?
|
|
||||||
}
|
|
||||||
None => self
|
|
||||||
.flattened_sorter
|
|
||||||
.insert(docid.to_be_bytes(), &document_sorter_buffer)?,
|
|
||||||
}
|
}
|
||||||
|
self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -310,23 +307,18 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
// We use the extracted/generated user id as the key for this document.
|
// We use the extracted/generated user id as the key for this document.
|
||||||
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
||||||
|
|
||||||
match self.flatten_from_fields_ids_map(KvReader::new(&obkv_buffer))? {
|
let flattened_obkv = KvReader::new(&obkv_buffer);
|
||||||
Some(flattened_obkv) => {
|
if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? {
|
||||||
document_sorter_buffer.clear();
|
document_sorter_buffer.clear();
|
||||||
document_sorter_buffer.push(Operation::Addition as u8);
|
document_sorter_buffer.push(Operation::Addition as u8);
|
||||||
into_del_add_obkv(
|
into_del_add_obkv(
|
||||||
KvReaderU16::new(&flattened_obkv),
|
KvReaderU16::new(&obkv),
|
||||||
false,
|
false,
|
||||||
true,
|
true,
|
||||||
&mut document_sorter_buffer,
|
&mut document_sorter_buffer,
|
||||||
)?;
|
)?
|
||||||
self.flattened_sorter
|
|
||||||
.insert(docid.to_be_bytes(), &document_sorter_buffer)?
|
|
||||||
}
|
|
||||||
None => self
|
|
||||||
.flattened_sorter
|
|
||||||
.insert(docid.to_be_bytes(), &document_sorter_buffer)?,
|
|
||||||
}
|
}
|
||||||
|
self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
||||||
}
|
}
|
||||||
documents_count += 1;
|
documents_count += 1;
|
||||||
|
|
||||||
@ -361,6 +353,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
/// - If the document to remove was inserted by the `read_documents` method before but was NOT present in the db,
|
/// - If the document to remove was inserted by the `read_documents` method before but was NOT present in the db,
|
||||||
/// it's added into the grenad to ensure we don't insert it + removed from the list of new documents ids.
|
/// it's added into the grenad to ensure we don't insert it + removed from the list of new documents ids.
|
||||||
/// - If the document to remove was not present in either the db or the transform we do nothing.
|
/// - If the document to remove was not present in either the db or the transform we do nothing.
|
||||||
|
#[logging_timer::time]
|
||||||
pub fn remove_documents<FA>(
|
pub fn remove_documents<FA>(
|
||||||
&mut self,
|
&mut self,
|
||||||
mut to_remove: Vec<String>,
|
mut to_remove: Vec<String>,
|
||||||
@ -370,6 +363,8 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
where
|
where
|
||||||
FA: Fn() -> bool + Sync,
|
FA: Fn() -> bool + Sync,
|
||||||
{
|
{
|
||||||
|
puffin::profile_function!();
|
||||||
|
|
||||||
// there may be duplicates in the documents to remove.
|
// there may be duplicates in the documents to remove.
|
||||||
to_remove.sort_unstable();
|
to_remove.sort_unstable();
|
||||||
to_remove.dedup();
|
to_remove.dedup();
|
||||||
@ -439,24 +434,19 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
||||||
|
|
||||||
// flatten it and push it as to delete in the flattened_sorter
|
// flatten it and push it as to delete in the flattened_sorter
|
||||||
match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? {
|
let flattened_obkv = KvReader::new(base_obkv);
|
||||||
Some(flattened_obkv) => {
|
if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? {
|
||||||
// we recreate our buffer with the flattened documents
|
// we recreate our buffer with the flattened documents
|
||||||
document_sorter_buffer.clear();
|
document_sorter_buffer.clear();
|
||||||
document_sorter_buffer.push(Operation::Deletion as u8);
|
document_sorter_buffer.push(Operation::Deletion as u8);
|
||||||
into_del_add_obkv(
|
into_del_add_obkv(
|
||||||
KvReaderU16::new(&flattened_obkv),
|
KvReaderU16::new(&obkv),
|
||||||
true,
|
true,
|
||||||
false,
|
false,
|
||||||
&mut document_sorter_buffer,
|
&mut document_sorter_buffer,
|
||||||
)?;
|
)?;
|
||||||
self.flattened_sorter
|
|
||||||
.insert(docid.to_be_bytes(), &document_sorter_buffer)?
|
|
||||||
}
|
|
||||||
None => self
|
|
||||||
.flattened_sorter
|
|
||||||
.insert(docid.to_be_bytes(), &document_sorter_buffer)?,
|
|
||||||
}
|
}
|
||||||
|
self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
||||||
|
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
@ -591,42 +581,10 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn remove_deleted_documents_from_field_distribution(
|
|
||||||
&self,
|
|
||||||
rtxn: &RoTxn,
|
|
||||||
field_distribution: &mut FieldDistribution,
|
|
||||||
) -> Result<()> {
|
|
||||||
for deleted_docid in self.replaced_documents_ids.iter() {
|
|
||||||
let obkv = self.index.documents.get(rtxn, &BEU32::new(deleted_docid))?.ok_or(
|
|
||||||
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
|
|
||||||
)?;
|
|
||||||
|
|
||||||
for (key, _) in obkv.iter() {
|
|
||||||
let name =
|
|
||||||
self.fields_ids_map.name(key).ok_or(FieldIdMapMissingEntry::FieldId {
|
|
||||||
field_id: key,
|
|
||||||
process: "Computing field distribution in transform.",
|
|
||||||
})?;
|
|
||||||
// We checked that the document was in the db earlier. If we can't find it it means
|
|
||||||
// there is an inconsistency between the field distribution and the field id map.
|
|
||||||
let field =
|
|
||||||
field_distribution.get_mut(name).ok_or(FieldIdMapMissingEntry::FieldId {
|
|
||||||
field_id: key,
|
|
||||||
process: "Accessing field distribution in transform.",
|
|
||||||
})?;
|
|
||||||
*field -= 1;
|
|
||||||
if *field == 0 {
|
|
||||||
// since we were able to get the field right before it's safe to unwrap here
|
|
||||||
field_distribution.remove(name).unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Generate the `TransformOutput` based on the given sorter that can be generated from any
|
/// Generate the `TransformOutput` based on the given sorter that can be generated from any
|
||||||
/// format like CSV, JSON or JSON stream. This sorter must contain a key that is the document
|
/// format like CSV, JSON or JSON stream. This sorter must contain a key that is the document
|
||||||
/// id for the user side and the value must be an obkv where keys are valid fields ids.
|
/// id for the user side and the value must be an obkv where keys are valid fields ids.
|
||||||
|
#[logging_timer::time]
|
||||||
pub(crate) fn output_from_sorter<F>(
|
pub(crate) fn output_from_sorter<F>(
|
||||||
self,
|
self,
|
||||||
wtxn: &mut heed::RwTxn,
|
wtxn: &mut heed::RwTxn,
|
||||||
@ -816,7 +774,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
let (docid, obkv) = result?;
|
let (docid, obkv) = result?;
|
||||||
|
|
||||||
obkv_buffer.clear();
|
obkv_buffer.clear();
|
||||||
let mut obkv_writer = obkv::KvWriter::<_, FieldId>::new(&mut obkv_buffer);
|
let mut obkv_writer = KvWriter::<_, FieldId>::new(&mut obkv_buffer);
|
||||||
|
|
||||||
// We iterate over the new `FieldsIdsMap` ids in order and construct the new obkv.
|
// We iterate over the new `FieldsIdsMap` ids in order and construct the new obkv.
|
||||||
for (id, name) in new_fields_ids_map.iter() {
|
for (id, name) in new_fields_ids_map.iter() {
|
||||||
|
Loading…
Reference in New Issue
Block a user