您当前的位置:聚焦 >  >> 正文
ES Client性能测试初探

时间:2023-02-21 14:20:41    来源:腾讯云

最近在工作中协助研发进行了ES优化,效果还是非常明显的,几乎翻倍。除了通过各种业务接口测试ES性能以外,还可以直接请求ES接口,绕过服务,这样应该数据回更加准确。所以,ES Client学起来。


(资料图片)

准备工作

首先,先准备了一个ES服务,这里就不多赘述了,大家自己在尝试的时候一定主意好ES Server和ES Client的版本要一致。其次,新建项目,添加依赖。

学习资料

搜一下,能搜到很多的ES学习资料,建议先去看看大厂出品的基础知识了解一下ES功能。然后就可以直接看ES的API了。下面是ES官方的文档地址:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.7/java-rest-high-search.html

如果能能查看自己公司项目源码的小伙伴可以多研究研发的代码,能够更好结合业务理解ES API的使用。

ES Client

HTTP请求

这里说一下,很多ES查询功能都是通过HTTP请求完成的,GET请求,body传参,一开始还是比较懵逼的。查了一些资料需要自己实现是个body携带数据的HTTPGET请求,下面是我的实现代码:

package com.funtester.httpclientimport org.apache.http.client.methods.HttpEntityEnclosingRequestBaseimport javax.annotation.concurrent.NotThreadSafe/** * HttpGet请求携带body参数 */@NotThreadSafeclass HttpGetByBody extends HttpEntityEnclosingRequestBase {    static final String METHOD_NAME = "GET";    /**     * 获取方法(必须重载)     *     * @return     */    @Override    String getMethod() {        return METHOD_NAME;    }    /**     * PS:不能照抄{@link org.apache.http.client.methods.HttpPost}     * @param uri     */    HttpGetByBody(final String uri) {        this(new URI(uri))    }    HttpGetByBody(final URI uri) {        super();        setURI(uri);    }    HttpGetByBody() {        super();    }}

ES Client

如果使用HTTP接口进行ES操作,需要组合多层级的参数,这个写起来会比较麻烦、可读性也比较差,而且更加容易出错。所以,还是使用ES Client作为操作ES的基础框架。

如果翻看ES Client源码,最终也是通过HttpClient发起HTTP请求的,这中间进行了很多的封装。这里分享一下ES Client的HTTP Client创建代码部分:

private CloseableHttpAsyncClient createHttpClient() {        //default timeouts are all infinite        RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()                .setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_MILLIS)                .setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS);        if (requestConfigCallback != null) {            requestConfigBuilder = requestConfigCallback.customizeRequestConfig(requestConfigBuilder);        }        try {            HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClientBuilder.create().setDefaultRequestConfig(requestConfigBuilder.build())                //default settings for connection pooling may be too constraining                .setMaxConnPerRoute(DEFAULT_MAX_CONN_PER_ROUTE).setMaxConnTotal(DEFAULT_MAX_CONN_TOTAL)                .setSSLContext(SSLContext.getDefault())                .setTargetAuthenticationStrategy(new PersistentCredentialsAuthenticationStrategy());            if (httpClientConfigCallback != null) {                httpClientBuilder = httpClientConfigCallback.customizeHttpClient(httpClientBuilder);            }            final HttpAsyncClientBuilder finalBuilder = httpClientBuilder;            return AccessController.doPrivileged(new PrivilegedAction() {                @Override                public CloseableHttpAsyncClient run() {                    return finalBuilder.build();                }            });        } catch (NoSuchAlgorithmException e) {            throw new IllegalStateException("could not create the default ssl context", e);        }    }

可以看出ES Client用到了HttpClient的异步Client,我猜是用future实现同步返回响应结果,这个没仔细看,有错请指出。这里也回答我的自己的一个疑惑,ES Client是支持并发的。

ES Client 封装

就我自己的观察,ES Client的封装程度非常高,完全可以拿来就用。我担心自己过几天之后就不知道改怎么用这些ES Client 的API了,所以又进行了一次封装,权当是一个学习笔记类。

封装代码有点多,放到了文末。

测试用例

添加数据

这个可以用来跑一部分数据到ES里。

package com.funtest.groovytestimport com.alibaba.fastjson.JSONObjectimport com.funtester.es.ESClientimport com.funtester.frame.SourceCodeimport com.funtester.frame.execute.FunQpsConcurrentimport java.util.concurrent.atomic.AtomicIntegerclass ESC extends SourceCode {    static void main(String[] args) {        def client = new ESClient("127.0.0.1", 9200, "http")        def data = new JSONObject()        data.name = "FunTester"        data.age = getRandomInt(100)        def index = new AtomicInteger(0)        def test = {            data.put("time", index.getAndIncrement())            client.index("fun", "tt", data)        }        new FunQpsConcurrent(test, "ES添加数据").start()    }}

如果想测试添加、删除功能,只需要把test闭包内容修改即可。

def test = {            data.put("time", index.getAndIncrement())            client.delete("fun", "tt", client.index("fun", "tt", data))        }

下面是搜索功能的性能测试用例:

package com.funtest.groovytestimport com.alibaba.fastjson.JSONObjectimport com.funtester.es.ESClientimport com.funtester.frame.SourceCodeimport com.funtester.frame.execute.FunQpsConcurrentimport org.elasticsearch.index.query.QueryBuildersimport java.util.concurrent.atomic.AtomicIntegerclass ESC extends SourceCode {    static void main(String[] args) {        def client = new ESClient("127.0.0.1", 9200, "http")        def data = new JSONObject()        data.name = "FunTester"        data.age = getRandomInt(100)        def index = new AtomicInteger(0)        def test = {            client.search("fun", QueryBuilders.matchQuery("time", getRandomInt(10)))        }        new FunQpsConcurrent(test, "ES搜索").start()    }}

ES Client API封装类

package com.funtester.esimport com.funtester.frame.SourceCodeimport groovy.util.logging.Log4j2import org.apache.http.HttpHostimport org.elasticsearch.action.delete.DeleteRequestimport org.elasticsearch.action.get.GetRequestimport org.elasticsearch.action.get.GetResponseimport org.elasticsearch.action.index.IndexRequestimport org.elasticsearch.action.index.IndexResponseimport org.elasticsearch.action.search.SearchRequestimport org.elasticsearch.action.search.SearchResponseimport org.elasticsearch.action.search.SearchScrollRequestimport org.elasticsearch.client.RequestOptionsimport org.elasticsearch.client.RestClientimport org.elasticsearch.client.RestHighLevelClientimport org.elasticsearch.common.unit.TimeValueimport org.elasticsearch.index.query.QueryBuilderimport org.elasticsearch.search.SearchHitsimport org.elasticsearch.search.builder.SearchSourceBuilderimport org.elasticsearch.search.fetch.subphase.FetchSourceContextimport java.util.concurrent.TimeUnit/** * ES客户端API练习类 */@Log4j2class ESClient extends SourceCode {    String host    int port    String scheme    RestHighLevelClient client    ESClient(String host, int port = 9200, String scheme = "http") {        this.host = host        this.port = port        this.scheme = scheme        // 设置验证信息,填写账号及密码        //        CredentialsProvider credentialsProvider = new BasicCredentialsProvider()        //        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("user", "passwd"))        def builder = RestClient.builder(new HttpHost(host, port, scheme))        // 设置认证信息        //        builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {        //        //            @Override        //            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {        //                return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)        //            }        //        })        builder.setMaxRetryTimeoutMillis(1000)        client = new RestHighLevelClient(builder)    }    /**     * 添加数据     * @param index     * @param type     * @param data     * @return     */    def index(String index, type, Map data) {        IndexRequest indexRequest = new IndexRequest(index, type).source(data)        IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT)        indexResponse.getId()    }    /**     * 获取数据     * @param index     * @param type     * @param id     * @return     */    def get(String index, type, id) {        // 查询文档        GetRequest getRequest = new GetRequest(index, type, id)        GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT)        if (getResponse.isExists()) {            getResponse.getSourceAsString()        }    }    /**     * 数据是否存在     * @param index     * @param type     * @param id     * @return     */    def exists(String index, type, id) {        GetRequest getRequest = new GetRequest(index, type, id)        getRequest.fetchSourceContext(new FetchSourceContext(false))        getRequest.storedFields("_none_")        client.exists(getRequest, RequestOptions.DEFAULT)    }    /**     * 删除数据     * @param index     * @param type     * @param id     * @return     */    def delete(String index, type, id) {        DeleteRequest deleteRequest = new DeleteRequest(index, type, id)        client.delete(deleteRequest, RequestOptions.DEFAULT)    }    /**     * 搜索数据     * @param index     * @param query     * @param size     * @return     */    def search(String index, QueryBuilder query, int size = 10) {        SearchRequest searchRequest = new SearchRequest(index)        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()        sourceBuilder.query(query)        sourceBuilder.from(0)        sourceBuilder.size(size)        sourceBuilder.timeout(new TimeValue(1, TimeUnit.SECONDS))        searchRequest.source(sourceBuilder)       client.search(searchRequest, RequestOptions.DEFAULT)    }    /**     * 滚动搜索     * @param index     * @param query     * @param size     */    def searchScroll(String index, QueryBuilder query, int size = 10) {        SearchRequest searchRequest = new SearchRequest(index)        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()        searchSourceBuilder.query(query)        searchSourceBuilder.size(size)        searchRequest.source(searchSourceBuilder)        searchRequest.scroll(TimeValue.timeValueMinutes(1L))        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT)        String scrollId = searchResponse.getScrollId()        SearchHits hits = searchResponse.getHits()        def searchHits = hits.getHits()        while (searchHits != null && searchHits.length > 0) {            SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId)            scrollRequest.scroll(TimeValue.timeValueMinutes(1L))            searchResponse = client.scroll(scrollRequest, RequestOptions.DEFAULT)            scrollId = searchResponse.getScrollId()            searchHits = searchResponse.getHits().getHits()        }    }    def close() {        client.close()    }}

FunTester原创专题推荐~~

-- By FunTester

关键词: HTTP Java Apache