From 6b0eda14a11ba243e7adf3900aae75ba23980c96 Mon Sep 17 00:00:00 2001 From: ThibG Date: Sat, 16 Mar 2019 20:18:47 +0100 Subject: [PATCH] Avoid race condition when streaming deleted statuses (#10280) * Avoid race condition when streaming deleted statuses * Move redis lock to DistributionWorker to avoid extra Redis value --- app/services/remove_status_service.rb | 28 ++++++++++++++++++--------- app/workers/distribution_worker.rb | 8 +++++++- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb index 98875429d..747f209f3 100644 --- a/app/services/remove_status_service.rb +++ b/app/services/remove_status_service.rb @@ -14,16 +14,22 @@ class RemoveStatusService < BaseService @stream_entry = status.stream_entry @options = options - remove_from_self if status.account.local? - remove_from_followers - remove_from_lists - remove_from_affected - remove_reblogs - remove_from_hashtags - remove_from_public - remove_from_media if status.media_attachments.any? + RedisLock.acquire(lock_options) do |lock| + if lock.acquired? + remove_from_self if status.account.local? + remove_from_followers + remove_from_lists + remove_from_affected + remove_reblogs + remove_from_hashtags + remove_from_public + remove_from_media if status.media_attachments.any? - @status.destroy! + @status.destroy! + else + raise Mastodon::RaceConditionError + end + end # There is no reason to send out Undo activities when the # cause is that the original object has been removed, since @@ -156,4 +162,8 @@ class RemoveStatusService < BaseService redis.publish('timeline:public:media', @payload) redis.publish('timeline:public:local:media', @payload) if @status.local? end + + def lock_options + { redis: Redis.current, key: "distribute:#{@status.id}" } + end end diff --git a/app/workers/distribution_worker.rb b/app/workers/distribution_worker.rb index f423d43ae..4e20ef31b 100644 --- a/app/workers/distribution_worker.rb +++ b/app/workers/distribution_worker.rb @@ -4,7 +4,13 @@ class DistributionWorker include Sidekiq::Worker def perform(status_id) - FanOutOnWriteService.new.call(Status.find(status_id)) + RedisLock.acquire(redis: Redis.current, key: "distribute:#{status_id}") do |lock| + if lock.acquired? + FanOutOnWriteService.new.call(Status.find(status_id)) + else + raise Mastodon::RaceConditionError + end + end rescue ActiveRecord::RecordNotFound true end