Change sidekiq-bulk's batch size from 10,000 to 1,000 jobs in one Redis call (#24034)
This commit is contained in:
		
					parent
					
						
							
								ed887271f3
							
						
					
				
			
			
				commit
				
					
						1d0ad558ff
					
				
			
		
					 10 changed files with 16 additions and 16 deletions
				
			
		|  | @ -12,7 +12,7 @@ class ActivityPub::Forwarder | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|   def forward! |   def forward! | ||||||
|     ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes) do |inbox_url| |     ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes, limit: 1_000) do |inbox_url| | ||||||
|       [payload, signature_account_id, inbox_url] |       [payload, signature_account_id, inbox_url] | ||||||
|     end |     end | ||||||
|   end |   end | ||||||
|  |  | ||||||
|  | @ -257,11 +257,11 @@ class DeleteAccountService < BaseService | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|   def delete_actor! |   def delete_actor! | ||||||
|     ActivityPub::DeliveryWorker.push_bulk(delivery_inboxes) do |inbox_url| |     ActivityPub::DeliveryWorker.push_bulk(delivery_inboxes, limit: 1_000) do |inbox_url| | ||||||
|       [delete_actor_json, @account.id, inbox_url] |       [delete_actor_json, @account.id, inbox_url] | ||||||
|     end |     end | ||||||
| 
 | 
 | ||||||
|     ActivityPub::LowPriorityDeliveryWorker.push_bulk(low_priority_delivery_inboxes) do |inbox_url| |     ActivityPub::LowPriorityDeliveryWorker.push_bulk(low_priority_delivery_inboxes, limit: 1_000) do |inbox_url| | ||||||
|       [delete_actor_json, @account.id, inbox_url] |       [delete_actor_json, @account.id, inbox_url] | ||||||
|     end |     end | ||||||
|   end |   end | ||||||
|  |  | ||||||
|  | @ -88,7 +88,7 @@ class RemoveStatusService < BaseService | ||||||
| 
 | 
 | ||||||
|     status_reach_finder = StatusReachFinder.new(@status, unsafe: true) |     status_reach_finder = StatusReachFinder.new(@status, unsafe: true) | ||||||
| 
 | 
 | ||||||
|     ActivityPub::DeliveryWorker.push_bulk(status_reach_finder.inboxes) do |inbox_url| |     ActivityPub::DeliveryWorker.push_bulk(status_reach_finder.inboxes, limit: 1_000) do |inbox_url| | ||||||
|       [signed_activity_json, @account.id, inbox_url] |       [signed_activity_json, @account.id, inbox_url] | ||||||
|     end |     end | ||||||
|   end |   end | ||||||
|  |  | ||||||
|  | @ -31,21 +31,21 @@ class SuspendAccountService < BaseService | ||||||
|     # counterpart to this operation, i.e. you can't then force a remote |     # counterpart to this operation, i.e. you can't then force a remote | ||||||
|     # account to re-follow you, so this part is not reversible. |     # account to re-follow you, so this part is not reversible. | ||||||
| 
 | 
 | ||||||
|     follows = Follow.where(account: @account).to_a |     Follow.where(account: @account).find_in_batches do |follows| | ||||||
| 
 |  | ||||||
|       ActivityPub::DeliveryWorker.push_bulk(follows) do |follow| |       ActivityPub::DeliveryWorker.push_bulk(follows) do |follow| | ||||||
|         [Oj.dump(serialize_payload(follow, ActivityPub::RejectFollowSerializer)), follow.target_account_id, @account.inbox_url] |         [Oj.dump(serialize_payload(follow, ActivityPub::RejectFollowSerializer)), follow.target_account_id, @account.inbox_url] | ||||||
|       end |       end | ||||||
| 
 | 
 | ||||||
|       follows.each(&:destroy) |       follows.each(&:destroy) | ||||||
|     end |     end | ||||||
|  |   end | ||||||
| 
 | 
 | ||||||
|   def distribute_update_actor! |   def distribute_update_actor! | ||||||
|     return unless @account.local? |     return unless @account.local? | ||||||
| 
 | 
 | ||||||
|     account_reach_finder = AccountReachFinder.new(@account) |     account_reach_finder = AccountReachFinder.new(@account) | ||||||
| 
 | 
 | ||||||
|     ActivityPub::DeliveryWorker.push_bulk(account_reach_finder.inboxes) do |inbox_url| |     ActivityPub::DeliveryWorker.push_bulk(account_reach_finder.inboxes, limit: 1_000) do |inbox_url| | ||||||
|       [signed_activity_json, @account.id, inbox_url] |       [signed_activity_json, @account.id, inbox_url] | ||||||
|     end |     end | ||||||
|   end |   end | ||||||
|  |  | ||||||
|  | @ -41,7 +41,7 @@ class UnsuspendAccountService < BaseService | ||||||
| 
 | 
 | ||||||
|     account_reach_finder = AccountReachFinder.new(@account) |     account_reach_finder = AccountReachFinder.new(@account) | ||||||
| 
 | 
 | ||||||
|     ActivityPub::DeliveryWorker.push_bulk(account_reach_finder.inboxes) do |inbox_url| |     ActivityPub::DeliveryWorker.push_bulk(account_reach_finder.inboxes, limit: 1_000) do |inbox_url| | ||||||
|       [signed_activity_json, @account.id, inbox_url] |       [signed_activity_json, @account.id, inbox_url] | ||||||
|     end |     end | ||||||
|   end |   end | ||||||
|  |  | ||||||
|  | @ -22,7 +22,7 @@ class UpdateAccountService < BaseService | ||||||
|   def authorize_all_follow_requests(account) |   def authorize_all_follow_requests(account) | ||||||
|     follow_requests = FollowRequest.where(target_account: account) |     follow_requests = FollowRequest.where(target_account: account) | ||||||
|     follow_requests = follow_requests.preload(:account).select { |req| !req.account.silenced? } |     follow_requests = follow_requests.preload(:account).select { |req| !req.account.silenced? } | ||||||
|     AuthorizeFollowWorker.push_bulk(follow_requests) do |req| |     AuthorizeFollowWorker.push_bulk(follow_requests, limit: 1_000) do |req| | ||||||
|       [req.account_id, req.target_account_id] |       [req.account_id, req.target_account_id] | ||||||
|     end |     end | ||||||
|   end |   end | ||||||
|  |  | ||||||
|  | @ -12,7 +12,7 @@ class ActivityPub::DistributePollUpdateWorker | ||||||
| 
 | 
 | ||||||
|     return unless @status.preloadable_poll |     return unless @status.preloadable_poll | ||||||
| 
 | 
 | ||||||
|     ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| |     ActivityPub::DeliveryWorker.push_bulk(inboxes, limit: 1_000) do |inbox_url| | ||||||
|       [payload, @account.id, inbox_url] |       [payload, @account.id, inbox_url] | ||||||
|     end |     end | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -10,7 +10,7 @@ class ActivityPub::MoveDistributionWorker | ||||||
|     @migration = AccountMigration.find(migration_id) |     @migration = AccountMigration.find(migration_id) | ||||||
|     @account   = @migration.account |     @account   = @migration.account | ||||||
| 
 | 
 | ||||||
|     ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| |     ActivityPub::DeliveryWorker.push_bulk(inboxes, limit: 1_000) do |inbox_url| | ||||||
|       [signed_payload, @account.id, inbox_url] |       [signed_payload, @account.id, inbox_url] | ||||||
|     end |     end | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -25,7 +25,7 @@ class ActivityPub::RawDistributionWorker | ||||||
|   def distribute! |   def distribute! | ||||||
|     return if inboxes.empty? |     return if inboxes.empty? | ||||||
| 
 | 
 | ||||||
|     ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| |     ActivityPub::DeliveryWorker.push_bulk(inboxes, limit: 1_000) do |inbox_url| | ||||||
|       [payload, source_account_id, inbox_url, options] |       [payload, source_account_id, inbox_url, options] | ||||||
|     end |     end | ||||||
|   end |   end | ||||||
|  |  | ||||||
|  | @ -131,7 +131,7 @@ module Mastodon | ||||||
|         json = Oj.dump(ActivityPub::LinkedDataSignature.new(payload).sign!(account)) |         json = Oj.dump(ActivityPub::LinkedDataSignature.new(payload).sign!(account)) | ||||||
| 
 | 
 | ||||||
|         unless options[:dry_run] |         unless options[:dry_run] | ||||||
|           ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| |           ActivityPub::DeliveryWorker.push_bulk(inboxes, limit: 1_000) do |inbox_url| | ||||||
|             [json, account.id, inbox_url] |             [json, account.id, inbox_url] | ||||||
|           end |           end | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue