2d12948220
There is an idempotency key generated by clients when authoring a post, and stored in Redis, to ensure that if a user or client retries posting the same status, we don't get a duplicate. Hachyderm.io has been experiencing some filesystem and database performance issues, causing database writes to be slow. This can mean that there are successful posts, but the reverse proxy returns 504 Gateway Timeout before the idempotency status has been updated; users or clients who retry (such as Tusky which retries automatically, see tuskyapp/Tusky#2951) can re-try the same post with the same idempotency key before it has actually been recorded in Redis, leading to duplicate posts. To address this issue, move all of the database updates after the initial transaction that creates the status into the `postprocess_status!` method, so we can insert the idempotency key immediately after the status has been created, significantly reducing the window in which the status could be created but the idempotency key not yet stored. Note: this has not yet been tested; I'm submitting this PR for discussion and to offer to the Hachyderm.io admins to try out to fix the multiple posting problem. Co-authored-by: Brian Campbell <brcampbell@beta.team>
200 lines
6.3 KiB
Ruby
200 lines
6.3 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
class PostStatusService < BaseService
|
|
include Redisable
|
|
include LanguagesHelper
|
|
|
|
MIN_SCHEDULE_OFFSET = 5.minutes.freeze
|
|
|
|
# Post a text status update, fetch and notify remote users mentioned
|
|
# @param [Account] account Account from which to post
|
|
# @param [Hash] options
|
|
# @option [String] :text Message
|
|
# @option [Status] :thread Optional status to reply to
|
|
# @option [Boolean] :sensitive
|
|
# @option [String] :visibility
|
|
# @option [String] :spoiler_text
|
|
# @option [String] :language
|
|
# @option [String] :scheduled_at
|
|
# @option [Hash] :poll Optional poll to attach
|
|
# @option [Enumerable] :media_ids Optional array of media IDs to attach
|
|
# @option [Doorkeeper::Application] :application
|
|
# @option [String] :idempotency Optional idempotency key
|
|
# @option [Boolean] :with_rate_limit
|
|
# @return [Status]
|
|
def call(account, options = {})
|
|
@account = account
|
|
@options = options
|
|
@text = @options[:text] || ''
|
|
@in_reply_to = @options[:thread]
|
|
|
|
return idempotency_duplicate if idempotency_given? && idempotency_duplicate?
|
|
|
|
validate_media!
|
|
preprocess_attributes!
|
|
|
|
if scheduled?
|
|
schedule_status!
|
|
else
|
|
process_status!
|
|
end
|
|
|
|
redis.setex(idempotency_key, 3_600, @status.id) if idempotency_given?
|
|
|
|
unless scheduled?
|
|
postprocess_status!
|
|
bump_potential_friendship!
|
|
end
|
|
|
|
@status
|
|
end
|
|
|
|
private
|
|
|
|
def preprocess_attributes!
|
|
@sensitive = (@options[:sensitive].nil? ? @account.user&.setting_default_sensitive : @options[:sensitive]) || @options[:spoiler_text].present?
|
|
@text = @options.delete(:spoiler_text) if @text.blank? && @options[:spoiler_text].present?
|
|
@visibility = @options[:visibility] || @account.user&.setting_default_privacy
|
|
@visibility = :unlisted if @visibility&.to_sym == :public && @account.silenced?
|
|
@scheduled_at = @options[:scheduled_at]&.to_datetime
|
|
@scheduled_at = nil if scheduled_in_the_past?
|
|
rescue ArgumentError
|
|
raise ActiveRecord::RecordInvalid
|
|
end
|
|
|
|
def process_status!
|
|
# The following transaction block is needed to wrap the UPDATEs to
|
|
# the media attachments when the status is created
|
|
|
|
ApplicationRecord.transaction do
|
|
@status = @account.statuses.create!(status_attributes)
|
|
end
|
|
end
|
|
|
|
def schedule_status!
|
|
status_for_validation = @account.statuses.build(status_attributes)
|
|
|
|
if status_for_validation.valid?
|
|
# Marking the status as destroyed is necessary to prevent the status from being
|
|
# persisted when the associated media attachments get updated when creating the
|
|
# scheduled status.
|
|
status_for_validation.destroy
|
|
|
|
# The following transaction block is needed to wrap the UPDATEs to
|
|
# the media attachments when the scheduled status is created
|
|
|
|
ApplicationRecord.transaction do
|
|
@status = @account.scheduled_statuses.create!(scheduled_status_attributes)
|
|
end
|
|
else
|
|
raise ActiveRecord::RecordInvalid
|
|
end
|
|
end
|
|
|
|
def postprocess_status!
|
|
process_hashtags_service.call(@status)
|
|
process_mentions_service.call(@status)
|
|
Trends.tags.register(@status)
|
|
LinkCrawlWorker.perform_async(@status.id)
|
|
DistributionWorker.perform_async(@status.id)
|
|
ActivityPub::DistributionWorker.perform_async(@status.id)
|
|
PollExpirationNotifyWorker.perform_at(@status.poll.expires_at, @status.poll.id) if @status.poll
|
|
end
|
|
|
|
def validate_media!
|
|
if @options[:media_ids].blank? || !@options[:media_ids].is_a?(Enumerable)
|
|
@media = []
|
|
return
|
|
end
|
|
|
|
raise Mastodon::ValidationError, I18n.t('media_attachments.validations.too_many') if @options[:media_ids].size > 4 || @options[:poll].present?
|
|
|
|
@media = @account.media_attachments.where(status_id: nil).where(id: @options[:media_ids].take(4).map(&:to_i))
|
|
|
|
raise Mastodon::ValidationError, I18n.t('media_attachments.validations.images_and_video') if @media.size > 1 && @media.find(&:audio_or_video?)
|
|
raise Mastodon::ValidationError, I18n.t('media_attachments.validations.not_ready') if @media.any?(&:not_processed?)
|
|
end
|
|
|
|
def process_mentions_service
|
|
ProcessMentionsService.new
|
|
end
|
|
|
|
def process_hashtags_service
|
|
ProcessHashtagsService.new
|
|
end
|
|
|
|
def scheduled?
|
|
@scheduled_at.present?
|
|
end
|
|
|
|
def idempotency_key
|
|
"idempotency:status:#{@account.id}:#{@options[:idempotency]}"
|
|
end
|
|
|
|
def idempotency_given?
|
|
@options[:idempotency].present?
|
|
end
|
|
|
|
def idempotency_duplicate
|
|
if scheduled?
|
|
@account.schedule_statuses.find(@idempotency_duplicate)
|
|
else
|
|
@account.statuses.find(@idempotency_duplicate)
|
|
end
|
|
end
|
|
|
|
def idempotency_duplicate?
|
|
@idempotency_duplicate = redis.get(idempotency_key)
|
|
end
|
|
|
|
def scheduled_in_the_past?
|
|
@scheduled_at.present? && @scheduled_at <= Time.now.utc + MIN_SCHEDULE_OFFSET
|
|
end
|
|
|
|
def bump_potential_friendship!
|
|
return if !@status.reply? || @account.id == @status.in_reply_to_account_id
|
|
ActivityTracker.increment('activity:interactions')
|
|
return if @account.following?(@status.in_reply_to_account_id)
|
|
PotentialFriendshipTracker.record(@account.id, @status.in_reply_to_account_id, :reply)
|
|
end
|
|
|
|
def status_attributes
|
|
{
|
|
text: @text,
|
|
media_attachments: @media || [],
|
|
ordered_media_attachment_ids: (@options[:media_ids] || []).map(&:to_i) & @media.map(&:id),
|
|
thread: @in_reply_to,
|
|
poll_attributes: poll_attributes,
|
|
sensitive: @sensitive,
|
|
spoiler_text: @options[:spoiler_text] || '',
|
|
visibility: @visibility,
|
|
language: valid_locale_cascade(@options[:language], @account.user&.preferred_posting_language, I18n.default_locale),
|
|
application: @options[:application],
|
|
rate_limit: @options[:with_rate_limit],
|
|
}.compact
|
|
end
|
|
|
|
def scheduled_status_attributes
|
|
{
|
|
scheduled_at: @scheduled_at,
|
|
media_attachments: @media || [],
|
|
params: scheduled_options,
|
|
}
|
|
end
|
|
|
|
def poll_attributes
|
|
return if @options[:poll].blank?
|
|
|
|
@options[:poll].merge(account: @account, voters_count: 0)
|
|
end
|
|
|
|
def scheduled_options
|
|
@options.tap do |options_hash|
|
|
options_hash[:in_reply_to_id] = options_hash.delete(:thread)&.id
|
|
options_hash[:application_id] = options_hash.delete(:application)&.id
|
|
options_hash[:scheduled_at] = nil
|
|
options_hash[:idempotency] = nil
|
|
options_hash[:with_rate_limit] = false
|
|
end
|
|
end
|
|
end
|