Revert: Revert Back to RxJava for Now

This commit is contained in:
LooKeR 2021-11-13 10:13:15 +05:30
parent bc44c9de15
commit ed49230d4e

View File

@ -29,34 +29,27 @@ import com.looker.droidify.utility.extension.android.notificationManager
import com.looker.droidify.utility.extension.resources.getColorFromAttr
import com.looker.droidify.utility.extension.text.formatSize
import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.disposables.Disposable
import io.reactivex.rxjava3.schedulers.Schedulers.io
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.collect
import io.reactivex.rxjava3.schedulers.Schedulers
import io.reactivex.rxjava3.subjects.PublishSubject
import java.lang.ref.WeakReference
import java.util.concurrent.TimeUnit
import kotlin.math.roundToInt
class SyncService : ConnectionService<SyncService.Binder>() {
private val scope = CoroutineScope(Dispatchers.Default)
companion object {
private const val ACTION_CANCEL = "${BuildConfig.APPLICATION_ID}.intent.action.CANCEL"
private val mutableStateSubject = MutableSharedFlow<State>()
private val mutableFinishSubject = MutableSharedFlow<Unit>()
private val stateSubject = mutableStateSubject.asSharedFlow()
private val finishSubject = mutableFinishSubject.asSharedFlow()
private val stateSubject = PublishSubject.create<State>()
private val finishSubject = PublishSubject.create<Unit>()
}
private sealed class State {
data class Connecting(val name: String) : State()
data class Syncing(
val name: String, val stage: RepositoryUpdater.Stage,
val read: Long, val total: Long?,
val read: Long, val total: Long?
) : State()
object Finishing : State()
@ -65,7 +58,7 @@ class SyncService : ConnectionService<SyncService.Binder>() {
private class Task(val repositoryId: Long, val manual: Boolean)
private data class CurrentTask(
val task: Task?, val disposable: Disposable,
val hasUpdates: Boolean, val lastState: State,
val hasUpdates: Boolean, val lastState: State
)
private enum class Started { NO, AUTO, MANUAL }
@ -79,7 +72,7 @@ class SyncService : ConnectionService<SyncService.Binder>() {
enum class SyncRequest { AUTO, MANUAL, FORCE }
inner class Binder : android.os.Binder() {
val finish: SharedFlow<Unit>
val finish: Observable<Unit>
get() = finishSubject
private fun sync(ids: List<Long>, request: SyncRequest) {
@ -159,6 +152,8 @@ class SyncService : ConnectionService<SyncService.Binder>() {
private val binder = Binder()
override fun onBind(intent: Intent): Binder = binder
private var stateDisposable: Disposable? = null
override fun onCreate() {
super.onCreate()
@ -175,17 +170,17 @@ class SyncService : ConnectionService<SyncService.Binder>() {
)
.let(notificationManager::createNotificationChannel)
}
scope.launch(Dispatchers.Default) {
stateSubject.collect {
publishForegroundState(false,
it)
}
}
stateDisposable = stateSubject
.sample(500L, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
.subscribe { publishForegroundState(false, it) }
}
override fun onDestroy() {
super.onDestroy()
scope.cancel()
stateDisposable?.dispose()
stateDisposable = null
cancelTasks { true }
cancelCurrentTask { true }
}
@ -354,20 +349,18 @@ class SyncService : ConnectionService<SyncService.Binder>() {
lateinit var disposable: Disposable
disposable = RepositoryUpdater
.update(this, repository, unstable) { stage, progress, total ->
scope.launch(Dispatchers.Default) {
if (!disposable.isDisposed) {
mutableStateSubject.emit(
State.Syncing(
repository.name,
stage,
progress,
total
)
if (!disposable.isDisposed) {
stateSubject.onNext(
State.Syncing(
repository.name,
stage,
progress,
total
)
}
)
}
}
.observeOn(io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { result, throwable ->
currentTask = null
throwable?.printStackTrace()
@ -398,7 +391,7 @@ class SyncService : ConnectionService<SyncService.Binder>() {
.toList()
}
}
.subscribeOn(io())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { result, throwable ->
throwable?.printStackTrace()
@ -411,7 +404,7 @@ class SyncService : ConnectionService<SyncService.Binder>() {
}
currentTask = CurrentTask(null, disposable, true, State.Finishing)
} else {
scope.launch(Dispatchers.Main) { mutableFinishSubject.emit(Unit) }
finishSubject.onNext(Unit)
val needStop = started == Started.MANUAL
started = Started.NO
if (needStop) {
@ -479,20 +472,23 @@ class SyncService : ConnectionService<SyncService.Binder>() {
class Job : JobService() {
private var syncParams: JobParameters? = null
private var syncDisposable: Disposable? = null
private val syncConnection =
Connection(SyncService::class.java, onBind = { connection, binder ->
MainScope().launch {
binder.finish.collect {
val params = syncParams
if (params != null) {
syncParams = null
connection.unbind(this@Job)
jobFinished(params, false)
}
syncDisposable = binder.finish.subscribe {
val params = syncParams
if (params != null) {
syncParams = null
syncDisposable?.dispose()
syncDisposable = null
connection.unbind(this)
jobFinished(params, false)
}
binder.sync(SyncRequest.AUTO)
}
binder.sync(SyncRequest.AUTO)
}, onUnbind = { _, binder ->
syncDisposable?.dispose()
syncDisposable = null
binder.cancelAuto()
val params = syncParams
if (params != null) {
@ -509,6 +505,8 @@ class SyncService : ConnectionService<SyncService.Binder>() {
override fun onStopJob(params: JobParameters): Boolean {
syncParams = null
syncDisposable?.dispose()
syncDisposable = null
val reschedule = syncConnection.binder?.cancelAuto() == true
syncConnection.unbind(this)
return reschedule