结合限制和顺序的 ActiveRecord find_each

我试图使用 ActiveRecord 的 find_each方法运行一个约50,000条记录的查询,但它似乎忽略了我的其他参数,比如:

Thing.active.order("created_at DESC").limit(50000).find_each {|t| puts t.id }

下面是通过 完整的数据集执行的结果查询,而不是我希望的50,000个数据停止并按 created_at排序:

Thing Load (198.8ms)  SELECT "things".* FROM "things" WHERE "things"."active" = 't' AND ("things"."id" > 373343) ORDER BY "things"."id" ASC LIMIT 1000

有没有办法得到类似的行为,find_each,但总的最大限制,并尊重我的排序标准?

42874 次浏览

find_each uses find_in_batches under the hood.

Its not possible to select the order of the records, as described in find_in_batches, is automatically set to ascending on the primary key (“id ASC”) to make the batch ordering work.

However, the criteria is applied, what you can do is:

Thing.active.find_each(batch_size: 50000) { |t| puts t.id }

Regarding the limit, it wasn't implemented yet: https://github.com/rails/rails/pull/5696


Answering to your second question, you can create the logic yourself:

total_records = 50000
batch = 1000
(0..(total_records - batch)).step(batch) do |i|
puts Thing.active.order("created_at DESC").offset(i).limit(batch).to_sql
end

The documentation says that find_each and find_in_batches don't retain sort order and limit because:

  • Sorting ASC on the PK is used to make the batch ordering work.
  • Limit is used to control the batch sizes.

You could write your own version of this function like @rorra did. But you can get into trouble when mutating the objects. If for example you sort by created_at and save the object it might come up again in one of the next batches. Similarly you might skip objects because the order of results has changed when executing the query to get the next batch. Only use that solution with read only objects.

Now my primary concern was that I didn't want to load 30000+ objects into memory at once. My concern was not the execution time of the query itself. Therefore I used a solution that executes the original query but only caches the ID's. It then divides the array of ID's into chunks and queries/creates the objects per chunk. This way you can safely mutate the objects because the sort order is kept in memory.

Here is a minimal example similar to what I did:

batch_size = 512
ids = Thing.order('created_at DESC').pluck(:id) # Replace .order(:created_at) with your own scope
ids.each_slice(batch_size) do |chunk|
Thing.find(chunk, :order => "field(id, #{chunk.join(',')})").each do |thing|
# Do things with thing
end
end

The trade-offs to this solution are:

  • The complete query is executed to get the ID's
  • An array of all the ID's is kept in memory
  • Uses the MySQL specific FIELD() function

Hope this helps!

I was looking for the same behaviour and thought up of this solution. This DOES NOT order by created_at but I thought I would post anyways.

max_records_to_retrieve = 50000
last_index = Thing.count
start_index = [(last_index - max_records_to_retrieve), 0].max
Thing.active.find_each(:start => start_index) do |u|
# do stuff
end

Drawbacks of this approach: - You need 2 queries (first one should be fast) - This guarantees a max of 50K records but if ids are skipped you will get less.

Retrieving the ids first and processing the in_groups_of

ordered_photo_ids = Photo.order(likes_count: :desc).pluck(:id)


ordered_photo_ids.in_groups_of(1000, false).each do |photo_ids|
photos = Photo.order(likes_count: :desc).where(id: photo_ids)


# ...
end

It's important to also add the ORDER BY query to the inner call.

One option is to put an implementation tailored for your particular model into the model itself (speaking of which, id is usually a better choice for ordering records, created_at may have duplicates):

class Thing < ActiveRecord::Base
def self.find_each_desc limit
batch_size = 1000
i = 1
records = self.order(created_at: :desc).limit(batch_size)
while records.any?
records.each do |task|
yield task, i
i += 1
return if i > limit
end
records = self.order(created_at: :desc).where('id < ?', records.last.id).limit(batch_size)
end
end
end

Or else you can generalize things a bit, and make it work for all the models:

lib/active_record_extensions.rb:

ActiveRecord::Batches.module_eval do
def find_each_desc limit
batch_size = 1000
i = 1
records = self.order(id: :desc).limit(batch_size)
while records.any?
records.each do |task|
yield task, i
i += 1
return if i > limit
end
records = self.order(id: :desc).where('id < ?', records.last.id).limit(batch_size)
end
end
end


ActiveRecord::Querying.module_eval do
delegate :find_each_desc, :to => :all
end

config/initializers/extensions.rb:

require "active_record_extensions"

P.S. I'm putting the code in files according to this answer.

You can iterate backwards by standard ruby iterators:

Thing.last.id.step(0,-1000) do |i|
Thing.where(id: (i-1000+1)..i).order('id DESC').each do |thing|
#...
end
end

Note: +1 is because BETWEEN which will be in query includes both bounds but we need include only one.

Sure, with this approach there could be fetched less than 1000 records in batch because some of them are deleted already but this is ok in my case.

You can try ar-as-batches Gem.

From their documentation you can do something like this

Users.where(country_id: 44).order(:joined_at).offset(200).as_batches do |user|
user.party_all_night!
end

Do it in one query and avoid iterating:

User.offset(2).order('name DESC').last(3)

will product a query like this

SELECT "users".* FROM "users" ORDER BY name ASC LIMIT $1 OFFSET $2 [["LIMIT", 3], ["OFFSET", 2]

Using Kaminari or something other it will be easy.

Create batch loader class.

module BatchLoader
extend ActiveSupport::Concern


def batch_by_page(options = {})
options = init_batch_options!(options)


next_page = 1


loop do
next_page = yield(next_page, options[:batch_size])


break next_page if next_page.nil?
end
end


private


def default_batch_options
{
batch_size: 50
}
end


def init_batch_options!(options)
options ||= {}
default_batch_options.merge!(options)
end
end

Create Repository

class ThingRepository
include BatchLoader


# @param [Integer] per_page
# @param [Proc] block
def batch_changes(per_page=100, &block)
relation = Thing.active.order("created_at DESC")


batch_by_page do |next_page|
query = relation.page(next_page).per(per_page)
yield query if block_given?
query.next_page
end
end
end

Use the repository

repo = ThingRepository.new
repo.batch_changes(5000).each do |g|
g.each do |t|
#...
end
end

As remarked by @Kirk in one of the comments, find_each supports limit as of version 5.1.0.

Example from the changelog:

Post.limit(10_000).find_each do |post|
# ...
end

The documentation says:

Limits are honored, and if present there is no requirement for the batch size: it can be less than, equal to, or greater than the limit.

(setting a custom order is still not supported though)

Adding find_in_batches_with_order did solve my usecase, where I was having ids already but need batching and ordering. It was inspired by @dirk-geurs solution

# Create file config/initializers/find_in_batches_with_order.rb with follwing code.
ActiveRecord::Batches.class_eval do
## Only flat order structure is supported now
## example: [:forename, :surname] is supported but [:forename, {surname: :asc}] is not supported
def find_in_batches_with_order(ids: nil, order: [], batch_size: 1000)
relation = self
arrangement = order.dup
index = order.find_index(:id)


unless index
arrangement.push(:id)
index = arrangement.length - 1
end


ids ||= relation.order(*arrangement).pluck(*arrangement).map{ |tupple| tupple[index] }
ids.each_slice(batch_size) do |chunk_ids|
chunk_relation = relation.where(id: chunk_ids).order(*order)
yield(chunk_relation)
end
end
end

Leaving Gist here https://gist.github.com/the-spectator/28b1176f98cc2f66e870755bb2334545

I had the same problem with a query with DISTINCT ON where you need an ORDER BY with that field, so this is my approach with Postgres:

def filtered_model_ids
Model.joins(:father_model)
.select('DISTINCT ON (model.field) model.id')
.order(:field)
.map(&:id)
end


def processor
filtered_model_ids.each_slice(BATCH_SIZE).lazy.each do |batch|
Model.find(batch).each do |record|
# Code
end
end
end

Rails 6.1 adds support for descending order in find_each, find_in_batches and in_batches.

My code

batch_size = 100
total_count = klass.count
offset = 0
processed_count = 0
while processed_count < total_count
relation = klass.order({ active_at: :asc, created_at: :desc }).offset(offset).limit(batch_size)
relation.each do |record|
record.process
end
processed_count += batch_size
end