PMKFKETRBAZWKGX2ZBWZAUBXVKD2V22AEHFKA2HCSSTGI75DPEAAC
DBCRYUN5Q3DIFGBVVCOWRNBMMWCGILT4TY2OY22IYVH6U7TQIYTAC
IFWHYAZTPKL7ZVOV3AFZNIUTRLMVSOACYACA3CI4DSEEZU77YQEQC
UV2RTMNSNI3AJFVVD66OT3CEF7WU7TLAPJKACGX7BBSSWAEBAAAAC
DH3RVE6JRQBOSRUBVIIMKSJYFS4CK5PI25KV2NAJ7EP4Q7NNKSJQC
SIHQ3OG5GIBQLSMOBKSD2IHFTGFXU4VDCX45BH2ABS42BV3N2UIQC
LECAQCCL2UKTMH3LQSGEFVBOF6CYCGJGJZM2I2T4R7QPTBVW4VYQC
}
#[pg_test]
fn test_score_vecs() {
println!("Starting test");
let mut rng = thread_rng();
let poisson = Poisson::new(1024.0).expect("Couldn't Poissonify");
let n_vecs = poisson.sample(&mut rng) as usize;
let dim = 512u16;
let mut shmem_handler = SharedMemHandler {};
println!("INITIALISING TABLES");
init_index_table();
clear_allocations();
println!("Building vectors");
let vectors = VectorsBuilder::new().init("vectors_2", dim);
insert_random_vecs(&mut rng, &vectors, n_vecs);
println!("DONE Building vectors");
let mut mem_cache_vectors = MemCacheVectorsBuilder::new().init(&vectors);
mem_cache_vectors.push_all_possible(&mut shmem_handler);
let vec = std::iter::repeat(1.0)
.take(dim as usize)
.collect::<Vec<_>>();
Spi::connect(|client| {
let vecs_and_scores_db = vectors.score_vecs_ip(&client, &vec);
let vecs_and_scores_cache = mem_cache_vectors.score_vecs_ip(&client, &vec);
let score: f32 = vecs_and_scores_cache
.iter()
.map(|(idx, score)| {
let db_score = vecs_and_scores_db
.get(idx)
.expect(&format!("Missing index {} from db", idx));
(*score - db_score).abs()
})
.sum();
assert!(score <= 1.0e-7);
Ok(Some(score))
})
.expect("Couln't unwrap score");
.map(|heap_tuple| {
let vec_table_name = heap_tuple
.by_ordinal(1)
.expect("Unable to read vector table name")
.value::<String>()
.expect("Unable to cast vector table name to String");
let index_table_name = heap_tuple
.by_ordinal(2)
.expect("Unabled to read index table name")
.value::<String>()
.expect("Unable to cast index table name to String");
let dim = heap_tuple
.by_ordinal(3)
.expect("Unable to read dimension")
.value::<i32>()
.expect("Unable to cast dimension to int");
.select_help()
.iter_three::<String, String, i32>()
.map(|(vec_table_opt, index_table_opt, dim_opt)| {
let vec_table_name =
vec_table_opt.expect("Unable to cast vector table name to String");
let index_table_name =
index_table_opt.expect("Unable to cast index table name to String");
let dim = dim_opt.expect("Unable to cast dimension to int");
}
pub(crate) fn score_vecs_ip(&self, client: &SpiClient, vec: &[f32]) -> HashMap<i64, f32> {
client
.select(
&format!(
"
SELECT
vec.id, ip(vec.vec, $1) ip
FROM {vector_table} vec
",
vector_table = &self.name
),
None,
Some(vec![(
PgBuiltInOids::FLOAT4ARRAYOID.oid(),
vec.into_datum(),
)]),
)
.select_help()
.iter_two::<i64, f32>()
.filter_map(|(idx_opt, score_opt)| {
let idx = idx_opt?;
let score = score_opt?;
Some((idx, score))
})
.collect()
.ok_or({
println!("Doing some Stuff");
SpiError::Transaction
})?; // Probably unwrap is better
{
let vecs_with_ids = self.get_vecs_block(&client, count).collect::<Vec<_>>();
let inserts = vecs_with_ids
.iter()
.map(|vec_with_id| {
self.push_vector(allocator_index as usize, &mut client, vec_with_id)
})
.collect::<Result<Vec<()>, String>>()
.map_err(|_| SpiError::Transaction)?
.len(); // todo if one of these fails, there should be a recovery step.
println!(
"Insert n_vecs {}, in allocator {} with dim {}",
vecs_with_ids.len(),
allocator_index,
self.vector_table.dim
);
let vecs_with_ids = self.get_vecs_block(&client, count).collect::<Vec<_>>();
let inserts = vecs_with_ids
.iter()
.map(|vec_with_id| self.push_vector(allocator_index as usize, &mut client, vec_with_id))
.collect::<Result<Vec<()>, String>>()
.map_err(|_| SpiError::Transaction)?
.len(); // todo if one of these fails, there should be a recovery step.
println!(
"Insert n_vecs {}, in allocator {} with dim {}",
vecs_with_ids.len(),
allocator_index,
self.vector_table.dim
);
Ok(inserts.wrap_with_client(client))
Ok(inserts.wrap_with_client(client))
} else {
Ok(0.wrap_with_client(client))
}
COUNT(vector_id) n_vecs
FROM {}
GROUP BY allocator
ORDER BY n_vecs
)
SELECT
allocator,
n_vecs
FROM allocators_by_vec
WHERE n_vecs < $1",
n_vecs
FROM allocators_by_vec
WHERE n_vecs < $1",
}
pub(crate) fn score_vecs_ip(&self, client: &SpiClient, vec: &[f32]) -> HashMap<i64, f32> {
client
.select(
&format!(
"
SELECT
mit.vector_id id,
get_block_vector_ip(mit.allocator, mit.allocator_index, $1) ip
FROM {memcache_index_table} mit
UNION
SELECT
vec.id, ip(vec.vec, $1) ip
FROM {vector_table} vec
LEFT JOIN {memcache_index_table} mit2
ON mit2.vector_id = vec.id
WHERE mit2.id IS NULL
",
memcache_index_table = &self.name,
vector_table = &self.vector_table.name
),
None,
Some(vec![(
PgBuiltInOids::FLOAT4ARRAYOID.oid(),
vec.into_datum(),
)]),
)
.select_help()
.iter_two::<i64, f32>()
.filter_map(|(idx_opt, score_opt)| {
let idx = idx_opt?;
let score = score_opt?;
Some((idx, score))
})
.collect::<HashMap<_, _>>()
fn get_block_vector_ip(block_index: i32, vector_index: i32, comp_array: Array<f32>) -> Option<f32> {
if block_index >= SHMEM_BLOCKS.get_size() as i32 {
panic!(
"Tried to access block index {} bigger than block size {} when calculating inner product",
block_index,
SHMEM_BLOCKS.get_size()
)
}
SHMEM_BLOCKS.get_block(block_index as usize).map(|block| {
let shared_ref = block.share();
shared_ref
.get_slice(vector_index as usize)
.expect("vector_index out of bounds")
.into_iter()
.zip(comp_array.into_iter())
.map(|(x, y_opt)| x * y_opt.unwrap_or(0.0))
.sum::<f32>()
})
}
#[pg_extern]