/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.elasticsearch;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.ActionRequest;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.DocWriteRequest;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkItemResponse;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequest;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkResponse;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.unit.ByteSizeUnit;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.unit.ByteSizeValue;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.unit.TimeValue;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.rest.RestStatus;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.BufferingNoOpRequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable>
extends RichSinkFunction<T>
implements CheckpointedFunction {
    private static final long serialVersionUID = -1007596293618451942L;
    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBase.class);
    public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
    public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
    public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
    public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable";
    public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type";
    public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries";
    public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay";
    private final Integer bulkProcessorFlushMaxActions;
    private final Integer bulkProcessorFlushMaxSizeMb;
    private final Long bulkProcessorFlushIntervalMillis;
    private final BulkFlushBackoffPolicy bulkProcessorFlushBackoffPolicy;
    private final Map<String, String> userConfig;
    private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
    private final ActionRequestFailureHandler failureHandler;
    private boolean flushOnCheckpoint = true;
    private transient RequestIndexer requestIndexer;
    private transient BufferingNoOpRequestIndexer failureRequestIndexer;
    private final ElasticsearchApiCallBridge<C> callBridge;
    private AtomicLong numPendingRequests = new AtomicLong(0L);
    private transient C client;
    private transient BulkProcessor bulkProcessor;
    private final AtomicReference<Throwable> failureThrowable = new AtomicReference();

    public ElasticsearchSinkBase(ElasticsearchApiCallBridge<C> callBridge, Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction, ActionRequestFailureHandler failureHandler) {
        this.callBridge = (ElasticsearchApiCallBridge)Preconditions.checkNotNull(callBridge);
        this.elasticsearchSinkFunction = (ElasticsearchSinkFunction)Preconditions.checkNotNull(elasticsearchSinkFunction);
        this.failureHandler = (ActionRequestFailureHandler)Preconditions.checkNotNull((Object)failureHandler);
        Preconditions.checkArgument((boolean)InstantiationUtil.isSerializable(elasticsearchSinkFunction), (Object)"The implementation of the provided ElasticsearchSinkFunction is not serializable. The object probably contains or references non-serializable fields.");
        Preconditions.checkArgument((boolean)InstantiationUtil.isSerializable((Object)failureHandler), (Object)"The implementation of the provided ActionRequestFailureHandler is not serializable. The object probably contains or references non-serializable fields.");
        Preconditions.checkNotNull(userConfig);
        userConfig = new HashMap<String, String>(userConfig);
        ParameterTool params = ParameterTool.fromMap(userConfig);
        if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
            this.bulkProcessorFlushMaxActions = params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS);
            userConfig.remove(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS);
        } else {
            this.bulkProcessorFlushMaxActions = null;
        }
        if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
            this.bulkProcessorFlushMaxSizeMb = params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB);
            userConfig.remove(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB);
        } else {
            this.bulkProcessorFlushMaxSizeMb = null;
        }
        if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
            this.bulkProcessorFlushIntervalMillis = params.getLong(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
            userConfig.remove(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
        } else {
            this.bulkProcessorFlushIntervalMillis = null;
        }
        boolean bulkProcessorFlushBackoffEnable = params.getBoolean(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, true);
        userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE);
        if (bulkProcessorFlushBackoffEnable) {
            this.bulkProcessorFlushBackoffPolicy = new BulkFlushBackoffPolicy();
            if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)) {
                this.bulkProcessorFlushBackoffPolicy.setBackoffType(FlushBackoffType.valueOf(params.get(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)));
                userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE);
            }
            if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES)) {
                this.bulkProcessorFlushBackoffPolicy.setMaxRetryCount(params.getInt(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES));
                userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES);
            }
            if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY)) {
                this.bulkProcessorFlushBackoffPolicy.setDelayMillis(params.getLong(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY));
                userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY);
            }
        } else {
            this.bulkProcessorFlushBackoffPolicy = null;
        }
        this.userConfig = userConfig;
    }

    public void disableFlushOnCheckpoint() {
        this.flushOnCheckpoint = false;
    }

    public void open(Configuration parameters) throws Exception {
        this.client = this.callBridge.createClient(this.userConfig);
        this.callBridge.verifyClientConnection(this.client);
        this.bulkProcessor = this.buildBulkProcessor(new BulkProcessorListener());
        this.requestIndexer = this.callBridge.createBulkProcessorIndexer(this.bulkProcessor, this.flushOnCheckpoint, this.numPendingRequests);
        this.failureRequestIndexer = new BufferingNoOpRequestIndexer();
        this.elasticsearchSinkFunction.open();
    }

    public void invoke(T value, SinkFunction.Context context) throws Exception {
        this.checkAsyncErrorsAndRequests();
        this.elasticsearchSinkFunction.process(value, this.getRuntimeContext(), this.requestIndexer);
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        this.checkAsyncErrorsAndRequests();
        if (this.flushOnCheckpoint) {
            while (this.numPendingRequests.get() != 0L) {
                this.bulkProcessor.flush();
                this.checkAsyncErrorsAndRequests();
            }
        }
    }

    public void close() throws Exception {
        if (this.bulkProcessor != null) {
            this.bulkProcessor.close();
            this.bulkProcessor = null;
        }
        if (this.client != null) {
            this.client.close();
            this.client = null;
        }
        this.callBridge.cleanup();
        this.checkErrorAndRethrow();
    }

    @VisibleForTesting
    protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
        Preconditions.checkNotNull((Object)listener);
        BulkProcessor.Builder bulkProcessorBuilder = this.callBridge.createBulkProcessorBuilder(this.client, listener);
        bulkProcessorBuilder.setConcurrentRequests(0);
        if (this.bulkProcessorFlushMaxActions != null) {
            bulkProcessorBuilder.setBulkActions(this.bulkProcessorFlushMaxActions);
        }
        if (this.bulkProcessorFlushMaxSizeMb != null) {
            bulkProcessorBuilder.setBulkSize(new ByteSizeValue(this.bulkProcessorFlushMaxSizeMb.intValue(), ByteSizeUnit.MB));
        }
        if (this.bulkProcessorFlushIntervalMillis != null) {
            bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(this.bulkProcessorFlushIntervalMillis));
        }
        this.callBridge.configureBulkProcessorBackoff(bulkProcessorBuilder, this.bulkProcessorFlushBackoffPolicy);
        return bulkProcessorBuilder.build();
    }

    private void checkErrorAndRethrow() {
        Throwable cause = this.failureThrowable.get();
        if (cause != null) {
            throw new RuntimeException("An error occurred in ElasticsearchSink.", cause);
        }
    }

    private void checkAsyncErrorsAndRequests() {
        this.checkErrorAndRethrow();
        this.failureRequestIndexer.processBufferedRequests(this.requestIndexer);
    }

    @VisibleForTesting
    long getNumPendingRequests() {
        if (this.flushOnCheckpoint) {
            return this.numPendingRequests.get();
        }
        throw new UnsupportedOperationException("The number of pending requests is not maintained when flushing on checkpoint is disabled.");
    }

    private class BulkProcessorListener
    implements BulkProcessor.Listener {
        private BulkProcessorListener() {
        }

        @Override
        public void beforeBulk(long executionId, BulkRequest request) {
        }

        @Override
        public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
            if (response.hasFailures()) {
                try {
                    for (int i = 0; i < response.getItems().length; ++i) {
                        BulkItemResponse itemResponse = response.getItems()[i];
                        Throwable failure = ElasticsearchSinkBase.this.callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
                        if (failure == null) continue;
                        LOG.error("Failed Elasticsearch item request: {}", (Object)itemResponse.getFailureMessage(), (Object)failure);
                        RestStatus restStatus = itemResponse.getFailure().getStatus();
                        DocWriteRequest<?> actionRequest = request.requests().get(i);
                        if (restStatus == null) {
                            if (actionRequest instanceof ActionRequest) {
                                ElasticsearchSinkBase.this.failureHandler.onFailure((ActionRequest)((Object)actionRequest), failure, -1, ElasticsearchSinkBase.this.failureRequestIndexer);
                                continue;
                            }
                            throw new UnsupportedOperationException("The sink currently only supports ActionRequests");
                        }
                        if (actionRequest instanceof ActionRequest) {
                            ElasticsearchSinkBase.this.failureHandler.onFailure((ActionRequest)((Object)actionRequest), failure, restStatus.getStatus(), ElasticsearchSinkBase.this.failureRequestIndexer);
                            continue;
                        }
                        throw new UnsupportedOperationException("The sink currently only supports ActionRequests");
                    }
                }
                catch (Throwable t) {
                    ElasticsearchSinkBase.this.failureThrowable.compareAndSet(null, t);
                }
            }
            if (ElasticsearchSinkBase.this.flushOnCheckpoint) {
                ElasticsearchSinkBase.this.numPendingRequests.getAndAdd(-request.numberOfActions());
            }
        }

        @Override
        public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
            LOG.error("Failed Elasticsearch bulk request: {}", (Object)failure.getMessage(), (Object)failure);
            try {
                for (DocWriteRequest<?> writeRequest : request.requests()) {
                    if (writeRequest instanceof ActionRequest) {
                        ElasticsearchSinkBase.this.failureHandler.onFailure((ActionRequest)((Object)writeRequest), failure, -1, ElasticsearchSinkBase.this.failureRequestIndexer);
                        continue;
                    }
                    throw new UnsupportedOperationException("The sink currently only supports ActionRequests");
                }
            }
            catch (Throwable t) {
                ElasticsearchSinkBase.this.failureThrowable.compareAndSet(null, t);
            }
            if (ElasticsearchSinkBase.this.flushOnCheckpoint) {
                ElasticsearchSinkBase.this.numPendingRequests.getAndAdd(-request.numberOfActions());
            }
        }
    }

    public static class BulkFlushBackoffPolicy
    implements Serializable {
        private static final long serialVersionUID = -6022851996101826049L;
        private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL;
        private int maxRetryCount = 8;
        private long delayMillis = 50L;

        public FlushBackoffType getBackoffType() {
            return this.backoffType;
        }

        public int getMaxRetryCount() {
            return this.maxRetryCount;
        }

        public long getDelayMillis() {
            return this.delayMillis;
        }

        public void setBackoffType(FlushBackoffType backoffType) {
            this.backoffType = (FlushBackoffType)((Object)Preconditions.checkNotNull((Object)((Object)backoffType)));
        }

        public void setMaxRetryCount(int maxRetryCount) {
            Preconditions.checkArgument((maxRetryCount >= 0 ? 1 : 0) != 0);
            this.maxRetryCount = maxRetryCount;
        }

        public void setDelayMillis(long delayMillis) {
            Preconditions.checkArgument((delayMillis >= 0L ? 1 : 0) != 0);
            this.delayMillis = delayMillis;
        }
    }

    @PublicEvolving
    public static enum FlushBackoffType {
        CONSTANT,
        EXPONENTIAL;

    }
}

