diff --git a/Cargo.lock b/Cargo.lock index 6a86c9fda2..973aefd65d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2713,7 +2713,6 @@ dependencies = [ "serde", "snafu", "typetag", - "xxhash-rust", ] [[package]] @@ -2846,6 +2845,7 @@ dependencies = [ "mur3", "serde", "sha1 0.11.0-rc.0", + "xxhash-rust", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 3568c27c1f..fd712d7aa2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -343,7 +343,7 @@ typed-builder = "0.20.0" typetag = "0.2.18" url = "2.4.0" uuid = {version = "1.18.1", features = ["v4"]} -xxhash-rust = "0.8.12" +xxhash-rust = {version = "0.8.15", features = ["const_xxh3", "const_xxh64", "const_xxh32", "xxh64", "xxh3", "xxh32"]} [workspace.dependencies.arrow2] features = ["serde_types"] diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index 92b0631d80..28eb73637a 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -1458,7 +1458,7 @@ class PySeries: num_hashes: int, ngram_size: int, seed: int = 1, - hash_function: Literal["murmurhash3", "xxhash", "sha1"] = "murmurhash3", + hash_function: Literal["murmurhash3", "xxhash", "xxhash3_64", "xxhash64", "xxhash32", "sha1"] = "murmurhash3", ) -> PySeries: ... def __invert__(self) -> PySeries: ... def count(self, mode: CountMode) -> PySeries: ... diff --git a/daft/expressions/expressions.py b/daft/expressions/expressions.py index 16241be133..7c508a76fc 100644 --- a/daft/expressions/expressions.py +++ b/daft/expressions/expressions.py @@ -1300,7 +1300,9 @@ def between(self, lower: int | builtins.float, upper: int | builtins.float) -> E return between(self, lower, upper) def hash( - self, seed: Any | None = None, hash_function: Literal["xxhash", "murmurhash3", "sha1"] | None = "xxhash" + self, + seed: Any | None = None, + hash_function: Literal["xxhash", "xxhash32", "xxhash64", "xxhash3_64", "murmurhash3", "sha1"] | None = "xxhash", ) -> Expression: """Hashes the values in the Expression. @@ -1317,7 +1319,7 @@ def minhash( num_hashes: int, ngram_size: int, seed: int = 1, - hash_function: Literal["murmurhash3", "xxhash", "sha1"] = "murmurhash3", + hash_function: Literal["murmurhash3", "xxhash", "xxhash32", "xxhash64", "xxhash3_64", "sha1"] = "murmurhash3", ) -> Expression: """Runs the MinHash algorithm on the series. diff --git a/daft/functions/misc.py b/daft/functions/misc.py index 00c1200ee8..d7a6794dff 100644 --- a/daft/functions/misc.py +++ b/daft/functions/misc.py @@ -269,7 +269,7 @@ def is_in(expr: Expression, other: Any) -> Expression: def hash( expr: Expression, seed: Any | None = None, - hash_function: Literal["xxhash", "murmurhash3", "sha1"] | None = "xxhash", + hash_function: Literal["xxhash", "xxhash32", "xxhash64", "xxhash3_64", "murmurhash3", "sha1"] | None = "xxhash", ) -> Expression: """Hashes the values in the Expression. @@ -278,7 +278,7 @@ def hash( Args: expr: The expression to hash. seed (optional): Seed used for generating the hash. Defaults to 0. - hash_function (optional): Hash function to use. One of "xxhash", "murmurhash3", or "sha1". Defaults to "xxhash". + hash_function (optional): Hash function to use. One of "xxhash" (alias for "xxhash3_64"), "xxhash32", "xxhash64", "xxhash3_64", "murmurhash3", or "sha1". Defaults to "xxhash" (alias for "xxhash3_64"). Returns: Expression (UInt64 Expression): The hashed expression. @@ -302,7 +302,7 @@ def minhash( num_hashes: int, ngram_size: int, seed: int = 1, - hash_function: Literal["murmurhash3", "xxhash", "sha1"] = "murmurhash3", + hash_function: Literal["murmurhash3", "xxhash", "xxhash32", "xxhash64", "xxhash3_64", "sha1"] = "murmurhash3", ) -> Expression: """Runs the MinHash algorithm on the series. @@ -314,11 +314,11 @@ def minhash( to normalize the strings yourself. Args: - text (String Expression): expression to hash. + text (String Expression): The expression to hash. num_hashes (int): The number of hash permutations to compute. ngram_size (int): The number of tokens in each shingle/ngram. seed (int, default=1): Seed used for generating permutations and the initial string hashes. Defaults to 1. - hash_function (str, default="murmurhash3"): Hash function to use for initial string hashing. One of "murmurhash3", "xxhash", or "sha1". Defaults to "murmurhash3". + hash_function (str, default="murmurhash3"): Hash function to use for initial string hashing. One of "murmurhash3", "xxhash" (alias for "xxhash3_64"), "xxhash32", "xxhash64", "xxhash3_64", or "sha1". Defaults to "murmurhash3". Returns: Expression (FixedSizedList[UInt32, num_hashes] Expression): diff --git a/daft/series.py b/daft/series.py index d7902983bf..29267c8094 100644 --- a/daft/series.py +++ b/daft/series.py @@ -620,7 +620,7 @@ def minhash( num_hashes: int, ngram_size: int, seed: int = 1, - hash_function: Literal["murmurhash3", "xxhash", "sha1"] = "murmurhash3", + hash_function: Literal["murmurhash3", "xxhash", "xxhash3_64", "xxhash64", "xxhash32", "sha1"] = "murmurhash3", ) -> Series: """Runs the MinHash algorithm on the series. @@ -634,7 +634,7 @@ def minhash( num_hashes: The number of hash permutations to compute. ngram_size: The number of tokens in each shingle/ngram. seed (optional): Seed used for generating permutations and the initial string hashes. Defaults to 1. - hash_function (optional): Hash function to use for initial string hashing. One of "murmur3", "xxhash", or "sha1". Defaults to "murmur3". + hash_function (optional): Hash function to use for initial string hashing. One of "murmur3", "xxhash3_64" (or alias "xxhash"), "xxhash64", "xxhash32", or "sha1". Defaults to "murmur3". """ if not isinstance(num_hashes, int): raise ValueError(f"expected an integer for num_hashes but got {type(num_hashes)}") @@ -644,11 +644,17 @@ def minhash( raise ValueError(f"expected an integer or None for seed but got {type(seed)}") if not isinstance(hash_function, str): raise ValueError(f"expected str for hash_function but got {type(hash_function)}") - assert hash_function in [ - "murmurhash3", - "xxhash", - "sha1", - ], f"hash_function must be one of 'murmurhash3', 'xxhash', 'sha1', got {hash_function}" + assert ( + hash_function + in [ + "murmurhash3", + "xxhash", + "xxhash3_64", + "xxhash64", + "xxhash32", + "sha1", + ] + ), f"hash_function must be one of 'murmurhash3', 'xxhash', 'xxhash3_64', 'xxhash64', 'xxhash32', 'sha1', got {hash_function}" return Series._from_pyseries(self._series.minhash(num_hashes, ngram_size, seed, hash_function)) diff --git a/src/daft-core/Cargo.toml b/src/daft-core/Cargo.toml index 810581f017..c1dbbba84d 100644 --- a/src/daft-core/Cargo.toml +++ b/src/daft-core/Cargo.toml @@ -54,10 +54,7 @@ pyo3 = {workspace = true, optional = true} rand = "0.9.1" serde = {workspace = true} sketches-ddsketch = {workspace = true} - -[dependencies.xxhash-rust] -features = ["xxh3", "const_xxh3", "xxh64"] -version = "0.8.5" +xxhash-rust = {workspace = true} [dev-dependencies] rstest = {workspace = true} diff --git a/src/daft-core/src/array/ops/hash.rs b/src/daft-core/src/array/ops/hash.rs index f98dd64311..8f5f6e1584 100644 --- a/src/daft-core/src/array/ops/hash.rs +++ b/src/daft-core/src/array/ops/hash.rs @@ -7,7 +7,11 @@ use arrow2::types::Index; use common_error::{DaftError, DaftResult}; use daft_hash::{HashFunctionKind, MurBuildHasher, Sha1Hasher}; use daft_schema::{dtype::DataType, field::Field}; -use xxhash_rust::xxh3::{xxh3_64, xxh3_64_with_seed}; +use xxhash_rust::{ + xxh3::{xxh3_64, xxh3_64_with_seed}, + xxh32::xxh32, + xxh64::xxh64, +}; use super::as_arrow::AsArrow; use crate::{ @@ -28,7 +32,7 @@ where T: DaftPrimitiveType, { pub fn hash(&self, seed: Option<&UInt64Array>) -> DaftResult { - self.hash_with(seed, HashFunctionKind::XxHash) + self.hash_with(seed, HashFunctionKind::XxHash3_64) } pub fn hash_with( &self, @@ -44,7 +48,7 @@ where impl Utf8Array { pub fn hash(&self, seed: Option<&UInt64Array>) -> DaftResult { - self.hash_with(seed, HashFunctionKind::XxHash) + self.hash_with(seed, HashFunctionKind::XxHash3_64) } pub fn hash_with( &self, @@ -60,7 +64,7 @@ impl Utf8Array { impl BinaryArray { pub fn hash(&self, seed: Option<&UInt64Array>) -> DaftResult { - self.hash_with(seed, HashFunctionKind::XxHash) + self.hash_with(seed, HashFunctionKind::XxHash3_64) } pub fn hash_with( &self, @@ -76,7 +80,7 @@ impl BinaryArray { impl FixedSizeBinaryArray { pub fn hash(&self, seed: Option<&UInt64Array>) -> DaftResult { - self.hash_with(seed, HashFunctionKind::XxHash) + self.hash_with(seed, HashFunctionKind::XxHash3_64) } pub fn hash_with( &self, @@ -92,7 +96,7 @@ impl FixedSizeBinaryArray { impl BooleanArray { pub fn hash(&self, seed: Option<&UInt64Array>) -> DaftResult { - self.hash_with(seed, HashFunctionKind::XxHash) + self.hash_with(seed, HashFunctionKind::XxHash3_64) } pub fn hash_with( &self, @@ -108,7 +112,7 @@ impl BooleanArray { impl NullArray { pub fn hash(&self, seed: Option<&UInt64Array>) -> DaftResult { - self.hash_with(seed, HashFunctionKind::XxHash) + self.hash_with(seed, HashFunctionKind::XxHash3_64) } pub fn hash_with( &self, @@ -159,7 +163,15 @@ fn hash_list( .collect(); match hash_function { - HashFunctionKind::XxHash => { + HashFunctionKind::XxHash32 => { + let seed = cur_seed_opt.unwrap_or(0) as u32; + Some(xxh32(&child_bytes, seed) as u64) + } + HashFunctionKind::XxHash64 => { + let seed = cur_seed_opt.unwrap_or(0); + Some(xxh64(&child_bytes, seed)) + } + HashFunctionKind::XxHash3_64 => { if let Some(cur_seed) = cur_seed_opt { Some(xxh3_64_with_seed(&child_bytes, cur_seed)) } else { @@ -199,7 +211,9 @@ fn hash_list( let end = (offsets[i as usize + 1] as usize) * OFFSET; match hash_function { - HashFunctionKind::XxHash => Some(xxh3_64(&child_bytes[start..end])), + HashFunctionKind::XxHash32 => Some(xxh32(&child_bytes[start..end], 0) as u64), + HashFunctionKind::XxHash64 => Some(xxh64(&child_bytes[start..end], 0)), + HashFunctionKind::XxHash3_64 => Some(xxh3_64(&child_bytes[start..end])), HashFunctionKind::MurmurHash3 => { // Use 42 as default seed, // refer to: https://github.com/Eventual-Inc/Daft/blob/7be4b1ff9ed3fdc3a45947beefab7e7291cd3be7/src/daft-hash/src/lib.rs#L18 @@ -222,7 +236,7 @@ fn hash_list( impl ListArray { pub fn hash(&self, seed: Option<&UInt64Array>) -> DaftResult { - self.hash_with(seed, HashFunctionKind::XxHash) + self.hash_with(seed, HashFunctionKind::XxHash3_64) } pub fn hash_with( &self, @@ -242,7 +256,7 @@ impl ListArray { impl FixedSizeListArray { pub fn hash(&self, seed: Option<&UInt64Array>) -> DaftResult { - self.hash_with(seed, HashFunctionKind::XxHash) + self.hash_with(seed, HashFunctionKind::XxHash3_64) } pub fn hash_with( &self, @@ -265,7 +279,7 @@ impl FixedSizeListArray { impl StructArray { pub fn hash(&self, seed: Option<&UInt64Array>) -> DaftResult { - self.hash_with(seed, HashFunctionKind::XxHash) + self.hash_with(seed, HashFunctionKind::XxHash3_64) } pub fn hash_with( diff --git a/src/daft-core/src/kernels/hashing.rs b/src/daft-core/src/kernels/hashing.rs index 2914a59f8a..e0a0a78a87 100644 --- a/src/daft-core/src/kernels/hashing.rs +++ b/src/daft-core/src/kernels/hashing.rs @@ -11,8 +11,7 @@ use arrow2::{ }; use daft_hash::{HashFunctionKind, MurBuildHasher, Sha1Hasher}; use xxhash_rust::{ - const_xxh3, - xxh3::{xxh3_64, xxh3_64_with_seed}, + const_xxh3, const_xxh32, const_xxh64, xxh3::xxh3_64_with_seed, xxh32::xxh32, xxh64::xxh64, }; fn hash_primitive( @@ -20,29 +19,33 @@ fn hash_primitive( seed: Option<&PrimitiveArray>, hash_function: HashFunctionKind, ) -> PrimitiveArray { + fn xxhash u64>( + array: &PrimitiveArray, + seed: Option<&PrimitiveArray>, + f: F, + ) -> PrimitiveArray { + let hashes = if let Some(seed) = seed { + array + .iter() + .zip(seed.values_iter()) + .map(|(v, s)| match v { + Some(v) => f(v.to_le_bytes().as_ref(), *s), + None => NULL_HASH, + }) + .collect::>() + } else { + array + .iter() + .map(|v| match v { + Some(v) => f(v.to_le_bytes().as_ref(), 0), + None => NULL_HASH, + }) + .collect::>() + }; + PrimitiveArray::::new(DataType::UInt64, hashes.into(), None) + } + match hash_function { - HashFunctionKind::XxHash => { - const NULL_HASH: u64 = const_xxh3::xxh3_64(b""); - let hashes = if let Some(seed) = seed { - array - .iter() - .zip(seed.values_iter()) - .map(|(v, s)| match v { - Some(v) => xxh3_64_with_seed(v.to_le_bytes().as_ref(), *s), - None => NULL_HASH, - }) - .collect::>() - } else { - array - .iter() - .map(|v| match v { - Some(v) => xxh3_64(v.to_le_bytes().as_ref()), - None => NULL_HASH, - }) - .collect::>() - }; - PrimitiveArray::::new(DataType::UInt64, hashes.into(), None) - } HashFunctionKind::MurmurHash3 => { let hasher = MurBuildHasher::new(seed.and_then(|s| s.get(0)).unwrap_or(42) as u32); let hashes = array @@ -82,6 +85,18 @@ fn hash_primitive( .collect::>(); PrimitiveArray::::new(DataType::UInt64, hashes.into(), None) } + HashFunctionKind::XxHash32 => { + const NULL_HASH: u64 = const_xxh32::xxh32(b"", 0) as u64; + xxhash::(array, seed, |v, s| xxh32(v, s as u32) as u64) + } + HashFunctionKind::XxHash64 => { + const NULL_HASH: u64 = const_xxh64::xxh64(b"", 0); + xxhash::(array, seed, xxh64) + } + HashFunctionKind::XxHash3_64 => { + const NULL_HASH: u64 = const_xxh3::xxh3_64(b""); + xxhash::(array, seed, xxh3_64_with_seed) + } } } @@ -90,34 +105,40 @@ fn hash_boolean( seed: Option<&PrimitiveArray>, hash_function: HashFunctionKind, ) -> PrimitiveArray { - match hash_function { - HashFunctionKind::XxHash => { - const NULL_HASH: u64 = const_xxh3::xxh3_64(b""); - const FALSE_HASH: u64 = const_xxh3::xxh3_64(b"0"); - const TRUE_HASH: u64 = const_xxh3::xxh3_64(b"1"); + fn xxhash< + const NULL_HASH: u64, + const TRUE_HASH: u64, + const FALSE_HASH: u64, + F: Fn(&[u8], u64) -> u64, + >( + array: &BooleanArray, + seed: Option<&PrimitiveArray>, + f: F, + ) -> PrimitiveArray { + let hashes = if let Some(seed) = seed { + array + .iter() + .zip(seed.values_iter()) + .map(|(v, s)| match v { + Some(true) => f(b"1", *s), + Some(false) => f(b"0", *s), + None => NULL_HASH, + }) + .collect::>() + } else { + array + .iter() + .map(|v| match v { + Some(true) => TRUE_HASH, + Some(false) => FALSE_HASH, + None => NULL_HASH, + }) + .collect::>() + }; + PrimitiveArray::::new(DataType::UInt64, hashes.into(), None) + } - let hashes = if let Some(seed) = seed { - array - .iter() - .zip(seed.values_iter()) - .map(|(v, s)| match v { - Some(true) => xxh3_64_with_seed(b"1", *s), - Some(false) => xxh3_64_with_seed(b"0", *s), - None => NULL_HASH, - }) - .collect::>() - } else { - array - .iter() - .map(|v| match v { - Some(true) => TRUE_HASH, - Some(false) => FALSE_HASH, - None => NULL_HASH, - }) - .collect::>() - }; - PrimitiveArray::::new(DataType::UInt64, hashes.into(), None) - } + match hash_function { HashFunctionKind::MurmurHash3 => { let hasher = MurBuildHasher::new(seed.and_then(|s| s.get(0)).unwrap_or(42) as u32); let hashes = array @@ -165,6 +186,26 @@ fn hash_boolean( .collect::>(); PrimitiveArray::::new(DataType::UInt64, hashes.into(), None) } + HashFunctionKind::XxHash32 => { + const NULL_HASH: u64 = const_xxh32::xxh32(b"", 0) as u64; + const FALSE_HASH: u64 = const_xxh32::xxh32(b"0", 0) as u64; + const TRUE_HASH: u64 = const_xxh32::xxh32(b"1", 0) as u64; + xxhash::(array, seed, |v, s| { + xxh32(v, s as u32) as u64 + }) + } + HashFunctionKind::XxHash64 => { + const NULL_HASH: u64 = const_xxh64::xxh64(b"", 0); + const FALSE_HASH: u64 = const_xxh64::xxh64(b"0", 0); + const TRUE_HASH: u64 = const_xxh64::xxh64(b"1", 0); + xxhash::(array, seed, xxh64) + } + HashFunctionKind::XxHash3_64 => { + const NULL_HASH: u64 = const_xxh3::xxh3_64(b""); + const FALSE_HASH: u64 = const_xxh3::xxh3_64(b"0"); + const TRUE_HASH: u64 = const_xxh3::xxh3_64(b"1"); + xxhash::(array, seed, xxh3_64_with_seed) + } } } @@ -173,18 +214,20 @@ fn hash_null( seed: Option<&PrimitiveArray>, hash_function: HashFunctionKind, ) -> PrimitiveArray { + fn xxhash u64>( + len: usize, + seed: Option<&PrimitiveArray>, + f: F, + ) -> PrimitiveArray { + let hashes = if let Some(seed) = seed { + seed.values_iter().map(|s| f(b"", *s)).collect::>() + } else { + (0..len).map(|_| NULL_HASH).collect::>() + }; + PrimitiveArray::::new(DataType::UInt64, hashes.into(), None) + } + match hash_function { - HashFunctionKind::XxHash => { - const NULL_HASH: u64 = const_xxh3::xxh3_64(b""); - let hashes = if let Some(seed) = seed { - seed.values_iter() - .map(|s| xxh3_64_with_seed(b"", *s)) - .collect::>() - } else { - (0..array.len()).map(|_| NULL_HASH).collect::>() - }; - PrimitiveArray::::new(DataType::UInt64, hashes.into(), None) - } HashFunctionKind::MurmurHash3 => { let hasher = MurBuildHasher::new(seed.and_then(|s| s.get(0)).unwrap_or(42) as u32); let hashes = (0..array.len()) @@ -206,6 +249,18 @@ fn hash_null( .collect::>(); PrimitiveArray::::new(DataType::UInt64, hashes.into(), None) } + HashFunctionKind::XxHash32 => { + const NULL_HASH: u64 = const_xxh32::xxh32(b"", 0) as u64; + xxhash::(array.len(), seed, |v, s| xxh32(v, s as u32) as u64) + } + HashFunctionKind::XxHash64 => { + const NULL_HASH: u64 = const_xxh64::xxh64(b"", 0); + xxhash::(array.len(), seed, xxh64) + } + HashFunctionKind::XxHash3_64 => { + const NULL_HASH: u64 = const_xxh3::xxh3_64(b""); + xxhash::(array.len(), seed, xxh3_64_with_seed) + } } } @@ -214,19 +269,27 @@ fn hash_binary( seed: Option<&PrimitiveArray>, hash_function: HashFunctionKind, ) -> PrimitiveArray { + fn xxhash u64>( + array: &BinaryArray, + seed: Option<&PrimitiveArray>, + f: F, + ) -> PrimitiveArray { + let hashes = if let Some(seed) = seed { + array + .values_iter() + .zip(seed.values_iter()) + .map(|(v, s)| f(v, *s)) + .collect::>() + } else { + array.values_iter().map(|v| f(v, 0)).collect::>() + }; + PrimitiveArray::::new(DataType::UInt64, hashes.into(), None) + } + match hash_function { - HashFunctionKind::XxHash => { - let hashes = if let Some(seed) = seed { - array - .values_iter() - .zip(seed.values_iter()) - .map(|(v, s)| xxh3_64_with_seed(v, *s)) - .collect::>() - } else { - array.values_iter().map(xxh3_64).collect::>() - }; - PrimitiveArray::::new(DataType::UInt64, hashes.into(), None) - } + HashFunctionKind::XxHash32 => xxhash(array, seed, |v, s| xxh32(v, s as u32) as u64), + HashFunctionKind::XxHash64 => xxhash(array, seed, xxh64), + HashFunctionKind::XxHash3_64 => xxhash(array, seed, xxh3_64_with_seed), HashFunctionKind::MurmurHash3 => { let hasher = MurBuildHasher::new(seed.and_then(|s| s.get(0)).unwrap_or(42) as u32); let hashes = array @@ -258,19 +321,27 @@ fn hash_fixed_size_binary( seed: Option<&PrimitiveArray>, hash_function: HashFunctionKind, ) -> PrimitiveArray { + fn xxhash u64>( + array: &FixedSizeBinaryArray, + seed: Option<&PrimitiveArray>, + f: F, + ) -> PrimitiveArray { + let hashes = if let Some(seed) = seed { + array + .values_iter() + .zip(seed.values_iter()) + .map(|(v, s)| f(v, *s)) + .collect::>() + } else { + array.values_iter().map(|v| f(v, 0)).collect::>() + }; + PrimitiveArray::::new(DataType::UInt64, hashes.into(), None) + } + match hash_function { - HashFunctionKind::XxHash => { - let hashes = if let Some(seed) = seed { - array - .values_iter() - .zip(seed.values_iter()) - .map(|(v, s)| xxh3_64_with_seed(v, *s)) - .collect::>() - } else { - array.values_iter().map(xxh3_64).collect::>() - }; - PrimitiveArray::::new(DataType::UInt64, hashes.into(), None) - } + HashFunctionKind::XxHash32 => xxhash(array, seed, |v, s| xxh32(v, s as u32) as u64), + HashFunctionKind::XxHash64 => xxhash(array, seed, xxh64), + HashFunctionKind::XxHash3_64 => xxhash(array, seed, xxh3_64_with_seed), HashFunctionKind::MurmurHash3 => { let hasher = MurBuildHasher::new(seed.and_then(|s| s.get(0)).unwrap_or(42) as u32); let hashes = array @@ -302,22 +373,30 @@ fn hash_utf8( seed: Option<&PrimitiveArray>, hash_function: HashFunctionKind, ) -> PrimitiveArray { + fn xxhash u64>( + array: &Utf8Array, + seed: Option<&PrimitiveArray>, + f: F, + ) -> PrimitiveArray { + let hashes = if let Some(seed) = seed { + array + .values_iter() + .zip(seed.values_iter()) + .map(|(v, s)| f(v.as_bytes(), *s)) + .collect::>() + } else { + array + .values_iter() + .map(|v| f(v.as_bytes(), 0)) + .collect::>() + }; + PrimitiveArray::::new(DataType::UInt64, hashes.into(), None) + } + match hash_function { - HashFunctionKind::XxHash => { - let hashes = if let Some(seed) = seed { - array - .values_iter() - .zip(seed.values_iter()) - .map(|(v, s)| xxh3_64_with_seed(v.as_bytes(), *s)) - .collect::>() - } else { - array - .values_iter() - .map(|v| xxh3_64(v.as_bytes())) - .collect::>() - }; - PrimitiveArray::::new(DataType::UInt64, hashes.into(), None) - } + HashFunctionKind::XxHash32 => xxhash(array, seed, |v, s| xxh32(v, s as u32) as u64), + HashFunctionKind::XxHash64 => xxhash(array, seed, xxh64), + HashFunctionKind::XxHash3_64 => xxhash(array, seed, xxh3_64_with_seed), HashFunctionKind::MurmurHash3 => { let hasher = MurBuildHasher::new(seed.and_then(|s| s.get(0)).unwrap_or(42) as u32); let hashes = array @@ -350,42 +429,59 @@ fn hash_timestamp_with_timezone( seed: Option<&PrimitiveArray>, hash_function: HashFunctionKind, ) -> PrimitiveArray { + fn xxhash u64>( + array: &PrimitiveArray, + timezone: &str, + seed: Option<&PrimitiveArray>, + f: F, + ) -> PrimitiveArray { + let hashes = if let Some(seed) = seed { + array + .iter() + .zip(seed.values_iter()) + .map(|(v, s)| match v { + Some(v) => { + // Combine timestamp and timezone for hashing + let mut combined = Vec::new(); + combined.extend_from_slice(&v.to_le_bytes()); + combined.extend_from_slice(timezone.as_bytes()); + f(&combined, *s) + } + None => NULL_HASH, + }) + .collect::>() + } else { + array + .iter() + .map(|v| match v { + Some(v) => { + // Combine timestamp and timezone for hashing + let mut combined = Vec::new(); + combined.extend_from_slice(&v.to_le_bytes()); + combined.extend_from_slice(timezone.as_bytes()); + f(&combined, 0) + } + None => NULL_HASH, + }) + .collect::>() + }; + PrimitiveArray::::new(DataType::UInt64, hashes.into(), None) + } + // For timestamps with timezone, we combine the timestamp value with the timezone string // to ensure that the same instant in different timezones produces different hashes match hash_function { - HashFunctionKind::XxHash => { + HashFunctionKind::XxHash32 => { + const NULL_HASH: u64 = const_xxh32::xxh32(b"", 0) as u64; + xxhash::(array, timezone, seed, |v, s| xxh32(v, s as u32) as u64) + } + HashFunctionKind::XxHash64 => { + const NULL_HASH: u64 = const_xxh64::xxh64(b"", 0); + xxhash::(array, timezone, seed, xxh64) + } + HashFunctionKind::XxHash3_64 => { const NULL_HASH: u64 = const_xxh3::xxh3_64(b""); - let hashes = if let Some(seed) = seed { - array - .iter() - .zip(seed.values_iter()) - .map(|(v, s)| match v { - Some(v) => { - // Combine timestamp and timezone for hashing - let mut combined = Vec::new(); - combined.extend_from_slice(&v.to_le_bytes()); - combined.extend_from_slice(timezone.as_bytes()); - xxh3_64_with_seed(&combined, *s) - } - None => NULL_HASH, - }) - .collect::>() - } else { - array - .iter() - .map(|v| match v { - Some(v) => { - // Combine timestamp and timezone for hashing - let mut combined = Vec::new(); - combined.extend_from_slice(&v.to_le_bytes()); - combined.extend_from_slice(timezone.as_bytes()); - xxh3_64(&combined) - } - None => NULL_HASH, - }) - .collect::>() - }; - PrimitiveArray::::new(DataType::UInt64, hashes.into(), None) + xxhash::(array, timezone, seed, xxh3_64_with_seed) } HashFunctionKind::MurmurHash3 => { let hasher = MurBuildHasher::new(seed.and_then(|s| s.get(0)).unwrap_or(42) as u32); @@ -442,8 +538,7 @@ fn hash_decimal( // For decimal hashing, we preserve the exact representation including scale // Different scales should produce different hashes (123, 123.0, 123.00 are different) // We convert to string representation that preserves the scale information - - let format_decimal = |value: i128, scale: usize| -> Vec { + fn format_decimal(value: i128, scale: usize) -> Vec { if value == 0 { // For zero, return "0.000..." with the appropriate number of decimal places let mut result = String::from("0"); @@ -489,36 +584,53 @@ fn hash_decimal( } result.into_bytes() - }; + } + + fn xxhash u64>( + array: &PrimitiveArray, + seed: Option<&PrimitiveArray>, + f: F, + scale: usize, + ) -> PrimitiveArray { + let hashes = if let Some(seed) = seed { + array + .iter() + .zip(seed.values_iter()) + .map(|(v, s)| match v { + Some(v) => { + let formatted = format_decimal(*v, scale); + f(&formatted, *s) + } + None => NULL_HASH, + }) + .collect::>() + } else { + array + .iter() + .map(|v| match v { + Some(v) => { + let formatted = format_decimal(*v, scale); + f(&formatted, 0) + } + None => NULL_HASH, + }) + .collect::>() + }; + PrimitiveArray::::new(DataType::UInt64, hashes.into(), None) + } match hash_function { - HashFunctionKind::XxHash => { + HashFunctionKind::XxHash32 => { + const NULL_HASH: u64 = const_xxh32::xxh32(b"", 0) as u64; + xxhash::(array, seed, |v, s| xxh32(v, s as u32) as u64, scale) + } + HashFunctionKind::XxHash64 => { + const NULL_HASH: u64 = const_xxh64::xxh64(b"", 0); + xxhash::(array, seed, xxh64, scale) + } + HashFunctionKind::XxHash3_64 => { const NULL_HASH: u64 = const_xxh3::xxh3_64(b""); - let hashes = if let Some(seed) = seed { - array - .iter() - .zip(seed.values_iter()) - .map(|(v, s)| match v { - Some(v) => { - let formatted = format_decimal(*v, scale); - xxh3_64_with_seed(&formatted, *s) - } - None => NULL_HASH, - }) - .collect::>() - } else { - array - .iter() - .map(|v| match v { - Some(v) => { - let formatted = format_decimal(*v, scale); - xxh3_64(&formatted) - } - None => NULL_HASH, - }) - .collect::>() - }; - PrimitiveArray::::new(DataType::UInt64, hashes.into(), None) + xxhash::(array, seed, xxh3_64_with_seed, scale) } HashFunctionKind::MurmurHash3 => { let hasher = MurBuildHasher::new(seed.and_then(|s| s.get(0)).unwrap_or(42) as u32); diff --git a/src/daft-core/src/python/series.rs b/src/daft-core/src/python/series.rs index 8318e44e5a..70a41773d2 100644 --- a/src/daft-core/src/python/series.rs +++ b/src/daft-core/src/python/series.rs @@ -1,11 +1,10 @@ use std::{ - hash::BuildHasherDefault, ops::{Add, Div, Mul, Rem, Sub}, sync::Arc, }; use common_arrow_ffi as ffi; -use daft_hash::{HashFunctionKind, MurBuildHasher, Sha1Hasher}; +use daft_hash::HashFunctionKind; use daft_schema::python::PyDataType; use pyo3::{ exceptions::{PyIndexError, PyStopIteration, PyValueError}, @@ -275,25 +274,12 @@ impl PySeries { ))); } let seed = seed as u32; - let num_hashes = num_hashes as usize; let ngram_size = ngram_size as usize; - let result = match hash_function { - HashFunctionKind::MurmurHash3 => { - let hasher = MurBuildHasher::new(seed); - self.series.minhash(num_hashes, ngram_size, seed, &hasher) - } - HashFunctionKind::XxHash => { - let hasher = xxhash_rust::xxh64::Xxh64Builder::new(seed as u64); - self.series.minhash(num_hashes, ngram_size, seed, &hasher) - } - HashFunctionKind::Sha1 => { - let hasher = BuildHasherDefault::::default(); - self.series.minhash(num_hashes, ngram_size, seed, &hasher) - } - }?; - + let result = self + .series + .minhash(num_hashes, ngram_size, seed, hash_function)?; Ok(result.into()) } diff --git a/src/daft-core/src/series/ops/minhash.rs b/src/daft-core/src/series/ops/minhash.rs index bbcff86313..e33347bfe4 100644 --- a/src/daft-core/src/series/ops/minhash.rs +++ b/src/daft-core/src/series/ops/minhash.rs @@ -1,4 +1,7 @@ +use std::hash::BuildHasherDefault; + use common_error::{DaftError, DaftResult}; +use daft_hash::{HashFunctionKind, MurBuildHasher, Sha1Hasher, XxHash32BuildHasher}; use crate::{ array::ops::DaftMinHash, @@ -12,17 +15,41 @@ impl Series { num_hashes: usize, ngram_size: usize, seed: u32, - hasher: &impl std::hash::BuildHasher, + hash_function: HashFunctionKind, ) -> DaftResult { - match self.data_type() { - DataType::Utf8 => Ok(self - .utf8()? - .minhash(num_hashes, ngram_size, seed, hasher)? - .into_series()), - dt => Err(DaftError::TypeError(format!( - "minhash not implemented for {}", - dt - ))), - } + let arr = match self.data_type() { + DataType::Utf8 => self.utf8()?, + dt => { + return Err(DaftError::TypeError(format!( + "minhash not implemented for {}", + dt + ))); + } + }; + + let output = match hash_function { + HashFunctionKind::MurmurHash3 => { + let hasher = MurBuildHasher::new(seed); + arr.minhash(num_hashes, ngram_size, seed, &hasher) + } + HashFunctionKind::XxHash64 => { + let hasher = xxhash_rust::xxh64::Xxh64Builder::new(seed as u64); + arr.minhash(num_hashes, ngram_size, seed, &hasher) + } + HashFunctionKind::XxHash32 => { + let hasher = XxHash32BuildHasher::new(seed); + arr.minhash(num_hashes, ngram_size, seed, &hasher) + } + HashFunctionKind::XxHash3_64 => { + let hasher = xxhash_rust::xxh3::Xxh3Builder::new().with_seed(seed as u64); + arr.minhash(num_hashes, ngram_size, seed, &hasher) + } + HashFunctionKind::Sha1 => { + let hasher = BuildHasherDefault::::default(); + arr.minhash(num_hashes, ngram_size, seed, &hasher) + } + }?; + + Ok(output.into_series()) } } diff --git a/src/daft-functions/Cargo.toml b/src/daft-functions/Cargo.toml index 837f52dc87..feba8228df 100644 --- a/src/daft-functions/Cargo.toml +++ b/src/daft-functions/Cargo.toml @@ -6,9 +6,8 @@ daft-hash = {workspace = true} num-traits = {workspace = true} pyo3 = {workspace = true, optional = true} typetag = {workspace = true} -xxhash-rust = {workspace = true, features = ["xxh64"]} -serde.workspace = true -snafu.workspace = true +serde = {workspace = true} +snafu = {workspace = true} [features] python = [ diff --git a/src/daft-functions/src/hash.rs b/src/daft-functions/src/hash.rs index 24818dfa08..cdfabd2bab 100644 --- a/src/daft-functions/src/hash.rs +++ b/src/daft-functions/src/hash.rs @@ -34,7 +34,7 @@ impl ScalarUDF for HashFunction { let hash_function = hash_function .map(|s| s.parse::()) .transpose()? - .unwrap_or(HashFunctionKind::XxHash); + .unwrap_or(HashFunctionKind::XxHash3_64); if let Some(seed) = seed { match seed.len() { diff --git a/src/daft-functions/src/minhash.rs b/src/daft-functions/src/minhash.rs index 5bfff0e6c6..67a77af042 100644 --- a/src/daft-functions/src/minhash.rs +++ b/src/daft-functions/src/minhash.rs @@ -1,9 +1,7 @@ -use std::hash::BuildHasherDefault; - use common_error::DaftResult; use daft_core::prelude::*; use daft_dsl::functions::prelude::*; -use daft_hash::{HashFunctionKind, MurBuildHasher, Sha1Hasher}; +use daft_hash::HashFunctionKind; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] @@ -41,20 +39,7 @@ impl ScalarUDF for MinHashFunction { .transpose()? .unwrap_or_default(); - match hash_function { - HashFunctionKind::MurmurHash3 => { - let hasher = MurBuildHasher::new(seed); - input.minhash(num_hashes, ngram_size, seed, &hasher) - } - HashFunctionKind::XxHash => { - let hasher = xxhash_rust::xxh64::Xxh64Builder::new(seed as u64); - input.minhash(num_hashes, ngram_size, seed, &hasher) - } - HashFunctionKind::Sha1 => { - let hasher = BuildHasherDefault::::default(); - input.minhash(num_hashes, ngram_size, seed, &hasher) - } - } + input.minhash(num_hashes, ngram_size, seed, hash_function) } fn get_return_field( &self, diff --git a/src/daft-hash/Cargo.toml b/src/daft-hash/Cargo.toml index b51a86d3ea..04ce990563 100644 --- a/src/daft-hash/Cargo.toml +++ b/src/daft-hash/Cargo.toml @@ -3,6 +3,7 @@ common-error = {workspace = true} mur3 = {workspace = true} serde = {workspace = true, features = ["derive"]} sha1 = {workspace = true} +xxhash-rust = {workspace = true} [lints] workspace = true diff --git a/src/daft-hash/src/lib.rs b/src/daft-hash/src/lib.rs index 42ad539aa9..5d6bd50924 100644 --- a/src/daft-hash/src/lib.rs +++ b/src/daft-hash/src/lib.rs @@ -50,11 +50,47 @@ impl Hasher for Sha1Hasher { } } +pub struct XxHash32Hasher { + inner: xxhash_rust::xxh32::Xxh32, +} + +impl Hasher for XxHash32Hasher { + fn write(&mut self, bytes: &[u8]) { + self.inner.update(bytes); + } + + fn finish(&self) -> u64 { + self.inner.digest() as u64 + } +} + +pub struct XxHash32BuildHasher { + seed: u32, +} + +impl XxHash32BuildHasher { + pub fn new(seed: u32) -> Self { + Self { seed } + } +} + +impl BuildHasher for XxHash32BuildHasher { + type Hasher = XxHash32Hasher; + + fn build_hasher(&self) -> Self::Hasher { + XxHash32Hasher { + inner: xxhash_rust::xxh32::Xxh32::new(self.seed), + } + } +} + #[derive(Default, Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum HashFunctionKind { #[default] MurmurHash3, - XxHash, + XxHash64, + XxHash32, + XxHash3_64, Sha1, } @@ -64,7 +100,9 @@ impl FromStr for HashFunctionKind { fn from_str(s: &str) -> Result { match s.to_lowercase().as_str() { "murmurhash3" => Ok(Self::MurmurHash3), - "xxhash" => Ok(Self::XxHash), + "xxhash64" => Ok(Self::XxHash64), + "xxhash32" => Ok(Self::XxHash32), + "xxhash3_64" | "xxhash" => Ok(Self::XxHash3_64), "sha1" => Ok(Self::Sha1), _ => Err(DaftError::ValueError(format!( "Invalid hash function: {}", diff --git a/tests/recordbatch/test_minhash.py b/tests/recordbatch/test_minhash.py index e94b7e76e9..71ee68f1ff 100644 --- a/tests/recordbatch/test_minhash.py +++ b/tests/recordbatch/test_minhash.py @@ -9,7 +9,7 @@ @pytest.mark.parametrize("num_hashes", [1, 2, 16, 128]) @pytest.mark.parametrize("ngram_size", [1, 2, 4, 5, 100]) @pytest.mark.parametrize("seed", [1, 2, 123, None]) -@pytest.mark.parametrize("hash_function", ["murmurhash3", "xxhash", "sha1"]) +@pytest.mark.parametrize("hash_function", ["murmurhash3", "xxhash", "xxhash3_64", "xxhash64", "xxhash32", "sha1"]) def test_table_expr_minhash(num_hashes, ngram_size, seed, hash_function): df = daft.from_pydict( { diff --git a/tests/series/test_minhash.py b/tests/series/test_minhash.py index 08812c51a6..3871bf334b 100644 --- a/tests/series/test_minhash.py +++ b/tests/series/test_minhash.py @@ -12,7 +12,7 @@ def minhash_none( num_hashes: int, ngram_size: int, seed: int | None, - hash_function: Literal["murmurhash3", "xxhash", "sha1"] = "murmurhash3", + hash_function: Literal["murmurhash3", "xxhash", "xxhash3_64", "xxhash64", "xxhash32", "sha1"] = "murmurhash3", ) -> list[list[int] | None]: if seed is None: return series.minhash(num_hashes, ngram_size, hash_function=hash_function).to_pylist() @@ -41,7 +41,7 @@ def minhash_none( @pytest.mark.parametrize("num_hashes", [1, 2, 16, 128]) @pytest.mark.parametrize("ngram_size", [1, 2, 4, 5, 100]) @pytest.mark.parametrize("seed", [1, -1, 123, None]) -@pytest.mark.parametrize("hash_function", ["murmurhash3", "xxhash", "sha1"]) +@pytest.mark.parametrize("hash_function", ["murmurhash3", "xxhash", "xxhash3_64", "xxhash64", "xxhash32", "sha1"]) def test_minhash(num_hashes, ngram_size, seed, hash_function): minhash = minhash_none(test_series, num_hashes, ngram_size, seed, hash_function) assert minhash[4] is None and minhash[-1] is None @@ -109,7 +109,7 @@ def test_minhash_exact_values(num_hashes, ngram_size, seed, expected): @pytest.mark.parametrize("num_hashes", [0, -1, -100]) @pytest.mark.parametrize("ngram_size", [1, 2, 4, 5, 100]) @pytest.mark.parametrize("seed", [1, -1, 123, None]) -@pytest.mark.parametrize("hash_function", ["murmurhash3", "xxhash", "sha1"]) +@pytest.mark.parametrize("hash_function", ["murmurhash3", "xxhash", "xxhash3_64", "xxhash64", "xxhash32", "sha1"]) def test_minhash_fails_nonpositive_num_hashes(num_hashes, ngram_size, seed, hash_function): with pytest.raises(ValueError, match="num_hashes must be positive"): minhash_none(test_series, num_hashes, ngram_size, seed, hash_function) @@ -118,7 +118,7 @@ def test_minhash_fails_nonpositive_num_hashes(num_hashes, ngram_size, seed, hash @pytest.mark.parametrize("num_hashes", [1, 2, 16, 128]) @pytest.mark.parametrize("ngram_size", [0, -1, -100]) @pytest.mark.parametrize("seed", [1, -1, 123, None]) -@pytest.mark.parametrize("hash_function", ["murmurhash3", "xxhash", "sha1"]) +@pytest.mark.parametrize("hash_function", ["murmurhash3", "xxhash", "xxhash3_64", "xxhash64", "xxhash32", "sha1"]) def test_minhash_fails_nonpositive_ngram_size(num_hashes, ngram_size, seed, hash_function): with pytest.raises(ValueError, match="ngram_size must be positive"): minhash_none(test_series, num_hashes, ngram_size, seed, hash_function) @@ -127,7 +127,7 @@ def test_minhash_fails_nonpositive_ngram_size(num_hashes, ngram_size, seed, hash @pytest.mark.parametrize("num_hashes", [1, 2, 16, 128]) @pytest.mark.parametrize("ngram_size", [1, 2, 4, 5, 100]) @pytest.mark.parametrize("seed", [1, -1, 123, None]) -@pytest.mark.parametrize("hash_function", ["murmurhash3", "xxhash", "sha1"]) +@pytest.mark.parametrize("hash_function", ["murmurhash3", "xxhash", "xxhash3_64", "xxhash64", "xxhash32", "sha1"]) def test_minhash_empty_series(num_hashes, ngram_size, seed, hash_function): series = Series.from_pylist([]).cast(DataType.string()) @@ -138,7 +138,7 @@ def test_minhash_empty_series(num_hashes, ngram_size, seed, hash_function): @pytest.mark.parametrize("num_hashes", [1, 2, 16, 128]) @pytest.mark.parametrize("ngram_size", [1, 2, 4, 5, 100]) @pytest.mark.parametrize("seed", [1, -1, 123, None]) -@pytest.mark.parametrize("hash_function", ["murmurhash3", "xxhash", "sha1"]) +@pytest.mark.parametrize("hash_function", ["murmurhash3", "xxhash", "xxhash3_64", "xxhash64", "xxhash32", "sha1"]) def test_minhash_seed_consistency(num_hashes, ngram_size, seed, hash_function): minhash1 = minhash_none(test_series, num_hashes, ngram_size, seed, hash_function) minhash2 = minhash_none(test_series, num_hashes, ngram_size, seed, hash_function) @@ -148,7 +148,7 @@ def test_minhash_seed_consistency(num_hashes, ngram_size, seed, hash_function): @pytest.mark.parametrize("num_hashes", [1, 2, 16, 128]) @pytest.mark.parametrize("ngram_size", [1, 2, 4, 5, 100]) @pytest.mark.parametrize("seed_pair", [[1, 2], [1, 5], [None, 2], [123, 234]]) -@pytest.mark.parametrize("hash_function", ["murmurhash3", "xxhash", "sha1"]) +@pytest.mark.parametrize("hash_function", ["murmurhash3", "xxhash", "xxhash3_64", "xxhash64", "xxhash32", "sha1"]) def test_minhash_seed_differences(num_hashes, ngram_size, seed_pair, hash_function): minhash1 = minhash_none(test_series, num_hashes, ngram_size, seed_pair[0], hash_function) minhash2 = minhash_none(test_series, num_hashes, ngram_size, seed_pair[1], hash_function)