TaskManager.kt (5894B) - raw


      1 package me.rhunk.snapenhance.task
      2 
      3 import android.content.ContentValues
      4 import android.content.Context
      5 import android.database.sqlite.SQLiteDatabase
      6 import kotlinx.coroutines.asCoroutineDispatcher
      7 import kotlinx.coroutines.launch
      8 import kotlinx.coroutines.runBlocking
      9 import me.rhunk.snapenhance.RemoteSideContext
     10 import me.rhunk.snapenhance.common.util.SQLiteDatabaseHelper
     11 import me.rhunk.snapenhance.common.util.ktx.getLong
     12 import me.rhunk.snapenhance.common.util.ktx.getStringOrNull
     13 import java.util.concurrent.Executors
     14 import kotlin.coroutines.suspendCoroutine
     15 
     16 class TaskManager(
     17     private val remoteSideContext: RemoteSideContext
     18 ) {
     19     private lateinit var taskDatabase: SQLiteDatabase
     20     private val queueExecutor = Executors.newSingleThreadExecutor()
     21 
     22     fun init() {
     23         taskDatabase = remoteSideContext.androidContext.openOrCreateDatabase("tasks", Context.MODE_PRIVATE, null).apply {
     24             SQLiteDatabaseHelper.createTablesFromSchema(this, mapOf(
     25                 "tasks" to listOf(
     26                     "id INTEGER PRIMARY KEY AUTOINCREMENT",
     27                     "hash VARCHAR UNIQUE",
     28                     "title VARCHAR(255) NOT NULL",
     29                     "author VARCHAR(255)",
     30                     "type VARCHAR(255) NOT NULL",
     31                     "status VARCHAR(255) NOT NULL",
     32                     "extra TEXT"
     33                 )
     34             ))
     35         }
     36     }
     37 
     38     private val activeTasks = mutableMapOf<Long, PendingTask>()
     39 
     40     private fun readTaskFromCursor(cursor: android.database.Cursor): Task {
     41         val task = Task(
     42             type = TaskType.fromKey(cursor.getStringOrNull("type")!!),
     43             title = cursor.getStringOrNull("title")!!,
     44             author = cursor.getStringOrNull("author"),
     45             hash = cursor.getStringOrNull("hash")!!
     46         )
     47         task.status = TaskStatus.fromKey(cursor.getStringOrNull("status")!!)
     48         task.extra = cursor.getStringOrNull("extra")
     49         task.changeListener = {
     50             updateTask(cursor.getLong("id"), task)
     51         }
     52         return task
     53     }
     54 
     55     private fun putNewTask(task: Task): Long {
     56         return runBlocking {
     57             suspendCoroutine {
     58                 queueExecutor.execute {
     59                     taskDatabase.rawQuery("SELECT * FROM tasks WHERE hash = ?", arrayOf(task.hash)).use { cursor ->
     60                         if (cursor.moveToNext()) {
     61                             it.resumeWith(Result.success(cursor.getLong("id")))
     62                             return@execute
     63                         }
     64                     }
     65 
     66                     val result = taskDatabase.insert("tasks", null, ContentValues().apply {
     67                         put("type", task.type.key)
     68                         put("hash", task.hash)
     69                         put("author", task.author)
     70                         put("title", task.title)
     71                         put("status", task.status.key)
     72                         put("extra", task.extra)
     73                     })
     74 
     75                     it.resumeWith(Result.success(result))
     76                 }
     77             }
     78         }
     79     }
     80 
     81     private fun updateTask(id: Long, task: Task) {
     82         queueExecutor.execute {
     83             taskDatabase.execSQL("UPDATE tasks SET status = ?, extra = ? WHERE id = ?",
     84                 arrayOf(
     85                     task.status.key,
     86                     task.extra,
     87                     id.toString()
     88                 )
     89             )
     90         }
     91     }
     92 
     93     fun clearAllTasks() {
     94         runBlocking {
     95             launch(queueExecutor.asCoroutineDispatcher()) {
     96                 taskDatabase.execSQL("DELETE FROM tasks")
     97             }
     98         }
     99     }
    100 
    101     fun removeTask(task: Task) {
    102         runBlocking {
    103             activeTasks.entries.find { it.value.task == task }?.let {
    104                 activeTasks.remove(it.key)
    105                 runCatching {
    106                     it.value.cancel()
    107                 }.onFailure {
    108                     remoteSideContext.log.warn("Failed to cancel task ${task.hash}")
    109                 }
    110             }
    111             launch(queueExecutor.asCoroutineDispatcher()) {
    112                 taskDatabase.execSQL("DELETE FROM tasks WHERE hash = ?", arrayOf(task.hash))
    113             }
    114         }
    115     }
    116 
    117     fun createPendingTask(task: Task): PendingTask {
    118         val taskId = putNewTask(task)
    119         task.changeListener = {
    120             updateTask(taskId, task)
    121         }
    122 
    123         val pendingTask = PendingTask(taskId, task)
    124         activeTasks[taskId] = pendingTask
    125         return pendingTask
    126     }
    127 
    128     fun getTaskByHash(hash: String?): Task? {
    129         if (hash == null) return null
    130         taskDatabase.rawQuery("SELECT * FROM tasks WHERE hash = ?", arrayOf(hash)).use { cursor ->
    131             if (cursor.moveToNext()) {
    132                 return readTaskFromCursor(cursor)
    133             }
    134         }
    135         return null
    136     }
    137 
    138     fun getActiveTasks() = activeTasks
    139 
    140     fun fetchStoredTasks(lastId: Long = Long.MAX_VALUE, limit: Int = 10): Map<Long, Task> {
    141         val tasks = mutableMapOf<Long, Task>()
    142         val invalidTasks = mutableListOf<Long>()
    143 
    144         taskDatabase.rawQuery("SELECT * FROM tasks WHERE id < ? ORDER BY id DESC LIMIT ?", arrayOf(lastId.toString(), limit.toString())).use { cursor ->
    145             while (cursor.moveToNext()) {
    146                 runCatching {
    147                     val task = readTaskFromCursor(cursor)
    148                     if (!task.status.isFinalStage()) { task.status = TaskStatus.FAILURE }
    149                     tasks[cursor.getLong("id")] = task
    150                 }.onFailure {
    151                     invalidTasks.add(cursor.getLong("id"))
    152                     remoteSideContext.log.warn("Failed to read task ${cursor.getLong("id")}")
    153                 }
    154             }
    155         }
    156 
    157         invalidTasks.forEach {
    158             queueExecutor.execute {
    159                 taskDatabase.execSQL("DELETE FROM tasks WHERE id = ?", arrayOf(it.toString()))
    160             }
    161         }
    162 
    163         return tasks
    164     }
    165 }