From 6e8a3fe8deea62818ac0cc9194939904a69f8e6b Mon Sep 17 00:00:00 2001 From: mpostma Date: Tue, 28 Sep 2021 22:58:48 +0200 Subject: [PATCH] move csv parsing to document_formats --- Cargo.lock | 185 +++++------ meilisearch-lib/src/document_formats.rs | 303 +++++++++++++++++- .../updates/csv_documents_iter.rs | 282 ---------------- .../src/index_controller/updates/mod.rs | 28 +- 4 files changed, 384 insertions(+), 414 deletions(-) delete mode 100644 meilisearch-lib/src/index_controller/updates/csv_documents_iter.rs diff --git a/Cargo.lock b/Cargo.lock index af964f2cd..d7ab9dbe4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -82,7 +82,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2f86cd6857c135e6e9fe57b1619a88d1f94a7df34c00e11fe13e64fd3438837" dependencies = [ "quote 1.0.9", - "syn 1.0.76", + "syn 1.0.77", ] [[package]] @@ -218,7 +218,7 @@ dependencies = [ "actix-router", "proc-macro2 1.0.29", "quote 1.0.9", - "syn 1.0.76", + "syn 1.0.77", ] [[package]] @@ -296,29 +296,15 @@ dependencies = [ [[package]] name = "arc-swap" -version = "1.3.2" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5ab7d9e73059c86c36473f459b52adbd99c3554a4fec492caef460806006f00" +checksum = "e6df5aef5c5830360ce5218cecb8f018af3438af5686ae945094affc86fdec63" [[package]] name = "as-slice" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "45403b49e3954a4b8428a0ac21a4b7afadccf92bfd96273f1a58cd4812496ae0" -<<<<<<< HEAD -======= -dependencies = [ - "generic-array 0.12.4", - "generic-array 0.13.3", - "generic-array 0.14.4", - "stable_deref_trait", -] - -[[package]] -name = "assert-json-diff" -version = "1.0.1" -source = "git+https://github.com/qdequele/assert-json-diff?branch=master#9012a0c8866d0f2db0ef9a6242e4a19d1e8c67e4" ->>>>>>> 9d9543fd (Use an existing revision of milli) dependencies = [ "generic-array 0.12.4", "generic-array 0.13.3", @@ -344,7 +330,7 @@ checksum = "648ed8c8d2ce5409ccd57453d9d1b214b342a0d69376a6feda1fd6cae3299308" dependencies = [ "proc-macro2 1.0.29", "quote 1.0.9", - "syn 1.0.76", + "syn 1.0.77", ] [[package]] @@ -355,7 +341,7 @@ checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e" dependencies = [ "proc-macro2 1.0.29", "quote 1.0.9", - "syn 1.0.76", + "syn 1.0.77", ] [[package]] @@ -478,9 +464,9 @@ dependencies = [ [[package]] name = "bstr" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90682c8d613ad3373e66de8c6411e0ae2ab2571e879d2efbf73558cc66f21279" +checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223" dependencies = [ "lazy_static", "memchr", @@ -490,9 +476,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.7.0" +version = "3.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c59e7af012c713f529e7a3ee57ce9b31ddd858d4b512923602f74608b009631" +checksum = "d9df67f7bf9ef8498769f994239c45613ef0c5899415fb58e9add412d2c1a538" [[package]] name = "byte-tools" @@ -526,7 +512,7 @@ checksum = "8e215f8c2f9f79cb53c8335e687ffd07d5bfcb6fe5fc80723762d0be46e7cc54" dependencies = [ "proc-macro2 1.0.29", "quote 1.0.9", - "syn 1.0.76", + "syn 1.0.77", ] [[package]] @@ -593,9 +579,9 @@ dependencies = [ [[package]] name = "cedarwood" -version = "0.4.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "963e82c7b94163808ca3a452608d260b64ba5bc7b5653b4af1af59887899f48d" +checksum = "fa312498f9f41452998d984d3deb84c84f86aeb8a2499d7505bb8106d78d147d" dependencies = [ "smallvec", ] @@ -668,7 +654,7 @@ checksum = "1df715824eb382e34b7afb7463b0247bf41538aeba731fba05241ecdb5dc3747" dependencies = [ "proc-macro2 1.0.29", "quote 1.0.9", - "syn 1.0.76", + "syn 1.0.77", ] [[package]] @@ -817,7 +803,7 @@ checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" dependencies = [ "proc-macro2 1.0.29", "quote 1.0.9", - "syn 1.0.76", + "syn 1.0.77", ] [[package]] @@ -830,7 +816,7 @@ dependencies = [ "proc-macro2 1.0.29", "quote 1.0.9", "rustc_version 0.3.3", - "syn 1.0.76", + "syn 1.0.77", ] [[package]] @@ -895,7 +881,7 @@ checksum = "c134c37760b27a871ba422106eedbb8247da973a09e82558bf26d619c882b159" dependencies = [ "proc-macro2 1.0.29", "quote 1.0.9", - "syn 1.0.76", + "syn 1.0.77", ] [[package]] @@ -937,9 +923,9 @@ checksum = "31586bda1b136406162e381a3185a506cdfc1631708dd40cba2f6628d8634499" [[package]] name = "flate2" -version = "1.0.21" +version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80edafed416a46fb378521624fab1cfa2eb514784fd8921adbe8a8d8321da811" +checksum = "1e6988e897c1c9c485f43b47a529cef42fde0547f9d8d41a7062518f1d8fc53f" dependencies = [ "cfg-if 1.0.0", "crc32fast", @@ -1033,7 +1019,7 @@ dependencies = [ "proc-macro-hack", "proc-macro2 1.0.29", "quote 1.0.9", - "syn 1.0.76", + "syn 1.0.77", ] [[package]] @@ -1132,7 +1118,7 @@ dependencies = [ "proc-macro-error", "proc-macro2 1.0.29", "quote 1.0.9", - "syn 1.0.76", + "syn 1.0.77", ] [[package]] @@ -1288,9 +1274,9 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" [[package]] name = "http" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "527e8c9ac747e28542699a951517aa9a6945af506cd1f2e1b53a576c17b6cc11" +checksum = "1323096b05d41827dadeaee54c9981958c0f94e670bc94ed80037d1a7b8b186b" dependencies = [ "bytes", "fnv", @@ -1334,9 +1320,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.12" +version = "0.14.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13f67199e765030fa08fe0bd581af683f0d5bc04ea09c2b1102012c5fb90e7fd" +checksum = "15d1cfb9e4f68655fa04c01f59edb405b6074a0f7118ea881e5026e4a1cd8593" dependencies = [ "bytes", "futures-channel", @@ -1395,9 +1381,9 @@ dependencies = [ [[package]] name = "instant" -version = "0.1.10" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bee0328b1209d157ef001c94dd85b4f8f64139adb0eac2659f4b08382b2f474d" +checksum = "716d3d89f35ac6a34fd0eed635395f4c3b76fa889338a4632e5231a8684216bd" dependencies = [ "cfg-if 1.0.0", ] @@ -1470,9 +1456,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.54" +version = "0.3.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1866b355d9c878e5e607473cbe3f63282c0b7aad2db1dbebf55076c686918254" +checksum = "7cc9ffccd38c451a86bf13657df244e9c3f37493cce8e5e21e940963777acc84" dependencies = [ "wasm-bindgen", ] @@ -1500,9 +1486,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.101" +version = "0.2.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cb00336871be5ed2c8ed44b60ae9959dc5b9f08539422ed43f09e34ecaeba21" +checksum = "dd8f7255a17a627354f321ef0055d63b898c6fb27eff628af4d1b66b7331edf6" [[package]] name = "libgit2-sys" @@ -1792,12 +1778,7 @@ dependencies = [ [[package]] name = "milli" -<<<<<<< HEAD version = "0.16.0" -======= -version = "0.14.0" -source = "git+https://github.com/meilisearch/milli.git?rev=9d9010e#9d9010e45ff1eddd8a7715423ad0988a35ee34b6" ->>>>>>> 9d9543fd (Use an existing revision of milli) dependencies = [ "bimap", "bincode", @@ -2071,7 +2052,7 @@ dependencies = [ "pest_meta", "proc-macro2 1.0.29", "quote 1.0.9", - "syn 1.0.76", + "syn 1.0.77", ] [[package]] @@ -2140,7 +2121,7 @@ checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389" dependencies = [ "proc-macro2 1.0.29", "quote 1.0.9", - "syn 1.0.76", + "syn 1.0.77", ] [[package]] @@ -2157,9 +2138,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pkg-config" -version = "0.3.19" +version = "0.3.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c" +checksum = "7c9b1041b4387893b91ee6746cddfc28516aff326a3519fb2adf820932c5e6cb" [[package]] name = "ppv-lite86" @@ -2176,7 +2157,7 @@ dependencies = [ "proc-macro-error-attr", "proc-macro2 1.0.29", "quote 1.0.9", - "syn 1.0.76", + "syn 1.0.77", "version_check", ] @@ -2383,9 +2364,9 @@ dependencies = [ [[package]] name = "retain_mut" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9c17925a9027d298a4603d286befe3f9dc0e8ed02523141914eb628798d6e5b" +checksum = "448296241d034b96c11173591deaa1302f2c17b56092106c1f92c1bc0183a8c9" [[package]] name = "ring" @@ -2550,14 +2531,14 @@ checksum = "d7bc1a1ab1961464eae040d96713baa5a724a8152c1222492465b54322ec508b" dependencies = [ "proc-macro2 1.0.29", "quote 1.0.9", - "syn 1.0.76", + "syn 1.0.77", ] [[package]] name = "serde_json" -version = "1.0.67" +version = "1.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7f9e390c27c3c0ce8bc5d725f6e4d30a29d26659494aa4b17535f7522c5c950" +checksum = "0f690853975602e1bfe1ccbf50504d67174e3bcf340f23b5ea9992e0587a52d8" dependencies = [ "indexmap", "itoa", @@ -2670,15 +2651,15 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.6.1" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" +checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309" [[package]] name = "socket2" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "765f090f0e423d2b55843402a07915add955e7d60657db13707a159727326cad" +checksum = "5dc90fe6c7be1a323296982db1836d1ea9e47b6839496dde9a541bc496df3516" dependencies = [ "libc", "winapi", @@ -2729,7 +2710,7 @@ dependencies = [ "quote 1.0.9", "serde", "serde_derive", - "syn 1.0.76", + "syn 1.0.77", ] [[package]] @@ -2745,7 +2726,7 @@ dependencies = [ "serde_derive", "serde_json", "sha1", - "syn 1.0.76", + "syn 1.0.77", ] [[package]] @@ -2781,7 +2762,7 @@ dependencies = [ "proc-macro-error", "proc-macro2 1.0.29", "quote 1.0.9", - "syn 1.0.76", + "syn 1.0.77", ] [[package]] @@ -2797,9 +2778,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.76" +version = "1.0.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6f107db402c2c2055242dbf4d2af0e69197202e9faacbef9571bbe47f5a1b84" +checksum = "5239bc68e0fef57495900cfea4e8dc75596d9a319d7e16b1e0a440d24e6fe0a0" dependencies = [ "proc-macro2 1.0.29", "quote 1.0.9", @@ -2823,15 +2804,15 @@ checksum = "474aaa926faa1603c40b7885a9eaea29b444d1cb2850cb7c0e37bb1a4182f4fa" dependencies = [ "proc-macro2 1.0.29", "quote 1.0.9", - "syn 1.0.76", + "syn 1.0.77", "unicode-xid 0.2.2", ] [[package]] name = "sysinfo" -version = "0.20.3" +version = "0.20.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92d77883450d697c0010e60db3d940ed130b0ed81d27485edee981621b434e52" +checksum = "ffff4a02fa61eee51f95210fc9c98ea6eeb46bb071adeafd61e1a0b9b22c6a6d" dependencies = [ "cfg-if 1.0.0", "core-foundation-sys", @@ -2902,7 +2883,7 @@ checksum = "bad553cc2c78e8de258400763a647e80e6d1b31ee237275d756f6836d204494c" dependencies = [ "proc-macro2 1.0.29", "quote 1.0.9", - "syn 1.0.76", + "syn 1.0.77", ] [[package]] @@ -2951,14 +2932,14 @@ dependencies = [ "proc-macro2 1.0.29", "quote 1.0.9", "standback", - "syn 1.0.76", + "syn 1.0.77", ] [[package]] name = "tinyvec" -version = "1.4.0" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5241dd6f21443a3606b432718b166d3cedc962fd4b8bea54a8bc7f514ebda986" +checksum = "f83b2a3d4d9091d0abd7eba4dc2710b1718583bd4d8992e2190720ea38f391f7" dependencies = [ "tinyvec_macros", ] @@ -2971,9 +2952,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.11.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4efe6fc2395938c8155973d7be49fe8d03a843726e285e100a8a383cc0154ce" +checksum = "c2c2416fdedca8443ae44b4527de1ea633af61d8f7169ffa6e72c5b53d24efcc" dependencies = [ "autocfg", "bytes", @@ -2997,7 +2978,7 @@ checksum = "54473be61f4ebe4efd09cec9bd5d16fa51d70ea0192213d754d2d500457db110" dependencies = [ "proc-macro2 1.0.29", "quote 1.0.9", - "syn 1.0.76", + "syn 1.0.77", ] [[package]] @@ -3053,9 +3034,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tracing" -version = "0.1.26" +version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d" +checksum = "84f96e095c0c82419687c20ddf5cb3eadb61f4e1405923c9dc8e53a1adacbda8" dependencies = [ "cfg-if 1.0.0", "pin-project-lite", @@ -3121,9 +3102,9 @@ checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b" [[package]] name = "unicode-width" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3" +checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973" [[package]] name = "unicode-xid" @@ -3240,9 +3221,9 @@ checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" [[package]] name = "wasm-bindgen" -version = "0.2.77" +version = "0.2.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e68338db6becec24d3c7977b5bf8a48be992c934b5d07177e3931f5dc9b076c" +checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce" dependencies = [ "cfg-if 1.0.0", "serde", @@ -3252,24 +3233,24 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.77" +version = "0.2.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f34c405b4f0658583dba0c1c7c9b694f3cac32655db463b56c254a1c75269523" +checksum = "a317bf8f9fba2476b4b2c85ef4c4af8ff39c3c7f0cdfeed4f82c34a880aa837b" dependencies = [ "bumpalo", "lazy_static", "log", "proc-macro2 1.0.29", "quote 1.0.9", - "syn 1.0.76", + "syn 1.0.77", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-futures" -version = "0.4.27" +version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a87d738d4abc4cf22f6eb142f5b9a81301331ee3c767f2fef2fda4e325492060" +checksum = "8e8d7523cb1f2a4c96c1317ca690031b714a51cc14e05f712446691f413f5d39" dependencies = [ "cfg-if 1.0.0", "js-sys", @@ -3279,9 +3260,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.77" +version = "0.2.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9d5a6580be83b19dc570a8f9c324251687ab2184e57086f71625feb57ec77c8" +checksum = "d56146e7c495528bf6587663bea13a8eb588d39b36b679d83972e1a2dbbdacf9" dependencies = [ "quote 1.0.9", "wasm-bindgen-macro-support", @@ -3289,28 +3270,28 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.77" +version = "0.2.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3775a030dc6f5a0afd8a84981a21cc92a781eb429acef9ecce476d0c9113e92" +checksum = "7803e0eea25835f8abdc585cd3021b3deb11543c6fe226dcd30b228857c5c5ab" dependencies = [ "proc-macro2 1.0.29", "quote 1.0.9", - "syn 1.0.76", + "syn 1.0.77", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.77" +version = "0.2.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c279e376c7a8e8752a8f1eaa35b7b0bee6bb9fb0cdacfa97cc3f1f289c87e2b4" +checksum = "0237232789cf037d5480773fe568aac745bfe2afbc11a863e97901780a6b47cc" [[package]] name = "web-sys" -version = "0.3.54" +version = "0.3.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a84d70d1ec7d2da2d26a5bd78f4bca1b8c3254805363ce743b7a05bc30d195a" +checksum = "38eb105f1c59d9eaa6b5cdc92b859d85b926e82cb2e0945cd0c9259faa6fe9fb" dependencies = [ "js-sys", "wasm-bindgen", @@ -3346,9 +3327,9 @@ dependencies = [ [[package]] name = "whoami" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7741161a40200a867c96dfa5574544efa4178cf4c8f770b62dd1cc0362d7ae1" +checksum = "cabfe22aa4936611957e0b5ad9ed0472ac52b2bfb9aedac4a3f3a91a03bd1ff0" dependencies = [ "wasm-bindgen", "web-sys", @@ -3420,7 +3401,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d498dbd1fd7beb83c86709ae1c33ca50942889473473d287d56ce4770a18edfb" dependencies = [ "proc-macro2 1.0.29", - "syn 1.0.76", + "syn 1.0.77", "synstructure", ] diff --git a/meilisearch-lib/src/document_formats.rs b/meilisearch-lib/src/document_formats.rs index 297c89831..f06a509c2 100644 --- a/meilisearch-lib/src/document_formats.rs +++ b/meilisearch-lib/src/document_formats.rs @@ -1,8 +1,7 @@ -use std::{ - fmt, - io::{Read, Seek, Write}, -}; +use std::io::{self, Read, Result as IoResult, Seek, Write}; +use std::fmt; +use csv::{Reader as CsvReader, StringRecordsIntoIter}; use milli::documents::DocumentBatchBuilder; use serde_json::{Deserializer, Map, Value}; @@ -12,6 +11,7 @@ type Result = std::result::Result; pub enum PayloadType { Jsonl, Json, + Csv, } impl fmt::Display for PayloadType { @@ -19,6 +19,7 @@ impl fmt::Display for PayloadType { match self { PayloadType::Jsonl => write!(f, "ndjson"), PayloadType::Json => write!(f, "json"), + PayloadType::Csv => write!(f, "csv"), } } } @@ -34,7 +35,7 @@ pub enum DocumentFormatError { ), } -internal_error!(DocumentFormatError: milli::documents::Error); +internal_error!(DocumentFormatError: milli::documents::Error, io::Error); macro_rules! malformed { ($type:path, $e:expr) => { @@ -42,6 +43,20 @@ macro_rules! malformed { }; } +pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result<()> { + let mut builder = DocumentBatchBuilder::new(writer).unwrap(); + + let iter = CsvDocumentIter::from_reader(input)?; + for doc in iter { + let doc = doc?; + builder.add_documents(doc).unwrap(); + } + builder.finish().unwrap(); + + Ok(()) +} + + /// read jsonl from input and write an obkv batch to writer. pub fn read_jsonl(input: impl Read, writer: impl Write + Seek) -> Result<()> { let mut builder = DocumentBatchBuilder::new(writer)?; @@ -68,3 +83,281 @@ pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<()> { Ok(()) } + + +enum AllowedType { + String, + Number, +} + +fn parse_csv_header(header: &str) -> (String, AllowedType) { + // if there are several separators we only split on the last one. + match header.rsplit_once(':') { + Some((field_name, field_type)) => match field_type { + "string" => (field_name.to_string(), AllowedType::String), + "number" => (field_name.to_string(), AllowedType::Number), + // if the pattern isn't reconized, we keep the whole field. + _otherwise => (header.to_string(), AllowedType::String), + }, + None => (header.to_string(), AllowedType::String), + } +} + +pub struct CsvDocumentIter +where + R: Read, +{ + documents: StringRecordsIntoIter, + headers: Vec<(String, AllowedType)>, +} + +impl CsvDocumentIter { + pub fn from_reader(reader: R) -> IoResult { + let mut records = CsvReader::from_reader(reader); + + let headers = records + .headers()? + .into_iter() + .map(parse_csv_header) + .collect(); + + Ok(Self { + documents: records.into_records(), + headers, + }) + } +} + +impl Iterator for CsvDocumentIter { + type Item = Result>; + + fn next(&mut self) -> Option { + let csv_document = self.documents.next()?; + + match csv_document { + Ok(csv_document) => { + let mut document = Map::new(); + + for ((field_name, field_type), value) in + self.headers.iter().zip(csv_document.into_iter()) + { + let parsed_value = (|| match field_type { + AllowedType::Number => malformed!(PayloadType::Csv, value + .parse::() + .map(Value::from)), + AllowedType::String => Ok(Value::String(value.to_string())), + })(); + + match parsed_value { + Ok(value) => drop(document.insert(field_name.to_string(), value)), + Err(e) => return Some(Err(e)), + } + } + + Some(Ok(document)) + } + Err(e) => Some(Err(DocumentFormatError::MalformedPayload(Box::new(e), PayloadType::Csv))), + } + } +} + +#[cfg(test)] +mod test { + use serde_json::json; + + use super::*; + + #[test] + fn simple_csv_document() { + let documents = r#"city,country,pop +"Boston","United States","4628910""#; + + let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); + + assert_eq!( + Value::Object(csv_iter.next().unwrap().unwrap()), + json!({ + "city": "Boston", + "country": "United States", + "pop": "4628910", + }) + ); + } + + #[test] + fn coma_in_field() { + let documents = r#"city,country,pop +"Boston","United, States","4628910""#; + + let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); + + assert_eq!( + Value::Object(csv_iter.next().unwrap().unwrap()), + json!({ + "city": "Boston", + "country": "United, States", + "pop": "4628910", + }) + ); + } + + #[test] + fn quote_in_field() { + let documents = r#"city,country,pop +"Boston","United"" States","4628910""#; + + let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); + + assert_eq!( + Value::Object(csv_iter.next().unwrap().unwrap()), + json!({ + "city": "Boston", + "country": "United\" States", + "pop": "4628910", + }) + ); + } + + #[test] + fn integer_in_field() { + let documents = r#"city,country,pop:number +"Boston","United States","4628910""#; + + let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); + + assert_eq!( + Value::Object(csv_iter.next().unwrap().unwrap()), + json!({ + "city": "Boston", + "country": "United States", + "pop": 4628910.0, + }) + ); + } + + #[test] + fn float_in_field() { + let documents = r#"city,country,pop:number +"Boston","United States","4628910.01""#; + + let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); + + assert_eq!( + Value::Object(csv_iter.next().unwrap().unwrap()), + json!({ + "city": "Boston", + "country": "United States", + "pop": 4628910.01, + }) + ); + } + + #[test] + fn several_colon_in_header() { + let documents = r#"city:love:string,country:state,pop +"Boston","United States","4628910""#; + + let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); + + assert_eq!( + Value::Object(csv_iter.next().unwrap().unwrap()), + json!({ + "city:love": "Boston", + "country:state": "United States", + "pop": "4628910", + }) + ); + } + + #[test] + fn ending_by_colon_in_header() { + let documents = r#"city:,country,pop +"Boston","United States","4628910""#; + + let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); + + assert_eq!( + Value::Object(csv_iter.next().unwrap().unwrap()), + json!({ + "city:": "Boston", + "country": "United States", + "pop": "4628910", + }) + ); + } + + #[test] + fn starting_by_colon_in_header() { + let documents = r#":city,country,pop +"Boston","United States","4628910""#; + + let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); + + assert_eq!( + Value::Object(csv_iter.next().unwrap().unwrap()), + json!({ + ":city": "Boston", + "country": "United States", + "pop": "4628910", + }) + ); + } + + #[ignore] + #[test] + fn starting_by_colon_in_header2() { + let documents = r#":string,country,pop +"Boston","United States","4628910""#; + + let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); + + assert!(dbg!(csv_iter.next().unwrap()).is_err()); + } + + #[test] + fn double_colon_in_header() { + let documents = r#"city::string,country,pop +"Boston","United States","4628910""#; + + let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); + + assert_eq!( + Value::Object(csv_iter.next().unwrap().unwrap()), + json!({ + "city:": "Boston", + "country": "United States", + "pop": "4628910", + }) + ); + } + + #[test] + fn bad_type_in_header() { + let documents = r#"city,country:number,pop +"Boston","United States","4628910""#; + + let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); + + assert!(csv_iter.next().unwrap().is_err()); + } + + #[test] + fn bad_column_count1() { + let documents = r#"city,country,pop +"Boston","United States","4628910", "too much""#; + + let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); + + assert!(csv_iter.next().unwrap().is_err()); + } + + #[test] + fn bad_column_count2() { + let documents = r#"city,country,pop +"Boston","United States""#; + + let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); + + assert!(csv_iter.next().unwrap().is_err()); + } +} diff --git a/meilisearch-lib/src/index_controller/updates/csv_documents_iter.rs b/meilisearch-lib/src/index_controller/updates/csv_documents_iter.rs deleted file mode 100644 index 837240ceb..000000000 --- a/meilisearch-lib/src/index_controller/updates/csv_documents_iter.rs +++ /dev/null @@ -1,282 +0,0 @@ -use super::error::{Result, UpdateLoopError}; -use std::io::{Read, Result as IoResult}; - -use csv::{Reader as CsvReader, StringRecordsIntoIter}; -use serde_json::{Map, Value}; - -enum AllowedType { - String, - Number, -} - -fn parse_csv_header(header: &str) -> (String, AllowedType) { - // if there are several separators we only split on the last one. - match header.rsplit_once(':') { - Some((field_name, field_type)) => match field_type { - "string" => (field_name.to_string(), AllowedType::String), - "number" => (field_name.to_string(), AllowedType::Number), - // if the pattern isn't reconized, we keep the whole field. - _otherwise => (header.to_string(), AllowedType::String), - }, - None => (header.to_string(), AllowedType::String), - } -} - -pub struct CsvDocumentIter -where - R: Read, -{ - documents: StringRecordsIntoIter, - headers: Vec<(String, AllowedType)>, -} - -impl CsvDocumentIter { - pub fn from_reader(reader: R) -> IoResult { - let mut records = CsvReader::from_reader(reader); - - let headers = records - .headers()? - .into_iter() - .map(parse_csv_header) - .collect(); - - Ok(Self { - documents: records.into_records(), - headers, - }) - } -} - -impl Iterator for CsvDocumentIter { - type Item = Result>; - - fn next(&mut self) -> Option { - let csv_document = self.documents.next()?; - - match csv_document { - Ok(csv_document) => { - let mut document = Map::new(); - - for ((field_name, field_type), value) in - self.headers.iter().zip(csv_document.into_iter()) - { - let parsed_value = (|| match field_type { - AllowedType::Number => value - .parse::() - .map(Value::from) - .map_err(|e| UpdateLoopError::MalformedPayload(Box::new(e))), - AllowedType::String => Ok(Value::String(value.to_string())), - })(); - - match parsed_value { - Ok(value) => drop(document.insert(field_name.to_string(), value)), - Err(e) => return Some(Err(e)), - } - } - - Some(Ok(document)) - } - Err(e) => Some(Err(UpdateLoopError::MalformedPayload(Box::new(e)))), - } - } -} - -#[cfg(test)] -mod test { - use serde_json::json; - - use super::*; - - #[test] - fn simple_csv_document() { - let documents = r#"city,country,pop -"Boston","United States","4628910""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city": "Boston", - "country": "United States", - "pop": "4628910", - }) - ); - } - - #[test] - fn coma_in_field() { - let documents = r#"city,country,pop -"Boston","United, States","4628910""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city": "Boston", - "country": "United, States", - "pop": "4628910", - }) - ); - } - - #[test] - fn quote_in_field() { - let documents = r#"city,country,pop -"Boston","United"" States","4628910""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city": "Boston", - "country": "United\" States", - "pop": "4628910", - }) - ); - } - - #[test] - fn integer_in_field() { - let documents = r#"city,country,pop:number -"Boston","United States","4628910""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city": "Boston", - "country": "United States", - "pop": 4628910.0, - }) - ); - } - - #[test] - fn float_in_field() { - let documents = r#"city,country,pop:number -"Boston","United States","4628910.01""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city": "Boston", - "country": "United States", - "pop": 4628910.01, - }) - ); - } - - #[test] - fn several_double_dot_in_header() { - let documents = r#"city:love:string,country:state,pop -"Boston","United States","4628910""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city:love": "Boston", - "country:state": "United States", - "pop": "4628910", - }) - ); - } - - #[test] - fn ending_by_double_dot_in_header() { - let documents = r#"city:,country,pop -"Boston","United States","4628910""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city:": "Boston", - "country": "United States", - "pop": "4628910", - }) - ); - } - - #[test] - fn starting_by_double_dot_in_header() { - let documents = r#":city,country,pop -"Boston","United States","4628910""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - ":city": "Boston", - "country": "United States", - "pop": "4628910", - }) - ); - } - - #[test] - fn starting_by_double_dot_in_header2() { - let documents = r#":string,country,pop -"Boston","United States","4628910""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert!(csv_iter.next().unwrap().is_err()); - } - - #[test] - fn double_double_dot_in_header() { - let documents = r#"city::string,country,pop -"Boston","United States","4628910""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city:": "Boston", - "country": "United States", - "pop": "4628910", - }) - ); - } - - #[test] - fn bad_type_in_header() { - let documents = r#"city,country:number,pop -"Boston","United States","4628910""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert!(csv_iter.next().unwrap().is_err()); - } - - #[test] - fn bad_column_count1() { - let documents = r#"city,country,pop -"Boston","United States","4628910", "too much""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert!(csv_iter.next().unwrap().is_err()); - } - - #[test] - fn bad_column_count2() { - let documents = r#"city,country,pop -"Boston","United States""#; - - let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - - assert!(csv_iter.next().unwrap().is_err()); - } -} diff --git a/meilisearch-lib/src/index_controller/updates/mod.rs b/meilisearch-lib/src/index_controller/updates/mod.rs index 14f0a7c69..56ea779de 100644 --- a/meilisearch-lib/src/index_controller/updates/mod.rs +++ b/meilisearch-lib/src/index_controller/updates/mod.rs @@ -1,10 +1,8 @@ -mod csv_documents_iter; pub mod error; mod message; pub mod status; pub mod store; -use crate::index_controller::updates::csv_documents_iter::CsvDocumentIter; use std::io; use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicBool; @@ -15,7 +13,6 @@ use async_stream::stream; use bytes::Bytes; use futures::{Stream, StreamExt}; use log::trace; -use milli::documents::DocumentBatchBuilder; use milli::update::IndexDocumentsMethod; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; @@ -24,13 +21,13 @@ use uuid::Uuid; use self::error::{Result, UpdateLoopError}; pub use self::message::UpdateMsg; use self::store::{UpdateStore, UpdateStoreInfo}; -use crate::document_formats::read_json; +use crate::document_formats::{read_csv, read_json}; use crate::index::{Index, Settings, Unchecked}; use crate::index_controller::update_file_store::UpdateFileStore; use status::UpdateStatus; use super::index_resolver::HardStateIndexResolver; -use super::{DocumentAdditionFormat, Payload, Update}; +use super::{DocumentAdditionFormat, Update}; pub type UpdateSender = mpsc::Sender; @@ -198,6 +195,7 @@ impl UpdateLoop { tokio::task::spawn_blocking(move || -> Result<_> { match format { DocumentAdditionFormat::Json => read_json(reader, &mut *update_file)?, + DocumentAdditionFormat::Csv => read_csv(reader, &mut *update_file)?, } update_file.persist()?; @@ -225,26 +223,6 @@ impl UpdateLoop { Ok(status.into()) } - async fn documents_from_csv(&self, payload: Payload) -> Result { - let file_store = self.update_file_store.clone(); - tokio::task::spawn_blocking(move || { - let (uuid, mut file) = file_store.new_update().unwrap(); - let mut builder = DocumentBatchBuilder::new(&mut *file).unwrap(); - - let iter = CsvDocumentIter::from_reader(StreamReader::new(payload))?; - for doc in iter { - let doc = doc?; - builder.add_documents(doc).unwrap(); - } - builder.finish().unwrap(); - - file.persist(); - - Ok(uuid) - }) - .await? - } - async fn handle_list_updates(&self, uuid: Uuid) -> Result> { let update_store = self.store.clone(); tokio::task::spawn_blocking(move || {