package org.apache.beam.sdk.io.splunk;

import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.auto.value.AutoValue;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.repackaged.core.org.apache.commons.compress.utils.IOUtils;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.splunk.AutoValue_SplunkEventWriter;
import org.apache.beam.sdk.io.splunk.HttpEventPublisher;
import org.apache.beam.sdk.io.splunk.SplunkWriteError;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@AutoValue
/* loaded from: input_file:org/apache/beam/sdk/io/splunk/SplunkEventWriter.class */
public abstract class SplunkEventWriter extends DoFn<KV<Integer, SplunkEvent>, SplunkWriteError> {
    private static final long DEFAULT_FLUSH_DELAY = 2;
    private static final String BUFFER_STATE_NAME = "buffer";
    private static final String COUNT_STATE_NAME = "count";
    private static final String TIME_ID_NAME = "expiry";

    @DoFn.StateId(BUFFER_STATE_NAME)
    private final StateSpec<BagState<SplunkEvent>> buffer = StateSpecs.bag();

    @DoFn.StateId(COUNT_STATE_NAME)
    private final StateSpec<ValueState<Long>> count = StateSpecs.value();

    @DoFn.TimerId(TIME_ID_NAME)
    private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
    private Integer batchCount;
    private Boolean disableValidation;
    private HttpEventPublisher publisher;
    private Boolean enableBatchLogs;
    private Boolean enableGzipHttpCompression;
    private static final Integer DEFAULT_BATCH_COUNT = 1;
    private static final Boolean DEFAULT_DISABLE_CERTIFICATE_VALIDATION = false;
    private static final Boolean DEFAULT_ENABLE_BATCH_LOGS = true;
    private static final Boolean DEFAULT_ENABLE_GZIP_HTTP_COMPRESSION = true;
    private static final Logger LOG = LoggerFactory.getLogger(SplunkEventWriter.class);
    private static final Counter INPUT_COUNTER = Metrics.counter(SplunkEventWriter.class, "inbound-events");
    private static final Counter SUCCESS_WRITES = Metrics.counter(SplunkEventWriter.class, "outbound-successful-events");
    private static final Counter FAILED_WRITES = Metrics.counter(SplunkEventWriter.class, "outbound-failed-events");
    private static final Counter INVALID_REQUESTS = Metrics.counter(SplunkEventWriter.class, "http-invalid-requests");
    private static final Counter SERVER_ERROR_REQUESTS = Metrics.counter(SplunkEventWriter.class, "http-server-error-requests");
    private static final Counter VALID_REQUESTS = Metrics.counter(SplunkEventWriter.class, "http-valid-requests");
    private static final Distribution SUCCESSFUL_WRITE_LATENCY_MS = Metrics.distribution(SplunkEventWriter.class, "successful_write_to_splunk_latency_ms");
    private static final Distribution UNSUCCESSFUL_WRITE_LATENCY_MS = Metrics.distribution(SplunkEventWriter.class, "unsuccessful_write_to_splunk_latency_ms");
    private static final Distribution SUCCESSFUL_WRITE_BATCH_SIZE = Metrics.distribution(SplunkEventWriter.class, "write_to_splunk_batch");
    private static final Gson GSON = new GsonBuilder().setFieldNamingStrategy(field -> {
        return field.getName().toLowerCase();
    }).create();

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue.Builder
    /* loaded from: input_file:org/apache/beam/sdk/io/splunk/SplunkEventWriter$Builder.class */
    public static abstract class Builder {
        abstract Builder setUrl(ValueProvider<String> valueProvider);

        abstract ValueProvider<String> url();

        abstract Builder setToken(ValueProvider<String> valueProvider);

        abstract ValueProvider<String> token();

        abstract Builder setDisableCertificateValidation(ValueProvider<Boolean> valueProvider);

        abstract Builder setRootCaCertificatePath(ValueProvider<String> valueProvider);

        abstract Builder setEnableBatchLogs(ValueProvider<Boolean> valueProvider);

        abstract Builder setEnableGzipHttpCompression(ValueProvider<Boolean> valueProvider);

        abstract Builder setInputBatchCount(ValueProvider<Integer> valueProvider);

        abstract SplunkEventWriter autoBuild();

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withUrl(ValueProvider<String> valueProvider) {
            Preconditions.checkArgument(valueProvider != null, "withURL(url) called with null input.");
            return setUrl(valueProvider);
        }

        Builder withUrl(String str) {
            Preconditions.checkArgument(str != null, "withURL(url) called with null input.");
            return setUrl(ValueProvider.StaticValueProvider.of(str));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withToken(ValueProvider<String> valueProvider) {
            Preconditions.checkArgument(valueProvider != null, "withToken(token) called with null input.");
            return setToken(valueProvider);
        }

        Builder withToken(String str) {
            Preconditions.checkArgument(str != null, "withToken(token) called with null input.");
            return setToken(ValueProvider.StaticValueProvider.of(str));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withInputBatchCount(ValueProvider<Integer> valueProvider) {
            return setInputBatchCount(valueProvider);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withDisableCertificateValidation(ValueProvider<Boolean> valueProvider) {
            return setDisableCertificateValidation(valueProvider);
        }

        public Builder withRootCaCertificatePath(ValueProvider<String> valueProvider) {
            return setRootCaCertificatePath(valueProvider);
        }

        public Builder withEnableBatchLogs(ValueProvider<Boolean> valueProvider) {
            return setEnableBatchLogs(valueProvider);
        }

        public Builder withEnableGzipHttpCompression(ValueProvider<Boolean> valueProvider) {
            return setEnableGzipHttpCompression(valueProvider);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public SplunkEventWriter build() {
            Preconditions.checkNotNull(url(), "url needs to be provided.");
            Preconditions.checkNotNull(token(), "token needs to be provided.");
            return autoBuild();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Builder newBuilder() {
        return new AutoValue_SplunkEventWriter.Builder();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract ValueProvider<String> url();

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract ValueProvider<String> token();

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract ValueProvider<Boolean> disableCertificateValidation();

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract ValueProvider<Integer> inputBatchCount();

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract ValueProvider<String> rootCaCertificatePath();

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract ValueProvider<Boolean> enableBatchLogs();

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract ValueProvider<Boolean> enableGzipHttpCompression();

    @DoFn.Setup
    public void setup() {
        Preconditions.checkArgument(url().isAccessible(), "url is required for writing events.");
        Preconditions.checkArgument(token().isAccessible(), "Access token is required for writing events.");
        if (this.batchCount == null) {
            if (inputBatchCount() != null) {
                this.batchCount = (Integer) inputBatchCount().get();
            }
            this.batchCount = (Integer) MoreObjects.firstNonNull(this.batchCount, DEFAULT_BATCH_COUNT);
            LOG.info("Batch count set to: {}", this.batchCount);
        }
        if (this.disableValidation == null) {
            if (disableCertificateValidation() != null) {
                this.disableValidation = (Boolean) disableCertificateValidation().get();
            }
            this.disableValidation = (Boolean) MoreObjects.firstNonNull(this.disableValidation, DEFAULT_DISABLE_CERTIFICATE_VALIDATION);
            LOG.info("Disable certificate validation set to: {}", this.disableValidation);
        }
        if (this.enableBatchLogs == null) {
            if (enableBatchLogs() != null) {
                this.enableBatchLogs = (Boolean) enableBatchLogs().get();
            }
            this.enableBatchLogs = (Boolean) MoreObjects.firstNonNull(this.enableBatchLogs, DEFAULT_ENABLE_BATCH_LOGS);
            LOG.info("Enable Batch logs set to: {}", this.enableBatchLogs);
        }
        if (this.enableGzipHttpCompression == null) {
            if (enableGzipHttpCompression() != null) {
                this.enableGzipHttpCompression = (Boolean) enableGzipHttpCompression().get();
            }
            this.enableGzipHttpCompression = (Boolean) MoreObjects.firstNonNull(this.enableGzipHttpCompression, DEFAULT_ENABLE_GZIP_HTTP_COMPRESSION);
            LOG.info("Enable gzip http compression set to: {}", this.enableGzipHttpCompression);
        }
        try {
            HttpEventPublisher.Builder withEnableGzipHttpCompression = HttpEventPublisher.newBuilder().withUrl((String) url().get()).withToken((String) token().get()).withDisableCertificateValidation(this.disableValidation).withEnableGzipHttpCompression(this.enableGzipHttpCompression);
            if (rootCaCertificatePath() != null && rootCaCertificatePath().get() != null) {
                withEnableGzipHttpCompression.withRootCaCertificate(getCertFromGcsAsBytes((String) rootCaCertificatePath().get()));
            }
            this.publisher = withEnableGzipHttpCompression.build();
            LOG.info("Successfully created HttpEventPublisher");
        } catch (IOException | KeyManagementException | KeyStoreException | NoSuchAlgorithmException | CertificateException e) {
            LOG.error("Error creating HttpEventPublisher: {}", e.getMessage());
            throw new RuntimeException(e);
        }
    }

    @DoFn.ProcessElement
    public void processElement(@DoFn.Element KV<Integer, SplunkEvent> kv, DoFn.OutputReceiver<SplunkWriteError> outputReceiver, BoundedWindow boundedWindow, @DoFn.StateId("buffer") BagState<SplunkEvent> bagState, @DoFn.StateId("count") ValueState<Long> valueState, @DoFn.TimerId("expiry") Timer timer) throws IOException {
        Long l = (Long) MoreObjects.firstNonNull((Long) valueState.read(), 0L);
        SplunkEvent splunkEvent = (SplunkEvent) kv.getValue();
        INPUT_COUNTER.inc();
        bagState.add(splunkEvent);
        Long valueOf = Long.valueOf(l.longValue() + 1);
        valueState.write(valueOf);
        timer.offset(Duration.standardSeconds(DEFAULT_FLUSH_DELAY)).setRelative();
        if (valueOf.longValue() >= this.batchCount.intValue()) {
            if (this.enableBatchLogs.booleanValue()) {
                LOG.info("Flushing batch of {} events", valueOf);
            }
            flush(outputReceiver, bagState, valueState);
        }
    }

    @DoFn.OnTimer(TIME_ID_NAME)
    public void onExpiry(DoFn.OutputReceiver<SplunkWriteError> outputReceiver, @DoFn.StateId("buffer") BagState<SplunkEvent> bagState, @DoFn.StateId("count") ValueState<Long> valueState) throws IOException {
        if (((Long) MoreObjects.firstNonNull((Long) valueState.read(), 0L)).longValue() > 0) {
            if (this.enableBatchLogs.booleanValue()) {
                LOG.info("Flushing window with {} events", valueState.read());
            }
            flush(outputReceiver, bagState, valueState);
        }
    }

    @DoFn.Teardown
    public void tearDown() {
        if (this.publisher != null) {
            try {
                this.publisher.close();
                LOG.info("Successfully closed HttpEventPublisher");
            } catch (IOException e) {
                LOG.warn("Received exception while closing HttpEventPublisher: {}", e.getMessage());
            }
        }
    }

    private void flush(DoFn.OutputReceiver<SplunkWriteError> outputReceiver, BagState<SplunkEvent> bagState, ValueState<Long> valueState) throws IOException {
        if (((Boolean) bagState.isEmpty().read()).booleanValue()) {
            return;
        }
        HttpResponse httpResponse = null;
        ArrayList newArrayList = Lists.newArrayList(bagState.read());
        long nanoTime = System.nanoTime();
        try {
            try {
                try {
                    HttpResponse execute = this.publisher.execute(newArrayList);
                    if (execute.isSuccessStatusCode()) {
                        SUCCESSFUL_WRITE_LATENCY_MS.update(Instant.now().toEpochMilli() - nanoTime);
                        SUCCESS_WRITES.inc(((Long) valueState.read()).longValue());
                        VALID_REQUESTS.inc();
                        SUCCESSFUL_WRITE_BATCH_SIZE.update(((Long) valueState.read()).longValue());
                        if (this.enableBatchLogs.booleanValue()) {
                            LOG.info("Successfully wrote {} events", valueState.read());
                        }
                    } else {
                        UNSUCCESSFUL_WRITE_LATENCY_MS.update(System.nanoTime() - nanoTime);
                        FAILED_WRITES.inc(((Long) valueState.read()).longValue());
                        int statusCode = execute.getStatusCode();
                        if (statusCode >= 400 && statusCode < 500) {
                            INVALID_REQUESTS.inc();
                        } else if (statusCode >= 500 && statusCode < 600) {
                            SERVER_ERROR_REQUESTS.inc();
                        }
                        logWriteFailures(valueState, execute.getStatusCode(), execute.parseAsString(), execute.getStatusMessage());
                        flushWriteFailures(newArrayList, execute.getStatusMessage(), Integer.valueOf(execute.getStatusCode()), outputReceiver);
                    }
                    bagState.clear();
                    valueState.clear();
                    if (execute != null) {
                        execute.disconnect();
                    }
                } catch (IOException e) {
                    LOG.error("Error writing to Splunk: {}", e.getMessage());
                    UNSUCCESSFUL_WRITE_LATENCY_MS.update(System.nanoTime() - nanoTime);
                    FAILED_WRITES.inc(((Long) valueState.read()).longValue());
                    INVALID_REQUESTS.inc();
                    logWriteFailures(valueState, 0, e.getMessage(), null);
                    flushWriteFailures(newArrayList, e.getMessage(), null, outputReceiver);
                    bagState.clear();
                    valueState.clear();
                    if (0 != 0) {
                        httpResponse.disconnect();
                    }
                }
            } catch (HttpResponseException e2) {
                LOG.error("Error writing to Splunk. StatusCode: {}, content: {}, StatusMessage: {}", new Object[]{Integer.valueOf(e2.getStatusCode()), e2.getContent(), e2.getStatusMessage()});
                UNSUCCESSFUL_WRITE_LATENCY_MS.update(System.nanoTime() - nanoTime);
                FAILED_WRITES.inc(((Long) valueState.read()).longValue());
                int statusCode2 = e2.getStatusCode();
                if (statusCode2 >= 400 && statusCode2 < 500) {
                    INVALID_REQUESTS.inc();
                } else if (statusCode2 >= 500 && statusCode2 < 600) {
                    SERVER_ERROR_REQUESTS.inc();
                }
                logWriteFailures(valueState, e2.getStatusCode(), e2.getContent(), e2.getStatusMessage());
                flushWriteFailures(newArrayList, e2.getStatusMessage(), Integer.valueOf(e2.getStatusCode()), outputReceiver);
                bagState.clear();
                valueState.clear();
                if (0 != 0) {
                    httpResponse.disconnect();
                }
            }
        } catch (Throwable th) {
            bagState.clear();
            valueState.clear();
            if (0 != 0) {
                httpResponse.disconnect();
            }
            throw th;
        }
    }

    private void logWriteFailures(@DoFn.StateId("count") ValueState<Long> valueState, int i, String str, String str2) {
        if (this.enableBatchLogs.booleanValue()) {
            LOG.error("Failed to write {} events", valueState.read());
        }
        LOG.error("Error writing to Splunk. StatusCode: {}, content: {}, StatusMessage: {}", new Object[]{Integer.valueOf(i), str, str2});
    }

    private static void flushWriteFailures(List<SplunkEvent> list, String str, Integer num, DoFn.OutputReceiver<SplunkWriteError> outputReceiver) {
        Preconditions.checkNotNull(list, "events cannot be null.");
        SplunkWriteError.Builder newBuilder = SplunkWriteError.newBuilder();
        if (str != null) {
            newBuilder.withStatusMessage(str);
        }
        if (num != null) {
            newBuilder.withStatusCode(num);
        }
        Iterator<SplunkEvent> it = list.iterator();
        while (it.hasNext()) {
            outputReceiver.output(newBuilder.withPayload(GSON.toJson(it.next())).create());
        }
    }

    public static byte[] getCertFromGcsAsBytes(String str) throws IOException {
        try {
            InputStream newInputStream = Channels.newInputStream(FileSystems.open(FileSystems.matchSingleFileSpec(str).resourceId()));
            Throwable th = null;
            try {
                try {
                    byte[] byteArray = IOUtils.toByteArray(newInputStream);
                    if (newInputStream != null) {
                        if (0 != 0) {
                            try {
                                newInputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            newInputStream.close();
                        }
                    }
                    return byteArray;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException("Error when reading: " + str, e);
        }
    }
}
