package org.apache.flink.streaming.connectors.elasticsearch;

import java.io.Serializable;
import java.lang.AutoCloseable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.ActionRequest;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.DocWriteRequest;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.bulk.BulkItemResponse;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.bulk.BulkProcessor;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.bulk.BulkRequest;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.bulk.BulkResponse;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.unit.ByteSizeUnit;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.unit.ByteSizeValue;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.unit.TimeValue;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.rest.RestStatus;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.class */
public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends RichSinkFunction<T> implements CheckpointedFunction {
    private static final long serialVersionUID = -1007596293618451942L;
    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBase.class);
    public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
    public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
    public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
    public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable";
    public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type";
    public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries";
    public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay";
    private final Integer bulkProcessorFlushMaxActions;
    private final Integer bulkProcessorFlushMaxSizeMb;
    private final Long bulkProcessorFlushIntervalMillis;
    private final BulkFlushBackoffPolicy bulkProcessorFlushBackoffPolicy;
    private final Map<String, String> userConfig;
    private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
    private final ActionRequestFailureHandler failureHandler;
    private transient RequestIndexer requestIndexer;
    private transient BufferingNoOpRequestIndexer failureRequestIndexer;
    private final ElasticsearchApiCallBridge<C> callBridge;
    private transient C client;
    private transient BulkProcessor bulkProcessor;
    private boolean flushOnCheckpoint = true;
    private AtomicLong numPendingRequests = new AtomicLong(0);
    private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();

    /* loaded from: input_file:org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase$BulkFlushBackoffPolicy.class */
    public static class BulkFlushBackoffPolicy implements Serializable {
        private static final long serialVersionUID = -6022851996101826049L;
        private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL;
        private int maxRetryCount = 8;
        private long delayMillis = 50;

        public FlushBackoffType getBackoffType() {
            return this.backoffType;
        }

        public int getMaxRetryCount() {
            return this.maxRetryCount;
        }

        public long getDelayMillis() {
            return this.delayMillis;
        }

        public void setBackoffType(FlushBackoffType flushBackoffType) {
            this.backoffType = (FlushBackoffType) Preconditions.checkNotNull(flushBackoffType);
        }

        public void setMaxRetryCount(int i) {
            Preconditions.checkArgument(i >= 0);
            this.maxRetryCount = i;
        }

        public void setDelayMillis(long j) {
            Preconditions.checkArgument(j >= 0);
            this.delayMillis = j;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase$BulkProcessorListener.class */
    private class BulkProcessorListener implements BulkProcessor.Listener {
        private BulkProcessorListener() {
        }

        @Override // org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.bulk.BulkProcessor.Listener
        public void beforeBulk(long j, BulkRequest bulkRequest) {
        }

        @Override // org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.bulk.BulkProcessor.Listener
        public void afterBulk(long j, BulkRequest bulkRequest, BulkResponse bulkResponse) {
            if (bulkResponse.hasFailures()) {
                for (int i = 0; i < bulkResponse.getItems().length; i++) {
                    try {
                        BulkItemResponse bulkItemResponse = bulkResponse.getItems()[i];
                        Throwable extractFailureCauseFromBulkItemResponse = ElasticsearchSinkBase.this.callBridge.extractFailureCauseFromBulkItemResponse(bulkItemResponse);
                        if (extractFailureCauseFromBulkItemResponse != null) {
                            ElasticsearchSinkBase.LOG.error("Failed Elasticsearch item request: {}", bulkItemResponse.getFailureMessage(), extractFailureCauseFromBulkItemResponse);
                            RestStatus status = bulkItemResponse.getFailure().getStatus();
                            Object obj = (DocWriteRequest) bulkRequest.requests().get(i);
                            if (status == null) {
                                if (!(obj instanceof ActionRequest)) {
                                    throw new UnsupportedOperationException("The sink currently only supports ActionRequests");
                                }
                                ElasticsearchSinkBase.this.failureHandler.onFailure((ActionRequest) obj, extractFailureCauseFromBulkItemResponse, -1, ElasticsearchSinkBase.this.failureRequestIndexer);
                            } else {
                                if (!(obj instanceof ActionRequest)) {
                                    throw new UnsupportedOperationException("The sink currently only supports ActionRequests");
                                }
                                ElasticsearchSinkBase.this.failureHandler.onFailure((ActionRequest) obj, extractFailureCauseFromBulkItemResponse, status.getStatus(), ElasticsearchSinkBase.this.failureRequestIndexer);
                            }
                        }
                    } catch (Throwable th) {
                        ElasticsearchSinkBase.this.failureThrowable.compareAndSet(null, th);
                    }
                }
            }
            if (ElasticsearchSinkBase.this.flushOnCheckpoint) {
                ElasticsearchSinkBase.this.numPendingRequests.getAndAdd(-bulkRequest.numberOfActions());
            }
        }

        @Override // org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.bulk.BulkProcessor.Listener
        public void afterBulk(long j, BulkRequest bulkRequest, Throwable th) {
            ElasticsearchSinkBase.LOG.error("Failed Elasticsearch bulk request: {}", th.getMessage(), th);
            try {
                for (Object obj : bulkRequest.requests()) {
                    if (!(obj instanceof ActionRequest)) {
                        throw new UnsupportedOperationException("The sink currently only supports ActionRequests");
                    }
                    ElasticsearchSinkBase.this.failureHandler.onFailure((ActionRequest) obj, th, -1, ElasticsearchSinkBase.this.failureRequestIndexer);
                }
            } catch (Throwable th2) {
                ElasticsearchSinkBase.this.failureThrowable.compareAndSet(null, th2);
            }
            if (ElasticsearchSinkBase.this.flushOnCheckpoint) {
                ElasticsearchSinkBase.this.numPendingRequests.getAndAdd(-bulkRequest.numberOfActions());
            }
        }
    }

    @PublicEvolving
    /* loaded from: input_file:org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase$FlushBackoffType.class */
    public enum FlushBackoffType {
        CONSTANT,
        EXPONENTIAL
    }

    public ElasticsearchSinkBase(ElasticsearchApiCallBridge<C> elasticsearchApiCallBridge, Map<String, String> map, ElasticsearchSinkFunction<T> elasticsearchSinkFunction, ActionRequestFailureHandler actionRequestFailureHandler) {
        this.callBridge = (ElasticsearchApiCallBridge) Preconditions.checkNotNull(elasticsearchApiCallBridge);
        this.elasticsearchSinkFunction = (ElasticsearchSinkFunction) Preconditions.checkNotNull(elasticsearchSinkFunction);
        this.failureHandler = (ActionRequestFailureHandler) Preconditions.checkNotNull(actionRequestFailureHandler);
        Preconditions.checkArgument(InstantiationUtil.isSerializable(elasticsearchSinkFunction), "The implementation of the provided ElasticsearchSinkFunction is not serializable. The object probably contains or references non-serializable fields.");
        Preconditions.checkArgument(InstantiationUtil.isSerializable(actionRequestFailureHandler), "The implementation of the provided ActionRequestFailureHandler is not serializable. The object probably contains or references non-serializable fields.");
        Preconditions.checkNotNull(map);
        HashMap hashMap = new HashMap(map);
        ParameterTool fromMap = ParameterTool.fromMap(hashMap);
        if (fromMap.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
            this.bulkProcessorFlushMaxActions = Integer.valueOf(fromMap.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
            hashMap.remove(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS);
        } else {
            this.bulkProcessorFlushMaxActions = null;
        }
        if (fromMap.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
            this.bulkProcessorFlushMaxSizeMb = Integer.valueOf(fromMap.getInt(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB));
            hashMap.remove(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB);
        } else {
            this.bulkProcessorFlushMaxSizeMb = null;
        }
        if (fromMap.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
            this.bulkProcessorFlushIntervalMillis = Long.valueOf(fromMap.getLong(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS));
            hashMap.remove(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
        } else {
            this.bulkProcessorFlushIntervalMillis = null;
        }
        boolean z = fromMap.getBoolean(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, true);
        hashMap.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE);
        if (z) {
            this.bulkProcessorFlushBackoffPolicy = new BulkFlushBackoffPolicy();
            if (fromMap.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)) {
                this.bulkProcessorFlushBackoffPolicy.setBackoffType(FlushBackoffType.valueOf(fromMap.get(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)));
                hashMap.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE);
            }
            if (fromMap.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES)) {
                this.bulkProcessorFlushBackoffPolicy.setMaxRetryCount(fromMap.getInt(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES));
                hashMap.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES);
            }
            if (fromMap.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY)) {
                this.bulkProcessorFlushBackoffPolicy.setDelayMillis(fromMap.getLong(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY));
                hashMap.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY);
            }
        } else {
            this.bulkProcessorFlushBackoffPolicy = null;
        }
        this.userConfig = hashMap;
    }

    public void disableFlushOnCheckpoint() {
        this.flushOnCheckpoint = false;
    }

    public void open(Configuration configuration) throws Exception {
        this.client = this.callBridge.createClient(this.userConfig);
        this.callBridge.verifyClientConnection(this.client);
        this.bulkProcessor = buildBulkProcessor(new BulkProcessorListener());
        this.requestIndexer = this.callBridge.createBulkProcessorIndexer(this.bulkProcessor, this.flushOnCheckpoint, this.numPendingRequests);
        this.failureRequestIndexer = new BufferingNoOpRequestIndexer();
        this.elasticsearchSinkFunction.open();
    }

    public void invoke(T t, SinkFunction.Context context) throws Exception {
        checkAsyncErrorsAndRequests();
        this.elasticsearchSinkFunction.process(t, getRuntimeContext(), this.requestIndexer);
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        checkAsyncErrorsAndRequests();
        if (this.flushOnCheckpoint) {
            while (this.numPendingRequests.get() != 0) {
                this.bulkProcessor.flush();
                checkAsyncErrorsAndRequests();
            }
        }
    }

    public void close() throws Exception {
        if (this.bulkProcessor != null) {
            this.bulkProcessor.close();
            this.bulkProcessor = null;
        }
        if (this.client != null) {
            this.client.close();
            this.client = null;
        }
        this.callBridge.cleanup();
        checkErrorAndRethrow();
    }

    @VisibleForTesting
    protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
        Preconditions.checkNotNull(listener);
        BulkProcessor.Builder createBulkProcessorBuilder = this.callBridge.createBulkProcessorBuilder(this.client, listener);
        createBulkProcessorBuilder.setConcurrentRequests(0);
        if (this.bulkProcessorFlushMaxActions != null) {
            createBulkProcessorBuilder.setBulkActions(this.bulkProcessorFlushMaxActions.intValue());
        }
        if (this.bulkProcessorFlushMaxSizeMb != null) {
            createBulkProcessorBuilder.setBulkSize(new ByteSizeValue(this.bulkProcessorFlushMaxSizeMb.intValue(), ByteSizeUnit.MB));
        }
        if (this.bulkProcessorFlushIntervalMillis != null) {
            createBulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(this.bulkProcessorFlushIntervalMillis.longValue()));
        }
        this.callBridge.configureBulkProcessorBackoff(createBulkProcessorBuilder, this.bulkProcessorFlushBackoffPolicy);
        return createBulkProcessorBuilder.build();
    }

    private void checkErrorAndRethrow() {
        Throwable th = this.failureThrowable.get();
        if (th != null) {
            throw new RuntimeException("An error occurred in ElasticsearchSink.", th);
        }
    }

    private void checkAsyncErrorsAndRequests() {
        checkErrorAndRethrow();
        this.failureRequestIndexer.processBufferedRequests(this.requestIndexer);
    }

    @VisibleForTesting
    long getNumPendingRequests() {
        if (this.flushOnCheckpoint) {
            return this.numPendingRequests.get();
        }
        throw new UnsupportedOperationException("The number of pending requests is not maintained when flushing on checkpoint is disabled.");
    }
}
