package com.github.jonathanxd.dracon.cache
import com.github.jonathanxd.dracon.coroutine.DRACON_SCHEDULER
import com.intellij.execution.process.mediator.util.blockingGet
import com.intellij.openapi.project.Project
import com.intellij.openapi.project.getProjectDataPath
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.channels.trySendBlocking
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardOpenOption
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.locks.ReentrantLock
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream
import kotlin.concurrent.withLock
/**
* Denotes the current version of data cache, must be updated when data cache format changes.
*/
const val ACTOR_DATA_CACHE_VERSION: Long = 3
/**
* Actor based data cache storage, with support to cache misses and falling back to default resolver,
* this allows writes to on-file cached to lock without freezing IntelliJ IDEA (when request for data happens in UI-thread).
*/
class ActorDataCache<K, V>(val project: Project,
val name: String) {
private val cachePath = project.getProjectDataPath("com.github.jonathanxd.dracon")
private val cacheFile = cachePath.resolve("$name.gz")
private val manager = ActorDataCacheManager<K, CachedValue<V>>(this.cacheFile, DATA_CACHE_VERSION)
private val lock = ReentrantLock()
private val updateExecutor = Executors.newSingleThreadExecutor()
private val inMemory = ConcurrentHashMap<K, CachedValue<V>>()
init {
this.manager.load {
inMemory.putAll(it)
}
}
/**
* Executes
*/
fun <R> withLock(f: () -> R) = this.lock.withLock(f)
fun unload(key: K) {
this.lock.withLock {
this.inMemory.remove(key)
}
writeToDisk()
}
fun queryOrLoad(key: K, compute: (K) -> V): V {
return this.lock.withLock {
if (!this.inMemory.containsKey(key)) {
val computeEntry = CachedValue(Instant.now(), compute(key))
this.inMemory[key] = computeEntry
writeToDisk()
computeEntry
} else {
this.inMemory[key]!!
}.value
}
}
fun <I> queryOrLoad(key: K,
compute: (K) -> I,
filter: (I) -> Boolean,
iMapper: (I) -> V,
vMapper: (V) -> I): I {
return this.lock.withLock {
if (!this.inMemory.containsKey(key)) {
val computeEntry = compute(key)
if (!filter(computeEntry)) {
return computeEntry
}
this.inMemory[key] = CachedValue(Instant.now(), iMapper(computeEntry!!))
writeToDisk()
computeEntry
} else {
vMapper(this.inMemory[key]!!.value)
}
}
}
/**
* This call completes asynchronously.
*/
private fun writeToDisk() {
this.manager.write(this.inMemory)
}
/**
* @see CacheService.invalidate
*/
fun invalidate() {
this.lock.withLock {
this.inMemory.clear()
this.writeToDisk()
}
}
/**
* @see CacheService.invalidate
*/
fun invalidate(key: K) {
this.lock.withLock {
this.inMemory.remove(key)
this.writeToDisk()
}
}
/**
* @see CacheService.updateCache
*/
fun updateCache(key: K, newValueCompute: () -> V) {
val instant = Instant.now()
this.lock.withLock {
val inMemoryValue = this.inMemory[key]
if (inMemoryValue == null || inMemoryValue.instant.isBefore(instant)) {
this.inMemory[key] = CachedValue(instant, newValueCompute())
}
}
}
/**
* @see CacheService.updateCache
*/
fun updateCache(keys: List<K>, newValueCompute: (K) -> V) {
val instant = Instant.now()
this.lock.withLock {
for (key in keys) {
val inMemoryValue = this.inMemory[key]
if (inMemoryValue == null || inMemoryValue.instant.isBefore(instant)) {
this.inMemory[key] = CachedValue(instant, newValueCompute(key))
}
}
}
}
}
sealed class CacheCommand<K, V> {
class CacheLoad<K, V>(val completion: CompletableDeferred<Map<K, V>?>): CacheCommand<K, V>()
class CacheSave<K, V>(val data: Map<K, V>, val completion: CompletableDeferred<Unit>): CacheCommand<K, V>()
}
// Do async, accept MISS while locked
// cache storage is very slow due to compression
class ActorDataCacheManager<K, V>(val path: Path, val version: Long) {
@Suppress("BlockingMethodInNonBlockingContext")
private val loader = CoroutineScope(Dispatchers.IO).actor<CacheCommand<K, V>> {
for (msg in channel) {
when (msg) {
is CacheCommand.CacheLoad<K, V> -> {
msg.completion.complete(if (!Files.exists(path)) {
null
} else {
try {
Files.newInputStream(path).use { stream ->
GZIPInputStream(stream).use { gz ->
ObjectInputStream(gz).use { reader ->
val version = reader.readLong()
if (version != this@ActorDataCacheManager.version) {
emptyMap()
} else {
reader.readObject() as Map<K, V>
}
}
}
}
} catch (t: Throwable) {
t.printStackTrace()
Files.delete(path)
null
}
})
}
is CacheCommand.CacheSave<K, V> -> {
Files.newOutputStream(path, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING).use { writer ->
GZIPOutputStream(writer).use { compress ->
ObjectOutputStream(compress).use { oos ->
oos.writeLong(this@ActorDataCacheManager.version)
oos.writeObject(msg.data)
msg.completion.complete(Unit)
}
}
}
}
}
}
}
init {
Files.createDirectories(this.path.parent)
}
fun load(onLoad: suspend (Map<K, V>) -> Unit) {
CoroutineScope(DRACON_SCHEDULER).launch {
val complete = CompletableDeferred<Map<K, V>?>()
loader.send(CacheCommand.CacheLoad(complete))
val load = complete.await()
if (load != null) {
onLoad(load)
}
}
}
fun write(data: Map<K, V>, onComplete: suspend () -> Unit = {}) {
CoroutineScope(DRACON_SCHEDULER).launch {
val complete = CompletableDeferred<Unit>()
loader.send(CacheCommand.CacheSave(data, complete))
complete.await()
onComplete()
}
}
}