From ed49230d4ebde1b43f909e31d7e44c56b3e2a893 Mon Sep 17 00:00:00 2001 From: LooKeR Date: Sat, 13 Nov 2021 10:13:15 +0530 Subject: [PATCH] Revert: Revert Back to RxJava for Now --- .../looker/droidify/service/SyncService.kt | 90 +++++++++---------- 1 file changed, 44 insertions(+), 46 deletions(-) diff --git a/src/main/kotlin/com/looker/droidify/service/SyncService.kt b/src/main/kotlin/com/looker/droidify/service/SyncService.kt index 34ee5af1..2306107d 100644 --- a/src/main/kotlin/com/looker/droidify/service/SyncService.kt +++ b/src/main/kotlin/com/looker/droidify/service/SyncService.kt @@ -29,34 +29,27 @@ import com.looker.droidify.utility.extension.android.notificationManager import com.looker.droidify.utility.extension.resources.getColorFromAttr import com.looker.droidify.utility.extension.text.formatSize import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers +import io.reactivex.rxjava3.core.Observable import io.reactivex.rxjava3.disposables.Disposable -import io.reactivex.rxjava3.schedulers.Schedulers.io -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.SharedFlow -import kotlinx.coroutines.flow.asSharedFlow -import kotlinx.coroutines.flow.collect +import io.reactivex.rxjava3.schedulers.Schedulers +import io.reactivex.rxjava3.subjects.PublishSubject import java.lang.ref.WeakReference +import java.util.concurrent.TimeUnit import kotlin.math.roundToInt class SyncService : ConnectionService() { - private val scope = CoroutineScope(Dispatchers.Default) - companion object { private const val ACTION_CANCEL = "${BuildConfig.APPLICATION_ID}.intent.action.CANCEL" - private val mutableStateSubject = MutableSharedFlow() - private val mutableFinishSubject = MutableSharedFlow() - - private val stateSubject = mutableStateSubject.asSharedFlow() - private val finishSubject = mutableFinishSubject.asSharedFlow() + private val stateSubject = PublishSubject.create() + private val finishSubject = PublishSubject.create() } private sealed class State { data class Connecting(val name: String) : State() data class Syncing( val name: String, val stage: RepositoryUpdater.Stage, - val read: Long, val total: Long?, + val read: Long, val total: Long? ) : State() object Finishing : State() @@ -65,7 +58,7 @@ class SyncService : ConnectionService() { private class Task(val repositoryId: Long, val manual: Boolean) private data class CurrentTask( val task: Task?, val disposable: Disposable, - val hasUpdates: Boolean, val lastState: State, + val hasUpdates: Boolean, val lastState: State ) private enum class Started { NO, AUTO, MANUAL } @@ -79,7 +72,7 @@ class SyncService : ConnectionService() { enum class SyncRequest { AUTO, MANUAL, FORCE } inner class Binder : android.os.Binder() { - val finish: SharedFlow + val finish: Observable get() = finishSubject private fun sync(ids: List, request: SyncRequest) { @@ -159,6 +152,8 @@ class SyncService : ConnectionService() { private val binder = Binder() override fun onBind(intent: Intent): Binder = binder + private var stateDisposable: Disposable? = null + override fun onCreate() { super.onCreate() @@ -175,17 +170,17 @@ class SyncService : ConnectionService() { ) .let(notificationManager::createNotificationChannel) } - scope.launch(Dispatchers.Default) { - stateSubject.collect { - publishForegroundState(false, - it) - } - } + + stateDisposable = stateSubject + .sample(500L, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()) + .subscribe { publishForegroundState(false, it) } } override fun onDestroy() { super.onDestroy() - scope.cancel() + + stateDisposable?.dispose() + stateDisposable = null cancelTasks { true } cancelCurrentTask { true } } @@ -354,20 +349,18 @@ class SyncService : ConnectionService() { lateinit var disposable: Disposable disposable = RepositoryUpdater .update(this, repository, unstable) { stage, progress, total -> - scope.launch(Dispatchers.Default) { - if (!disposable.isDisposed) { - mutableStateSubject.emit( - State.Syncing( - repository.name, - stage, - progress, - total - ) + if (!disposable.isDisposed) { + stateSubject.onNext( + State.Syncing( + repository.name, + stage, + progress, + total ) - } + ) } } - .observeOn(io()) + .observeOn(AndroidSchedulers.mainThread()) .subscribe { result, throwable -> currentTask = null throwable?.printStackTrace() @@ -398,7 +391,7 @@ class SyncService : ConnectionService() { .toList() } } - .subscribeOn(io()) + .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe { result, throwable -> throwable?.printStackTrace() @@ -411,7 +404,7 @@ class SyncService : ConnectionService() { } currentTask = CurrentTask(null, disposable, true, State.Finishing) } else { - scope.launch(Dispatchers.Main) { mutableFinishSubject.emit(Unit) } + finishSubject.onNext(Unit) val needStop = started == Started.MANUAL started = Started.NO if (needStop) { @@ -479,20 +472,23 @@ class SyncService : ConnectionService() { class Job : JobService() { private var syncParams: JobParameters? = null + private var syncDisposable: Disposable? = null private val syncConnection = Connection(SyncService::class.java, onBind = { connection, binder -> - MainScope().launch { - binder.finish.collect { - val params = syncParams - if (params != null) { - syncParams = null - connection.unbind(this@Job) - jobFinished(params, false) - } + syncDisposable = binder.finish.subscribe { + val params = syncParams + if (params != null) { + syncParams = null + syncDisposable?.dispose() + syncDisposable = null + connection.unbind(this) + jobFinished(params, false) } - binder.sync(SyncRequest.AUTO) } + binder.sync(SyncRequest.AUTO) }, onUnbind = { _, binder -> + syncDisposable?.dispose() + syncDisposable = null binder.cancelAuto() val params = syncParams if (params != null) { @@ -509,9 +505,11 @@ class SyncService : ConnectionService() { override fun onStopJob(params: JobParameters): Boolean { syncParams = null + syncDisposable?.dispose() + syncDisposable = null val reschedule = syncConnection.binder?.cancelAuto() == true syncConnection.unbind(this) return reschedule } } -} +} \ No newline at end of file