Check if media processing finished before sending status (#2458)
* make MastodonApi.createStatus suspending * check if media processing has finished before sending status * add backoff for retrying processed media check
This commit is contained in:
parent
6062ec6b9e
commit
671d2c6a45
4 changed files with 103 additions and 83 deletions
|
@ -251,13 +251,15 @@ class ComposeViewModel @Inject constructor(
|
||||||
val sendObservable = media
|
val sendObservable = media
|
||||||
.filter { items -> items.all { it.uploadPercent == -1 } }
|
.filter { items -> items.all { it.uploadPercent == -1 } }
|
||||||
.map {
|
.map {
|
||||||
val mediaIds = ArrayList<String>()
|
val mediaIds: MutableList<String> = mutableListOf()
|
||||||
val mediaUris = ArrayList<Uri>()
|
val mediaUris: MutableList<Uri> = mutableListOf()
|
||||||
val mediaDescriptions = ArrayList<String>()
|
val mediaDescriptions: MutableList<String> = mutableListOf()
|
||||||
|
val mediaProcessed: MutableList<Boolean> = mutableListOf()
|
||||||
for (item in media.value!!) {
|
for (item in media.value!!) {
|
||||||
mediaIds.add(item.id!!)
|
mediaIds.add(item.id!!)
|
||||||
mediaUris.add(item.uri)
|
mediaUris.add(item.uri)
|
||||||
mediaDescriptions.add(item.description ?: "")
|
mediaDescriptions.add(item.description ?: "")
|
||||||
|
mediaProcessed.add(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
val tootToSend = StatusToSend(
|
val tootToSend = StatusToSend(
|
||||||
|
@ -276,7 +278,8 @@ class ComposeViewModel @Inject constructor(
|
||||||
accountId = accountManager.activeAccount!!.id,
|
accountId = accountManager.activeAccount!!.id,
|
||||||
draftId = draftId,
|
draftId = draftId,
|
||||||
idempotencyKey = randomAlphanumericString(16),
|
idempotencyKey = randomAlphanumericString(16),
|
||||||
retries = 0
|
retries = 0,
|
||||||
|
mediaProcessed = mediaProcessed
|
||||||
)
|
)
|
||||||
|
|
||||||
serviceClient.sendToot(tootToSend)
|
serviceClient.sendToot(tootToSend)
|
||||||
|
|
|
@ -156,13 +156,18 @@ interface MastodonApi {
|
||||||
@Field("description") description: String
|
@Field("description") description: String
|
||||||
): Result<Attachment>
|
): Result<Attachment>
|
||||||
|
|
||||||
|
@GET("api/v1/media/{mediaId}")
|
||||||
|
suspend fun getMedia(
|
||||||
|
@Path("mediaId") mediaId: String
|
||||||
|
): Response<MediaUploadResult>
|
||||||
|
|
||||||
@POST("api/v1/statuses")
|
@POST("api/v1/statuses")
|
||||||
fun createStatus(
|
suspend fun createStatus(
|
||||||
@Header("Authorization") auth: String,
|
@Header("Authorization") auth: String,
|
||||||
@Header(DOMAIN_HEADER) domain: String,
|
@Header(DOMAIN_HEADER) domain: String,
|
||||||
@Header("Idempotency-Key") idempotencyKey: String,
|
@Header("Idempotency-Key") idempotencyKey: String,
|
||||||
@Body status: NewStatus
|
@Body status: NewStatus
|
||||||
): Call<Status>
|
): Result<Status>
|
||||||
|
|
||||||
@GET("api/v1/statuses/{id}")
|
@GET("api/v1/statuses/{id}")
|
||||||
fun status(
|
fun status(
|
||||||
|
|
|
@ -100,7 +100,8 @@ class SendStatusBroadcastReceiver : BroadcastReceiver() {
|
||||||
accountId = account.id,
|
accountId = account.id,
|
||||||
draftId = -1,
|
draftId = -1,
|
||||||
idempotencyKey = randomAlphanumericString(16),
|
idempotencyKey = randomAlphanumericString(16),
|
||||||
retries = 0
|
retries = 0,
|
||||||
|
mediaProcessed = mutableListOf()
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -11,6 +11,7 @@ import android.content.Intent
|
||||||
import android.os.Build
|
import android.os.Build
|
||||||
import android.os.IBinder
|
import android.os.IBinder
|
||||||
import android.os.Parcelable
|
import android.os.Parcelable
|
||||||
|
import android.util.Log
|
||||||
import androidx.core.app.NotificationCompat
|
import androidx.core.app.NotificationCompat
|
||||||
import androidx.core.app.ServiceCompat
|
import androidx.core.app.ServiceCompat
|
||||||
import androidx.core.content.ContextCompat
|
import androidx.core.content.ContextCompat
|
||||||
|
@ -29,13 +30,12 @@ import com.keylesspalace.tusky.network.MastodonApi
|
||||||
import dagger.android.AndroidInjection
|
import dagger.android.AndroidInjection
|
||||||
import kotlinx.coroutines.CoroutineScope
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import kotlinx.coroutines.Dispatchers
|
import kotlinx.coroutines.Dispatchers
|
||||||
|
import kotlinx.coroutines.Job
|
||||||
import kotlinx.coroutines.SupervisorJob
|
import kotlinx.coroutines.SupervisorJob
|
||||||
import kotlinx.coroutines.delay
|
import kotlinx.coroutines.delay
|
||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.launch
|
||||||
import kotlinx.parcelize.Parcelize
|
import kotlinx.parcelize.Parcelize
|
||||||
import retrofit2.Call
|
import retrofit2.HttpException
|
||||||
import retrofit2.Callback
|
|
||||||
import retrofit2.Response
|
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import javax.inject.Inject
|
import javax.inject.Inject
|
||||||
|
@ -55,7 +55,7 @@ class SendStatusService : Service(), Injectable {
|
||||||
private val serviceScope = CoroutineScope(Dispatchers.Main + supervisorJob)
|
private val serviceScope = CoroutineScope(Dispatchers.Main + supervisorJob)
|
||||||
|
|
||||||
private val statusesToSend = ConcurrentHashMap<Int, StatusToSend>()
|
private val statusesToSend = ConcurrentHashMap<Int, StatusToSend>()
|
||||||
private val sendCalls = ConcurrentHashMap<Int, Call<Status>>()
|
private val sendJobs = ConcurrentHashMap<Int, Job>()
|
||||||
|
|
||||||
private val notificationManager by lazy { getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager }
|
private val notificationManager by lazy { getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager }
|
||||||
|
|
||||||
|
@ -64,12 +64,9 @@ class SendStatusService : Service(), Injectable {
|
||||||
super.onCreate()
|
super.onCreate()
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onBind(intent: Intent): IBinder? {
|
override fun onBind(intent: Intent): IBinder? = null
|
||||||
return null
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun onStartCommand(intent: Intent, flags: Int, startId: Int): Int {
|
override fun onStartCommand(intent: Intent, flags: Int, startId: Int): Int {
|
||||||
|
|
||||||
if (intent.hasExtra(KEY_STATUS)) {
|
if (intent.hasExtra(KEY_STATUS)) {
|
||||||
val statusToSend = intent.getParcelableExtra<StatusToSend>(KEY_STATUS)
|
val statusToSend = intent.getParcelableExtra<StatusToSend>(KEY_STATUS)
|
||||||
?: throw IllegalStateException("SendStatusService started without $KEY_STATUS extra")
|
?: throw IllegalStateException("SendStatusService started without $KEY_STATUS extra")
|
||||||
|
@ -129,6 +126,25 @@ class SendStatusService : Service(), Injectable {
|
||||||
|
|
||||||
statusToSend.retries++
|
statusToSend.retries++
|
||||||
|
|
||||||
|
sendJobs[statusId] = serviceScope.launch {
|
||||||
|
try {
|
||||||
|
var mediaCheckRetries = 0
|
||||||
|
while (statusToSend.mediaProcessed.any { !it }) {
|
||||||
|
delay(1000L * mediaCheckRetries)
|
||||||
|
statusToSend.mediaProcessed.forEachIndexed { index, processed ->
|
||||||
|
if (!processed) {
|
||||||
|
// Mastodon returns 206 if the media was not yet processed
|
||||||
|
statusToSend.mediaProcessed[index] = mastodonApi.getMedia(statusToSend.mediaIds[index]).code() == 200
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mediaCheckRetries ++
|
||||||
|
}
|
||||||
|
} catch (e: Exception) {
|
||||||
|
Log.w(TAG, "failed getting media status", e)
|
||||||
|
retrySending(statusId)
|
||||||
|
return@launch
|
||||||
|
}
|
||||||
|
|
||||||
val newStatus = NewStatus(
|
val newStatus = NewStatus(
|
||||||
statusToSend.text,
|
statusToSend.text,
|
||||||
statusToSend.warningText,
|
statusToSend.warningText,
|
||||||
|
@ -140,37 +156,32 @@ class SendStatusService : Service(), Injectable {
|
||||||
statusToSend.poll
|
statusToSend.poll
|
||||||
)
|
)
|
||||||
|
|
||||||
val sendCall = mastodonApi.createStatus(
|
mastodonApi.createStatus(
|
||||||
"Bearer " + account.accessToken,
|
"Bearer " + account.accessToken,
|
||||||
account.domain,
|
account.domain,
|
||||||
statusToSend.idempotencyKey,
|
statusToSend.idempotencyKey,
|
||||||
newStatus
|
newStatus
|
||||||
)
|
).fold({ sentStatus ->
|
||||||
|
|
||||||
sendCalls[statusId] = sendCall
|
|
||||||
|
|
||||||
val callback = object : Callback<Status> {
|
|
||||||
override fun onResponse(call: Call<Status>, response: Response<Status>) {
|
|
||||||
serviceScope.launch {
|
|
||||||
|
|
||||||
val scheduled = !statusToSend.scheduledAt.isNullOrEmpty()
|
|
||||||
statusesToSend.remove(statusId)
|
statusesToSend.remove(statusId)
|
||||||
|
|
||||||
if (response.isSuccessful) {
|
|
||||||
// If the status was loaded from a draft, delete the draft and associated media files.
|
// If the status was loaded from a draft, delete the draft and associated media files.
|
||||||
if (statusToSend.draftId != 0) {
|
if (statusToSend.draftId != 0) {
|
||||||
draftHelper.deleteDraftAndAttachments(statusToSend.draftId)
|
draftHelper.deleteDraftAndAttachments(statusToSend.draftId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val scheduled = !statusToSend.scheduledAt.isNullOrEmpty()
|
||||||
|
|
||||||
if (scheduled) {
|
if (scheduled) {
|
||||||
response.body()?.let(::StatusScheduledEvent)?.let(eventHub::dispatch)
|
eventHub.dispatch(StatusScheduledEvent(sentStatus))
|
||||||
} else {
|
} else {
|
||||||
response.body()?.let(::StatusComposedEvent)?.let(eventHub::dispatch)
|
eventHub.dispatch(StatusComposedEvent(sentStatus))
|
||||||
}
|
}
|
||||||
|
|
||||||
notificationManager.cancel(statusId)
|
notificationManager.cancel(statusId)
|
||||||
} else {
|
}, { throwable ->
|
||||||
|
Log.w(TAG, "failed sending status", throwable)
|
||||||
|
if (throwable is HttpException) {
|
||||||
// the server refused to accept the status, save status & show error message
|
// the server refused to accept the status, save status & show error message
|
||||||
|
statusesToSend.remove(statusId)
|
||||||
saveStatusToDrafts(statusToSend)
|
saveStatusToDrafts(statusToSend)
|
||||||
|
|
||||||
val builder = NotificationCompat.Builder(this@SendStatusService, CHANNEL_ID)
|
val builder = NotificationCompat.Builder(this@SendStatusService, CHANNEL_ID)
|
||||||
|
@ -186,26 +197,24 @@ class SendStatusService : Service(), Injectable {
|
||||||
|
|
||||||
notificationManager.cancel(statusId)
|
notificationManager.cancel(statusId)
|
||||||
notificationManager.notify(errorNotificationId--, builder.build())
|
notificationManager.notify(errorNotificationId--, builder.build())
|
||||||
|
} else {
|
||||||
|
// a network problem occurred, let's retry sending the status
|
||||||
|
retrySending(statusId)
|
||||||
}
|
}
|
||||||
|
})
|
||||||
stopSelfWhenDone()
|
stopSelfWhenDone()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onFailure(call: Call<Status>, t: Throwable) {
|
private suspend fun retrySending(statusId: Int) {
|
||||||
serviceScope.launch {
|
// when statusToSend == null, sending has been canceled
|
||||||
var backoff = TimeUnit.SECONDS.toMillis(statusToSend.retries.toLong())
|
val statusToSend = statusesToSend[statusId] ?: return
|
||||||
if (backoff > MAX_RETRY_INTERVAL) {
|
|
||||||
backoff = MAX_RETRY_INTERVAL
|
val backoff = TimeUnit.SECONDS.toMillis(statusToSend.retries.toLong()).coerceAtMost(MAX_RETRY_INTERVAL)
|
||||||
}
|
|
||||||
|
|
||||||
delay(backoff)
|
delay(backoff)
|
||||||
sendStatus(statusId)
|
sendStatus(statusId)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
sendCall.enqueue(callback)
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun stopSelfWhenDone() {
|
private fun stopSelfWhenDone() {
|
||||||
|
|
||||||
|
@ -218,8 +227,8 @@ class SendStatusService : Service(), Injectable {
|
||||||
private fun cancelSending(statusId: Int) = serviceScope.launch {
|
private fun cancelSending(statusId: Int) = serviceScope.launch {
|
||||||
val statusToCancel = statusesToSend.remove(statusId)
|
val statusToCancel = statusesToSend.remove(statusId)
|
||||||
if (statusToCancel != null) {
|
if (statusToCancel != null) {
|
||||||
val sendCall = sendCalls.remove(statusId)
|
val sendJob = sendJobs.remove(statusId)
|
||||||
sendCall?.cancel()
|
sendJob?.cancel()
|
||||||
|
|
||||||
saveStatusToDrafts(statusToCancel)
|
saveStatusToDrafts(statusToCancel)
|
||||||
|
|
||||||
|
@ -263,6 +272,7 @@ class SendStatusService : Service(), Injectable {
|
||||||
}
|
}
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
|
private const val TAG = "SendStatusService"
|
||||||
|
|
||||||
private const val KEY_STATUS = "status"
|
private const val KEY_STATUS = "status"
|
||||||
private const val KEY_CANCEL = "cancel_id"
|
private const val KEY_CANCEL = "cancel_id"
|
||||||
|
@ -319,5 +329,6 @@ data class StatusToSend(
|
||||||
val accountId: Long,
|
val accountId: Long,
|
||||||
val draftId: Int,
|
val draftId: Int,
|
||||||
val idempotencyKey: String,
|
val idempotencyKey: String,
|
||||||
var retries: Int
|
var retries: Int,
|
||||||
|
val mediaProcessed: MutableList<Boolean>
|
||||||
) : Parcelable
|
) : Parcelable
|
||||||
|
|
Loading…
Reference in a new issue