在一些场景下我们可能会遇到需要从es导出一部分特定的数据然后进行处理这样的情况,这个时候spark会是一个不错的选择。
第一步 maven配置这里一些工具使用的版本:
spark:2.3.4
elasticsearch:7.0.0
scala:2.11.8
maven配置如下:
org.scala-lang
scala-library
2.11.8
org.apache.spark
spark-core_2.11
2.3.4
org.elasticsearch
elasticsearch-spark-20_2.11
7.0.0
第二步 代码
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
object ReadESTest {
def run():Unit={
//本地运行
val conf =new SparkConf().setAppName("ReadESTest").setMaster("local")
//是否自动创建索引
conf.set("es.index.auto.create", "true")
//es节点ip,可写多个
conf.set("es.nodes","127.0.0.1")
//es端口
conf.set("es.port","9200")
//es用户密码,没有则不用配置
conf.set("es.net.http.auth.user", "username")
conf.set("es.net.http.auth.pass", "password")
//本地运行配置为false,服务器上运行配置为true
conf.set("es.nodes.discovery", "false")
//是否只读es的data节点
conf.set("es.nodes.data.only", "false")
//设置scroll
conf.set("es.scroll.size","10000")
conf.set("es.scroll.keepalive","10m")
//设置需要读取哪些字段,可不配置
conf.set("es.read.source.filter","@timestamp,userId,userName")
val sc =new SparkContext(conf)
//写dsl语句
val query:String =s"""{
"query": {
"range": {
"@timestamp": {
"from": "2020-02-15T16:00:00.000Z",
"to": "2020-02-16T16:01:00.000Z",
"include_lower": true,
"include_upper": false
}
}
}
}"""
//设置要查询的索引名,支持*通配符
val index="test-*"
//拿到数据后取第二列,是一个map
val rdd = sc.esRDD(index,query).map(_._2)
//取十条打印
rdd.take(10).foreach(println)
sc.stop()
}
def main(args: Array[String]): Unit = {
run()
}
}
第三步 踩坑
1、maven依赖的各项包版本要选好
2、如果在本地运行时遇到
Connection timed out: connect
Node [ip1:9200] failed (java.net.ConnectException: Connection timed out: connect); selected next node [ip2:9200]
这样的报错,将以下这个设置修改为false:
conf.set(“es.nodes.discovery”, “false”)
3、org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: invalid response
这个报错是因为查询的索引被冻结了,可以将索引解冻或跳过被冻结的索引
用spark读es不适合一次性读取大批量的数据,笔者实际尝试后发现性能较差,而且对es负担比较重,不过针对一些临时的需要处理不算很大的数据量的需求还是很方便的