2022-05-23 06:16:43 +10:00
# frozen_string_literal: true
class Importer :: BaseImporter
# @param [Integer] batch_size
# @param [Concurrent::ThreadPoolExecutor] executor
def initialize ( batch_size : , executor : )
@batch_size = batch_size
@executor = executor
@wait_for = Concurrent :: Set . new
end
# Callback to run when a concurrent work unit completes
# @param [Proc]
def on_progress ( & block )
@on_progress = block
end
# Callback to run when a concurrent work unit fails
# @param [Proc]
def on_failure ( & block )
@on_failure = block
end
# Reduce resource usage during and improve speed of indexing
def optimize_for_import!
Chewy . client . indices . put_settings index : index . index_name , body : { index : { refresh_interval : - 1 } }
end
# Restore original index settings
def optimize_for_search!
Chewy . client . indices . put_settings index : index . index_name , body : { index : { refresh_interval : index . settings_hash [ :settings ] [ :index ] [ :refresh_interval ] } }
end
# Estimate the amount of documents that would be indexed. Not exact!
# @returns [Integer]
def estimate!
ActiveRecord :: Base . connection_pool . with_connection { | connection | connection . select_one ( " SELECT reltuples AS estimate FROM pg_class WHERE relname = ' #{ index . adapter . target . table_name } ' " ) [ 'estimate' ] . to_i }
end
# Import data from the database into the index
def import!
raise NotImplementedError
end
# Remove documents from the index that no longer exist in the database
def clean_up!
index . scroll_batches do | documents |
2023-02-20 12:28:40 +11:00
ids = documents . pluck ( '_id' )
2022-05-23 06:16:43 +10:00
existence_map = index . adapter . target . where ( id : ids ) . pluck ( :id ) . each_with_object ( { } ) { | id , map | map [ id . to_s ] = true }
tmp = ids . reject { | id | existence_map [ id ] }
next if tmp . empty?
in_work_unit ( tmp ) do | deleted_ids |
bulk = Chewy :: Index :: Import :: BulkBuilder . new ( index , delete : deleted_ids ) . bulk_body
Chewy :: Index :: Import :: BulkRequest . new ( index ) . perform ( bulk )
[ 0 , bulk . size ]
end
end
wait!
end
protected
def in_work_unit ( * args , & block )
work_unit = Concurrent :: Promises . future_on ( @executor , * args , & block )
work_unit . on_fulfillment! ( & @on_progress )
work_unit . on_rejection! ( & @on_failure )
work_unit . on_resolution! { @wait_for . delete ( work_unit ) }
@wait_for << work_unit
rescue Concurrent :: RejectedExecutionError
sleep ( 0 . 1 ) && retry # Backpressure
end
def wait!
Concurrent :: Promises . zip ( * @wait_for ) . wait
end
def index
raise NotImplementedError
end
end