fix encoding bug

This commit is contained in:
Irevoire 2021-06-09 17:10:10 +02:00 committed by mpostma
parent 2716c1aebb
commit 99551fc21b
9 changed files with 34 additions and 41 deletions

View File

@ -81,7 +81,7 @@ impl UpdateHandler {
primary_key.as_deref(), primary_key.as_deref(),
), ),
ClearDocuments => index.clear_documents(update_builder), ClearDocuments => index.clear_documents(update_builder),
DeleteDocuments => index.delete_documents(content, update_builder), DeleteDocuments { documents } => index.delete_documents(documents.to_vec(), update_builder),
Settings(settings) => index.update_settings(&settings.clone().check(), update_builder), Settings(settings) => index.update_settings(&settings.clone().check(), update_builder),
}; };

View File

@ -298,18 +298,14 @@ impl Index {
pub fn delete_documents( pub fn delete_documents(
&self, &self,
document_ids: Option<impl io::Read>, document_ids: Vec<String>,
update_builder: UpdateBuilder, update_builder: UpdateBuilder,
) -> anyhow::Result<UpdateResult> { ) -> anyhow::Result<UpdateResult> {
let ids = match document_ids {
Some(reader) => serde_json::from_reader(reader)?,
None => Vec::<String>::new(),
};
let mut txn = self.write_txn()?; let mut txn = self.write_txn()?;
let mut builder = update_builder.delete_documents(&mut txn, self)?; let mut builder = update_builder.delete_documents(&mut txn, self)?;
// We ignore unexisting document ids // We ignore unexisting document ids
ids.iter().for_each(|id| { document_ids.iter().for_each(|id| {
builder.delete_external_id(id); builder.delete_external_id(id);
}); });

View File

@ -200,18 +200,11 @@ impl IndexController {
pub async fn delete_documents( pub async fn delete_documents(
&self, &self,
uid: String, uid: String,
document_ids: Vec<String>, documents: Vec<String>,
) -> anyhow::Result<UpdateStatus> { ) -> anyhow::Result<UpdateStatus> {
let uuid = self.uuid_resolver.get(uid).await?; let uuid = self.uuid_resolver.get(uid).await?;
let meta = UpdateMeta::DeleteDocuments; let meta = UpdateMeta::DeleteDocuments { documents };
let (sender, receiver) = mpsc::channel(10); let (_, receiver) = mpsc::channel(1);
tokio::task::spawn(async move {
let json = serde_json::to_vec(&document_ids).unwrap();
let bytes = Bytes::from(json);
let _ = sender.send(Ok(bytes)).await;
});
let status = self.update_handle.update(meta, receiver, uuid).await?; let status = self.update_handle.update(meta, receiver, uuid).await?;
Ok(status) Ok(status)
} }

View File

@ -106,7 +106,7 @@ where
mut payload: mpsc::Receiver<PayloadData<D>>, mut payload: mpsc::Receiver<PayloadData<D>>,
) -> Result<UpdateStatus> { ) -> Result<UpdateStatus> {
let file_path = match meta { let file_path = match meta {
UpdateMeta::DocumentsAddition { .. } | UpdateMeta::DeleteDocuments => { UpdateMeta::DocumentsAddition { .. } => {
let update_file_id = uuid::Uuid::new_v4(); let update_file_id = uuid::Uuid::new_v4();
let path = self let path = self
.path .path
@ -181,10 +181,13 @@ where
async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> { async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> {
let store = self.store.clone(); let store = self.store.clone();
tokio::task::spawn_blocking(move || {
let result = store let result = store
.meta(uuid, id)? .meta(uuid, id)?
.ok_or(UpdateError::UnexistingUpdate(id))?; .ok_or(UpdateError::UnexistingUpdate(id))?;
Ok(result) Ok(result)
})
.await?
} }
async fn handle_delete(&self, uuid: Uuid) -> Result<()> { async fn handle_delete(&self, uuid: Uuid) -> Result<()> {

View File

@ -75,10 +75,10 @@ impl<'a> BytesDecode<'a> for UpdateKeyCodec {
type DItem = (Uuid, u64); type DItem = (Uuid, u64);
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> { fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
let uuid_bytes = bytes.get(0..size_of::<Uuid>())?.try_into().ok()?; let uuid_bytes = dbg!(bytes.get(0..size_of::<Uuid>())?.try_into().ok())?;
let uuid = Uuid::from_bytes(uuid_bytes); let uuid = Uuid::from_bytes(uuid_bytes);
let update_id_bytes = bytes.get(size_of::<Uuid>()..)?.try_into().ok()?; let update_id_bytes = dbg!(bytes.get(size_of::<Uuid>()..)?.try_into().ok())?;
let update_id = u64::from_be_bytes(update_id_bytes); let update_id = u64::from_be_bytes(update_id_bytes);
Some((uuid, update_id)) Some((uuid, update_id))

View File

@ -108,7 +108,6 @@ impl UpdateStore {
let updates = self let updates = self
.updates .updates
.iter(txn)? .iter(txn)?
.remap_key_type::<UpdateKeyCodec>()
.lazily_decode_data(); .lazily_decode_data();
for update in updates { for update in updates {

View File

@ -97,7 +97,7 @@ pub struct UpdateStore {
/// The keys are built as follow: /// The keys are built as follow:
/// | Uuid | id | /// | Uuid | id |
/// | 16-bytes | 8-bytes | /// | 16-bytes | 8-bytes |
updates: Database<ByteSlice, SerdeJson<UpdateStatus>>, updates: Database<UpdateKeyCodec, SerdeJson<UpdateStatus>>,
/// Indicates the current state of the update store, /// Indicates the current state of the update store,
state: Arc<StateLock>, state: Arc<StateLock>,
/// Wake up the loop when a new event occurs. /// Wake up the loop when a new event occurs.
@ -244,6 +244,8 @@ impl UpdateStore {
txn.commit()?; txn.commit()?;
dbg!("here");
self.notification_sender self.notification_sender
.blocking_send(()) .blocking_send(())
.expect("Update store loop exited."); .expect("Update store loop exited.");
@ -269,7 +271,7 @@ impl UpdateStore {
} }
_ => { _ => {
let _update_id = self.next_update_id_raw(wtxn, index_uuid)?; let _update_id = self.next_update_id_raw(wtxn, index_uuid)?;
self.updates.remap_key_type::<UpdateKeyCodec>().put( self.updates.put(
wtxn, wtxn,
&(index_uuid, update.id()), &(index_uuid, update.id()),
&update, &update,
@ -324,6 +326,8 @@ impl UpdateStore {
let content_path = content.map(|uuid| update_uuid_to_file_path(&self.path, uuid)); let content_path = content.map(|uuid| update_uuid_to_file_path(&self.path, uuid));
let update_id = processing.id(); let update_id = processing.id();
dbg!(&processing);
let file = match content_path { let file = match content_path {
Some(ref path) => { Some(ref path) => {
let file = File::open(path)?; let file = File::open(path)?;
@ -352,7 +356,7 @@ impl UpdateStore {
Err(res) => res.into(), Err(res) => res.into(),
}; };
self.updates.remap_key_type::<UpdateKeyCodec>().put( self.updates.put(
&mut wtxn, &mut wtxn,
&(index_uuid, update_id), &(index_uuid, update_id),
&result, &result,
@ -381,7 +385,11 @@ impl UpdateStore {
} }
} }
let updates = self.updates.prefix_iter(&txn, index_uuid.as_bytes())?; let updates = self
.updates
.remap_key_type::<ByteSlice>()
.prefix_iter(&txn, index_uuid.as_bytes())?;
for entry in updates { for entry in updates {
let (_, update) = entry?; let (_, update) = entry?;
update_list.insert(update.id(), update); update_list.insert(update.id(), update);
@ -412,26 +420,19 @@ impl UpdateStore {
let txn = self.env.read_txn()?; let txn = self.env.read_txn()?;
// Else, check if it is in the updates database: // Else, check if it is in the updates database:
let update = self let update = dbg!(self.updates.get(&txn, &(index_uuid, update_id)))?;
.updates
.remap_key_type::<UpdateKeyCodec>()
.get(&txn, &(index_uuid, update_id))?;
if let Some(update) = update { if let Some(update) = update {
return Ok(Some(update)); return Ok(Some(update));
} }
// If nothing was found yet, we resolve to iterate over the pending queue. // If nothing was found yet, we resolve to iterate over the pending queue.
let pendings = self let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data();
.pending_queue
.remap_key_type::<UpdateKeyCodec>()
.iter(&txn)?
.lazily_decode_data();
for entry in pendings { for entry in pendings {
let ((uuid, id), pending) = entry?; let ((_, uuid, id), pending) = entry?;
if uuid == index_uuid && id == update_id { if uuid == index_uuid && id == update_id {
return Ok(Some(pending.decode()?.into())); return Ok(Some(dbg!(pending.decode())?.into()));
} }
} }
@ -461,6 +462,7 @@ impl UpdateStore {
let mut updates = self let mut updates = self
.updates .updates
.remap_key_type::<ByteSlice>()
.prefix_iter_mut(&mut txn, index_uuid.as_bytes())? .prefix_iter_mut(&mut txn, index_uuid.as_bytes())?
.lazily_decode_data(); .lazily_decode_data();
@ -707,7 +709,6 @@ mod test {
assert!(store.pending_queue.first(&txn).unwrap().is_none()); assert!(store.pending_queue.first(&txn).unwrap().is_none());
let update = store let update = store
.updates .updates
.remap_key_type::<UpdateKeyCodec>()
.get(&txn, &(uuid, 0)) .get(&txn, &(uuid, 0))
.unwrap() .unwrap()
.unwrap(); .unwrap();
@ -715,7 +716,6 @@ mod test {
assert!(matches!(update, UpdateStatus::Processed(_))); assert!(matches!(update, UpdateStatus::Processed(_)));
let update = store let update = store
.updates .updates
.remap_key_type::<UpdateKeyCodec>()
.get(&txn, &(uuid, 1)) .get(&txn, &(uuid, 1))
.unwrap() .unwrap()
.unwrap(); .unwrap();

View File

@ -23,7 +23,9 @@ pub enum UpdateMeta {
primary_key: Option<String>, primary_key: Option<String>,
}, },
ClearDocuments, ClearDocuments,
DeleteDocuments, DeleteDocuments {
documents: Vec<String>
},
Settings(Settings<Unchecked>), Settings(Settings<Unchecked>),
} }

View File

@ -114,7 +114,7 @@ async fn delete_no_document_batch() {
index.add_documents(json!([{ "id": 1, "content": "foobar" }, { "id": 0, "content": "foobar" }, { "id": 3, "content": "foobar" }]), Some("id")).await; index.add_documents(json!([{ "id": 1, "content": "foobar" }, { "id": 0, "content": "foobar" }, { "id": 3, "content": "foobar" }]), Some("id")).await;
index.wait_update_id(0).await; index.wait_update_id(0).await;
let (_response, code) = index.delete_batch(vec![]).await; let (_response, code) = index.delete_batch(vec![]).await;
assert_eq!(code, 202); assert_eq!(code, 202, "{}", _response);
let _update = index.wait_update_id(1).await; let _update = index.wait_update_id(1).await;
let (response, code) = index let (response, code) = index