本文介绍: 使用Flink输出sink)到Elasticsearch时,出现了上面的错误,是因为登录到ES,ES启用了账号密码模式,因此必须要使用账号密码才能够访问它。

一、报错内容

Exception in thread "main" ElasticsearchStatusException[Elasticsearch exception [type=security_exception, reason=missing authentication credentials for REST request [/user2?master_timeout=30s&ignore_unavailable=false&expand_wildcards=open%2Cclosed&allow_no_indices=true&ignore_throttled=false&timeout=30s]]]
	at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:176)
	at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2011)
	at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1988)
	at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1745)
	at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1702)
	at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1672)
	at org.elasticsearch.client.IndicesClient.delete(IndicesClient.java:103)
	at com.xxx.elasticsearch.operation.main(operation.java:26)
	Suppressed: org.elasticsearch.client.ResponseException: method [DELETE], host [http://10.219.87.159:32705], URI [/user2?master_timeout=30s&ignore_unavailable=false&expand_wildcards=open%2Cclosed&allow_no_indices=true&ignore_throttled=false&timeout=30s], status line [HTTP/1.1 401 Unauthorized]
{"error":{"root_cause":[{"type":"security_exception","reason":"missing authentication credentials for REST request [/user2?master_timeout=30s&ignore_unavailable=false&expand_wildcards=open%2Cclosed&allow_no_indices=true&ignore_throttled=false&timeout=30s]","header":{"WWW-Authenticate":"Basic realm="security" charset="UTF-8""}}],"type":"security_exception","reason":"missing authentication credentials for REST request [/user2?master_timeout=30s&ignore_unavailable=false&expand_wildcards=open%2Cclosed&allow_no_indices=true&ignore_throttled=false&timeout=30s]","header":{"WWW-Authenticate":"Basic realm="security" charset="UTF-8""}},"status":401}
		at org.elasticsearch.client.RestClient.convertResponse(RestClient.java:326)
		at org.elasticsearch.client.RestClient.performRequest(RestClient.java:296)
		at org.elasticsearch.client.RestClient.performRequest(RestClient.java:270)
		at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2082)
		at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1732)
		... 4 more

二、报错内容

使用Flink输出(sink)到Elasticsearch时,出现了上面的错误,是因为登录到ES,ES启用了账号密码模式,因此必须要使用账号密码才能够访问它。

三、报错解决

于是新增如下代码后,问题解决

ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
       new ElasticsearchSinkFunction<String>() {
           public IndexRequest createIndexRequest(String element) {
               Map<String, String> json = new HashMap<>();
               json.put("data", element);
               return Requests.indexRequest()
                       .index("您的索引名")
                       //.type("my-type")
                       .source(json);
           }

           @Override
           public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
               indexer.add(createIndexRequest(element));
           }
       }
);
// 鉴权,正对写 es 需要密码场景
esSinkBuilder.setRestClientFactory(new HDRestClientFactory("您的账号名", "您的密码"));

public class HDRestClientFactory implements RestClientFactory {

    private String userName;
    private String password;
    transient CredentialsProvider credentialsProvider;

    public HDRestClientFactory(String userName, String password) {
        this.userName = userName;
        this.password = password;
    }

    @Override
    public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
        if (credentialsProvider == null) {
            credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
        }
        restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
                return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            }
        }).setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
            @Override
            public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
                builder.setConnectTimeout(5000);
                builder.setSocketTimeout(60000);
                builder.setConnectionRequestTimeout(2000);
                return builder;
            }
        });
    }
}

原文地址:https://blog.csdn.net/wstever/article/details/129679329

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。

如若转载,请注明出处:http://www.7code.cn/show_6399.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注