package org.apache.beam.sdk.io.elasticsearch;

import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ProcessFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.class */
public class ElasticsearchIO {

    /* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO$Write.class */
    public static class Write {

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO$Write$AssignToShard.class */
        public static class AssignToShard<T> extends DoFn<T, KV<Long, T>> {
            private final long numOfShard;

            public AssignToShard(long j) {
                this.numOfShard = j;
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<T, KV<Long, T>>.ProcessContext processContext) throws Exception {
                processContext.output(KV.of(Long.valueOf(ThreadLocalRandom.current().nextLong(this.numOfShard)), processContext.element()));
            }
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO$Write$Bound.class */
        public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
            private static final int CHUNK_SIZE = 3000;
            private static final int DEFAULT_RETRIES = 3;
            private static final Duration DEFAULT_RETRY_PAUSE = Duration.millis(35000);
            private final String clusterName;
            private final InetSocketAddress[] servers;
            private final Duration flushInterval;
            private final SerializableFunction<T, Iterable<ActionRequest<?>>> toActionRequests;
            private final long numOfShard;
            private final int maxBulkRequestSize;
            private final int maxRetries;
            private final Duration retryPause;
            private final ThrowingConsumer<BulkExecutionException> error;

            private Bound(String str, InetSocketAddress[] inetSocketAddressArr, Duration duration, SerializableFunction<T, Iterable<ActionRequest<?>>> serializableFunction, long j, int i, int i2, Duration duration2, ThrowingConsumer<BulkExecutionException> throwingConsumer) {
                this.clusterName = str;
                this.servers = inetSocketAddressArr;
                this.flushInterval = duration;
                this.toActionRequests = serializableFunction;
                this.numOfShard = j;
                this.maxBulkRequestSize = i;
                this.maxRetries = i2;
                this.retryPause = duration2;
                this.error = throwingConsumer;
            }

            Bound() {
                this(null, null, null, null, 0L, CHUNK_SIZE, DEFAULT_RETRIES, DEFAULT_RETRY_PAUSE, Write.access$000());
            }

            public Bound<T> withClusterName(String str) {
                return new Bound<>(str, this.servers, this.flushInterval, this.toActionRequests, this.numOfShard, this.maxBulkRequestSize, this.maxRetries, this.retryPause, this.error);
            }

            public Bound<T> withServers(InetSocketAddress[] inetSocketAddressArr) {
                return new Bound<>(this.clusterName, inetSocketAddressArr, this.flushInterval, this.toActionRequests, this.numOfShard, this.maxBulkRequestSize, this.maxRetries, this.retryPause, this.error);
            }

            public Bound<T> withFlushInterval(Duration duration) {
                return new Bound<>(this.clusterName, this.servers, duration, this.toActionRequests, this.numOfShard, this.maxBulkRequestSize, this.maxRetries, this.retryPause, this.error);
            }

            public Bound<T> withFunction(SerializableFunction<T, Iterable<ActionRequest<?>>> serializableFunction) {
                return new Bound<>(this.clusterName, this.servers, this.flushInterval, serializableFunction, this.numOfShard, this.maxBulkRequestSize, this.maxRetries, this.retryPause, this.error);
            }

            public Bound<T> withNumOfShard(long j) {
                return new Bound<>(this.clusterName, this.servers, this.flushInterval, this.toActionRequests, j, this.maxBulkRequestSize, this.maxRetries, this.retryPause, this.error);
            }

            public Bound<T> withError(ThrowingConsumer<BulkExecutionException> throwingConsumer) {
                return new Bound<>(this.clusterName, this.servers, this.flushInterval, this.toActionRequests, this.numOfShard, this.maxBulkRequestSize, this.maxRetries, this.retryPause, throwingConsumer);
            }

            public Bound<T> withMaxBulkRequestSize(int i) {
                return new Bound<>(this.clusterName, this.servers, this.flushInterval, this.toActionRequests, this.numOfShard, i, this.maxRetries, this.retryPause, this.error);
            }

            public Bound<T> withMaxRetries(int i) {
                return new Bound<>(this.clusterName, this.servers, this.flushInterval, this.toActionRequests, this.numOfShard, this.maxBulkRequestSize, i, this.retryPause, this.error);
            }

            public Bound<T> withRetryPause(Duration duration) {
                return new Bound<>(this.clusterName, this.servers, this.flushInterval, this.toActionRequests, this.numOfShard, this.maxBulkRequestSize, this.maxRetries, duration, this.error);
            }

            public PDone expand(PCollection<T> pCollection) {
                Preconditions.checkNotNull(this.clusterName);
                Preconditions.checkNotNull(this.servers);
                Preconditions.checkNotNull(this.toActionRequests);
                Preconditions.checkNotNull(this.flushInterval);
                Preconditions.checkArgument(this.numOfShard > 0);
                Preconditions.checkArgument(this.maxBulkRequestSize > 0);
                Preconditions.checkArgument(this.maxRetries >= 0);
                Preconditions.checkArgument(this.retryPause.getMillis() >= 0);
                pCollection.apply("Assign To Shard", ParDo.of(new AssignToShard(this.numOfShard))).apply("Re-Window to Global Window", Window.into(new GlobalWindows()).triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(this.flushInterval))).discardingFiredPanes().withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)).apply(GroupByKey.create()).apply("Write to Elasticesearch", ParDo.of(new ElasticsearchWriter(this.clusterName, this.servers, this.maxBulkRequestSize, this.toActionRequests, this.error, this.maxRetries, this.retryPause)));
                return PDone.in(pCollection.getPipeline());
            }
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO$Write$BulkExecutionException.class */
        public static class BulkExecutionException extends IOException {
            private final Iterable<Throwable> failures;

            BulkExecutionException(BulkResponse bulkResponse) {
                super(bulkResponse.buildFailureMessage());
                this.failures = (Iterable) Arrays.stream(bulkResponse.getItems()).map((v0) -> {
                    return v0.getFailure();
                }).filter((v0) -> {
                    return Objects.nonNull(v0);
                }).map((v0) -> {
                    return v0.getCause();
                }).collect(Collectors.toList());
            }

            public Iterable<Throwable> getFailures() {
                return this.failures;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO$Write$ClientSupplier.class */
        public static class ClientSupplier implements Supplier<Client>, Serializable {
            private final AtomicReference<Client> CLIENT = new AtomicReference<>();
            private final String clusterName;
            private final InetSocketAddress[] addresses;

            public ClientSupplier(String str, InetSocketAddress[] inetSocketAddressArr) {
                this.clusterName = str;
                this.addresses = inetSocketAddressArr;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.function.Supplier
            public Client get() {
                if (this.CLIENT.get() == null) {
                    synchronized (this.CLIENT) {
                        if (this.CLIENT.get() == null) {
                            this.CLIENT.set(create(this.clusterName, this.addresses));
                        }
                    }
                }
                return this.CLIENT.get();
            }

            private TransportClient create(String str, InetSocketAddress[] inetSocketAddressArr) {
                Settings build = Settings.settingsBuilder().put("cluster.name", str).build();
                return TransportClient.builder().settings(build).build().addTransportAddresses((InetSocketTransportAddress[]) Arrays.stream(inetSocketAddressArr).map(InetSocketTransportAddress::new).toArray(i -> {
                    return new InetSocketTransportAddress[i];
                }));
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO$Write$ElasticsearchWriter.class */
        public static class ElasticsearchWriter<T> extends DoFn<KV<Long, Iterable<T>>, Void> {
            private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchWriter.class);
            private static final String RETRY_ATTEMPT_LOG = "Error writing to Elasticsearch. Retry attempt[%d]";
            private static final String RETRY_FAILED_LOG = "Error writing to ES after %d attempt(s). No more attempts allowed";
            private final ClientSupplier clientSupplier;
            private final SerializableFunction<T, Iterable<ActionRequest<?>>> toActionRequests;
            private final ThrowingConsumer<BulkExecutionException> error;
            private FluentBackoff backoffConfig;
            private final int maxBulkRequestSize;
            private final int maxRetries;
            private final Duration retryPause;

            public ElasticsearchWriter(String str, InetSocketAddress[] inetSocketAddressArr, int i, SerializableFunction<T, Iterable<ActionRequest<?>>> serializableFunction, ThrowingConsumer<BulkExecutionException> throwingConsumer, int i2, Duration duration) {
                this.maxBulkRequestSize = i;
                this.clientSupplier = new ClientSupplier(str, inetSocketAddressArr);
                this.toActionRequests = serializableFunction;
                this.error = throwingConsumer;
                this.maxRetries = i2;
                this.retryPause = duration;
            }

            @DoFn.Setup
            public void setup() throws Exception {
                this.backoffConfig = FluentBackoff.DEFAULT.withMaxRetries(this.maxRetries).withInitialBackoff(this.retryPause);
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<Long, Iterable<T>>, Void>.ProcessContext processContext) throws Exception {
                Iterable iterable = (Iterable) ((KV) processContext.element()).getValue();
                if (!iterable.iterator().hasNext()) {
                    LOG.info("ElasticsearchWriter: no requests to send");
                    return;
                }
                Stream stream = StreamSupport.stream(iterable.spliterator(), false);
                SerializableFunction<T, Iterable<ActionRequest<?>>> serializableFunction = this.toActionRequests;
                serializableFunction.getClass();
                Stream flatMap = stream.map(serializableFunction::apply).flatMap(iterable2 -> {
                    return StreamSupport.stream(iterable2.spliterator(), false);
                });
                flatMap.getClass();
                Iterable<List> partition = Iterables.partition(flatMap::iterator, this.maxBulkRequestSize);
                ProcessFunction<List<ActionRequest>, BulkResponse> request = request(this.clientSupplier, this.error);
                ProcessFunction<List<ActionRequest>, BulkResponse> retry = retry(request, this.backoffConfig);
                for (List list : partition) {
                    try {
                        request.apply(list);
                    } catch (Exception e) {
                        retry.apply(list);
                    }
                }
            }

            private static ProcessFunction<List<ActionRequest>, BulkResponse> request(ClientSupplier clientSupplier, ThrowingConsumer<BulkExecutionException> throwingConsumer) {
                return list -> {
                    BulkResponse bulkResponse = (BulkResponse) clientSupplier.get().bulk(new BulkRequest().add(list)).get();
                    if (bulkResponse.hasFailures()) {
                        throwingConsumer.accept(new BulkExecutionException(bulkResponse));
                    }
                    return bulkResponse;
                };
            }

            private static ProcessFunction<List<ActionRequest>, BulkResponse> retry(ProcessFunction<List<ActionRequest>, BulkResponse> processFunction, FluentBackoff fluentBackoff) {
                return list -> {
                    BackOff backoff = fluentBackoff.backoff();
                    int i = 0;
                    BulkResponse bulkResponse = null;
                    Exception exc = null;
                    while (bulkResponse == null && BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
                        i++;
                        LOG.warn(String.format(RETRY_ATTEMPT_LOG, Integer.valueOf(i)));
                        try {
                            bulkResponse = (BulkResponse) processFunction.apply(list);
                            exc = null;
                        } catch (Exception e) {
                            exc = e;
                        }
                    }
                    if (exc != null) {
                        throw new Exception(String.format(RETRY_FAILED_LOG, Integer.valueOf(i)), exc);
                    }
                    return bulkResponse;
                };
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case 1095643111:
                        if (implMethodName.equals("lambda$request$4e15d502$1")) {
                            z = true;
                            break;
                        }
                        break;
                    case 1182885558:
                        if (implMethodName.equals("lambda$retry$7f860212$1")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/ProcessFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO$Write$ElasticsearchWriter") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/util/FluentBackoff;Lorg/apache/beam/sdk/transforms/ProcessFunction;Ljava/util/List;)Lorg/elasticsearch/action/bulk/BulkResponse;")) {
                            FluentBackoff fluentBackoff = (FluentBackoff) serializedLambda.getCapturedArg(0);
                            ProcessFunction processFunction = (ProcessFunction) serializedLambda.getCapturedArg(1);
                            return list -> {
                                BackOff backoff = fluentBackoff.backoff();
                                int i = 0;
                                BulkResponse bulkResponse = null;
                                Exception exc = null;
                                while (bulkResponse == null && BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
                                    i++;
                                    LOG.warn(String.format(RETRY_ATTEMPT_LOG, Integer.valueOf(i)));
                                    try {
                                        bulkResponse = (BulkResponse) processFunction.apply(list);
                                        exc = null;
                                    } catch (Exception e) {
                                        exc = e;
                                    }
                                }
                                if (exc != null) {
                                    throw new Exception(String.format(RETRY_FAILED_LOG, Integer.valueOf(i)), exc);
                                }
                                return bulkResponse;
                            };
                        }
                        break;
                    case true:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/ProcessFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO$Write$ElasticsearchWriter") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/elasticsearch/ElasticsearchIO$Write$ClientSupplier;Lorg/apache/beam/sdk/io/elasticsearch/ThrowingConsumer;Ljava/util/List;)Lorg/elasticsearch/action/bulk/BulkResponse;")) {
                            ClientSupplier clientSupplier = (ClientSupplier) serializedLambda.getCapturedArg(0);
                            ThrowingConsumer throwingConsumer = (ThrowingConsumer) serializedLambda.getCapturedArg(1);
                            return list2 -> {
                                BulkResponse bulkResponse = (BulkResponse) clientSupplier.get().bulk(new BulkRequest().add(list2)).get();
                                if (bulkResponse.hasFailures()) {
                                    throwingConsumer.accept(new BulkExecutionException(bulkResponse));
                                }
                                return bulkResponse;
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        }

        public static <T> Bound<T> withClusterName(String str) {
            return new Bound().withClusterName(str);
        }

        public static <T> Bound<T> withServers(InetSocketAddress[] inetSocketAddressArr) {
            return new Bound().withServers(inetSocketAddressArr);
        }

        public static <T> Bound withFlushInterval(Duration duration) {
            return new Bound().withFlushInterval(duration);
        }

        public static <T> Bound withFunction(SerializableFunction<T, Iterable<ActionRequest<?>>> serializableFunction) {
            return new Bound().withFunction(serializableFunction);
        }

        public static <T> Bound withNumOfShard(long j) {
            return new Bound().withNumOfShard(j);
        }

        public static <T> Bound withError(ThrowingConsumer<BulkExecutionException> throwingConsumer) {
            return new Bound().withError(throwingConsumer);
        }

        public static <T> Bound withMaxBulkRequestSize(int i) {
            return new Bound().withMaxBulkRequestSize(i);
        }

        public static <T> Bound withMaxRetries(int i) {
            return new Bound().withMaxRetries(i);
        }

        public static <T> Bound withRetryPause(Duration duration) {
            return new Bound().withRetryPause(duration);
        }

        private static ThrowingConsumer<BulkExecutionException> defaultErrorHandler() {
            return bulkExecutionException -> {
                throw bulkExecutionException;
            };
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case 735885725:
                    if (implMethodName.equals("lambda$defaultErrorHandler$520fb24c$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/elasticsearch/ThrowingConsumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO$Write") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/elasticsearch/ElasticsearchIO$Write$BulkExecutionException;)V")) {
                        return bulkExecutionException -> {
                            throw bulkExecutionException;
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }

        static /* synthetic */ ThrowingConsumer access$000() {
            return defaultErrorHandler();
        }
    }
}
