Dump elasticsearch 2.x to mongodb and back to ES 6.x

This question is more theoretical than source code.

I have an ES 2.x node that has over 1.2 TB of data. We have 40+ indexes, each of which has at least 1 type. Here ES 2.x is used as a database, and not as a search engine. The source that was used to dump data in ES 2.x is lost. In addition, the data is not normalized, but one ES document has several embedded documents. Our goal is to recreate the data source and at the same time normalize it.

What do we plan:

  • Extract data from ES, analyze it and upload it to the new mongodb into specific collections and maintain the relationship between the data. i.e. save in normalized form.
  • The mongo new data index on the new ES 6 node.

We use JRuby 9.1.15.0, Rails 5, Ruby 2.4 and Sidekiq.

We are currently retrieving data from ES for a specific date-time range. Sometimes we get 0 records, and sometimes 100,000+. The problem is that we get a huge amount of records.

Here is an example script that works when data for a date range is small, but fails when data is large. Indexes 1.2TB / 40 is an avg index .

class DataRetrieverWorker
  include Sidekiq::Worker
  include Sidekiq::Status::Worker

  def perform(indx_name, interval = 24, start_time = nil, end_time = nil)
    unless start_time || end_time
      client = ElasticSearchClient.instance.client
      last_retrieved_at = RetrievedIndex.where(name: indx_name).desc(:created_at).first
      start_time, end_time = unless last_retrieved_at
                               data = client.search index: indx_name, size: 1, sort: [{ insert_time: { order: 'asc' } }]
                               first_day = DateTime.parse(data['hits']['hits'].first['_source']['insert_time'])
                               start_time = first_day.beginning_of_day
                               end_time = first_day.end_of_day
                             else
                               # retrieve for the next time slot. usually 24 hrs.
                               [last_retrieved_at.end_time, last_retrieved_at.end_time + interval.hours]
                             end
      DataRetrieverWorker.perform_async(indx_name, interval, start_time, end_time)
    else
       # start scroll on the specified range and retrieve data.
       query = { range: { insert_time: { gt: DateTime.parse(start_time).utc.iso8601, lt: DateTime.parse(end_time).utc.iso8601 } } }
       data = client.search index: indx_name, scroll: '10m', size: SCROLL_SIZE, body: { query: query }
      ri = RetrievedIndex.find_by(name: indx_name, start_time: start_time, end_time: end_time)
      if ri
        DataRetrieverWorker.perform_at(2.seconds.from_now, indx_name, interval)
        return
      end
      ri = RetrievedIndex.create!(name: indx_name, start_time: start_time, end_time: end_time, documents_cnt: data['hits']['total'])
      if data['hits']['total'] > 0
        if data['hits']['total'] > 2000
          BulkJobsHandlerWorker.perform_async(ri.id.to_s, data['hits']['hits'])
          while data = client.scroll(body: { scroll_id: data['_scroll_id'] }, scroll: '10m') and not data['hits']['hits'].empty? do
            BulkJobsHandlerWorker.perform_async(ri.id.to_s, data['hits']['hits'])
          end
        else
          data['hits']['hits'].each do |r|
            schedule(r)
            ri.retrieved_documents.find_or_create_by!(es_id: r['_id'], es_index: indx_name)
          end
          while data = client.scroll(body: { scroll_id: data['_scroll_id'] }, scroll: '10m') and not data['hits']['hits'].empty? do
            data['hits']['hits'].each do |r|
              schedule(r)
              ri.retrieved_documents.find_or_create_by!(es_id: r['_id'], es_index: indx_name)
            end
          end
        end
      else
        DataRetrieverWorker.perform_async(indx_name, interval)
        return
      end
      DataRetrieverWorker.perform_at(indx_name, interval)
    end
  end

  private

  def schedule(data)
    DataPersisterWorker.perform_async(data)
  end
end

Questions:

  • What should be the ideal approach to extract data from ES 2.x. We extract data by date range, and then use the api scroll to retrieve the result set. Is it correct?
  • What should be done when we get a great result for a certain time range. Sometimes we get 20,000+ records in a few minutes. What should be the ideal approach?
  • sidekiq ?
  • , sidekiq?
  • ? . 0 100000 +.
  • , uinform ?
  • scroll api , 100- , 100 (100 api ES)? 8. . .

(, 4-5 . ) (, 6 ). . ruby, , / , Mongoid . mongodb mongoid .

. .

+4
3
  • ES 2.x. , api . ?

ES?

  1. , . 20000+ . ?

api, . API- Sliced ​​Scroll.

  1. sidekiq ?

sidekiq .

  1. , sidekiq?

, sidekiq?

  1. ? . 0 100000+.

100000 . , API . ES, match_all: {} scroll api. , - .

  1. - , ?

, . 0 api.

  1. API , 100cr , 100 (100 ES)?

, mongodb . MongoDB

:

scroll_id .

  • , , - _doc. , .

  • Elasticsearch, . (, 1 ) , . .

  • , - . ( , ), , , API:

  • API: , , , . , , . Elasticsearch , . , . , API . , API- :

, API , Clear Scroll API.

0

-, , .

IMHO, , IDS . , ElasticSearch

IDS ( ImporterWorker) IDS , ElasticSearch MongoDB.

, , 1_000_000 IDS, N (200 X 5_000) N .

:

  • - ElasticSearch, ElasticSearch

  • - ( - ), ImporterWorker IDS, , . - IDS,

0

/ Elasticsearch Elasticdump.

API- Elasticsearch . API- , Amazon S3.

Elasticdump .

  • node

    npm i elasticdump -g
    
  • Backup on demand in a zip file:

    elasticdump --input='http://username:password@localhost:9200/myindex' --searchBody '{"query" : {"range" :{"timestamp" : {"lte": 1483228800000}}}}' --output=$ --limit=1000 | gzip > /backups/myindex.gz
    
  • Restore from zip file:

    zcat /backups/myindex.gz | elasticdump --input=$ --output=http://username:password@localhost:9200/index_name
    

Examples of backup and restore data with snapshots for Amazon S3 or files

Set up your snapshot destination first

  • Example S3

    curl 'localhost:9200/_snapshot/my_repository?pretty' 
    -XPUT -H 'Content-Type: application/json' 
    -d '{
    "type" : "s3",
    "settings" : {
       "bucket" : "test-bucket",
       "base_path" : "backup-2017-01",
       "max_restore_bytes_per_sec" : "1gb",
       "max_snapshot_bytes_per_sec" : "1gb",
       "compress" : "true",
       "access_key" : "<ACCESS_KEY_HERE>",
       "secret_key" : "<SECRET_KEY_HERE>"
       }
    }'
    
  • Local disk or mounted NFS example

    curl 'localhost:9200/_snapshot/my_repository?pretty' -XPUT -H 'Content-Type: application/json' -d '{
    "type" : "fs",
    "settings" : {
       "location": "<PATH … for example /mnt/storage/backup>"
    }
    }'
    
  • Trigger snapshot

    curl -XPUT 'localhost:9200/_snapshot/my_repository/<snapshot_name>'
    
  • Show all backups

    curl 'localhost:9200/_snapshot/my_repository/_all'
    
  • Restore - the most important part of a backup is to verify that restoring the backup really works!

    curl -XPOST 'localhost:9200/_snapshot/my_repository/<snapshot_name>/_restore'
    

This text was found at:
https://sematext.com/blog/elasticsearch-security-authentication-encryption-backup/

-1
source

Source: https://habr.com/ru/post/1693139/


All Articles