IFWHYAZTPKL7ZVOV3AFZNIUTRLMVSOACYACA3CI4DSEEZU77YQEQC use pgx::extension_sql;use pgx::*;const TOL: f32 = 1.0e-8;#[pg_extern]fn cosine_similarity(a: Array<f32>, b: Array<f32>) -> f32 {let mut a_norm_sq = 0.0;let mut b_norm_sq = 0.0;let a_norm_sq_ref = &mut a_norm_sq;let b_norm_sq_ref = &mut b_norm_sq;let mut sq_sum: f32 = a.iter().zip(b.iter()).map(|pair| unwrap_pair(pair)).map(|(a, b)| {*a_norm_sq_ref += a * a;*b_norm_sq_ref += b * b;a * b}).sum();sq_sum /= TOL + a_norm_sq.sqrt();sq_sum /= TOL + b_norm_sq.sqrt();sq_sum.min(1.).max(-1.0)}#[pg_extern]fn l2_dist(a: Array<f32>, b: Array<f32>) -> f32 {let sq: f32 = a.iter().zip(b.iter()).map(|pair| unwrap_pair(pair)).map(|(a, b)| (a - b) * (a - b)).sum();sq.sqrt()}fn unwrap_pair((a, b): (Option<f32>, Option<f32>)) -> (f32, f32) {(a.unwrap_or(0.0), b.unwrap_or(0.0))}#[pg_extern]fn ip(a: Array<f32>, b: Array<f32>) -> f32 {a.iter().zip(b.iter()).map(|pair| unwrap_pair(pair)).map(|(a, b)| a * b).sum()}///extension_sql! {r#"///CREATE OPERATOR pg_catalog.<|> (/// PROCEDURE = ip,/// RESTRICT = restrict,/// LEFTARG = real[],/// RIGHTARG = real[],/// COMMUTATOR = '<|>'///);//////CREATE OPERATOR pg_catalog.|-| (/// PROCEDURE = l2_dist,/// RESTRICT = restrict,/// LEFTARG = real[],/// RIGHTARG = real[],/// COMMUTATOR = '<->'///);//////CREATE OPERATOR pg_catalog.<~> (/// PROCEDURE = cosine_similarity,/// RESTRICT = restrict,/// LEFTARG = real[],/// RIGHTARG = real[],/// COMMUTATOR = '<~>'///);/////////CREATE OPERATOR CLASS vector_ops DEFAULT FOR TYPE real[] USING vector AS/// OPERATOR 1 pg_catalog.<|>(real[]),/// OPERATOR 2 pg_catalog.|-|(real[]),/// OPERATOR 3 pg_catalog.<~>(real[]),/// STORAGE real[];///"#}#[cfg(any(test, feature = "pg_test"))]mod tests {use crate::vector_query::opclass::l2_dist;use crate::Vector;use core::mem::ManuallyDrop;use pgx::*;#[pg_test]fn test_hello_pgvector_rs() {let arr0 = (0..32).map(|x| x as f32).map(|x| x.sin()).collect::<Vec<_>>();let mut arr = arr0.iter().map(|x| x.into_datum()).map(|x| x.unwrap()).collect::<Vec<pg_sys::Datum>>();let mut nulls = arr.iter().map(|_| false).collect::<Vec<bool>>();let mut arr_2 = arr.clone();let mut nulls_2 = nulls.clone();println!("{:?}", arr);let vec1 = unsafe {let size = arr.len();let b = arr.as_mut_ptr();println!("{:?}", b);Array::over(b, nulls.as_mut_ptr(), size)};vec1.iter().for_each(|x| {println!("{:?}", x);});let vec2 = unsafe {let size = arr_2.len();Array::over(arr_2.as_mut_ptr(), nulls_2.as_mut_ptr(), size)};vec2.iter().zip(arr0.iter()).for_each(|(x, y)| {println!("{:?}", x.unwrap() - y);});let d = l2_dist(vec1, vec2);println!("distance: {}", d);assert!(false);}}
mod opclass;fn yolo() {panic!("ded!")}
{"rustc_fingerprint":16809209860643795384,"outputs":{"2797684049618456168":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n","stderr":""},"11097456331220016911":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n/Users/jan/.rustup/toolchains/nightly-x86_64-apple-darwin\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"\"\ntarget_family=\"unix\"\ntarget_feature=\"cmpxchg16b\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_feature=\"sse3\"\ntarget_feature=\"ssse3\"\ntarget_has_atomic=\"128\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_has_atomic_equal_alignment=\"16\"\ntarget_has_atomic_equal_alignment=\"32\"\ntarget_has_atomic_equal_alignment=\"64\"\ntarget_has_atomic_equal_alignment=\"8\"\ntarget_has_atomic_equal_alignment=\"ptr\"\ntarget_has_atomic_load_store=\"128\"\ntarget_has_atomic_load_store=\"16\"\ntarget_has_atomic_load_store=\"32\"\ntarget_has_atomic_load_store=\"64\"\ntarget_has_atomic_load_store=\"8\"\ntarget_has_atomic_load_store=\"ptr\"\ntarget_os=\"macos\"\ntarget_pointer_width=\"64\"\ntarget_thread_local\ntarget_vendor=\"apple\"\nunix\n","stderr":""},"15262391909083379790":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n","stderr":""},"931469667778813386":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n/Users/jan/.rustup/toolchains/nightly-x86_64-apple-darwin\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"\"\ntarget_family=\"unix\"\ntarget_feature=\"cmpxchg16b\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_feature=\"sse3\"\ntarget_feature=\"ssse3\"\ntarget_has_atomic=\"128\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_has_atomic_equal_alignment=\"16\"\ntarget_has_atomic_equal_alignment=\"32\"\ntarget_has_atomic_equal_alignment=\"64\"\ntarget_has_atomic_equal_alignment=\"8\"\ntarget_has_atomic_equal_alignment=\"ptr\"\ntarget_has_atomic_load_store=\"128\"\ntarget_has_atomic_load_store=\"16\"\ntarget_has_atomic_load_store=\"32\"\ntarget_has_atomic_load_store=\"64\"\ntarget_has_atomic_load_store=\"8\"\ntarget_has_atomic_load_store=\"ptr\"\ntarget_os=\"macos\"\ntarget_pointer_width=\"64\"\ntarget_thread_local\ntarget_vendor=\"apple\"\nunix\n","stderr":""},"11593133297143080531":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n","stderr":""},"10624890571507989382":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n/Users/jan/.rustup/toolchains/nightly-x86_64-apple-darwin\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"\"\ntarget_family=\"unix\"\ntarget_feature=\"cmpxchg16b\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_feature=\"sse3\"\ntarget_feature=\"ssse3\"\ntarget_has_atomic=\"128\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_has_atomic_equal_alignment=\"16\"\ntarget_has_atomic_equal_alignment=\"32\"\ntarget_has_atomic_equal_alignment=\"64\"\ntarget_has_atomic_equal_alignment=\"8\"\ntarget_has_atomic_equal_alignment=\"ptr\"\ntarget_has_atomic_load_store=\"128\"\ntarget_has_atomic_load_store=\"16\"\ntarget_has_atomic_load_store=\"32\"\ntarget_has_atomic_load_store=\"64\"\ntarget_has_atomic_load_store=\"8\"\ntarget_has_atomic_load_store=\"ptr\"\ntarget_os=\"macos\"\ntarget_pointer_width=\"64\"\ntarget_thread_local\ntarget_vendor=\"apple\"\nunix\n","stderr":""},"17598535894874457435":{"success":true,"status":"","code":0,"stdout":"rustc 1.54.0-nightly (676ee1472 2021-05-06)\nbinary: rustc\ncommit-hash: 676ee14729462585b969bbc52f32c307403f4126\ncommit-date: 2021-05-06\nhost: x86_64-apple-darwin\nrelease: 1.54.0-nightly\nLLVM version: 12.0.0\n","stderr":""}},"successes":{}}
use pgx::extension_sql;use pgx::*;const TOL: f32 = 1.0e-8;#[pg_extern]fn cosine_similarity(a: Array<f32>, b: Array<f32>) -> f32 {let mut a_norm_sq = 0.0;let mut b_norm_sq = 0.0;let a_norm_sq_ref = &mut a_norm_sq;let b_norm_sq_ref = &mut b_norm_sq;let mut sq_sum: f32 = a.iter().zip(b.iter()).map(|pair| unwrap_pair(pair)).map(|(a, b)| {*a_norm_sq_ref += a * a;*b_norm_sq_ref += b * b;a * b}).sum();sq_sum /= TOL + a_norm_sq.sqrt();sq_sum /= TOL + b_norm_sq.sqrt();sq_sum.min(1.).max(-1.0)}#[pg_extern]fn l2_dist(a: Array<f32>, b: Array<f32>) -> f32 {let sq: f32 = a.iter().zip(b.iter()).map(|pair| unwrap_pair(pair)).map(|(a, b)| (a - b) * (a - b)).sum();sq.sqrt()}fn unwrap_pair((a, b): (Option<f32>, Option<f32>)) -> (f32, f32) {(a.unwrap_or(0.0), b.unwrap_or(0.0))}#[pg_extern]fn ip(a: Array<f32>, b: Array<f32>) -> f32 {a.iter().zip(b.iter()).map(|pair| unwrap_pair(pair)).map(|(a, b)| a * b).sum()}///extension_sql! {r#"///CREATE OPERATOR pg_catalog.<|> (/// PROCEDURE = ip,/// RESTRICT = restrict,/// LEFTARG = real[],/// RIGHTARG = real[],/// COMMUTATOR = '<|>'///);//////CREATE OPERATOR pg_catalog.|-| (/// PROCEDURE = l2_dist,/// RESTRICT = restrict,/// LEFTARG = real[],/// RIGHTARG = real[],/// COMMUTATOR = '<->'///);//////CREATE OPERATOR pg_catalog.<~> (/// PROCEDURE = cosine_similarity,/// RESTRICT = restrict,/// LEFTARG = real[],/// RIGHTARG = real[],/// COMMUTATOR = '<~>'///);/////////CREATE OPERATOR CLASS vector_ops DEFAULT FOR TYPE real[] USING vector AS/// OPERATOR 1 pg_catalog.<|>(real[]),/// OPERATOR 2 pg_catalog.|-|(real[]),/// OPERATOR 3 pg_catalog.<~>(real[]),/// STORAGE real[];///"#}#[cfg(any(test, feature = "pg_test"))]mod tests {use crate::vector_query::opclass::l2_dist;use crate::Vector;use core::mem::ManuallyDrop;use pgx::*;#[pg_test]fn test_hello_pgvector_rs() {let arr0 = (0..32).map(|x| x as f32).map(|x| x.sin()).collect::<Vec<_>>();let mut arr = arr0.iter().map(|x| x.into_datum()).map(|x| x.unwrap()).collect::<Vec<pg_sys::Datum>>();let mut nulls = arr.iter().map(|_| false).collect::<Vec<bool>>();let mut arr_2 = arr.clone();let mut nulls_2 = nulls.clone();println!("{:?}", arr);let vec1 = unsafe {let size = arr.len();let b = arr.as_mut_ptr();println!("{:?}", b);Array::over(b, nulls.as_mut_ptr(), size)};vec1.iter().for_each(|x| {println!("{:?}", x);});let vec2 = unsafe {let size = arr_2.len();Array::over(arr_2.as_mut_ptr(), nulls_2.as_mut_ptr(), size)};vec2.iter().zip(arr0.iter()).for_each(|(x, y)| {println!("{:?}", x.unwrap() - y);});let d = l2_dist(vec1, vec2);println!("distance: {}", d);assert!(false);}}
mod opclass;fn yolo() {panic!("ded!")}
use pgx::PGXSharedMemory;use pgx::PgBox;use pgx::PgMemoryContexts;use std::iter::repeat;use thiserror::Error;trait F32VecBlockData {}trait I64BlockData {}#[derive(Default, Debug)]pub struct F32Block<const SIZE: usize>(heapless::Vec<f32, SIZE>);unsafe impl<const SIZE: usize> PGXSharedMemory for F32Block<SIZE> {}pub type BlockResult<T> = Result<T, F32BlockError>;#[derive(Error, Debug, PartialEq, Eq)]pub enum F32BlockError {#[error("Trying to write over end of block: Start {write_start}, Length {write_length}, block ends at {block_end}")]Overflow {write_start: usize,write_length: usize,block_end: usize,},#[error("Trying to push past block limit")]LimitExceded {push_start: usize,push_size: usize,limit: usize,},#[error("Trying to free unallocated memory. Only allocated {data_len}. Segment to delete starts at {start}, and is of length {len}")]DeleteEmpty {data_len: usize,start: usize,len: usize,},#[error("Placeholder")]PlaceHolder,#[error("Trying to start at index {start} that is beyond the length of written data {data_len}")]StartEmpty { data_len: usize, start: usize },#[error("Only support pushing bounded iterators")]PushUnboundedIter,}impl F32BlockError {fn overflow(start: usize, length: usize, block: usize) -> F32BlockError {F32BlockError::Overflow {write_start: start,write_length: length,block_end: block,}}fn limit_exceded(start: usize, length: usize, limit: usize) -> F32BlockError {F32BlockError::LimitExceded {push_start: start,push_size: length,limit,}}}struct Padded<'a> {dimension: u16,data: &'a [f32],}impl<'a> Padded<'a> {fn iterator_with_dim(self) -> impl Iterator<Item = f32> + 'a {self.data.iter().map(|x| *x).chain(repeat(0.0)).take(self.dimension as usize)}}/// This will write over a piece of the blockimpl<const SIZE: usize> F32Block<SIZE> {fn write_slice<'a, 'b>(&'a mut self, vec: &'b [f32], start: usize) -> BlockResult<()> {let vec_len = vec.len();let data_len = self.0.len();let end = vec_len + start;if end <= data_len {for (new_val, old_val) in vec.iter().zip(self.0[start..].iter_mut()) {*old_val = *new_val;}Ok(())} else {Err(F32BlockError::overflow(start, vec_len, data_len))}}fn write_iter<IterType: Iterator<Item = f32>>(&mut self,start: usize,iter: IterType,) -> BlockResult<()> {if let (_, Some(upper_bound)) = iter.size_hint() {let data_len = self.0.len();if start + upper_bound > data_len {for (old_val, new_val) in self.0[start..].iter_mut().zip(iter) {*old_val = new_val;}Ok(())} else {Err(F32BlockError::overflow(start, upper_bound, data_len))}} else {Err(F32BlockError::PushUnboundedIter)}}fn push_slice<'a, 'b>(&'a mut self, vec: &'b [f32]) -> BlockResult<()> {let data_len = self.0.len();let vec_len = vec.len();self.0.extend_from_slice(vec).map_err(|_| F32BlockError::limit_exceded(data_len, vec_len, SIZE))}/// We allow pushing iterators, but we require an upper bound for the size hint of the/// iterator.fn push_iter<IterType: Iterator<Item = f32>>(&mut self, iter: IterType) -> BlockResult<()> {if let (_, Some(upper_bound)) = iter.size_hint() {let data_len = self.0.len();if data_len + upper_bound <= SIZE {iter.map(|el| self.0.push(el)).collect::<Result<Vec<_>, _>>().map_err(|_| F32BlockError::PushUnboundedIter).map(|_| ())} else {Err(F32BlockError::limit_exceded(data_len, upper_bound, SIZE))}} else {Err(F32BlockError::PushUnboundedIter)}}fn free(&mut self, start: usize, len: usize) -> Result<(), F32BlockError> {let mut end = start + len;let data_len = self.0.len();if end > data_len {Err(F32BlockError::overflow(start, len, data_len))} else {let mut loop_start = start;loop {let (initial, tail) = self.0.split_at_mut(end);for (to_delete, to_assign) in initial[loop_start..].iter_mut().zip(tail.iter()) {*to_delete = *to_assign;}end += len;loop_start += len;if end >= data_len {break;}}self.0.truncate(data_len - len);Ok(())}}fn read_data(&self, start: usize, len: usize) -> BlockResult<&[f32]> {let data_len = self.0.len();let end = start + len;if end > data_len {Err(F32BlockError::overflow(start, len, SIZE))} else {Ok(&self.0[start..end])}}}#[cfg(test)]mod f32_block_tests {const TOL: f32 = 1.0e-7;const BLOCK_SIZE: usize = 8192;use crate::vec_block_alloc::F32Block;use crate::vec_block_alloc::F32BlockError;/// test_push: given block of finite size, test that we can push a vector of finite dim/// test_drop:/// test_compact:#[test]fn test_write() {let good_start = 0;let vec_size = 512;let end = vec_size * 2;let bad_start = BLOCK_SIZE - (vec_size / 2);let vec = (0..vec_size).map(|x| (x as f32).sin()).collect::<Vec<_>>();let mut mem = F32Block::<BLOCK_SIZE>::default();mem.push_iter((0..end).map(|x| (x as f32))).unwrap();mem.write_slice(&vec, good_start).unwrap();let read = mem.read_data(good_start, vec_size).unwrap();let diff: f32 = read.iter().zip(vec.iter()).map(|(x, y)| (x - y).abs()).sum();assert!(diff <= TOL);assert_eq!(mem.write_slice(&vec, bad_start).unwrap_err(),F32BlockError::overflow(bad_start, vec_size, end));assert_eq!(mem.write_slice(&vec, BLOCK_SIZE + vec_size).unwrap_err(),F32BlockError::overflow(BLOCK_SIZE + vec_size, vec_size, end));}#[test]fn test_push() {let mut mem = F32Block::<BLOCK_SIZE>::default();let vec_size = 512;let vec = (0..vec_size).map(|x| (x as f32).sin()).collect::<Vec<_>>();let pushes = (0..20).map(|_| mem.push_slice(&vec)).collect::<Result<Vec<()>, F32BlockError>>();assert!(pushes.is_err());}#[test]fn test_delete() {let good_start = 1024; // something less than BLOCK_SIZE;let len = good_start * 1024; // something less than BLOCK_SIZE - good_start;let mut mem = F32Block::<BLOCK_SIZE>::default();assert_eq!(mem.free(good_start, len).unwrap_err(),F32BlockError::overflow(good_start, len, BLOCK_SIZE));mem.push_iter((0..len).map(|x| (x as f32).cos())).unwrap();assert!(mem.free(good_start, len).is_ok());assert!(mem.free(good_start, len).is_err());}}/// Later we can replace the individual fields with a seq of blocks#[derive(Default, Debug)]pub struct FiniteDimAllocator<const BlockSize: usize> {dimension: u16,len: usize,f32_data: F32Block<BlockSize>,}unsafe impl<const Size: usize> PGXSharedMemory for FiniteDimAllocator<Size> {}impl<const BlockSize: usize> FiniteDimAllocator<BlockSize> {fn new(dimension: u16) -> FiniteDimAllocator<BlockSize> {FiniteDimAllocator {dimension,len: 0,f32_data: F32Block::<BlockSize>::default(),}}pub fn is_set(&self) -> bool {self.dimension > 0}pub fn set_dim(&mut self, dimension: u16) {self.dimension = dimension}fn dim(&self) -> usize {self.dimension as usize}fn end(&self) -> usize {self.len * (self.dimension as usize)}pub fn add_vec(&mut self, vec: &[f32]) -> Result<(), F32BlockError> {//// Replace with custom error typelet padded = Padded {dimension: self.dimension,data: vec,};self.f32_data.push_iter(padded.iterator_with_dim())?;self.len += 1;Ok(())}fn remove_vec(&mut self, index: usize) -> Result<(), F32BlockError> {if index >= self.len {Err(F32BlockError::PlaceHolder)} else {let dimension = self.dim();let slice_start = index * dimension;let slice_end = slice_start + dimension;self.f32_data.free(slice_start, slice_end)?;self.len -= 1;Ok(())}}pub fn get_slice(&self, index: usize) -> Result<&[f32], F32BlockError> {let dimension = self.dim();let slice_start = dimension * index;let slice_end = slice_start + dimension;self.f32_data.read_data(slice_start, dimension)}fn pad<'b>(&'_ self, data: &'b [f32]) -> Padded<'b> {Padded {dimension: self.dimension,data,}}pub fn update_vec(&mut self, index: usize, data: &[f32]) -> Result<(), F32BlockError> {let padded = self.pad(data);self.f32_data.write_iter(index * self.dim(), padded.iterator_with_dim())?;Ok(())}// todo SIMD????pub fn ip_many(&self, vec: &Vec<f32>, indices: &Vec<usize>) -> Result<Vec<f32>, F32BlockError> {indices.iter().map(|&index| {let data = self.get_slice(index)?;Ok(data.iter().zip(vec.iter()).map(|(&a, &b)| a * b).sum())}).collect::<Result<Vec<f32>, F32BlockError>>()}}#[cfg(test)]mod test_finite_dim_allocator {use crate::util::random_vecs;use crate::vec_block_alloc::F32BlockError;use crate::vec_block_alloc::FiniteDimAllocator;#[test]fn test_insert() {const SIZE: usize = 100000;let dim: u16 = 32;let mut allocator = FiniteDimAllocator::<SIZE>::new(dim);let allocator_borr = &mut allocator;random_vecs(dim, 100).map(|vec| allocator_borr.add_vec(&vec)).collect::<Result<Vec<()>, F32BlockError>>().unwrap();}#[test]fn print_size() {const SIZE: usize = 523934;let size = std::mem::size_of::<FiniteDimAllocator<SIZE>>();print!("Size of allocator: {}", size);assert!(false);}}
const PI: f32 = 3.141592653;const TAU: f32 = 2.0 * PI;pub fn random_vecs(dim: u16, n: i32) -> impl std::iter::Iterator<Item = Vec<f32>> {(0..n).map(|i| (i, rand::random::<f32>() * TAU)).map(move |(i, offset)| {let vec = (0..dim).map(|index| offset as f32 + TAU * ((index as f32) / 16.0)).map(|x| x.sin()).collect::<Vec<f32>>();(i, vec)}).map(|(_, vec)| vec)}
{"rustc_fingerprint":16809209860643795384,"outputs":{"2797684049618456168":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n","stderr":""},"11097456331220016911":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n/Users/jan/.rustup/toolchains/nightly-x86_64-apple-darwin\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"\"\ntarget_family=\"unix\"\ntarget_feature=\"cmpxchg16b\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_feature=\"sse3\"\ntarget_feature=\"ssse3\"\ntarget_has_atomic=\"128\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_has_atomic_equal_alignment=\"16\"\ntarget_has_atomic_equal_alignment=\"32\"\ntarget_has_atomic_equal_alignment=\"64\"\ntarget_has_atomic_equal_alignment=\"8\"\ntarget_has_atomic_equal_alignment=\"ptr\"\ntarget_has_atomic_load_store=\"128\"\ntarget_has_atomic_load_store=\"16\"\ntarget_has_atomic_load_store=\"32\"\ntarget_has_atomic_load_store=\"64\"\ntarget_has_atomic_load_store=\"8\"\ntarget_has_atomic_load_store=\"ptr\"\ntarget_os=\"macos\"\ntarget_pointer_width=\"64\"\ntarget_thread_local\ntarget_vendor=\"apple\"\nunix\n","stderr":""},"15262391909083379790":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n","stderr":""},"931469667778813386":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n/Users/jan/.rustup/toolchains/nightly-x86_64-apple-darwin\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"\"\ntarget_family=\"unix\"\ntarget_feature=\"cmpxchg16b\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_feature=\"sse3\"\ntarget_feature=\"ssse3\"\ntarget_has_atomic=\"128\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_has_atomic_equal_alignment=\"16\"\ntarget_has_atomic_equal_alignment=\"32\"\ntarget_has_atomic_equal_alignment=\"64\"\ntarget_has_atomic_equal_alignment=\"8\"\ntarget_has_atomic_equal_alignment=\"ptr\"\ntarget_has_atomic_load_store=\"128\"\ntarget_has_atomic_load_store=\"16\"\ntarget_has_atomic_load_store=\"32\"\ntarget_has_atomic_load_store=\"64\"\ntarget_has_atomic_load_store=\"8\"\ntarget_has_atomic_load_store=\"ptr\"\ntarget_os=\"macos\"\ntarget_pointer_width=\"64\"\ntarget_thread_local\ntarget_vendor=\"apple\"\nunix\n","stderr":""},"11593133297143080531":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n","stderr":""},"10624890571507989382":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n/Users/jan/.rustup/toolchains/nightly-x86_64-apple-darwin\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"\"\ntarget_family=\"unix\"\ntarget_feature=\"cmpxchg16b\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_feature=\"sse3\"\ntarget_feature=\"ssse3\"\ntarget_has_atomic=\"128\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_has_atomic_equal_alignment=\"16\"\ntarget_has_atomic_equal_alignment=\"32\"\ntarget_has_atomic_equal_alignment=\"64\"\ntarget_has_atomic_equal_alignment=\"8\"\ntarget_has_atomic_equal_alignment=\"ptr\"\ntarget_has_atomic_load_store=\"128\"\ntarget_has_atomic_load_store=\"16\"\ntarget_has_atomic_load_store=\"32\"\ntarget_has_atomic_load_store=\"64\"\ntarget_has_atomic_load_store=\"8\"\ntarget_has_atomic_load_store=\"ptr\"\ntarget_os=\"macos\"\ntarget_pointer_width=\"64\"\ntarget_thread_local\ntarget_vendor=\"apple\"\nunix\n","stderr":""},"17598535894874457435":{"success":true,"status":"","code":0,"stdout":"rustc 1.54.0-nightly (676ee1472 2021-05-06)\nbinary: rustc\ncommit-hash: 676ee14729462585b969bbc52f32c307403f4126\ncommit-date: 2021-05-06\nhost: x86_64-apple-darwin\nrelease: 1.54.0-nightly\nLLVM version: 12.0.0\n","stderr":""}},"successes":{}}
use crate::vec_block_alloc::F32Block;use crate::FiniteDimAllocator;use pgx::log;use pgx::pg_sys;use pgx::PgSharedMemoryInitialization;use pgx::{pg_guard, pg_shmem_init};use pgx::{PgLwLock, PgLwLockInner};use std::lazy::OnceCell;use uuid::Uuid;pub struct ShMemBlocks<const BLOCK_SIZE: usize> {size: OnceCell<usize>,name: OnceCell<&'static str>,pub inner: OnceCell<ShMemBlocksInner<BLOCK_SIZE>>,}unsafe impl<const BLOCK_SIZE: usize> Send for ShMemBlocks<BLOCK_SIZE> {}unsafe impl<const BLOCK_SIZE: usize> Sync for ShMemBlocks<BLOCK_SIZE> {}#[derive(Debug)]pub struct ShMemBlocksInner<const BLOCK_SIZE: usize> {pub start: *mut PgLwLock<FiniteDimAllocator<BLOCK_SIZE>>,}impl<const BLOCK_SIZE: usize> ShMemBlocks<BLOCK_SIZE> {pub const fn new() -> Self {ShMemBlocks {size: OnceCell::new(),name: OnceCell::new(),inner: OnceCell::new(),}}pub fn get_name(&self) -> &'static str {match self.name.get() {None => {let name = Box::leak(Uuid::new_v4().to_string().into_boxed_str());self.name.set(name).unwrap();name}Some(name) => name,}}pub fn get_size(&self) -> usize {match self.size.get() {None => {let size = 1024; // todo: replace with GucStuffself.size.set(size).unwrap();size}Some(size) => *size,}}pub fn get_block(&self, index: usize) -> Option<&PgLwLock<FiniteDimAllocator<BLOCK_SIZE>>> {if index > self.get_size() {None} else {// This is guaranteed to be safe because we initialise all addresses at startupself.inner.get().map(|inner| unsafe { inner.get_block(index) }).flatten()}}pub fn iter<'a>(&'a self,) -> impl Iterator<Item = &PgLwLock<FiniteDimAllocator<BLOCK_SIZE>>> + 'a {let size = self.get_size();(0..size).filter_map(move |index| self.get_block(index))}}impl<const BLOCK_SIZE: usize> ShMemBlocksInner<BLOCK_SIZE> {unsafe fn get_block(&self, index: usize) -> Option<&PgLwLock<FiniteDimAllocator<BLOCK_SIZE>>> {self.start.add(index).as_ref()}}
use memoffset::*;use pgx::pg_sys::AsPgCStr;use pgx::*;use serde::Serialize;static mut RELOPT_KIND_PGVEC: u32 = 0; // Do I like this? Oh no I do not!#[derive(Clone, Debug, Serialize)]pub struct VecMemCacheOptions {/* varlena header (do not touch directly!) */#[allow(dead_code)]vl_len_: i32,dimension: u32,something: u32,}const DEFAULT_DIM: u32 = 512;#[allow(dead_code)]impl VecMemCacheOptions {pub(crate) fn from_relation(relation: &PgRelation) -> PgBox<VecMemCacheOptions> {if relation.rd_index.is_null() {panic!("'{} not a vector memcache index' ", relation.name())} else if relation.rd_options.is_null() {// use defaultslet mut opts = PgBox::<VecMemCacheOptions>::alloc0();opts.dimension = DEFAULT_DIM;opts} else {PgBox::from_pg(relation.rd_options as *mut VecMemCacheOptions)}}pub(crate) fn get_dim(&self) -> u16 {self.dimension as u16}}#[pg_guard]pub unsafe extern "C" fn vec_mem_cache_options(reloptions: pg_sys::Datum,validate: bool,) -> *mut pg_sys::varlena {let tab: [pg_sys::relopt_parse_elt; NUM_REL_OPTS] = [pg_sys::relopt_parse_elt {optname: "dimension".as_pg_cstr(),opttype: pg_sys::relopt_type_RELOPT_TYPE_INT,offset: offset_of!(VecMemCacheOptions, dimension) as i32,},pg_sys::relopt_parse_elt {optname: "something".as_pg_cstr(),opttype: pg_sys::relopt_type_RELOPT_TYPE_INT,offset: offset_of!(VecMemCacheOptions, something) as i32,},];build_relopts(reloptions, validate, tab)}// impl IntoDatum for *mut pg_sys::bytea {}const NUM_REL_OPTS: usize = 2;unsafe fn build_relopts(reloptions: pg_sys::Datum,validate: bool,tab: [pg_sys::relopt_parse_elt; NUM_REL_OPTS],) -> *mut pg_sys::bytea {let rdopts;/* Parse the user-given reloptions */rdopts = pg_sys::build_reloptions(reloptions,validate,RELOPT_KIND_PGVEC,std::mem::size_of::<VecMemCacheOptions>(),tab.as_ptr(),NUM_REL_OPTS as i32,);rdopts as *mut pg_sys::bytea}#[cfg(test)]mod test {}
pub mod build;mod insert;pub mod options;use pgx::*;#[pg_guard]pub extern "C" fn amvalidate(_opclassoid: pg_sys::Oid) -> bool {true}/// ```sql/// CREATE OR REPLACE FUNCTION amhandler(internal) RETURNS index_am_handler PARALLEL SAFE IMMUTABLE STRICT COST 0.0001 LANGUAGE c AS 'MODULE_PATHNAME', '@FUNCTION_NAME@';/// CREATE ACCESS METHOD vector TYPE INDEX HANDLER amhandler;/// ```#[pg_extern]pub fn amhandler(_fcinfi: pg_sys::FunctionCallInfo) -> PgBox<pg_sys::IndexAmRoutine> {let mut amroutine =PgBox::<pg_sys::IndexAmRoutine>::alloc_node(pg_sys::NodeTag_T_IndexAmRoutine);amroutine.amstrategies = 0;amroutine.amsupport = 4;amroutine.amcanorderbyop = true;amroutine.amoptionalkey = true;amroutine.amkeytype = pg_sys::InvalidOid;amroutine.ambuild = Some(build::ambuild);amroutine.ambuildempty = Some(build::vec_mem_cache_buildempty);amroutine.aminsert = None; // todo!();amroutine.ambulkdelete = None; // todo!();amroutine.amvacuumcleanup = None; // todo!();amroutine.amcostestimate = None; //todo!();amroutine.amoptions = Some(options::vec_mem_cache_options);amroutine.amvalidate = Some(amvalidate);amroutine.ambeginscan = None; //todo!();amroutine.amrescan = None; //todo!();amroutine.amgettuple = None; //todo!();amroutine.amendscan = None; //todo!();amroutine}#[cfg(test)]mod test {// We want to try out something like// ```sql// CREATE TABLE public.vecs_2//(// idx bigserial,// vec real[] CONSTRAINT vecs_2_dim512 CHECK (cardinality(vec) = 512),// PRIMARY KEY (idx)//);//// ```fn test_constraints() {todo!() // YOLO}}
// use crate::mem_cache_index::build::row_to_vec_iter;use pgx::*;pub unsafe extern "C" fn aminsert(index_relation: pg_sys::Relation,values: *mut pg_sys::Datum,_isnull: *mut bool,heap_tid: pg_sys::ItemPointer,_heap_relation: pg_sys::Relation,_check_unique: pg_sys::IndexUniqueCheck,_index_info: *mut pg_sys::IndexInfo,) -> bool {let index_relation = PgRelation::from_pg(index_relation);let values = std::slice::from_raw_parts(values, 1);let name = index_relation.name().bytes();// todo: try to get a pointer to the name as a sequence of i8 bytes.// let tupdesc = pg_sys::RelationNameGetTupleDesc(name);// let vec = row_to_vec_iter(tupdesc, values[0]).unwrap();true}
use super::options;use crate::vec_block_alloc;use crate::VECTOR_ALLOCATOR00;use pgx::*;pub extern "C" fn ambuild(heaprel: pg_sys::Relation,indexrel: pg_sys::Relation,index_info: *mut pg_sys::IndexInfo,) -> *mut pg_sys::IndexBuildResult {let mut vector_allocator = VECTOR_ALLOCATOR00.exclusive();// If index is already set, then panicif vector_allocator.is_set() {panic!("Index already set. We've only got room for one in here")}// Instantiate vector mem cachelet index_relation = unsafe { PgRelation::from_pg(indexrel) };let opts = options::VecMemCacheOptions::from_relation(&index_relation);vector_allocator.set_dim(opts.get_dim());// check that the indexrel has a realarray column and a bigint column// do heap scan to load as much data as possible into cache.let result = PgBox::<pg_sys::IndexBuildResult>::alloc0();result.into_pg()}struct BuildState<'a> {tupdesc: &'a PgTupleDesc<'a>,dimension: u16,// attributes: Vec<CategorizedAttribute<'a>>,memcxt: PgMemoryContexts,pub rows_added: u32,}impl<'a> BuildState<'a> {fn new(tupdesc: &'a PgTupleDesc,dimension: u16,// attributes: Vec<CategorizedAttribute<'a>>,) -> Self {BuildState {tupdesc,dimension,memcxt: PgMemoryContexts::new("pgvec_rs context"),rows_added: 0,}}}#[cfg(feature = "pg13")]#[pg_guard]unsafe extern "C" fn build_callback(_index: pg_sys::Relation,ctid: pg_sys::ItemPointer,values: *mut pg_sys::Datum,_isnull: *mut bool,_tuple_is_alive: bool,state: *mut std::os::raw::c_void,) {build_callback_internal(*ctid, values, state);}#[inline(always)]unsafe extern "C" fn build_callback_internal(ctid: pg_sys::ItemPointerData,values: *mut pg_sys::Datum,state: *mut std::os::raw::c_void,) {check_for_interrupts!();let state = (state as *mut BuildState).as_mut().unwrap();let old_context = state.memcxt.set_as_current();// TODO: Add check for structurelet values = std::slice::from_raw_parts(values, 1);let VecRow { id: _, data: vec } = row_to_vec_iter(&state.tupdesc, values[0]).unwrap();let mut vec_allocator = VECTOR_ALLOCATOR00.exclusive();vec_allocator.add_vec(&vec);state.rows_added += 1;}fn do_heap_scan<'a>(index_info: *mut pg_sys::IndexInfo,heap_relation: &'a PgRelation,index_relation: &'a PgRelation,tupdesc: &'a PgTupleDesc,) -> u32 {// Should be able a to get a dimension from the description, but we'll hard code it now YOLOlet dimension = 512;let mut state = BuildState::new(&tupdesc, dimension);unsafe {pg_sys::IndexBuildHeapScan(heap_relation.as_ptr(),index_relation.as_ptr(),index_info,Some(build_callback),&mut state,);}state.rows_added}#[derive(Default, Clone)]struct VecRowPartial {id: Option<i64>,data: Option<Vec<f32>>,}impl VecRowPartial {fn check(self) -> Result<VecRow, &'static str> {match self {VecRowPartial {id: Some(id),data: Some(data),} => Ok(VecRow { id, data }),VecRowPartial {id: None,data: Some(_),} => Err("Missing Id for vec row"),VecRowPartial {id: Some(_),data: None,} => Err("Missing vector data for vec row"),VecRowPartial {id: None,data: None,} => Err("Missing all data for vector"),}}}struct VecRow {id: i64,data: Vec<f32>,}#[inline]unsafe fn row_to_vec_iter<'a>(tupdesc: &'a PgTupleDesc,row: pg_sys::Datum,) -> Result<VecRow, &'static str> {let td = pg_sys::pg_detoast_datum(row as *mut pg_sys::varlena) as pg_sys::HeapTupleHeader;let mut tmptup = pg_sys::HeapTupleData {t_len: varsize(td as *mut pg_sys::varlena) as u32,t_self: Default::default(),t_tableOid: 0,t_data: td,};let mut datums = vec![0 as pg_sys::Datum; tupdesc.natts as usize];let mut nulls = vec![false; tupdesc.natts as usize];pg_sys::heap_deform_tuple(&mut tmptup,tupdesc.as_ptr(),datums.as_mut_ptr(),nulls.as_mut_ptr(),);let mut drop_cnt = 0;let mut vec_row = VecRowPartial::default();tupdesc.iter().map(|attribute| {let is_dropped = attribute.is_dropped();let array_type = unsafe { pg_sys::get_element_type(attribute.type_oid().value()) };let (base_oid, is_array) = if array_type != pg_sys::InvalidOid {(PgOid::from(array_type), true)} else {(attribute.type_oid(), false)};let typoid = base_oid;let fill_process: Result<(), &'static str> = match &typoid {PgOid::BuiltIn(builtin) => match (builtin, is_array) {(PgBuiltInOids::FLOAT4OID, true) => {if let None = vec_row.data {let data =unsafe { Vec::<f32>::from_datum(row, false, builtin.value()) }.unwrap();vec_row.data = Some(data);Ok(())} else {Err("Received more than one vector data entries in row")}}(PgBuiltInOids::INT8OID, false) => {if let None = vec_row.id {let id =unsafe { i64::from_datum(row, false, builtin.value()) }.unwrap();vec_row.id = Some(id);Ok(())} else {Err("Received more than one id in the row")}}(_, _) => {Err("This row is not the right shape. It should be (bigint, real[])")}},_ => Err("This element is not an accepted type"), // todo: communicate the right and wrong types};fill_process}).collect::<Result<Vec<()>, &'static str>>()?;vec_row.check()}#[pg_guard]pub extern "C" fn vec_mem_cache_buildempty(_index_relation: pg_sys::Relation) {}
#![feature(array_map)]#![feature(once_cell)]#![feature(const_extern_fn)]pub mod mem_cache_index;mod shmem_blocks;mod util;mod vec_block_alloc;pub mod vector_query;use crate::shmem_blocks::ShMemBlocksInner;use pgx::*;use rand::{distributions::Uniform, thread_rng, Rng};use serde::*;use std::fmt::format;use std::sync::Arc;use std::sync::Mutex;use vec_block_alloc::FiniteDimAllocator;#[macro_use]extern crate lazy_static;const MAX_SPACE: usize = 131_072;lazy_static! {static ref SHMEM_STEP: Arc<Mutex<usize>> = Arc::new(Mutex::new(0));static ref LOCKS: Arc<Mutex<Vec<PgLwLock<FiniteDimAllocator<MAX_SPACE>>>>> =Arc::new(Mutex::new(Vec::new()));static ref HOOK_LOCK: Mutex<bool> = Mutex::new(false);}pg_module_magic!();#[pg_extern]fn hello_pgvector_rs() -> &'static str {"Get ready to do some similarity search!"}fn unwrap_pair((a, b): (Option<f32>, Option<f32>)) -> (f32, f32) {(a.unwrap_or(0.0), b.unwrap_or(0.0))}#[derive(PostgresType, Deserialize, Serialize, Debug, PartialEq, Clone)]pub struct Vector(Vec<f32>);// we only seem able to allocate something that's about 2 MiB. Whelp let's start with that, and// figure out how to grow laterstatic VECTOR_ALLOCATOR00: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();static SHMEM_BLOCKS: shmem_blocks::ShMemBlocks<MAX_SPACE> =shmem_blocks::ShMemBlocks::<MAX_SPACE>::new();//static VECVECVEC: [PgLwLock<FiniteDimAllocator<MAX_SPACE>>; 400] =// [(); 400].map(|_| PgLwLock::new());#[pg_guard]pub extern "C" fn _PG_init() {// pg_shmem_init!(VECTOR_ALLOCATOR00);unsafe {let lock = std::ffi::CString::new(SHMEM_BLOCKS.get_name()).expect("CString::new failed");let size = SHMEM_BLOCKS.get_size();for _ in 0..size {let pg_lw_lock = PgLwLock::<FiniteDimAllocator<MAX_SPACE>>::new();let name = pg_lw_lock.get_name();let lock_name = std::ffi::CString::new(name).expect("CString::new failed");pg_sys::RequestAddinShmemSpace(size * std::mem::size_of::<FiniteDimAllocator<MAX_SPACE>>(),);pg_sys::RequestNamedLWLockTranche(lock_name.as_ptr(), 1);let mut locks = LOCKS.lock().unwrap();locks.push(pg_lw_lock)}pg_sys::RequestAddinShmemSpace(size * std::mem::size_of::<PgLwLock<FiniteDimAllocator<MAX_SPACE>>>(),);pg_sys::RequestNamedLWLockTranche(lock.as_ptr(), 1);}unsafe {let size = SHMEM_BLOCKS.get_size();for _ in 0..size {let hook_lock = HOOK_LOCK.lock().expect("LOCKING SHMEM HOOKS");static mut PREV_SHMEM_STARTUP_HOOK: Option<(unsafe extern "C" fn(), usize)> = None;PREV_SHMEM_STARTUP_HOOK = pg_sys::shmem_startup_hook.map(|i| (i, 0));pg_sys::shmem_startup_hook = Some(shmem_hook);println!("startup hook: {:?}", PREV_SHMEM_STARTUP_HOOK);println!("DONE SETTING SHMEM_HOOKS: {:?}", hook_lock);#[pg_guard]extern "C" fn shmem_hook() {unsafe {PREV_SHMEM_STARTUP_HOOK =PREV_SHMEM_STARTUP_HOOK.map(|(i, count)| (i, count + 1));if let Some((i, count)) = PREV_SHMEM_STARTUP_HOOK {if count < SHMEM_BLOCKS.get_size() {i()};}let addin_shmem_init_lock: *mut pg_sys::LWLock =&mut (*pg_sys::MainLWLockArray.add(21)).lock;pg_sys::LWLockAcquire(addin_shmem_init_lock, pg_sys::LWLockMode_LW_EXCLUSIVE);let mut found = false;let mut step_num = SHMEM_STEP.lock().unwrap();let locks_vec = LOCKS.lock().expect("Unable to lock locks vector");let lock = locks_vec.get(*step_num).unwrap();let lock_name = lock.get_name();let shm_name = std::ffi::CString::new(lock_name).expect("CString::new failed");println!("About to attach pointer");let fv_shmem = pg_sys::ShmemInitStruct(shm_name.into_raw(),std::mem::size_of::<FiniteDimAllocator<MAX_SPACE>>(),&mut found,) as *mut FiniteDimAllocator<MAX_SPACE>;lock.attach(fv_shmem);println!("About to write pointer");*step_num += 1;pg_sys::LWLockRelease(addin_shmem_init_lock);}}}static mut PREV_SHMEM_STARTUP_HOOK: Option<unsafe extern "C" fn()> = None;let hook_lock = HOOK_LOCK.lock().unwrap();PREV_SHMEM_STARTUP_HOOK = pg_sys::shmem_startup_hook;pg_sys::shmem_startup_hook = Some(shmem_hook);#[pg_guard]extern "C" fn shmem_hook() {unsafe {if let Some(i) = PREV_SHMEM_STARTUP_HOOK {i();}let mut found = false;let addin_shmem_init_lock: *mut pg_sys::LWLock =&mut (*pg_sys::MainLWLockArray.add(21)).lock;pg_sys::LWLockAcquire(addin_shmem_init_lock, pg_sys::LWLockMode_LW_EXCLUSIVE);let shm_name =std::ffi::CString::new(SHMEM_BLOCKS.get_name()).expect("CString::new failed");let fv_shmem = pg_sys::ShmemInitStruct(shm_name.into_raw(),SHMEM_BLOCKS.get_size()* std::mem::size_of::<PgLwLock<FiniteDimAllocator<MAX_SPACE>>>(),&mut found,) as *mut PgLwLock<FiniteDimAllocator<MAX_SPACE>>;SHMEM_BLOCKS.inner.set(ShMemBlocksInner { start: fv_shmem }).expect("Failed to set inner shmem");for offset in 0..SHMEM_BLOCKS.get_size() {let lock = LOCKS.lock().expect("Locks should be set").pop().expect("Locks should be initialised at this point");*lock.exclusive() = FiniteDimAllocator::<MAX_SPACE>::default();println!("written!");std::ptr::write(fv_shmem.add(offset), lock);let lock_ref = fv_shmem.add(offset).as_ref().unwrap();println!("{:?}", *lock_ref.share())}pg_sys::LWLockRelease(addin_shmem_init_lock);}}}}#[pg_extern]fn set_allocator_dim(block: i32, dimemsion: i16) {let error = format!("Block not found, only {:?}", SHMEM_BLOCKS.get_size());let block = SHMEM_BLOCKS.get_block(block as usize).expect(&error);block.exclusive().set_dim(dimemsion as u16)}#[pg_extern]fn fill_vec_allocator(block: i32, vecs: i32) {let mut rng = thread_rng();let unif = Uniform::new(0.0, 1.0);set_allocator_dim(block, 512);(0..vecs).for_each(|_| {let vec = (0..512).map(|_| rng.sample(unif)).collect::<Vec<_>>();SHMEM_BLOCKS.get_block(block as usize).unwrap().exclusive().add_vec(&vec);});}#[pg_extern]fn push_vec_to_mem(block: i32, vec: Array<f32>) {let vec = vec.into_iter().map(|x| x.unwrap_or(0.0)).collect::<Vec<_>>();SHMEM_BLOCKS.get_block(block as usize).unwrap().exclusive().add_vec(&vec);}#[pg_extern]fn query_vec(block: i32,vec: Array<f32>,ids: Array<i64>,) -> impl Iterator<Item = (name!(index, i64), name!(score, f32))> {let query_ids = ids.iter().filter(|x| x.is_some()).map(|x| x.unwrap() as usize).collect::<Vec<usize>>();let query_vec = vec.into_iter().map(|x| x.unwrap_or(0.0)).collect();let return_scores = SHMEM_BLOCKS.get_block(block as usize).unwrap().share().ip_many(&query_vec, &query_ids).unwrap();return_scores.iter().zip(query_ids.iter()).map(|(score, index)| (*index as i64, *score)).collect::<Vec<_>>().into_iter()}#[cfg(test)]pub mod pg_test {pub fn setup(_options: Vec<&str>) {// perform one-off initialization when the pg_test framework starts}pub fn postgresql_conf_options() -> Vec<&'static str> {// return any postgresql.conf settings that are required for your testsvec![]}}
use crate::vec_block_alloc::F32Block;use crate::FiniteDimAllocator;use pgx::log;use pgx::pg_sys;use pgx::PgSharedMemoryInitialization;use pgx::{pg_guard, pg_shmem_init};use pgx::{PgLwLock, PgLwLockInner};use std::lazy::OnceCell;use uuid::Uuid;pub struct ShMemBlocks<const BLOCK_SIZE: usize> {size: OnceCell<usize>,name: OnceCell<&'static str>,pub inner: OnceCell<ShMemBlocksInner<BLOCK_SIZE>>,}unsafe impl<const BLOCK_SIZE: usize> Send for ShMemBlocks<BLOCK_SIZE> {}unsafe impl<const BLOCK_SIZE: usize> Sync for ShMemBlocks<BLOCK_SIZE> {}#[derive(Debug)]pub struct ShMemBlocksInner<const BLOCK_SIZE: usize> {pub start: *mut PgLwLock<FiniteDimAllocator<BLOCK_SIZE>>,}impl<const BLOCK_SIZE: usize> ShMemBlocks<BLOCK_SIZE> {pub const fn new() -> Self {ShMemBlocks {size: OnceCell::new(),name: OnceCell::new(),inner: OnceCell::new(),}}pub fn get_name(&self) -> &'static str {match self.name.get() {None => {let name = Box::leak(Uuid::new_v4().to_string().into_boxed_str());self.name.set(name).unwrap();name}Some(name) => name,}}pub fn get_size(&self) -> usize {match self.size.get() {None => {let size = 1024; // todo: replace with GucStuffself.size.set(size).unwrap();size}Some(size) => *size,}}pub fn get_block(&self, index: usize) -> Option<&PgLwLock<FiniteDimAllocator<BLOCK_SIZE>>> {if index > self.get_size() {None} else {// This is guaranteed to be safe because we initialise all addresses at startupself.inner.get().map(|inner| unsafe { inner.get_block(index) }).flatten()}}pub fn iter<'a>(&'a self,) -> impl Iterator<Item = &PgLwLock<FiniteDimAllocator<BLOCK_SIZE>>> + 'a {let size = self.get_size();(0..size).filter_map(move |index| self.get_block(index))}}impl<const BLOCK_SIZE: usize> ShMemBlocksInner<BLOCK_SIZE> {unsafe fn get_block(&self, index: usize) -> Option<&PgLwLock<FiniteDimAllocator<BLOCK_SIZE>>> {self.start.add(index).as_ref()}}
use memoffset::*;use pgx::pg_sys::AsPgCStr;use pgx::*;use serde::Serialize;static mut RELOPT_KIND_PGVEC: u32 = 0; // Do I like this? Oh no I do not!#[derive(Clone, Debug, Serialize)]pub struct VecMemCacheOptions {/* varlena header (do not touch directly!) */#[allow(dead_code)]vl_len_: i32,dimension: u32,something: u32,}const DEFAULT_DIM: u32 = 512;#[allow(dead_code)]impl VecMemCacheOptions {pub(crate) fn from_relation(relation: &PgRelation) -> PgBox<VecMemCacheOptions> {if relation.rd_index.is_null() {panic!("'{} not a vector memcache index' ", relation.name())} else if relation.rd_options.is_null() {// use defaultslet mut opts = PgBox::<VecMemCacheOptions>::alloc0();opts.dimension = DEFAULT_DIM;opts} else {PgBox::from_pg(relation.rd_options as *mut VecMemCacheOptions)}}pub(crate) fn get_dim(&self) -> u16 {self.dimension as u16}}#[pg_guard]pub unsafe extern "C" fn vec_mem_cache_options(reloptions: pg_sys::Datum,validate: bool,) -> *mut pg_sys::varlena {let tab: [pg_sys::relopt_parse_elt; NUM_REL_OPTS] = [pg_sys::relopt_parse_elt {optname: "dimension".as_pg_cstr(),opttype: pg_sys::relopt_type_RELOPT_TYPE_INT,offset: offset_of!(VecMemCacheOptions, dimension) as i32,},pg_sys::relopt_parse_elt {optname: "something".as_pg_cstr(),opttype: pg_sys::relopt_type_RELOPT_TYPE_INT,offset: offset_of!(VecMemCacheOptions, something) as i32,},];build_relopts(reloptions, validate, tab)}// impl IntoDatum for *mut pg_sys::bytea {}const NUM_REL_OPTS: usize = 2;unsafe fn build_relopts(reloptions: pg_sys::Datum,validate: bool,tab: [pg_sys::relopt_parse_elt; NUM_REL_OPTS],) -> *mut pg_sys::bytea {let rdopts;/* Parse the user-given reloptions */rdopts = pg_sys::build_reloptions(reloptions,validate,RELOPT_KIND_PGVEC,std::mem::size_of::<VecMemCacheOptions>(),tab.as_ptr(),NUM_REL_OPTS as i32,);rdopts as *mut pg_sys::bytea}#[cfg(test)]mod test {}
// use crate::mem_cache_index::build::row_to_vec_iter;use pgx::*;pub unsafe extern "C" fn aminsert(index_relation: pg_sys::Relation,values: *mut pg_sys::Datum,_isnull: *mut bool,heap_tid: pg_sys::ItemPointer,_heap_relation: pg_sys::Relation,_check_unique: pg_sys::IndexUniqueCheck,_index_info: *mut pg_sys::IndexInfo,) -> bool {let index_relation = PgRelation::from_pg(index_relation);let values = std::slice::from_raw_parts(values, 1);let name = index_relation.name().bytes();// todo: try to get a pointer to the name as a sequence of i8 bytes.// let tupdesc = pg_sys::RelationNameGetTupleDesc(name);// let vec = row_to_vec_iter(tupdesc, values[0]).unwrap();true}