Move DownloadService to Coroutine (1/2)

This commit is contained in:
LooKeR 2021-12-04 00:28:44 +05:30
parent 0dd23c23b1
commit e7184ecccc
2 changed files with 56 additions and 47 deletions

View File

@ -24,13 +24,13 @@ import com.looker.droidify.utility.extension.android.*
import com.looker.droidify.utility.extension.resources.*
import com.looker.droidify.utility.extension.text.*
import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.disposables.Disposable
import io.reactivex.rxjava3.subjects.PublishSubject
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.collect
import java.io.File
import java.security.MessageDigest
import java.util.concurrent.TimeUnit
import kotlin.math.*
class DownloadService : ConnectionService<DownloadService.Binder>() {
@ -41,7 +41,8 @@ class DownloadService : ConnectionService<DownloadService.Binder>() {
private const val EXTRA_CACHE_FILE_NAME =
"${BuildConfig.APPLICATION_ID}.intent.extra.CACHE_FILE_NAME"
private val downloadingSubject = PublishSubject.create<State.Downloading>()
private val mutableDownloadState = MutableSharedFlow<State.Downloading>()
private val downloadState = mutableDownloadState.asSharedFlow()
}
val scope = CoroutineScope(Dispatchers.Default)
@ -81,15 +82,14 @@ class DownloadService : ConnectionService<DownloadService.Binder>() {
State(packageName, name)
class Success(
packageName: String, name: String, val release: Release,
val consume: () -> Unit,
packageName: String, name: String, val release: Release
) : State(packageName, name)
class Error(packageName: String, name: String) : State(packageName, name)
class Cancel(packageName: String, name: String) : State(packageName, name)
}
private val stateSubject = PublishSubject.create<State>()
private val mutableStateSubject = MutableSharedFlow<State>()
private class Task(
val packageName: String, val name: String, val release: Release,
@ -106,9 +106,7 @@ class DownloadService : ConnectionService<DownloadService.Binder>() {
private var currentTask: CurrentTask? = null
inner class Binder : android.os.Binder() {
fun events(packageName: String): Observable<State> {
return stateSubject.filter { it.packageName == packageName }
}
val stateSubject = mutableStateSubject.asSharedFlow()
fun enqueue(packageName: String, name: String, repository: Repository, release: Release) {
val task = Task(
@ -128,7 +126,7 @@ class DownloadService : ConnectionService<DownloadService.Binder>() {
if (currentTask == null) {
handleDownload()
} else {
stateSubject.onNext(State.Pending(packageName, name))
scope.launch { mutableStateSubject.emit(State.Pending(packageName, name)) }
}
}
}
@ -138,18 +136,11 @@ class DownloadService : ConnectionService<DownloadService.Binder>() {
cancelCurrentTask(packageName)
handleDownload()
}
fun getState(packageName: String): State? = currentTask
?.let { if (it.task.packageName == packageName) it.lastState else null }
?: tasks.find { it.packageName == packageName }
?.let { State.Pending(it.packageName, it.name) }
}
private val binder = Binder()
override fun onBind(intent: Intent): Binder = binder
private var downloadingDisposable: Disposable? = null
override fun onCreate() {
super.onCreate()
@ -162,16 +153,14 @@ class DownloadService : ConnectionService<DownloadService.Binder>() {
.let(notificationManager::createNotificationChannel)
}
downloadingDisposable = downloadingSubject
.sample(500L, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
.subscribe { publishForegroundState(false, it) }
scope.launch {
downloadState.collect { publishForegroundState(false, it) }
}
}
override fun onDestroy() {
super.onDestroy()
downloadingDisposable?.dispose()
downloadingDisposable = null
scope.cancel()
cancelTasks(null)
cancelCurrentTask(null)
@ -187,7 +176,7 @@ class DownloadService : ConnectionService<DownloadService.Binder>() {
private fun cancelTasks(packageName: String?) {
tasks.removeAll {
(packageName == null || it.packageName == packageName) && run {
stateSubject.onNext(State.Cancel(it.packageName, it.name))
scope.launch { mutableStateSubject.emit(State.Cancel(it.packageName, it.name)) }
true
}
}
@ -197,7 +186,14 @@ class DownloadService : ConnectionService<DownloadService.Binder>() {
currentTask?.let {
if (packageName == null || it.task.packageName == packageName) {
currentTask = null
stateSubject.onNext(State.Cancel(it.task.packageName, it.task.name))
scope.launch {
mutableStateSubject.emit(
State.Cancel(
it.task.packageName,
it.task.name
)
)
}
it.disposable.dispose()
}
}
@ -307,9 +303,10 @@ class DownloadService : ConnectionService<DownloadService.Binder>() {
private fun publishSuccess(task: Task) {
var consumed = false
stateSubject.onNext(State.Success(task.packageName, task.name, task.release) {
scope.launch {
mutableStateSubject.emit(State.Success(task.packageName, task.name, task.release))
consumed = true
})
}
if (!consumed) {
if (rootInstallerEnabled) {
scope.launch {
@ -414,7 +411,7 @@ class DownloadService : ConnectionService<DownloadService.Binder>() {
}
}::class
}.build())
stateSubject.onNext(state)
scope.launch { mutableStateSubject.emit(state) }
}
}
@ -441,14 +438,16 @@ class DownloadService : ConnectionService<DownloadService.Binder>() {
task.authentication
) { read, total ->
if (!disposable.isDisposed) {
downloadingSubject.onNext(
State.Downloading(
task.packageName,
task.name,
read,
total
scope.launch {
mutableDownloadState.emit(
State.Downloading(
task.packageName,
task.name,
read,
total
)
)
)
}
}
}
.observeOn(AndroidSchedulers.mainThread())
@ -460,7 +459,14 @@ class DownloadService : ConnectionService<DownloadService.Binder>() {
task,
if (result != null) ErrorType.Http else ErrorType.Network
)
stateSubject.onNext(State.Error(task.packageName, task.name))
scope.launch {
mutableStateSubject.emit(
State.Error(
task.packageName,
task.name
)
)
}
} else {
val validationError = validatePackage(task, partialReleaseFile)
if (validationError == null) {
@ -471,7 +477,14 @@ class DownloadService : ConnectionService<DownloadService.Binder>() {
} else {
partialReleaseFile.delete()
showNotificationError(task, ErrorType.Validation(validationError))
stateSubject.onNext(State.Error(task.packageName, task.name))
scope.launch {
mutableStateSubject.emit(
State.Error(
task.packageName,
task.name
)
)
}
}
}
handleDownload()

View File

@ -35,6 +35,8 @@ import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.disposables.Disposable
import io.reactivex.rxjava3.schedulers.Schedulers
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
@ -83,15 +85,12 @@ class AppDetailFragment() : ScreenFragment(), AppDetailAdapter.Callbacks {
private var recyclerView: RecyclerView? = null
private var productDisposable: Disposable? = null
private var downloadDisposable: Disposable? = null
private val downloadConnection = Connection(DownloadService::class.java, onBind = { _, binder ->
lifecycleScope.launch { updateDownloadState(binder.getState(packageName)) }
downloadDisposable = binder.events(packageName).subscribe {
lifecycleScope.launch { updateDownloadState(it) }
lifecycleScope.launch {
binder.stateSubject.filter { it.packageName == packageName }.collect {
updateDownloadState(it)
}
}
}, onUnbind = { _, _ ->
downloadDisposable?.dispose()
downloadDisposable = null
})
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
@ -238,8 +237,6 @@ class AppDetailFragment() : ScreenFragment(), AppDetailAdapter.Callbacks {
productDisposable?.dispose()
productDisposable = null
downloadDisposable?.dispose()
downloadDisposable = null
downloadConnection.unbind(requireContext())
}
@ -363,7 +360,6 @@ class AppDetailFragment() : ScreenFragment(), AppDetailAdapter.Callbacks {
(recyclerView?.adapter as? AppDetailAdapter)?.setStatus(status)
if (state is DownloadService.State.Success && isResumed) {
withContext(Dispatchers.Default) {
state.consume()
AppInstaller.getInstance(context)?.defaultInstaller?.install(state.release.cacheFileName)
}
}