This question is more theoretical than source code.
I have an ES 2.x node that has over 1.2 TB of data. We have 40+ indexes, each of which has at least 1 type. Here ES 2.x is used as a database, and not as a search engine. The source that was used to dump data in ES 2.x is lost. In addition, the data is not normalized, but one ES document has several embedded documents. Our goal is to recreate the data source and at the same time normalize it.
What do we plan:
- Extract data from ES, analyze it and upload it to the new mongodb into specific collections and maintain the relationship between the data. i.e. save in normalized form.
- The mongo new data index on the new ES 6 node.
We use JRuby 9.1.15.0, Rails 5, Ruby 2.4 and Sidekiq.
We are currently retrieving data from ES for a specific date-time range. Sometimes we get 0 records, and sometimes 100,000+. The problem is that we get a huge amount of records.
Here is an example script that works when data for a date range is small, but fails when data is large. Indexes 1.2TB / 40 is an avg index .
class DataRetrieverWorker
include Sidekiq::Worker
include Sidekiq::Status::Worker
def perform(indx_name, interval = 24, start_time = nil, end_time = nil)
unless start_time || end_time
client = ElasticSearchClient.instance.client
last_retrieved_at = RetrievedIndex.where(name: indx_name).desc(:created_at).first
start_time, end_time = unless last_retrieved_at
data = client.search index: indx_name, size: 1, sort: [{ insert_time: { order: 'asc' } }]
first_day = DateTime.parse(data['hits']['hits'].first['_source']['insert_time'])
start_time = first_day.beginning_of_day
end_time = first_day.end_of_day
else
[last_retrieved_at.end_time, last_retrieved_at.end_time + interval.hours]
end
DataRetrieverWorker.perform_async(indx_name, interval, start_time, end_time)
else
query = { range: { insert_time: { gt: DateTime.parse(start_time).utc.iso8601, lt: DateTime.parse(end_time).utc.iso8601 } } }
data = client.search index: indx_name, scroll: '10m', size: SCROLL_SIZE, body: { query: query }
ri = RetrievedIndex.find_by(name: indx_name, start_time: start_time, end_time: end_time)
if ri
DataRetrieverWorker.perform_at(2.seconds.from_now, indx_name, interval)
return
end
ri = RetrievedIndex.create!(name: indx_name, start_time: start_time, end_time: end_time, documents_cnt: data['hits']['total'])
if data['hits']['total'] > 0
if data['hits']['total'] > 2000
BulkJobsHandlerWorker.perform_async(ri.id.to_s, data['hits']['hits'])
while data = client.scroll(body: { scroll_id: data['_scroll_id'] }, scroll: '10m') and not data['hits']['hits'].empty? do
BulkJobsHandlerWorker.perform_async(ri.id.to_s, data['hits']['hits'])
end
else
data['hits']['hits'].each do |r|
schedule(r)
ri.retrieved_documents.find_or_create_by!(es_id: r['_id'], es_index: indx_name)
end
while data = client.scroll(body: { scroll_id: data['_scroll_id'] }, scroll: '10m') and not data['hits']['hits'].empty? do
data['hits']['hits'].each do |r|
schedule(r)
ri.retrieved_documents.find_or_create_by!(es_id: r['_id'], es_index: indx_name)
end
end
end
else
DataRetrieverWorker.perform_async(indx_name, interval)
return
end
DataRetrieverWorker.perform_at(indx_name, interval)
end
end
private
def schedule(data)
DataPersisterWorker.perform_async(data)
end
end
Questions:
- What should be the ideal approach to extract data from ES 2.x. We extract data by date range, and then use the api scroll to retrieve the result set. Is it correct?
- What should be done when we get a great result for a certain time range. Sometimes we get 20,000+ records in a few minutes. What should be the ideal approach?
- sidekiq ?
- , sidekiq?
- ? . 0 100000 +.
- , uinform ?
- scroll api , 100- , 100 (100 api ES)?
8. . .
(, 4-5 . ) (, 6 ). . ruby, , / , Mongoid . mongodb mongoid .
. .