From 5c488e20cc07a66aff3794fd94c3c84d47170b31 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= <clement@meilisearch.com>
Date: Wed, 27 Nov 2024 18:03:45 +0100
Subject: [PATCH] Send the geo rtree through crossbeam channel

---
 crates/milli/src/update/new/channel.rs | 107 +++++++++++++------------
 1 file changed, 56 insertions(+), 51 deletions(-)

diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs
index 70c4a6042..26e375a5a 100644
--- a/crates/milli/src/update/new/channel.rs
+++ b/crates/milli/src/update/new/channel.rs
@@ -166,7 +166,6 @@ pub struct DbOperation {
 
 impl DbOperation {
     pub fn key_value<'a>(&self, frame: &'a FrameGrantR<'_>) -> (&'a [u8], Option<&'a [u8]>) {
-        /// TODO replace the return type by an enum Write | Delete
         let skip = EntryHeader::variant_size() + mem::size_of::<Self>();
         match self.key_length {
             Some(key_length) => {
@@ -478,8 +477,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
 
     fn write_key_value(&self, database: Database, key: &[u8], value: &[u8]) -> crate::Result<()> {
         let key_length = NonZeroU16::new(key.len().try_into().unwrap()).unwrap();
-        self.write_key_value_with(database, key_length, value.len(), |buffer| {
-            let (key_buffer, value_buffer) = buffer.split_at_mut(key.len());
+        self.write_key_value_with(database, key_length, value.len(), |key_buffer, value_buffer| {
             key_buffer.copy_from_slice(key);
             value_buffer.copy_from_slice(value);
             Ok(())
@@ -494,7 +492,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
         key_value_writer: F,
     ) -> crate::Result<()>
     where
-        F: FnOnce(&mut [u8]) -> crate::Result<()>,
+        F: FnOnce(&mut [u8], &mut [u8]) -> crate::Result<()>,
     {
         let capacity = self.capacity;
         let refcell = self.producers.get().unwrap();
@@ -519,7 +517,8 @@ impl<'b> ExtractorBbqueueSender<'b> {
         let header_size = payload_header.header_size();
         let (header_bytes, remaining) = grant.split_at_mut(header_size);
         payload_header.serialize_into(header_bytes);
-        key_value_writer(remaining)?;
+        let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize);
+        key_value_writer(key_buffer, value_buffer)?;
 
         // We could commit only the used memory.
         grant.commit(total_length);
@@ -635,12 +634,16 @@ impl<D: DatabaseType> WordDocidsSender<'_, '_, D> {
     pub fn write(&self, key: &[u8], bitmap: &RoaringBitmap) -> crate::Result<()> {
         let key_length = NonZeroU16::new(key.len().try_into().unwrap()).unwrap();
         let value_length = CboRoaringBitmapCodec::serialized_size(bitmap);
-        self.sender.write_key_value_with(D::DATABASE, key_length, value_length, |buffer| {
-            let (key_buffer, value_buffer) = buffer.split_at_mut(key.len());
-            key_buffer.copy_from_slice(key);
-            CboRoaringBitmapCodec::serialize_into_writer(bitmap, value_buffer)?;
-            Ok(())
-        })
+        self.sender.write_key_value_with(
+            D::DATABASE,
+            key_length,
+            value_length,
+            |key_buffer, value_buffer| {
+                key_buffer.copy_from_slice(key);
+                CboRoaringBitmapCodec::serialize_into_writer(bitmap, value_buffer)?;
+                Ok(())
+            },
+        )
     }
 
     pub fn delete(&self, key: &[u8]) -> crate::Result<()> {
@@ -667,25 +670,29 @@ impl FacetDocidsSender<'_, '_> {
             FacetKind::Null | FacetKind::Empty | FacetKind::Exists => value_length,
         };
 
-        self.sender.write_key_value_with(database, key_length, value_length, |buffer| {
-            let (key_out, value_out) = buffer.split_at_mut(key.len());
-            key_out.copy_from_slice(key);
+        self.sender.write_key_value_with(
+            database,
+            key_length,
+            value_length,
+            |key_out, value_out| {
+                key_out.copy_from_slice(key);
 
-            let value_out = match facet_kind {
-                // We must take the facet group size into account
-                // when we serialize strings and numbers.
-                FacetKind::String | FacetKind::Number => {
-                    let (first, remaining) = value_out.split_first_mut().unwrap();
-                    *first = 1;
-                    remaining
-                }
-                FacetKind::Null | FacetKind::Empty | FacetKind::Exists => value_out,
-            };
+                let value_out = match facet_kind {
+                    // We must take the facet group size into account
+                    // when we serialize strings and numbers.
+                    FacetKind::String | FacetKind::Number => {
+                        let (first, remaining) = value_out.split_first_mut().unwrap();
+                        *first = 1;
+                        remaining
+                    }
+                    FacetKind::Null | FacetKind::Empty | FacetKind::Exists => value_out,
+                };
 
-            CboRoaringBitmapCodec::serialize_into_writer(bitmap, value_out)?;
+                CboRoaringBitmapCodec::serialize_into_writer(bitmap, value_out)?;
 
-            Ok(())
-        })
+                Ok(())
+            },
+        )
     }
 
     pub fn delete(&self, key: &[u8]) -> crate::Result<()> {
@@ -777,32 +784,30 @@ pub struct GeoSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>);
 
 impl GeoSender<'_, '_> {
     pub fn set_rtree(&self, value: Mmap) -> StdResult<(), SendError<()>> {
-        todo!("set rtree from file")
-        // self.0
-        //     .send(WriterOperation::DbOperation(DbOperation {
-        //         database: Database::Main,
-        //         entry: EntryOperation::Write(KeyValueEntry::from_large_key_value(
-        //             GEO_RTREE_KEY.as_bytes(),
-        //             value,
-        //         )),
-        //     }))
-        //     .map_err(|_| SendError(()))
+        self.0
+            .sender
+            .send(ReceiverAction::LargeEntry {
+                database: Database::Main,
+                key: GEO_RTREE_KEY.to_string().into_bytes().into_boxed_slice(),
+                value,
+            })
+            .map_err(|_| SendError(()))
     }
 
-    pub fn set_geo_faceted(&self, bitmap: &RoaringBitmap) -> StdResult<(), SendError<()>> {
-        todo!("serialize directly into bbqueue (as a real roaringbitmap not a cbo)")
+    pub fn set_geo_faceted(&self, bitmap: &RoaringBitmap) -> crate::Result<()> {
+        let key = GEO_FACETED_DOCUMENTS_IDS_KEY.as_bytes();
+        let key_length = NonZeroU16::new(key.len().try_into().unwrap()).unwrap();
+        let value_length = bitmap.serialized_size();
 
-        // let mut buffer = Vec::new();
-        // bitmap.serialize_into(&mut buffer).unwrap();
-
-        // self.0
-        //     .send(WriterOperation::DbOperation(DbOperation {
-        //         database: Database::Main,
-        //         entry: EntryOperation::Write(KeyValueEntry::from_small_key_value(
-        //             GEO_FACETED_DOCUMENTS_IDS_KEY.as_bytes(),
-        //             &buffer,
-        //         )),
-        //     }))
-        //     .map_err(|_| SendError(()))
+        self.0.write_key_value_with(
+            Database::Main,
+            key_length,
+            value_length,
+            |key_buffer, value_buffer| {
+                key_buffer.copy_from_slice(key);
+                bitmap.serialize_into(value_buffer)?;
+                Ok(())
+            },
+        )
     }
 }