TaskManager.kt (5894B) - raw
1 package me.rhunk.snapenhance.task 2 3 import android.content.ContentValues 4 import android.content.Context 5 import android.database.sqlite.SQLiteDatabase 6 import kotlinx.coroutines.asCoroutineDispatcher 7 import kotlinx.coroutines.launch 8 import kotlinx.coroutines.runBlocking 9 import me.rhunk.snapenhance.RemoteSideContext 10 import me.rhunk.snapenhance.common.util.SQLiteDatabaseHelper 11 import me.rhunk.snapenhance.common.util.ktx.getLong 12 import me.rhunk.snapenhance.common.util.ktx.getStringOrNull 13 import java.util.concurrent.Executors 14 import kotlin.coroutines.suspendCoroutine 15 16 class TaskManager( 17 private val remoteSideContext: RemoteSideContext 18 ) { 19 private lateinit var taskDatabase: SQLiteDatabase 20 private val queueExecutor = Executors.newSingleThreadExecutor() 21 22 fun init() { 23 taskDatabase = remoteSideContext.androidContext.openOrCreateDatabase("tasks", Context.MODE_PRIVATE, null).apply { 24 SQLiteDatabaseHelper.createTablesFromSchema(this, mapOf( 25 "tasks" to listOf( 26 "id INTEGER PRIMARY KEY AUTOINCREMENT", 27 "hash VARCHAR UNIQUE", 28 "title VARCHAR(255) NOT NULL", 29 "author VARCHAR(255)", 30 "type VARCHAR(255) NOT NULL", 31 "status VARCHAR(255) NOT NULL", 32 "extra TEXT" 33 ) 34 )) 35 } 36 } 37 38 private val activeTasks = mutableMapOf<Long, PendingTask>() 39 40 private fun readTaskFromCursor(cursor: android.database.Cursor): Task { 41 val task = Task( 42 type = TaskType.fromKey(cursor.getStringOrNull("type")!!), 43 title = cursor.getStringOrNull("title")!!, 44 author = cursor.getStringOrNull("author"), 45 hash = cursor.getStringOrNull("hash")!! 46 ) 47 task.status = TaskStatus.fromKey(cursor.getStringOrNull("status")!!) 48 task.extra = cursor.getStringOrNull("extra") 49 task.changeListener = { 50 updateTask(cursor.getLong("id"), task) 51 } 52 return task 53 } 54 55 private fun putNewTask(task: Task): Long { 56 return runBlocking { 57 suspendCoroutine { 58 queueExecutor.execute { 59 taskDatabase.rawQuery("SELECT * FROM tasks WHERE hash = ?", arrayOf(task.hash)).use { cursor -> 60 if (cursor.moveToNext()) { 61 it.resumeWith(Result.success(cursor.getLong("id"))) 62 return@execute 63 } 64 } 65 66 val result = taskDatabase.insert("tasks", null, ContentValues().apply { 67 put("type", task.type.key) 68 put("hash", task.hash) 69 put("author", task.author) 70 put("title", task.title) 71 put("status", task.status.key) 72 put("extra", task.extra) 73 }) 74 75 it.resumeWith(Result.success(result)) 76 } 77 } 78 } 79 } 80 81 private fun updateTask(id: Long, task: Task) { 82 queueExecutor.execute { 83 taskDatabase.execSQL("UPDATE tasks SET status = ?, extra = ? WHERE id = ?", 84 arrayOf( 85 task.status.key, 86 task.extra, 87 id.toString() 88 ) 89 ) 90 } 91 } 92 93 fun clearAllTasks() { 94 runBlocking { 95 launch(queueExecutor.asCoroutineDispatcher()) { 96 taskDatabase.execSQL("DELETE FROM tasks") 97 } 98 } 99 } 100 101 fun removeTask(task: Task) { 102 runBlocking { 103 activeTasks.entries.find { it.value.task == task }?.let { 104 activeTasks.remove(it.key) 105 runCatching { 106 it.value.cancel() 107 }.onFailure { 108 remoteSideContext.log.warn("Failed to cancel task ${task.hash}") 109 } 110 } 111 launch(queueExecutor.asCoroutineDispatcher()) { 112 taskDatabase.execSQL("DELETE FROM tasks WHERE hash = ?", arrayOf(task.hash)) 113 } 114 } 115 } 116 117 fun createPendingTask(task: Task): PendingTask { 118 val taskId = putNewTask(task) 119 task.changeListener = { 120 updateTask(taskId, task) 121 } 122 123 val pendingTask = PendingTask(taskId, task) 124 activeTasks[taskId] = pendingTask 125 return pendingTask 126 } 127 128 fun getTaskByHash(hash: String?): Task? { 129 if (hash == null) return null 130 taskDatabase.rawQuery("SELECT * FROM tasks WHERE hash = ?", arrayOf(hash)).use { cursor -> 131 if (cursor.moveToNext()) { 132 return readTaskFromCursor(cursor) 133 } 134 } 135 return null 136 } 137 138 fun getActiveTasks() = activeTasks 139 140 fun fetchStoredTasks(lastId: Long = Long.MAX_VALUE, limit: Int = 10): Map<Long, Task> { 141 val tasks = mutableMapOf<Long, Task>() 142 val invalidTasks = mutableListOf<Long>() 143 144 taskDatabase.rawQuery("SELECT * FROM tasks WHERE id < ? ORDER BY id DESC LIMIT ?", arrayOf(lastId.toString(), limit.toString())).use { cursor -> 145 while (cursor.moveToNext()) { 146 runCatching { 147 val task = readTaskFromCursor(cursor) 148 if (!task.status.isFinalStage()) { task.status = TaskStatus.FAILURE } 149 tasks[cursor.getLong("id")] = task 150 }.onFailure { 151 invalidTasks.add(cursor.getLong("id")) 152 remoteSideContext.log.warn("Failed to read task ${cursor.getLong("id")}") 153 } 154 } 155 } 156 157 invalidTasks.forEach { 158 queueExecutor.execute { 159 taskDatabase.execSQL("DELETE FROM tasks WHERE id = ?", arrayOf(it.toString())) 160 } 161 } 162 163 return tasks 164 } 165 }