diff --git a/src/main/kotlin/com/looker/droidify/service/SyncService.kt b/src/main/kotlin/com/looker/droidify/service/SyncService.kt index 82592c7e..3982b6fb 100644 --- a/src/main/kotlin/com/looker/droidify/service/SyncService.kt +++ b/src/main/kotlin/com/looker/droidify/service/SyncService.kt @@ -29,20 +29,22 @@ import com.looker.droidify.utility.extension.android.notificationManager import com.looker.droidify.utility.extension.resources.getColorFromAttr import com.looker.droidify.utility.extension.text.formatSize import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers -import io.reactivex.rxjava3.core.Observable import io.reactivex.rxjava3.disposables.Disposable import io.reactivex.rxjava3.schedulers.Schedulers -import io.reactivex.rxjava3.subjects.PublishSubject +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* import java.lang.ref.WeakReference -import java.util.concurrent.TimeUnit import kotlin.math.roundToInt class SyncService : ConnectionService() { companion object { private const val ACTION_CANCEL = "${BuildConfig.APPLICATION_ID}.intent.action.CANCEL" - private val stateSubject = PublishSubject.create() - private val finishSubject = PublishSubject.create() + private val mutableStateSubject = MutableSharedFlow() + private val mutableFinishState = MutableSharedFlow() + + private val stateSubject = mutableStateSubject.asSharedFlow() + private val finishState = mutableFinishState.asSharedFlow() } private sealed class State { @@ -63,6 +65,8 @@ class SyncService : ConnectionService() { private enum class Started { NO, AUTO, MANUAL } + private val scope = CoroutineScope(Dispatchers.Default) + private var started = Started.NO private val tasks = mutableListOf() private var currentTask: CurrentTask? = null @@ -72,8 +76,8 @@ class SyncService : ConnectionService() { enum class SyncRequest { AUTO, MANUAL, FORCE } inner class Binder : android.os.Binder() { - val finish: Observable - get() = finishSubject + val finish: SharedFlow + get() = finishState private fun sync(ids: List, request: SyncRequest) { val cancelledTask = @@ -152,8 +156,6 @@ class SyncService : ConnectionService() { private val binder = Binder() override fun onBind(intent: Intent): Binder = binder - private var stateDisposable: Disposable? = null - override fun onCreate() { super.onCreate() @@ -171,16 +173,11 @@ class SyncService : ConnectionService() { .let(notificationManager::createNotificationChannel) } - stateDisposable = stateSubject - .sample(500L, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()) - .subscribe { publishForegroundState(false, it) } + stateSubject.onEach { publishForegroundState(false, it) }.launchIn(scope) } override fun onDestroy() { super.onDestroy() - - stateDisposable?.dispose() - stateDisposable = null cancelTasks { true } cancelCurrentTask { true } } @@ -350,14 +347,16 @@ class SyncService : ConnectionService() { disposable = RepositoryUpdater .update(this, repository, unstable) { stage, progress, total -> if (!disposable.isDisposed) { - stateSubject.onNext( - State.Syncing( - repository.name, - stage, - progress, - total + scope.launch { + mutableStateSubject.emit( + State.Syncing( + repository.name, + stage, + progress, + total + ) ) - ) + } } } .observeOn(AndroidSchedulers.mainThread()) @@ -404,7 +403,7 @@ class SyncService : ConnectionService() { } currentTask = CurrentTask(null, disposable, true, State.Finishing) } else { - finishSubject.onNext(Unit) + scope.launch { mutableFinishState.emit(Unit) } val needStop = started == Started.MANUAL started = Started.NO if (needStop) { @@ -471,25 +470,24 @@ class SyncService : ConnectionService() { } class Job : JobService() { + private val jobScope = CoroutineScope(Dispatchers.Default) private var syncParams: JobParameters? = null - private var syncDisposable: Disposable? = null private val syncConnection = Connection(SyncService::class.java, onBind = { connection, binder -> - syncDisposable = binder.finish.subscribe { - val params = syncParams - if (params != null) { - syncParams = null - syncDisposable?.dispose() - syncDisposable = null - connection.unbind(this) - jobFinished(params, false) + jobScope.launch { + binder.finish.collect { + val params = syncParams + if (params != null) { + syncParams = null + connection.unbind(this@Job) + jobFinished(params, false) + } } } binder.sync(SyncRequest.AUTO) }, onUnbind = { _, binder -> - syncDisposable?.dispose() - syncDisposable = null binder.cancelAuto() + jobScope.cancel() val params = syncParams if (params != null) { syncParams = null @@ -505,8 +503,7 @@ class SyncService : ConnectionService() { override fun onStopJob(params: JobParameters): Boolean { syncParams = null - syncDisposable?.dispose() - syncDisposable = null + jobScope.cancel() val reschedule = syncConnection.binder?.cancelAuto() == true syncConnection.unbind(this) return reschedule