Elasticsearch-hadoop & Elasticsearch-spark sql - tracking scan and scroll instructions

We are trying to integrate the ES cluster (1.7.2, 4 node) with Spark (1.5.1, compiled with a hive and chaop with a scala 2.11, 4 node cluster), there are hdfs included in the equation (hadoop 2.7.4 nodes) and a lean server jdbc and elasticsearch-hadoop-2.2.0-m1.jar

Thus, there are two ways to execute instructions for ES.

  • SQL sparks with scala

    val conf = new  SparkConf().setAppName("QueryRemoteES").setMaster("spark://node1:37077").set("spark.executor.memory","2g")
    conf.set("spark.logConf", "true")
    conf.set("spark.cores.max","20")
    conf.set("es.index.auto.create", "false")
    conf.set("es.batch.size.bytes", "100mb")
    conf.set("es.batch.size.entries", "10000")
    conf.set("es.scroll.size", "10000")
    conf.set("es.nodes", "node2:39200")
    conf.set("es.nodes.discovery","true")
    conf.set("pushdown", "true")
    
    sc.addJar("executorLib/elasticsearch-hadoop-2.2.0-m1.jar")
    sc.addJar("executorLib/scala-library-2.10.1.jar")
    
    sqlContext.sql("CREATE TEMPORARY TABLE geoTab USING org.elasticsearch.spark.sql OPTIONS (resource 'geo_2/kafkain')" )
    
    val all: DataFrame = sqlContext.sql("SELECT count(*) FROM geoTab WHERE transmittersID='262021306841042'")
    .....
    
  • Thrift Server (code executed in sparklight)

    ....
    polledDataSource = new ComboPooledDataSource()
    polledDataSource.setDriverClass("org.apache.hive.jdbc.HiveDriver")
    polledDataSource.setJdbcUrl("jdbc:hive2://node1:30001")
    polledDataSource.setMaxPoolSize(5)
    dbConnection = polledDataSource.getConnection
    dbStatement = dbConnection.createStatement
    
    val dbResult = dbStatement.execute("CREATE TEMPORARY EXTERNAL TABLE IF NOT EXISTS geoDataHive6(transmittersID STRING,lat DOUBLE,lon DOUBLE) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource' = 'geo_2/kafkain','es.query'='{\"query\":{\"term\":{\"transmittersID\":\"262021306841042\"}}}','es.nodes'='node2','es.port'='39200','es.nodes.discovery' = 'false','es.mapping.include' = 'trans*,point.*','es.mapping.names' = 'transmittersID:transmittersID,lat:point.lat,lon:point.lon','pushdown' = 'true')")
    
    dbStatement.setFetchSize(50000)
    dbResultSet = dbStatement.executeQuery("SELECT count(*) FROM geoDataHive6")
    .....
    

I have the following problems and due to the fact that they are related, I decided to pack them into one question on the stack:

  • , , Spark SQL, pushdown , WHERE ( , es.query ), . 1 pushgow , (*) ES, , - ES , Spark SQL . , , .

  • , pushdown true false,

  • № 2, , pushdown, , -, WHERE , , , , . , - pushdown ES

  • , :

    //logging.yml
    index.search.slowlog: TRACE, index_search_slow_log_file
    index.indexing.slowlog: TRACE, index_indexing_slow_log_file
    
    additivity:
      index.search.slowlog: true
      index.indexing.slowlog: true
    

index.search.slowlog.threshold.query, index.search.slowlog.threshold.fetch index.indexing.slowlog.threshold.index 0ms. , ( ). Spark SQL , ES. , scan & scroll statement, , , . - ES?

+4
2
  • , . , , , . SparkSQL , . . JDBC?

  • , conf.set("pushdown", "true") - . , OPTION, . es

  • . PostgreSQL, .

+2

, elasticsearch, , :

:

  • Scala 2.11, Scala 2.10. : Scala, elasticsearch-spark , elasticsearch-hadoop Scala 2.10.

  • pushdown Spark DataSource. , pushdown ES ( Spark). , pushdown .

  • , ES-Hadoop es. - pushdown location, Spark DataSource ( Spark, Spark DS)

  • , pushdown. , , ; - .

  • Count Spark. , -, Databricks, . , dataFrame.rdd.esCount. .

  • , Thrift , Hive. , org.elasticsearch.hadoop.spark DEBUG. , SQL DSL.

, !

+1

Source: https://habr.com/ru/post/1615699/


All Articles