IFWHYAZTPKL7ZVOV3AFZNIUTRLMVSOACYACA3CI4DSEEZU77YQEQC
use pgx::extension_sql;
use pgx::*;
const TOL: f32 = 1.0e-8;
#[pg_extern]
fn cosine_similarity(a: Array<f32>, b: Array<f32>) -> f32 {
let mut a_norm_sq = 0.0;
let mut b_norm_sq = 0.0;
let a_norm_sq_ref = &mut a_norm_sq;
let b_norm_sq_ref = &mut b_norm_sq;
let mut sq_sum: f32 = a
.iter()
.zip(b.iter())
.map(|pair| unwrap_pair(pair))
.map(|(a, b)| {
*a_norm_sq_ref += a * a;
*b_norm_sq_ref += b * b;
a * b
})
.sum();
sq_sum /= TOL + a_norm_sq.sqrt();
sq_sum /= TOL + b_norm_sq.sqrt();
sq_sum.min(1.).max(-1.0)
}
#[pg_extern]
fn l2_dist(a: Array<f32>, b: Array<f32>) -> f32 {
let sq: f32 = a
.iter()
.zip(b.iter())
.map(|pair| unwrap_pair(pair))
.map(|(a, b)| (a - b) * (a - b))
.sum();
sq.sqrt()
}
fn unwrap_pair((a, b): (Option<f32>, Option<f32>)) -> (f32, f32) {
(a.unwrap_or(0.0), b.unwrap_or(0.0))
}
#[pg_extern]
fn ip(a: Array<f32>, b: Array<f32>) -> f32 {
a.iter()
.zip(b.iter())
.map(|pair| unwrap_pair(pair))
.map(|(a, b)| a * b)
.sum()
}
///extension_sql! {r#"
///CREATE OPERATOR pg_catalog.<|> (
/// PROCEDURE = ip,
/// RESTRICT = restrict,
/// LEFTARG = real[],
/// RIGHTARG = real[],
/// COMMUTATOR = '<|>'
///);
///
///CREATE OPERATOR pg_catalog.|-| (
/// PROCEDURE = l2_dist,
/// RESTRICT = restrict,
/// LEFTARG = real[],
/// RIGHTARG = real[],
/// COMMUTATOR = '<->'
///);
///
///CREATE OPERATOR pg_catalog.<~> (
/// PROCEDURE = cosine_similarity,
/// RESTRICT = restrict,
/// LEFTARG = real[],
/// RIGHTARG = real[],
/// COMMUTATOR = '<~>'
///);
///
///
///CREATE OPERATOR CLASS vector_ops DEFAULT FOR TYPE real[] USING vector AS
/// OPERATOR 1 pg_catalog.<|>(real[]),
/// OPERATOR 2 pg_catalog.|-|(real[]),
/// OPERATOR 3 pg_catalog.<~>(real[]),
/// STORAGE real[];
///"#}
#[cfg(any(test, feature = "pg_test"))]
mod tests {
use crate::vector_query::opclass::l2_dist;
use crate::Vector;
use core::mem::ManuallyDrop;
use pgx::*;
#[pg_test]
fn test_hello_pgvector_rs() {
let arr0 = (0..32)
.map(|x| x as f32)
.map(|x| x.sin())
.collect::<Vec<_>>();
let mut arr = arr0
.iter()
.map(|x| x.into_datum())
.map(|x| x.unwrap())
.collect::<Vec<pg_sys::Datum>>();
let mut nulls = arr.iter().map(|_| false).collect::<Vec<bool>>();
let mut arr_2 = arr.clone();
let mut nulls_2 = nulls.clone();
println!("{:?}", arr);
let vec1 = unsafe {
let size = arr.len();
let b = arr.as_mut_ptr();
println!("{:?}", b);
Array::over(b, nulls.as_mut_ptr(), size)
};
vec1.iter().for_each(|x| {
println!("{:?}", x);
});
let vec2 = unsafe {
let size = arr_2.len();
Array::over(arr_2.as_mut_ptr(), nulls_2.as_mut_ptr(), size)
};
vec2.iter().zip(arr0.iter()).for_each(|(x, y)| {
println!("{:?}", x.unwrap() - y);
});
let d = l2_dist(vec1, vec2);
println!("distance: {}", d);
assert!(false);
}
}
mod opclass;
fn yolo() {
panic!("ded!")
}
{"rustc_fingerprint":16809209860643795384,"outputs":{"2797684049618456168":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n","stderr":""},"11097456331220016911":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n/Users/jan/.rustup/toolchains/nightly-x86_64-apple-darwin\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"\"\ntarget_family=\"unix\"\ntarget_feature=\"cmpxchg16b\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_feature=\"sse3\"\ntarget_feature=\"ssse3\"\ntarget_has_atomic=\"128\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_has_atomic_equal_alignment=\"16\"\ntarget_has_atomic_equal_alignment=\"32\"\ntarget_has_atomic_equal_alignment=\"64\"\ntarget_has_atomic_equal_alignment=\"8\"\ntarget_has_atomic_equal_alignment=\"ptr\"\ntarget_has_atomic_load_store=\"128\"\ntarget_has_atomic_load_store=\"16\"\ntarget_has_atomic_load_store=\"32\"\ntarget_has_atomic_load_store=\"64\"\ntarget_has_atomic_load_store=\"8\"\ntarget_has_atomic_load_store=\"ptr\"\ntarget_os=\"macos\"\ntarget_pointer_width=\"64\"\ntarget_thread_local\ntarget_vendor=\"apple\"\nunix\n","stderr":""},"15262391909083379790":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n","stderr":""},"931469667778813386":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n/Users/jan/.rustup/toolchains/nightly-x86_64-apple-darwin\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"\"\ntarget_family=\"unix\"\ntarget_feature=\"cmpxchg16b\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_feature=\"sse3\"\ntarget_feature=\"ssse3\"\ntarget_has_atomic=\"128\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_has_atomic_equal_alignment=\"16\"\ntarget_has_atomic_equal_alignment=\"32\"\ntarget_has_atomic_equal_alignment=\"64\"\ntarget_has_atomic_equal_alignment=\"8\"\ntarget_has_atomic_equal_alignment=\"ptr\"\ntarget_has_atomic_load_store=\"128\"\ntarget_has_atomic_load_store=\"16\"\ntarget_has_atomic_load_store=\"32\"\ntarget_has_atomic_load_store=\"64\"\ntarget_has_atomic_load_store=\"8\"\ntarget_has_atomic_load_store=\"ptr\"\ntarget_os=\"macos\"\ntarget_pointer_width=\"64\"\ntarget_thread_local\ntarget_vendor=\"apple\"\nunix\n","stderr":""},"11593133297143080531":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n","stderr":""},"10624890571507989382":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n/Users/jan/.rustup/toolchains/nightly-x86_64-apple-darwin\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"\"\ntarget_family=\"unix\"\ntarget_feature=\"cmpxchg16b\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_feature=\"sse3\"\ntarget_feature=\"ssse3\"\ntarget_has_atomic=\"128\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_has_atomic_equal_alignment=\"16\"\ntarget_has_atomic_equal_alignment=\"32\"\ntarget_has_atomic_equal_alignment=\"64\"\ntarget_has_atomic_equal_alignment=\"8\"\ntarget_has_atomic_equal_alignment=\"ptr\"\ntarget_has_atomic_load_store=\"128\"\ntarget_has_atomic_load_store=\"16\"\ntarget_has_atomic_load_store=\"32\"\ntarget_has_atomic_load_store=\"64\"\ntarget_has_atomic_load_store=\"8\"\ntarget_has_atomic_load_store=\"ptr\"\ntarget_os=\"macos\"\ntarget_pointer_width=\"64\"\ntarget_thread_local\ntarget_vendor=\"apple\"\nunix\n","stderr":""},"17598535894874457435":{"success":true,"status":"","code":0,"stdout":"rustc 1.54.0-nightly (676ee1472 2021-05-06)\nbinary: rustc\ncommit-hash: 676ee14729462585b969bbc52f32c307403f4126\ncommit-date: 2021-05-06\nhost: x86_64-apple-darwin\nrelease: 1.54.0-nightly\nLLVM version: 12.0.0\n","stderr":""}},"successes":{}}
use pgx::extension_sql;
use pgx::*;
const TOL: f32 = 1.0e-8;
#[pg_extern]
fn cosine_similarity(a: Array<f32>, b: Array<f32>) -> f32 {
let mut a_norm_sq = 0.0;
let mut b_norm_sq = 0.0;
let a_norm_sq_ref = &mut a_norm_sq;
let b_norm_sq_ref = &mut b_norm_sq;
let mut sq_sum: f32 = a
.iter()
.zip(b.iter())
.map(|pair| unwrap_pair(pair))
.map(|(a, b)| {
*a_norm_sq_ref += a * a;
*b_norm_sq_ref += b * b;
a * b
})
.sum();
sq_sum /= TOL + a_norm_sq.sqrt();
sq_sum /= TOL + b_norm_sq.sqrt();
sq_sum.min(1.).max(-1.0)
}
#[pg_extern]
fn l2_dist(a: Array<f32>, b: Array<f32>) -> f32 {
let sq: f32 = a
.iter()
.zip(b.iter())
.map(|pair| unwrap_pair(pair))
.map(|(a, b)| (a - b) * (a - b))
.sum();
sq.sqrt()
}
fn unwrap_pair((a, b): (Option<f32>, Option<f32>)) -> (f32, f32) {
(a.unwrap_or(0.0), b.unwrap_or(0.0))
}
#[pg_extern]
fn ip(a: Array<f32>, b: Array<f32>) -> f32 {
a.iter()
.zip(b.iter())
.map(|pair| unwrap_pair(pair))
.map(|(a, b)| a * b)
.sum()
}
///extension_sql! {r#"
///CREATE OPERATOR pg_catalog.<|> (
/// PROCEDURE = ip,
/// RESTRICT = restrict,
/// LEFTARG = real[],
/// RIGHTARG = real[],
/// COMMUTATOR = '<|>'
///);
///
///CREATE OPERATOR pg_catalog.|-| (
/// PROCEDURE = l2_dist,
/// RESTRICT = restrict,
/// LEFTARG = real[],
/// RIGHTARG = real[],
/// COMMUTATOR = '<->'
///);
///
///CREATE OPERATOR pg_catalog.<~> (
/// PROCEDURE = cosine_similarity,
/// RESTRICT = restrict,
/// LEFTARG = real[],
/// RIGHTARG = real[],
/// COMMUTATOR = '<~>'
///);
///
///
///CREATE OPERATOR CLASS vector_ops DEFAULT FOR TYPE real[] USING vector AS
/// OPERATOR 1 pg_catalog.<|>(real[]),
/// OPERATOR 2 pg_catalog.|-|(real[]),
/// OPERATOR 3 pg_catalog.<~>(real[]),
/// STORAGE real[];
///"#}
#[cfg(any(test, feature = "pg_test"))]
mod tests {
use crate::vector_query::opclass::l2_dist;
use crate::Vector;
use core::mem::ManuallyDrop;
use pgx::*;
#[pg_test]
fn test_hello_pgvector_rs() {
let arr0 = (0..32)
.map(|x| x as f32)
.map(|x| x.sin())
.collect::<Vec<_>>();
let mut arr = arr0
.iter()
.map(|x| x.into_datum())
.map(|x| x.unwrap())
.collect::<Vec<pg_sys::Datum>>();
let mut nulls = arr.iter().map(|_| false).collect::<Vec<bool>>();
let mut arr_2 = arr.clone();
let mut nulls_2 = nulls.clone();
println!("{:?}", arr);
let vec1 = unsafe {
let size = arr.len();
let b = arr.as_mut_ptr();
println!("{:?}", b);
Array::over(b, nulls.as_mut_ptr(), size)
};
vec1.iter().for_each(|x| {
println!("{:?}", x);
});
let vec2 = unsafe {
let size = arr_2.len();
Array::over(arr_2.as_mut_ptr(), nulls_2.as_mut_ptr(), size)
};
vec2.iter().zip(arr0.iter()).for_each(|(x, y)| {
println!("{:?}", x.unwrap() - y);
});
let d = l2_dist(vec1, vec2);
println!("distance: {}", d);
assert!(false);
}
}
mod opclass;
fn yolo() {
panic!("ded!")
}
use pgx::PGXSharedMemory;
use pgx::PgBox;
use pgx::PgMemoryContexts;
use std::iter::repeat;
use thiserror::Error;
trait F32VecBlockData {}
trait I64BlockData {}
#[derive(Default, Debug)]
pub struct F32Block<const SIZE: usize>(heapless::Vec<f32, SIZE>);
unsafe impl<const SIZE: usize> PGXSharedMemory for F32Block<SIZE> {}
pub type BlockResult<T> = Result<T, F32BlockError>;
#[derive(Error, Debug, PartialEq, Eq)]
pub enum F32BlockError {
#[error("Trying to write over end of block: Start {write_start}, Length {write_length}, block ends at {block_end}")]
Overflow {
write_start: usize,
write_length: usize,
block_end: usize,
},
#[error("Trying to push past block limit")]
LimitExceded {
push_start: usize,
push_size: usize,
limit: usize,
},
#[error("Trying to free unallocated memory. Only allocated {data_len}. Segment to delete starts at {start}, and is of length {len}")]
DeleteEmpty {
data_len: usize,
start: usize,
len: usize,
},
#[error("Placeholder")]
PlaceHolder,
#[error(
"Trying to start at index {start} that is beyond the length of written data {data_len}"
)]
StartEmpty { data_len: usize, start: usize },
#[error("Only support pushing bounded iterators")]
PushUnboundedIter,
}
impl F32BlockError {
fn overflow(start: usize, length: usize, block: usize) -> F32BlockError {
F32BlockError::Overflow {
write_start: start,
write_length: length,
block_end: block,
}
}
fn limit_exceded(start: usize, length: usize, limit: usize) -> F32BlockError {
F32BlockError::LimitExceded {
push_start: start,
push_size: length,
limit,
}
}
}
struct Padded<'a> {
dimension: u16,
data: &'a [f32],
}
impl<'a> Padded<'a> {
fn iterator_with_dim(self) -> impl Iterator<Item = f32> + 'a {
self.data
.iter()
.map(|x| *x)
.chain(repeat(0.0))
.take(self.dimension as usize)
}
}
/// This will write over a piece of the block
impl<const SIZE: usize> F32Block<SIZE> {
fn write_slice<'a, 'b>(&'a mut self, vec: &'b [f32], start: usize) -> BlockResult<()> {
let vec_len = vec.len();
let data_len = self.0.len();
let end = vec_len + start;
if end <= data_len {
for (new_val, old_val) in vec.iter().zip(self.0[start..].iter_mut()) {
*old_val = *new_val;
}
Ok(())
} else {
Err(F32BlockError::overflow(start, vec_len, data_len))
}
}
fn write_iter<IterType: Iterator<Item = f32>>(
&mut self,
start: usize,
iter: IterType,
) -> BlockResult<()> {
if let (_, Some(upper_bound)) = iter.size_hint() {
let data_len = self.0.len();
if start + upper_bound > data_len {
for (old_val, new_val) in self.0[start..].iter_mut().zip(iter) {
*old_val = new_val;
}
Ok(())
} else {
Err(F32BlockError::overflow(start, upper_bound, data_len))
}
} else {
Err(F32BlockError::PushUnboundedIter)
}
}
fn push_slice<'a, 'b>(&'a mut self, vec: &'b [f32]) -> BlockResult<()> {
let data_len = self.0.len();
let vec_len = vec.len();
self.0
.extend_from_slice(vec)
.map_err(|_| F32BlockError::limit_exceded(data_len, vec_len, SIZE))
}
/// We allow pushing iterators, but we require an upper bound for the size hint of the
/// iterator.
fn push_iter<IterType: Iterator<Item = f32>>(&mut self, iter: IterType) -> BlockResult<()> {
if let (_, Some(upper_bound)) = iter.size_hint() {
let data_len = self.0.len();
if data_len + upper_bound <= SIZE {
iter.map(|el| self.0.push(el))
.collect::<Result<Vec<_>, _>>()
.map_err(|_| F32BlockError::PushUnboundedIter)
.map(|_| ())
} else {
Err(F32BlockError::limit_exceded(data_len, upper_bound, SIZE))
}
} else {
Err(F32BlockError::PushUnboundedIter)
}
}
fn free(&mut self, start: usize, len: usize) -> Result<(), F32BlockError> {
let mut end = start + len;
let data_len = self.0.len();
if end > data_len {
Err(F32BlockError::overflow(start, len, data_len))
} else {
let mut loop_start = start;
loop {
let (initial, tail) = self.0.split_at_mut(end);
for (to_delete, to_assign) in initial[loop_start..].iter_mut().zip(tail.iter()) {
*to_delete = *to_assign;
}
end += len;
loop_start += len;
if end >= data_len {
break;
}
}
self.0.truncate(data_len - len);
Ok(())
}
}
fn read_data(&self, start: usize, len: usize) -> BlockResult<&[f32]> {
let data_len = self.0.len();
let end = start + len;
if end > data_len {
Err(F32BlockError::overflow(start, len, SIZE))
} else {
Ok(&self.0[start..end])
}
}
}
#[cfg(test)]
mod f32_block_tests {
const TOL: f32 = 1.0e-7;
const BLOCK_SIZE: usize = 8192;
use crate::vec_block_alloc::F32Block;
use crate::vec_block_alloc::F32BlockError;
/// test_push: given block of finite size, test that we can push a vector of finite dim
/// test_drop:
/// test_compact:
#[test]
fn test_write() {
let good_start = 0;
let vec_size = 512;
let end = vec_size * 2;
let bad_start = BLOCK_SIZE - (vec_size / 2);
let vec = (0..vec_size).map(|x| (x as f32).sin()).collect::<Vec<_>>();
let mut mem = F32Block::<BLOCK_SIZE>::default();
mem.push_iter((0..end).map(|x| (x as f32))).unwrap();
mem.write_slice(&vec, good_start).unwrap();
let read = mem.read_data(good_start, vec_size).unwrap();
let diff: f32 = read
.iter()
.zip(vec.iter())
.map(|(x, y)| (x - y).abs())
.sum();
assert!(diff <= TOL);
assert_eq!(
mem.write_slice(&vec, bad_start).unwrap_err(),
F32BlockError::overflow(bad_start, vec_size, end)
);
assert_eq!(
mem.write_slice(&vec, BLOCK_SIZE + vec_size).unwrap_err(),
F32BlockError::overflow(BLOCK_SIZE + vec_size, vec_size, end)
);
}
#[test]
fn test_push() {
let mut mem = F32Block::<BLOCK_SIZE>::default();
let vec_size = 512;
let vec = (0..vec_size).map(|x| (x as f32).sin()).collect::<Vec<_>>();
let pushes = (0..20)
.map(|_| mem.push_slice(&vec))
.collect::<Result<Vec<()>, F32BlockError>>();
assert!(pushes.is_err());
}
#[test]
fn test_delete() {
let good_start = 1024; // something less than BLOCK_SIZE;
let len = good_start * 1024; // something less than BLOCK_SIZE - good_start;
let mut mem = F32Block::<BLOCK_SIZE>::default();
assert_eq!(
mem.free(good_start, len).unwrap_err(),
F32BlockError::overflow(good_start, len, BLOCK_SIZE)
);
mem.push_iter((0..len).map(|x| (x as f32).cos())).unwrap();
assert!(mem.free(good_start, len).is_ok());
assert!(mem.free(good_start, len).is_err());
}
}
/// Later we can replace the individual fields with a seq of blocks
#[derive(Default, Debug)]
pub struct FiniteDimAllocator<const BlockSize: usize> {
dimension: u16,
len: usize,
f32_data: F32Block<BlockSize>,
}
unsafe impl<const Size: usize> PGXSharedMemory for FiniteDimAllocator<Size> {}
impl<const BlockSize: usize> FiniteDimAllocator<BlockSize> {
fn new(dimension: u16) -> FiniteDimAllocator<BlockSize> {
FiniteDimAllocator {
dimension,
len: 0,
f32_data: F32Block::<BlockSize>::default(),
}
}
pub fn is_set(&self) -> bool {
self.dimension > 0
}
pub fn set_dim(&mut self, dimension: u16) {
self.dimension = dimension
}
fn dim(&self) -> usize {
self.dimension as usize
}
fn end(&self) -> usize {
self.len * (self.dimension as usize)
}
pub fn add_vec(&mut self, vec: &[f32]) -> Result<(), F32BlockError> {
//
// Replace with custom error type
let padded = Padded {
dimension: self.dimension,
data: vec,
};
self.f32_data.push_iter(padded.iterator_with_dim())?;
self.len += 1;
Ok(())
}
fn remove_vec(&mut self, index: usize) -> Result<(), F32BlockError> {
if index >= self.len {
Err(F32BlockError::PlaceHolder)
} else {
let dimension = self.dim();
let slice_start = index * dimension;
let slice_end = slice_start + dimension;
self.f32_data.free(slice_start, slice_end)?;
self.len -= 1;
Ok(())
}
}
pub fn get_slice(&self, index: usize) -> Result<&[f32], F32BlockError> {
let dimension = self.dim();
let slice_start = dimension * index;
let slice_end = slice_start + dimension;
self.f32_data.read_data(slice_start, dimension)
}
fn pad<'b>(&'_ self, data: &'b [f32]) -> Padded<'b> {
Padded {
dimension: self.dimension,
data,
}
}
pub fn update_vec(&mut self, index: usize, data: &[f32]) -> Result<(), F32BlockError> {
let padded = self.pad(data);
self.f32_data
.write_iter(index * self.dim(), padded.iterator_with_dim())?;
Ok(())
}
// todo SIMD????
pub fn ip_many(&self, vec: &Vec<f32>, indices: &Vec<usize>) -> Result<Vec<f32>, F32BlockError> {
indices
.iter()
.map(|&index| {
let data = self.get_slice(index)?;
Ok(data.iter().zip(vec.iter()).map(|(&a, &b)| a * b).sum())
})
.collect::<Result<Vec<f32>, F32BlockError>>()
}
}
#[cfg(test)]
mod test_finite_dim_allocator {
use crate::util::random_vecs;
use crate::vec_block_alloc::F32BlockError;
use crate::vec_block_alloc::FiniteDimAllocator;
#[test]
fn test_insert() {
const SIZE: usize = 100000;
let dim: u16 = 32;
let mut allocator = FiniteDimAllocator::<SIZE>::new(dim);
let allocator_borr = &mut allocator;
random_vecs(dim, 100)
.map(|vec| allocator_borr.add_vec(&vec))
.collect::<Result<Vec<()>, F32BlockError>>()
.unwrap();
}
#[test]
fn print_size() {
const SIZE: usize = 523934;
let size = std::mem::size_of::<FiniteDimAllocator<SIZE>>();
print!("Size of allocator: {}", size);
assert!(false);
}
}
const PI: f32 = 3.141592653;
const TAU: f32 = 2.0 * PI;
pub fn random_vecs(dim: u16, n: i32) -> impl std::iter::Iterator<Item = Vec<f32>> {
(0..n)
.map(|i| (i, rand::random::<f32>() * TAU))
.map(move |(i, offset)| {
let vec = (0..dim)
.map(|index| offset as f32 + TAU * ((index as f32) / 16.0))
.map(|x| x.sin())
.collect::<Vec<f32>>();
(i, vec)
})
.map(|(_, vec)| vec)
}
{"rustc_fingerprint":16809209860643795384,"outputs":{"2797684049618456168":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n","stderr":""},"11097456331220016911":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n/Users/jan/.rustup/toolchains/nightly-x86_64-apple-darwin\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"\"\ntarget_family=\"unix\"\ntarget_feature=\"cmpxchg16b\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_feature=\"sse3\"\ntarget_feature=\"ssse3\"\ntarget_has_atomic=\"128\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_has_atomic_equal_alignment=\"16\"\ntarget_has_atomic_equal_alignment=\"32\"\ntarget_has_atomic_equal_alignment=\"64\"\ntarget_has_atomic_equal_alignment=\"8\"\ntarget_has_atomic_equal_alignment=\"ptr\"\ntarget_has_atomic_load_store=\"128\"\ntarget_has_atomic_load_store=\"16\"\ntarget_has_atomic_load_store=\"32\"\ntarget_has_atomic_load_store=\"64\"\ntarget_has_atomic_load_store=\"8\"\ntarget_has_atomic_load_store=\"ptr\"\ntarget_os=\"macos\"\ntarget_pointer_width=\"64\"\ntarget_thread_local\ntarget_vendor=\"apple\"\nunix\n","stderr":""},"15262391909083379790":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n","stderr":""},"931469667778813386":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n/Users/jan/.rustup/toolchains/nightly-x86_64-apple-darwin\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"\"\ntarget_family=\"unix\"\ntarget_feature=\"cmpxchg16b\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_feature=\"sse3\"\ntarget_feature=\"ssse3\"\ntarget_has_atomic=\"128\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_has_atomic_equal_alignment=\"16\"\ntarget_has_atomic_equal_alignment=\"32\"\ntarget_has_atomic_equal_alignment=\"64\"\ntarget_has_atomic_equal_alignment=\"8\"\ntarget_has_atomic_equal_alignment=\"ptr\"\ntarget_has_atomic_load_store=\"128\"\ntarget_has_atomic_load_store=\"16\"\ntarget_has_atomic_load_store=\"32\"\ntarget_has_atomic_load_store=\"64\"\ntarget_has_atomic_load_store=\"8\"\ntarget_has_atomic_load_store=\"ptr\"\ntarget_os=\"macos\"\ntarget_pointer_width=\"64\"\ntarget_thread_local\ntarget_vendor=\"apple\"\nunix\n","stderr":""},"11593133297143080531":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n","stderr":""},"10624890571507989382":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n/Users/jan/.rustup/toolchains/nightly-x86_64-apple-darwin\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"\"\ntarget_family=\"unix\"\ntarget_feature=\"cmpxchg16b\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_feature=\"sse3\"\ntarget_feature=\"ssse3\"\ntarget_has_atomic=\"128\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_has_atomic_equal_alignment=\"16\"\ntarget_has_atomic_equal_alignment=\"32\"\ntarget_has_atomic_equal_alignment=\"64\"\ntarget_has_atomic_equal_alignment=\"8\"\ntarget_has_atomic_equal_alignment=\"ptr\"\ntarget_has_atomic_load_store=\"128\"\ntarget_has_atomic_load_store=\"16\"\ntarget_has_atomic_load_store=\"32\"\ntarget_has_atomic_load_store=\"64\"\ntarget_has_atomic_load_store=\"8\"\ntarget_has_atomic_load_store=\"ptr\"\ntarget_os=\"macos\"\ntarget_pointer_width=\"64\"\ntarget_thread_local\ntarget_vendor=\"apple\"\nunix\n","stderr":""},"17598535894874457435":{"success":true,"status":"","code":0,"stdout":"rustc 1.54.0-nightly (676ee1472 2021-05-06)\nbinary: rustc\ncommit-hash: 676ee14729462585b969bbc52f32c307403f4126\ncommit-date: 2021-05-06\nhost: x86_64-apple-darwin\nrelease: 1.54.0-nightly\nLLVM version: 12.0.0\n","stderr":""}},"successes":{}}
use crate::vec_block_alloc::F32Block;
use crate::FiniteDimAllocator;
use pgx::log;
use pgx::pg_sys;
use pgx::PgSharedMemoryInitialization;
use pgx::{pg_guard, pg_shmem_init};
use pgx::{PgLwLock, PgLwLockInner};
use std::lazy::OnceCell;
use uuid::Uuid;
pub struct ShMemBlocks<const BLOCK_SIZE: usize> {
size: OnceCell<usize>,
name: OnceCell<&'static str>,
pub inner: OnceCell<ShMemBlocksInner<BLOCK_SIZE>>,
}
unsafe impl<const BLOCK_SIZE: usize> Send for ShMemBlocks<BLOCK_SIZE> {}
unsafe impl<const BLOCK_SIZE: usize> Sync for ShMemBlocks<BLOCK_SIZE> {}
#[derive(Debug)]
pub struct ShMemBlocksInner<const BLOCK_SIZE: usize> {
pub start: *mut PgLwLock<FiniteDimAllocator<BLOCK_SIZE>>,
}
impl<const BLOCK_SIZE: usize> ShMemBlocks<BLOCK_SIZE> {
pub const fn new() -> Self {
ShMemBlocks {
size: OnceCell::new(),
name: OnceCell::new(),
inner: OnceCell::new(),
}
}
pub fn get_name(&self) -> &'static str {
match self.name.get() {
None => {
let name = Box::leak(Uuid::new_v4().to_string().into_boxed_str());
self.name.set(name).unwrap();
name
}
Some(name) => name,
}
}
pub fn get_size(&self) -> usize {
match self.size.get() {
None => {
let size = 1024; // todo: replace with GucStuff
self.size.set(size).unwrap();
size
}
Some(size) => *size,
}
}
pub fn get_block(&self, index: usize) -> Option<&PgLwLock<FiniteDimAllocator<BLOCK_SIZE>>> {
if index > self.get_size() {
None
} else {
// This is guaranteed to be safe because we initialise all addresses at startup
self.inner
.get()
.map(|inner| unsafe { inner.get_block(index) })
.flatten()
}
}
pub fn iter<'a>(
&'a self,
) -> impl Iterator<Item = &PgLwLock<FiniteDimAllocator<BLOCK_SIZE>>> + 'a {
let size = self.get_size();
(0..size).filter_map(move |index| self.get_block(index))
}
}
impl<const BLOCK_SIZE: usize> ShMemBlocksInner<BLOCK_SIZE> {
unsafe fn get_block(&self, index: usize) -> Option<&PgLwLock<FiniteDimAllocator<BLOCK_SIZE>>> {
self.start.add(index).as_ref()
}
}
use memoffset::*;
use pgx::pg_sys::AsPgCStr;
use pgx::*;
use serde::Serialize;
static mut RELOPT_KIND_PGVEC: u32 = 0; // Do I like this? Oh no I do not!
#[derive(Clone, Debug, Serialize)]
pub struct VecMemCacheOptions {
/* varlena header (do not touch directly!) */
#[allow(dead_code)]
vl_len_: i32,
dimension: u32,
something: u32,
}
const DEFAULT_DIM: u32 = 512;
#[allow(dead_code)]
impl VecMemCacheOptions {
pub(crate) fn from_relation(relation: &PgRelation) -> PgBox<VecMemCacheOptions> {
if relation.rd_index.is_null() {
panic!("'{} not a vector memcache index' ", relation.name())
} else if relation.rd_options.is_null() {
// use defaults
let mut opts = PgBox::<VecMemCacheOptions>::alloc0();
opts.dimension = DEFAULT_DIM;
opts
} else {
PgBox::from_pg(relation.rd_options as *mut VecMemCacheOptions)
}
}
pub(crate) fn get_dim(&self) -> u16 {
self.dimension as u16
}
}
#[pg_guard]
pub unsafe extern "C" fn vec_mem_cache_options(
reloptions: pg_sys::Datum,
validate: bool,
) -> *mut pg_sys::varlena {
let tab: [pg_sys::relopt_parse_elt; NUM_REL_OPTS] = [
pg_sys::relopt_parse_elt {
optname: "dimension".as_pg_cstr(),
opttype: pg_sys::relopt_type_RELOPT_TYPE_INT,
offset: offset_of!(VecMemCacheOptions, dimension) as i32,
},
pg_sys::relopt_parse_elt {
optname: "something".as_pg_cstr(),
opttype: pg_sys::relopt_type_RELOPT_TYPE_INT,
offset: offset_of!(VecMemCacheOptions, something) as i32,
},
];
build_relopts(reloptions, validate, tab)
}
// impl IntoDatum for *mut pg_sys::bytea {}
const NUM_REL_OPTS: usize = 2;
unsafe fn build_relopts(
reloptions: pg_sys::Datum,
validate: bool,
tab: [pg_sys::relopt_parse_elt; NUM_REL_OPTS],
) -> *mut pg_sys::bytea {
let rdopts;
/* Parse the user-given reloptions */
rdopts = pg_sys::build_reloptions(
reloptions,
validate,
RELOPT_KIND_PGVEC,
std::mem::size_of::<VecMemCacheOptions>(),
tab.as_ptr(),
NUM_REL_OPTS as i32,
);
rdopts as *mut pg_sys::bytea
}
#[cfg(test)]
mod test {}
pub mod build;
mod insert;
pub mod options;
use pgx::*;
#[pg_guard]
pub extern "C" fn amvalidate(_opclassoid: pg_sys::Oid) -> bool {
true
}
/// ```sql
/// CREATE OR REPLACE FUNCTION amhandler(internal) RETURNS index_am_handler PARALLEL SAFE IMMUTABLE STRICT COST 0.0001 LANGUAGE c AS 'MODULE_PATHNAME', '@FUNCTION_NAME@';
/// CREATE ACCESS METHOD vector TYPE INDEX HANDLER amhandler;
/// ```
#[pg_extern]
pub fn amhandler(_fcinfi: pg_sys::FunctionCallInfo) -> PgBox<pg_sys::IndexAmRoutine> {
let mut amroutine =
PgBox::<pg_sys::IndexAmRoutine>::alloc_node(pg_sys::NodeTag_T_IndexAmRoutine);
amroutine.amstrategies = 0;
amroutine.amsupport = 4;
amroutine.amcanorderbyop = true;
amroutine.amoptionalkey = true;
amroutine.amkeytype = pg_sys::InvalidOid;
amroutine.ambuild = Some(build::ambuild);
amroutine.ambuildempty = Some(build::vec_mem_cache_buildempty);
amroutine.aminsert = None; // todo!();
amroutine.ambulkdelete = None; // todo!();
amroutine.amvacuumcleanup = None; // todo!();
amroutine.amcostestimate = None; //todo!();
amroutine.amoptions = Some(options::vec_mem_cache_options);
amroutine.amvalidate = Some(amvalidate);
amroutine.ambeginscan = None; //todo!();
amroutine.amrescan = None; //todo!();
amroutine.amgettuple = None; //todo!();
amroutine.amendscan = None; //todo!();
amroutine
}
#[cfg(test)]
mod test {
// We want to try out something like
// ```sql
// CREATE TABLE public.vecs_2
//(
// idx bigserial,
// vec real[] CONSTRAINT vecs_2_dim512 CHECK (cardinality(vec) = 512),
// PRIMARY KEY (idx)
//);
//
// ```
fn test_constraints() {
todo!() // YOLO
}
}
// use crate::mem_cache_index::build::row_to_vec_iter;
use pgx::*;
pub unsafe extern "C" fn aminsert(
index_relation: pg_sys::Relation,
values: *mut pg_sys::Datum,
_isnull: *mut bool,
heap_tid: pg_sys::ItemPointer,
_heap_relation: pg_sys::Relation,
_check_unique: pg_sys::IndexUniqueCheck,
_index_info: *mut pg_sys::IndexInfo,
) -> bool {
let index_relation = PgRelation::from_pg(index_relation);
let values = std::slice::from_raw_parts(values, 1);
let name = index_relation.name().bytes();
// todo: try to get a pointer to the name as a sequence of i8 bytes.
// let tupdesc = pg_sys::RelationNameGetTupleDesc(name);
// let vec = row_to_vec_iter(tupdesc, values[0]).unwrap();
true
}
use super::options;
use crate::vec_block_alloc;
use crate::VECTOR_ALLOCATOR00;
use pgx::*;
pub extern "C" fn ambuild(
heaprel: pg_sys::Relation,
indexrel: pg_sys::Relation,
index_info: *mut pg_sys::IndexInfo,
) -> *mut pg_sys::IndexBuildResult {
let mut vector_allocator = VECTOR_ALLOCATOR00.exclusive();
// If index is already set, then panic
if vector_allocator.is_set() {
panic!("Index already set. We've only got room for one in here")
}
// Instantiate vector mem cache
let index_relation = unsafe { PgRelation::from_pg(indexrel) };
let opts = options::VecMemCacheOptions::from_relation(&index_relation);
vector_allocator.set_dim(opts.get_dim());
// check that the indexrel has a realarray column and a bigint column
// do heap scan to load as much data as possible into cache.
let result = PgBox::<pg_sys::IndexBuildResult>::alloc0();
result.into_pg()
}
struct BuildState<'a> {
tupdesc: &'a PgTupleDesc<'a>,
dimension: u16,
// attributes: Vec<CategorizedAttribute<'a>>,
memcxt: PgMemoryContexts,
pub rows_added: u32,
}
impl<'a> BuildState<'a> {
fn new(
tupdesc: &'a PgTupleDesc,
dimension: u16,
// attributes: Vec<CategorizedAttribute<'a>>,
) -> Self {
BuildState {
tupdesc,
dimension,
memcxt: PgMemoryContexts::new("pgvec_rs context"),
rows_added: 0,
}
}
}
#[cfg(feature = "pg13")]
#[pg_guard]
unsafe extern "C" fn build_callback(
_index: pg_sys::Relation,
ctid: pg_sys::ItemPointer,
values: *mut pg_sys::Datum,
_isnull: *mut bool,
_tuple_is_alive: bool,
state: *mut std::os::raw::c_void,
) {
build_callback_internal(*ctid, values, state);
}
#[inline(always)]
unsafe extern "C" fn build_callback_internal(
ctid: pg_sys::ItemPointerData,
values: *mut pg_sys::Datum,
state: *mut std::os::raw::c_void,
) {
check_for_interrupts!();
let state = (state as *mut BuildState).as_mut().unwrap();
let old_context = state.memcxt.set_as_current();
// TODO: Add check for structure
let values = std::slice::from_raw_parts(values, 1);
let VecRow { id: _, data: vec } = row_to_vec_iter(&state.tupdesc, values[0]).unwrap();
let mut vec_allocator = VECTOR_ALLOCATOR00.exclusive();
vec_allocator.add_vec(&vec);
state.rows_added += 1;
}
fn do_heap_scan<'a>(
index_info: *mut pg_sys::IndexInfo,
heap_relation: &'a PgRelation,
index_relation: &'a PgRelation,
tupdesc: &'a PgTupleDesc,
) -> u32 {
// Should be able a to get a dimension from the description, but we'll hard code it now YOLO
let dimension = 512;
let mut state = BuildState::new(&tupdesc, dimension);
unsafe {
pg_sys::IndexBuildHeapScan(
heap_relation.as_ptr(),
index_relation.as_ptr(),
index_info,
Some(build_callback),
&mut state,
);
}
state.rows_added
}
#[derive(Default, Clone)]
struct VecRowPartial {
id: Option<i64>,
data: Option<Vec<f32>>,
}
impl VecRowPartial {
fn check(self) -> Result<VecRow, &'static str> {
match self {
VecRowPartial {
id: Some(id),
data: Some(data),
} => Ok(VecRow { id, data }),
VecRowPartial {
id: None,
data: Some(_),
} => Err("Missing Id for vec row"),
VecRowPartial {
id: Some(_),
data: None,
} => Err("Missing vector data for vec row"),
VecRowPartial {
id: None,
data: None,
} => Err("Missing all data for vector"),
}
}
}
struct VecRow {
id: i64,
data: Vec<f32>,
}
#[inline]
unsafe fn row_to_vec_iter<'a>(
tupdesc: &'a PgTupleDesc,
row: pg_sys::Datum,
) -> Result<VecRow, &'static str> {
let td = pg_sys::pg_detoast_datum(row as *mut pg_sys::varlena) as pg_sys::HeapTupleHeader;
let mut tmptup = pg_sys::HeapTupleData {
t_len: varsize(td as *mut pg_sys::varlena) as u32,
t_self: Default::default(),
t_tableOid: 0,
t_data: td,
};
let mut datums = vec![0 as pg_sys::Datum; tupdesc.natts as usize];
let mut nulls = vec![false; tupdesc.natts as usize];
pg_sys::heap_deform_tuple(
&mut tmptup,
tupdesc.as_ptr(),
datums.as_mut_ptr(),
nulls.as_mut_ptr(),
);
let mut drop_cnt = 0;
let mut vec_row = VecRowPartial::default();
tupdesc
.iter()
.map(|attribute| {
let is_dropped = attribute.is_dropped();
let array_type = unsafe { pg_sys::get_element_type(attribute.type_oid().value()) };
let (base_oid, is_array) = if array_type != pg_sys::InvalidOid {
(PgOid::from(array_type), true)
} else {
(attribute.type_oid(), false)
};
let typoid = base_oid;
let fill_process: Result<(), &'static str> = match &typoid {
PgOid::BuiltIn(builtin) => match (builtin, is_array) {
(PgBuiltInOids::FLOAT4OID, true) => {
if let None = vec_row.data {
let data =
unsafe { Vec::<f32>::from_datum(row, false, builtin.value()) }
.unwrap();
vec_row.data = Some(data);
Ok(())
} else {
Err("Received more than one vector data entries in row")
}
}
(PgBuiltInOids::INT8OID, false) => {
if let None = vec_row.id {
let id =
unsafe { i64::from_datum(row, false, builtin.value()) }.unwrap();
vec_row.id = Some(id);
Ok(())
} else {
Err("Received more than one id in the row")
}
}
(_, _) => {
Err("This row is not the right shape. It should be (bigint, real[])")
}
},
_ => Err("This element is not an accepted type"), // todo: communicate the right and wrong types
};
fill_process
})
.collect::<Result<Vec<()>, &'static str>>()?;
vec_row.check()
}
#[pg_guard]
pub extern "C" fn vec_mem_cache_buildempty(_index_relation: pg_sys::Relation) {}
#![feature(array_map)]
#![feature(once_cell)]
#![feature(const_extern_fn)]
pub mod mem_cache_index;
mod shmem_blocks;
mod util;
mod vec_block_alloc;
pub mod vector_query;
use crate::shmem_blocks::ShMemBlocksInner;
use pgx::*;
use rand::{distributions::Uniform, thread_rng, Rng};
use serde::*;
use std::fmt::format;
use std::sync::Arc;
use std::sync::Mutex;
use vec_block_alloc::FiniteDimAllocator;
#[macro_use]
extern crate lazy_static;
const MAX_SPACE: usize = 131_072;
lazy_static! {
static ref SHMEM_STEP: Arc<Mutex<usize>> = Arc::new(Mutex::new(0));
static ref LOCKS: Arc<Mutex<Vec<PgLwLock<FiniteDimAllocator<MAX_SPACE>>>>> =
Arc::new(Mutex::new(Vec::new()));
static ref HOOK_LOCK: Mutex<bool> = Mutex::new(false);
}
pg_module_magic!();
#[pg_extern]
fn hello_pgvector_rs() -> &'static str {
"Get ready to do some similarity search!"
}
fn unwrap_pair((a, b): (Option<f32>, Option<f32>)) -> (f32, f32) {
(a.unwrap_or(0.0), b.unwrap_or(0.0))
}
#[derive(PostgresType, Deserialize, Serialize, Debug, PartialEq, Clone)]
pub struct Vector(Vec<f32>);
// we only seem able to allocate something that's about 2 MiB. Whelp let's start with that, and
// figure out how to grow later
static VECTOR_ALLOCATOR00: PgLwLock<FiniteDimAllocator<MAX_SPACE>> = PgLwLock::new();
static SHMEM_BLOCKS: shmem_blocks::ShMemBlocks<MAX_SPACE> =
shmem_blocks::ShMemBlocks::<MAX_SPACE>::new();
//static VECVECVEC: [PgLwLock<FiniteDimAllocator<MAX_SPACE>>; 400] =
// [(); 400].map(|_| PgLwLock::new());
#[pg_guard]
pub extern "C" fn _PG_init() {
// pg_shmem_init!(VECTOR_ALLOCATOR00);
unsafe {
let lock = std::ffi::CString::new(SHMEM_BLOCKS.get_name()).expect("CString::new failed");
let size = SHMEM_BLOCKS.get_size();
for _ in 0..size {
let pg_lw_lock = PgLwLock::<FiniteDimAllocator<MAX_SPACE>>::new();
let name = pg_lw_lock.get_name();
let lock_name = std::ffi::CString::new(name).expect("CString::new failed");
pg_sys::RequestAddinShmemSpace(
size * std::mem::size_of::<FiniteDimAllocator<MAX_SPACE>>(),
);
pg_sys::RequestNamedLWLockTranche(lock_name.as_ptr(), 1);
let mut locks = LOCKS.lock().unwrap();
locks.push(pg_lw_lock)
}
pg_sys::RequestAddinShmemSpace(
size * std::mem::size_of::<PgLwLock<FiniteDimAllocator<MAX_SPACE>>>(),
);
pg_sys::RequestNamedLWLockTranche(lock.as_ptr(), 1);
}
unsafe {
let size = SHMEM_BLOCKS.get_size();
for _ in 0..size {
let hook_lock = HOOK_LOCK.lock().expect("LOCKING SHMEM HOOKS");
static mut PREV_SHMEM_STARTUP_HOOK: Option<(unsafe extern "C" fn(), usize)> = None;
PREV_SHMEM_STARTUP_HOOK = pg_sys::shmem_startup_hook.map(|i| (i, 0));
pg_sys::shmem_startup_hook = Some(shmem_hook);
println!("startup hook: {:?}", PREV_SHMEM_STARTUP_HOOK);
println!("DONE SETTING SHMEM_HOOKS: {:?}", hook_lock);
#[pg_guard]
extern "C" fn shmem_hook() {
unsafe {
PREV_SHMEM_STARTUP_HOOK =
PREV_SHMEM_STARTUP_HOOK.map(|(i, count)| (i, count + 1));
if let Some((i, count)) = PREV_SHMEM_STARTUP_HOOK {
if count < SHMEM_BLOCKS.get_size() {
i()
};
}
let addin_shmem_init_lock: *mut pg_sys::LWLock =
&mut (*pg_sys::MainLWLockArray.add(21)).lock;
pg_sys::LWLockAcquire(addin_shmem_init_lock, pg_sys::LWLockMode_LW_EXCLUSIVE);
let mut found = false;
let mut step_num = SHMEM_STEP.lock().unwrap();
let locks_vec = LOCKS.lock().expect("Unable to lock locks vector");
let lock = locks_vec.get(*step_num).unwrap();
let lock_name = lock.get_name();
let shm_name = std::ffi::CString::new(lock_name).expect("CString::new failed");
println!("About to attach pointer");
let fv_shmem = pg_sys::ShmemInitStruct(
shm_name.into_raw(),
std::mem::size_of::<FiniteDimAllocator<MAX_SPACE>>(),
&mut found,
) as *mut FiniteDimAllocator<MAX_SPACE>;
lock.attach(fv_shmem);
println!("About to write pointer");
*step_num += 1;
pg_sys::LWLockRelease(addin_shmem_init_lock);
}
}
}
static mut PREV_SHMEM_STARTUP_HOOK: Option<unsafe extern "C" fn()> = None;
let hook_lock = HOOK_LOCK.lock().unwrap();
PREV_SHMEM_STARTUP_HOOK = pg_sys::shmem_startup_hook;
pg_sys::shmem_startup_hook = Some(shmem_hook);
#[pg_guard]
extern "C" fn shmem_hook() {
unsafe {
if let Some(i) = PREV_SHMEM_STARTUP_HOOK {
i();
}
let mut found = false;
let addin_shmem_init_lock: *mut pg_sys::LWLock =
&mut (*pg_sys::MainLWLockArray.add(21)).lock;
pg_sys::LWLockAcquire(addin_shmem_init_lock, pg_sys::LWLockMode_LW_EXCLUSIVE);
let shm_name =
std::ffi::CString::new(SHMEM_BLOCKS.get_name()).expect("CString::new failed");
let fv_shmem = pg_sys::ShmemInitStruct(
shm_name.into_raw(),
SHMEM_BLOCKS.get_size()
* std::mem::size_of::<PgLwLock<FiniteDimAllocator<MAX_SPACE>>>(),
&mut found,
) as *mut PgLwLock<FiniteDimAllocator<MAX_SPACE>>;
SHMEM_BLOCKS
.inner
.set(ShMemBlocksInner { start: fv_shmem })
.expect("Failed to set inner shmem");
for offset in 0..SHMEM_BLOCKS.get_size() {
let lock = LOCKS
.lock()
.expect("Locks should be set")
.pop()
.expect("Locks should be initialised at this point");
*lock.exclusive() = FiniteDimAllocator::<MAX_SPACE>::default();
println!("written!");
std::ptr::write(fv_shmem.add(offset), lock);
let lock_ref = fv_shmem.add(offset).as_ref().unwrap();
println!("{:?}", *lock_ref.share())
}
pg_sys::LWLockRelease(addin_shmem_init_lock);
}
}
}
}
#[pg_extern]
fn set_allocator_dim(block: i32, dimemsion: i16) {
let error = format!("Block not found, only {:?}", SHMEM_BLOCKS.get_size());
let block = SHMEM_BLOCKS.get_block(block as usize).expect(&error);
block.exclusive().set_dim(dimemsion as u16)
}
#[pg_extern]
fn fill_vec_allocator(block: i32, vecs: i32) {
let mut rng = thread_rng();
let unif = Uniform::new(0.0, 1.0);
set_allocator_dim(block, 512);
(0..vecs).for_each(|_| {
let vec = (0..512).map(|_| rng.sample(unif)).collect::<Vec<_>>();
SHMEM_BLOCKS
.get_block(block as usize)
.unwrap()
.exclusive()
.add_vec(&vec);
});
}
#[pg_extern]
fn push_vec_to_mem(block: i32, vec: Array<f32>) {
let vec = vec
.into_iter()
.map(|x| x.unwrap_or(0.0))
.collect::<Vec<_>>();
SHMEM_BLOCKS
.get_block(block as usize)
.unwrap()
.exclusive()
.add_vec(&vec);
}
#[pg_extern]
fn query_vec(
block: i32,
vec: Array<f32>,
ids: Array<i64>,
) -> impl Iterator<Item = (name!(index, i64), name!(score, f32))> {
let query_ids = ids
.iter()
.filter(|x| x.is_some())
.map(|x| x.unwrap() as usize)
.collect::<Vec<usize>>();
let query_vec = vec.into_iter().map(|x| x.unwrap_or(0.0)).collect();
let return_scores = SHMEM_BLOCKS
.get_block(block as usize)
.unwrap()
.share()
.ip_many(&query_vec, &query_ids)
.unwrap();
return_scores
.iter()
.zip(query_ids.iter())
.map(|(score, index)| (*index as i64, *score))
.collect::<Vec<_>>()
.into_iter()
}
#[cfg(test)]
pub mod pg_test {
pub fn setup(_options: Vec<&str>) {
// perform one-off initialization when the pg_test framework starts
}
pub fn postgresql_conf_options() -> Vec<&'static str> {
// return any postgresql.conf settings that are required for your tests
vec![]
}
}
use crate::vec_block_alloc::F32Block;
use crate::FiniteDimAllocator;
use pgx::log;
use pgx::pg_sys;
use pgx::PgSharedMemoryInitialization;
use pgx::{pg_guard, pg_shmem_init};
use pgx::{PgLwLock, PgLwLockInner};
use std::lazy::OnceCell;
use uuid::Uuid;
pub struct ShMemBlocks<const BLOCK_SIZE: usize> {
size: OnceCell<usize>,
name: OnceCell<&'static str>,
pub inner: OnceCell<ShMemBlocksInner<BLOCK_SIZE>>,
}
unsafe impl<const BLOCK_SIZE: usize> Send for ShMemBlocks<BLOCK_SIZE> {}
unsafe impl<const BLOCK_SIZE: usize> Sync for ShMemBlocks<BLOCK_SIZE> {}
#[derive(Debug)]
pub struct ShMemBlocksInner<const BLOCK_SIZE: usize> {
pub start: *mut PgLwLock<FiniteDimAllocator<BLOCK_SIZE>>,
}
impl<const BLOCK_SIZE: usize> ShMemBlocks<BLOCK_SIZE> {
pub const fn new() -> Self {
ShMemBlocks {
size: OnceCell::new(),
name: OnceCell::new(),
inner: OnceCell::new(),
}
}
pub fn get_name(&self) -> &'static str {
match self.name.get() {
None => {
let name = Box::leak(Uuid::new_v4().to_string().into_boxed_str());
self.name.set(name).unwrap();
name
}
Some(name) => name,
}
}
pub fn get_size(&self) -> usize {
match self.size.get() {
None => {
let size = 1024; // todo: replace with GucStuff
self.size.set(size).unwrap();
size
}
Some(size) => *size,
}
}
pub fn get_block(&self, index: usize) -> Option<&PgLwLock<FiniteDimAllocator<BLOCK_SIZE>>> {
if index > self.get_size() {
None
} else {
// This is guaranteed to be safe because we initialise all addresses at startup
self.inner
.get()
.map(|inner| unsafe { inner.get_block(index) })
.flatten()
}
}
pub fn iter<'a>(
&'a self,
) -> impl Iterator<Item = &PgLwLock<FiniteDimAllocator<BLOCK_SIZE>>> + 'a {
let size = self.get_size();
(0..size).filter_map(move |index| self.get_block(index))
}
}
impl<const BLOCK_SIZE: usize> ShMemBlocksInner<BLOCK_SIZE> {
unsafe fn get_block(&self, index: usize) -> Option<&PgLwLock<FiniteDimAllocator<BLOCK_SIZE>>> {
self.start.add(index).as_ref()
}
}
use memoffset::*;
use pgx::pg_sys::AsPgCStr;
use pgx::*;
use serde::Serialize;
static mut RELOPT_KIND_PGVEC: u32 = 0; // Do I like this? Oh no I do not!
#[derive(Clone, Debug, Serialize)]
pub struct VecMemCacheOptions {
/* varlena header (do not touch directly!) */
#[allow(dead_code)]
vl_len_: i32,
dimension: u32,
something: u32,
}
const DEFAULT_DIM: u32 = 512;
#[allow(dead_code)]
impl VecMemCacheOptions {
pub(crate) fn from_relation(relation: &PgRelation) -> PgBox<VecMemCacheOptions> {
if relation.rd_index.is_null() {
panic!("'{} not a vector memcache index' ", relation.name())
} else if relation.rd_options.is_null() {
// use defaults
let mut opts = PgBox::<VecMemCacheOptions>::alloc0();
opts.dimension = DEFAULT_DIM;
opts
} else {
PgBox::from_pg(relation.rd_options as *mut VecMemCacheOptions)
}
}
pub(crate) fn get_dim(&self) -> u16 {
self.dimension as u16
}
}
#[pg_guard]
pub unsafe extern "C" fn vec_mem_cache_options(
reloptions: pg_sys::Datum,
validate: bool,
) -> *mut pg_sys::varlena {
let tab: [pg_sys::relopt_parse_elt; NUM_REL_OPTS] = [
pg_sys::relopt_parse_elt {
optname: "dimension".as_pg_cstr(),
opttype: pg_sys::relopt_type_RELOPT_TYPE_INT,
offset: offset_of!(VecMemCacheOptions, dimension) as i32,
},
pg_sys::relopt_parse_elt {
optname: "something".as_pg_cstr(),
opttype: pg_sys::relopt_type_RELOPT_TYPE_INT,
offset: offset_of!(VecMemCacheOptions, something) as i32,
},
];
build_relopts(reloptions, validate, tab)
}
// impl IntoDatum for *mut pg_sys::bytea {}
const NUM_REL_OPTS: usize = 2;
unsafe fn build_relopts(
reloptions: pg_sys::Datum,
validate: bool,
tab: [pg_sys::relopt_parse_elt; NUM_REL_OPTS],
) -> *mut pg_sys::bytea {
let rdopts;
/* Parse the user-given reloptions */
rdopts = pg_sys::build_reloptions(
reloptions,
validate,
RELOPT_KIND_PGVEC,
std::mem::size_of::<VecMemCacheOptions>(),
tab.as_ptr(),
NUM_REL_OPTS as i32,
);
rdopts as *mut pg_sys::bytea
}
#[cfg(test)]
mod test {}
// use crate::mem_cache_index::build::row_to_vec_iter;
use pgx::*;
pub unsafe extern "C" fn aminsert(
index_relation: pg_sys::Relation,
values: *mut pg_sys::Datum,
_isnull: *mut bool,
heap_tid: pg_sys::ItemPointer,
_heap_relation: pg_sys::Relation,
_check_unique: pg_sys::IndexUniqueCheck,
_index_info: *mut pg_sys::IndexInfo,
) -> bool {
let index_relation = PgRelation::from_pg(index_relation);
let values = std::slice::from_raw_parts(values, 1);
let name = index_relation.name().bytes();
// todo: try to get a pointer to the name as a sequence of i8 bytes.
// let tupdesc = pg_sys::RelationNameGetTupleDesc(name);
// let vec = row_to_vec_iter(tupdesc, values[0]).unwrap();
true
}