~cytrogen/masto-fe

9bb2fb6b1484c90c5b2c6cc52ce148019e82a3e2 — Claire 2 years ago ecd76fa
Change importers to avoid a few inefficiencies (#26721)

M app/lib/importer/accounts_index_importer.rb => app/lib/importer/accounts_index_importer.rb +3 -3
@@ 4,10 4,10 @@ class Importer::AccountsIndexImporter < Importer::BaseImporter
  def import!
    scope.includes(:account_stat).find_in_batches(batch_size: @batch_size) do |tmp|
      in_work_unit(tmp) do |accounts|
        bulk = Chewy::Index::Import::BulkBuilder.new(index, to_index: accounts).bulk_body
        bulk = build_bulk_body(accounts)

        indexed = bulk.count { |entry| entry[:index] }
        deleted = bulk.count { |entry| entry[:delete] }
        indexed = bulk.size
        deleted = 0

        Chewy::Index::Import::BulkRequest.new(index).perform(bulk)


M app/lib/importer/base_importer.rb => app/lib/importer/base_importer.rb +8 -0
@@ 68,6 68,14 @@ class Importer::BaseImporter

  protected

  def build_bulk_body(to_import)
    # Specialize `Chewy::Index::Import::BulkBuilder#bulk_body` to avoid a few
    # inefficiencies, as none of our fields or join fields and we do not need
    # `BulkBuilder`'s versatility.
    crutches = Chewy::Index::Crutch::Crutches.new index, to_import
    to_import.map { |object| { index: { _id: object.id, data: index.compose(object, crutches, fields: []) } } }
  end

  def in_work_unit(...)
    work_unit = Concurrent::Promises.future_on(@executor, ...)


M app/lib/importer/instances_index_importer.rb => app/lib/importer/instances_index_importer.rb +3 -3
@@ 4,10 4,10 @@ class Importer::InstancesIndexImporter < Importer::BaseImporter
  def import!
    index.adapter.default_scope.find_in_batches(batch_size: @batch_size) do |tmp|
      in_work_unit(tmp) do |instances|
        bulk = Chewy::Index::Import::BulkBuilder.new(index, to_index: instances).bulk_body
        bulk = build_bulk_body(instances)

        indexed = bulk.count { |entry| entry[:index] }
        deleted = bulk.count { |entry| entry[:delete] }
        indexed = bulk.size
        deleted = 0

        Chewy::Index::Import::BulkRequest.new(index).perform(bulk)


M app/lib/importer/public_statuses_index_importer.rb => app/lib/importer/public_statuses_index_importer.rb +3 -3
@@ 5,11 5,11 @@ class Importer::PublicStatusesIndexImporter < Importer::BaseImporter
    scope.select(:id).find_in_batches(batch_size: @batch_size) do |batch|
      in_work_unit(batch.pluck(:id)) do |status_ids|
        bulk = ActiveRecord::Base.connection_pool.with_connection do
          Chewy::Index::Import::BulkBuilder.new(index, to_index: Status.includes(:media_attachments, :preloadable_poll, :preview_cards).where(id: status_ids)).bulk_body
          build_bulk_body(index.adapter.default_scope.where(id: status_ids))
        end

        indexed = bulk.count { |entry| entry[:index] }
        deleted = bulk.count { |entry| entry[:delete] }
        indexed = bulk.size
        deleted = 0

        Chewy::Index::Import::BulkRequest.new(index).perform(bulk)


M app/lib/importer/statuses_index_importer.rb => app/lib/importer/statuses_index_importer.rb +14 -21
@@ 13,32 13,25 @@ class Importer::StatusesIndexImporter < Importer::BaseImporter

      scope.find_in_batches(batch_size: @batch_size) do |tmp|
        in_work_unit(tmp.map(&:status_id)) do |status_ids|
          bulk = ActiveRecord::Base.connection_pool.with_connection do
            Chewy::Index::Import::BulkBuilder.new(index, to_index: index.adapter.default_scope.where(id: status_ids)).bulk_body
          end

          indexed = 0
          deleted = 0

          # We can't use the delete_if proc to do the filtering because delete_if
          # is called before rendering the data and we need to filter based
          # on the results of the filter, so this filtering happens here instead
          bulk.map! do |entry|
            new_entry = if entry[:index] && entry.dig(:index, :data, 'searchable_by').blank?
                          { delete: entry[:index].except(:data) }
                        else
                          entry
                        end

            if new_entry[:index]
              indexed += 1
            else
              deleted += 1
          bulk = ActiveRecord::Base.connection_pool.with_connection do
            to_index = index.adapter.default_scope.where(id: status_ids)
            crutches = Chewy::Index::Crutch::Crutches.new index, to_index
            to_index.map do |object|
              # This is unlikely to happen, but the post may have been
              # un-interacted with since it was queued for indexing
              if object.searchable_by.empty?
                deleted += 1
                { delete: { _id: object.id } }
              else
                { index: { _id: object.id, data: index.compose(object, crutches, fields: []) } }
              end
            end

            new_entry
          end

          indexed = bulk.size - deleted

          Chewy::Index::Import::BulkRequest.new(index).perform(bulk)

          [indexed, deleted]

M app/lib/importer/tags_index_importer.rb => app/lib/importer/tags_index_importer.rb +3 -3
@@ 4,10 4,10 @@ class Importer::TagsIndexImporter < Importer::BaseImporter
  def import!
    index.adapter.default_scope.find_in_batches(batch_size: @batch_size) do |tmp|
      in_work_unit(tmp) do |tags|
        bulk = Chewy::Index::Import::BulkBuilder.new(index, to_index: tags).bulk_body
        bulk = build_bulk_body(tags)

        indexed = bulk.count { |entry| entry[:index] }
        deleted = bulk.count { |entry| entry[:delete] }
        indexed = bulk.size
        deleted = 0

        Chewy::Index::Import::BulkRequest.new(index).perform(bulk)