We are trying to integrate the ES cluster (1.7.2, 4 node) with Spark (1.5.1, compiled with a hive and chaop with a scala 2.11, 4 node cluster), there are hdfs included in the equation (hadoop 2.7.4 nodes) and a lean server jdbc and elasticsearch-hadoop-2.2.0-m1.jar
Thus, there are two ways to execute instructions for ES.
SQL sparks with scala
val conf = new SparkConf().setAppName("QueryRemoteES").setMaster("spark://node1:37077").set("spark.executor.memory","2g")
conf.set("spark.logConf", "true")
conf.set("spark.cores.max","20")
conf.set("es.index.auto.create", "false")
conf.set("es.batch.size.bytes", "100mb")
conf.set("es.batch.size.entries", "10000")
conf.set("es.scroll.size", "10000")
conf.set("es.nodes", "node2:39200")
conf.set("es.nodes.discovery","true")
conf.set("pushdown", "true")
sc.addJar("executorLib/elasticsearch-hadoop-2.2.0-m1.jar")
sc.addJar("executorLib/scala-library-2.10.1.jar")
sqlContext.sql("CREATE TEMPORARY TABLE geoTab USING org.elasticsearch.spark.sql OPTIONS (resource 'geo_2/kafkain')" )
val all: DataFrame = sqlContext.sql("SELECT count(*) FROM geoTab WHERE transmittersID='262021306841042'")
.....
Thrift Server (code executed in sparklight)
....
polledDataSource = new ComboPooledDataSource()
polledDataSource.setDriverClass("org.apache.hive.jdbc.HiveDriver")
polledDataSource.setJdbcUrl("jdbc:hive2://node1:30001")
polledDataSource.setMaxPoolSize(5)
dbConnection = polledDataSource.getConnection
dbStatement = dbConnection.createStatement
val dbResult = dbStatement.execute("CREATE TEMPORARY EXTERNAL TABLE IF NOT EXISTS geoDataHive6(transmittersID STRING,lat DOUBLE,lon DOUBLE) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource' = 'geo_2/kafkain','es.query'='{\"query\":{\"term\":{\"transmittersID\":\"262021306841042\"}}}','es.nodes'='node2','es.port'='39200','es.nodes.discovery' = 'false','es.mapping.include' = 'trans*,point.*','es.mapping.names' = 'transmittersID:transmittersID,lat:point.lat,lon:point.lon','pushdown' = 'true')")
dbStatement.setFetchSize(50000)
dbResultSet = dbStatement.executeQuery("SELECT count(*) FROM geoDataHive6")
.....
I have the following problems and due to the fact that they are related, I decided to pack them into one question on the stack:
, , Spark SQL, pushdown , WHERE ( , es.query ), . 1 pushgow , (*) ES, , - ES , Spark SQL . , , .
, pushdown true false,
№ 2, , pushdown, , -, WHERE , , , , . , - pushdown ES
, :
index.search.slowlog: TRACE, index_search_slow_log_file
index.indexing.slowlog: TRACE, index_indexing_slow_log_file
additivity:
index.search.slowlog: true
index.indexing.slowlog: true
index.search.slowlog.threshold.query, index.search.slowlog.threshold.fetch index.indexing.slowlog.threshold.index 0ms.
, ( ). Spark SQL , ES. , scan & scroll statement, , , . - ES?