package org.apache.beam.sdk.io.solr;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.io.solr.AutoValue_SolrIO_ConnectionConfiguration;
import org.apache.beam.sdk.io.solr.AutoValue_SolrIO_Read;
import org.apache.beam.sdk.io.solr.AutoValue_SolrIO_RetryConfiguration;
import org.apache.beam.sdk.io.solr.AutoValue_SolrIO_Write;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.request.schema.SchemaRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(Experimental.Kind.SOURCE_SINK)
/* loaded from: input_file:org/apache/beam/sdk/io/solr/SolrIO.class */
public class SolrIO {
    private static final Logger LOG = LoggerFactory.getLogger(SolrIO.class);

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/solr/SolrIO$ConnectionConfiguration.class */
    public static abstract class ConnectionConfiguration implements Serializable {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/solr/SolrIO$ConnectionConfiguration$Builder.class */
        public static abstract class Builder {
            abstract Builder setZkHost(String str);

            abstract Builder setUsername(String str);

            abstract Builder setPassword(String str);

            abstract ConnectionConfiguration build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getZkHost();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract String getUsername();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract String getPassword();

        abstract Builder builder();

        public static ConnectionConfiguration create(String str) {
            Preconditions.checkArgument(str != null, "zkHost can not be null");
            return new AutoValue_SolrIO_ConnectionConfiguration.Builder().setZkHost(str).build();
        }

        public ConnectionConfiguration withBasicCredentials(String str, String str2) {
            Preconditions.checkArgument(str != null, "username can not be null");
            Preconditions.checkArgument(!str.isEmpty(), "username can not be empty");
            Preconditions.checkArgument(str2 != null, "password can not be null");
            Preconditions.checkArgument(!str2.isEmpty(), "password can not be empty");
            return builder().setUsername(str).setPassword(str2).build();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void populateDisplayData(DisplayData.Builder builder) {
            builder.add(DisplayData.item("zkHost", getZkHost()));
            builder.addIfNotNull(DisplayData.item("username", getUsername()));
        }

        private HttpClient createHttpClient() {
            ModifiableSolrParams modifiableSolrParams = new ModifiableSolrParams();
            modifiableSolrParams.set("httpBasicAuthUser", new String[]{getUsername()});
            modifiableSolrParams.set("httpBasicAuthPassword", new String[]{getPassword()});
            return HttpClientUtil.createClient(modifiableSolrParams);
        }

        AuthorizedSolrClient<CloudSolrClient> createClient() {
            return new AuthorizedSolrClient<>(new CloudSolrClient(getZkHost(), createHttpClient()), this);
        }

        AuthorizedSolrClient<HttpSolrClient> createClient(String str) {
            return new AuthorizedSolrClient<>(new HttpSolrClient(str, createHttpClient()), this);
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/solr/SolrIO$Read.class */
    public static abstract class Read extends PTransform<PBegin, PCollection<SolrDocument>> {
        private static final long MAX_BATCH_SIZE = 10000;

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/solr/SolrIO$Read$Builder.class */
        public static abstract class Builder {
            abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);

            abstract Builder setQuery(String str);

            abstract Builder setBatchSize(int i);

            abstract Builder setCollection(String str);

            abstract Read build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract ConnectionConfiguration getConnectionConfiguration();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract String getCollection();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getQuery();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract int getBatchSize();

        abstract Builder builder();

        public Read withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
            Preconditions.checkArgument(connectionConfiguration != null, "connectionConfiguration can not be null");
            return builder().setConnectionConfiguration(connectionConfiguration).build();
        }

        public Read from(String str) {
            Preconditions.checkArgument(str != null, "collection can not be null");
            return builder().setCollection(str).build();
        }

        public Read withQuery(String str) {
            Preconditions.checkArgument(str != null, "query can not be null");
            Preconditions.checkArgument(!str.isEmpty(), "query can not be empty");
            return builder().setQuery(str).build();
        }

        @VisibleForTesting
        Read withBatchSize(int i) {
            Preconditions.checkArgument(i > 0 && ((long) i) < MAX_BATCH_SIZE, "Valid values for batchSize are 1 (inclusize) to %s (exclusive), but was: %s ", MAX_BATCH_SIZE, i);
            return builder().setBatchSize(i).build();
        }

        public PCollection<SolrDocument> expand(PBegin pBegin) {
            Preconditions.checkArgument(getConnectionConfiguration() != null, "withConnectionConfiguration() is required");
            Preconditions.checkArgument(getCollection() != null, "from() is required");
            return pBegin.apply("Create", Create.of(this, new Read[0])).apply("Split", ParDo.of(new SplitFn())).apply("Reshuffle", Reshuffle.viaRandomKey()).apply("Read", ParDo.of(new ReadFn()));
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item("query", getQuery()));
            getConnectionConfiguration().populateDisplayData(builder);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/solr/SolrIO$ReadFn.class */
    public static class ReadFn extends DoFn<KV<Read, ReplicaInfo>, SolrDocument> {
        ReadFn() {
        }

        @DoFn.ProcessElement
        public void process(@DoFn.Element KV<Read, ReplicaInfo> kv, DoFn.OutputReceiver<SolrDocument> outputReceiver) throws IOException {
            Read read = (Read) kv.getKey();
            ReplicaInfo replicaInfo = (ReplicaInfo) kv.getValue();
            String str = "*";
            String query = read.getQuery();
            if (query == null) {
                query = "*:*";
            }
            SolrQuery solrQuery = new SolrQuery(query);
            solrQuery.setRows(Integer.valueOf(read.getBatchSize()));
            solrQuery.setDistrib(false);
            AuthorizedSolrClient<HttpSolrClient> createClient = read.getConnectionConfiguration().createClient(replicaInfo.baseUrl());
            Throwable th = null;
            try {
                try {
                    solrQuery.addSort(createClient.process(read.getCollection(), new SchemaRequest.UniqueKey()).getUniqueKey(), SolrQuery.ORDER.asc);
                    while (true) {
                        solrQuery.set("cursorMark", new String[]{str});
                        try {
                            QueryResponse query2 = createClient.query(replicaInfo.coreName(), solrQuery);
                            if (str.equals(query2.getNextCursorMark())) {
                                break;
                            }
                            str = query2.getNextCursorMark();
                            Iterator it = query2.getResults().iterator();
                            while (it.hasNext()) {
                                outputReceiver.output((SolrDocument) it.next());
                            }
                        } catch (SolrServerException e) {
                            throw new IOException((Throwable) e);
                        }
                    }
                    if (createClient != null) {
                        if (0 == 0) {
                            createClient.close();
                            return;
                        }
                        try {
                            createClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (SolrServerException e2) {
                    throw new IOException("Can not get unique key from solr", e2);
                }
            } catch (Throwable th3) {
                if (createClient != null) {
                    if (0 != 0) {
                        try {
                            createClient.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        createClient.close();
                    }
                }
                throw th3;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/solr/SolrIO$ReplicaInfo.class */
    public static abstract class ReplicaInfo implements Serializable {
        public abstract String coreName();

        public abstract String coreUrl();

        public abstract String baseUrl();

        static ReplicaInfo create(Replica replica) {
            return new AutoValue_SolrIO_ReplicaInfo(replica.getStr("core"), replica.getCoreUrl(), replica.getStr("base_url"));
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/solr/SolrIO$RetryConfiguration.class */
    public static abstract class RetryConfiguration implements Serializable {

        @VisibleForTesting
        static final RetryPredicate DEFAULT_RETRY_PREDICATE = new DefaultRetryPredicate();

        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/solr/SolrIO$RetryConfiguration$Builder.class */
        static abstract class Builder {
            abstract Builder setMaxAttempts(int i);

            abstract Builder setMaxDuration(Duration duration);

            abstract Builder setRetryPredicate(RetryPredicate retryPredicate);

            abstract RetryConfiguration build();
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/solr/SolrIO$RetryConfiguration$DefaultRetryPredicate.class */
        private static class DefaultRetryPredicate implements RetryPredicate {
            private static final ImmutableSet<Integer> ELIGIBLE_CODES = ImmutableSet.of(Integer.valueOf(SolrException.ErrorCode.CONFLICT.code), Integer.valueOf(SolrException.ErrorCode.SERVER_ERROR.code), Integer.valueOf(SolrException.ErrorCode.SERVICE_UNAVAILABLE.code), Integer.valueOf(SolrException.ErrorCode.INVALID_STATE.code), Integer.valueOf(SolrException.ErrorCode.UNKNOWN.code));

            private DefaultRetryPredicate() {
            }

            @Override // java.util.function.Predicate
            public boolean test(Throwable th) {
                return (th instanceof IOException) || (th instanceof SolrServerException) || ((th instanceof SolrException) && ELIGIBLE_CODES.contains(Integer.valueOf(((SolrException) th).code())));
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @FunctionalInterface
        /* loaded from: input_file:org/apache/beam/sdk/io/solr/SolrIO$RetryConfiguration$RetryPredicate.class */
        public interface RetryPredicate extends Predicate<Throwable>, Serializable {
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract int getMaxAttempts();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Duration getMaxDuration();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract RetryPredicate getRetryPredicate();

        abstract Builder builder();

        public static RetryConfiguration create(int i, Duration duration) {
            Preconditions.checkArgument(i > 0, "maxAttempts must be greater than 0");
            Preconditions.checkArgument(duration != null && duration.isLongerThan(Duration.ZERO), "maxDuration must be greater than 0");
            return new AutoValue_SolrIO_RetryConfiguration.Builder().setMaxAttempts(i).setMaxDuration(duration).setRetryPredicate(DEFAULT_RETRY_PREDICATE).build();
        }

        @VisibleForTesting
        RetryConfiguration withRetryPredicate(RetryPredicate retryPredicate) {
            Preconditions.checkArgument(retryPredicate != null, "predicate must be provided");
            return builder().setRetryPredicate(retryPredicate).build();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/solr/SolrIO$SplitFn.class */
    public static class SplitFn extends DoFn<Read, KV<Read, ReplicaInfo>> {
        SplitFn() {
        }

        @DoFn.ProcessElement
        public void process(@DoFn.Element Read read, DoFn.OutputReceiver<KV<Read, ReplicaInfo>> outputReceiver) throws IOException {
            AuthorizedSolrClient<CloudSolrClient> createClient = read.getConnectionConfiguration().createClient();
            Throwable th = null;
            try {
                try {
                    String collection = read.getCollection();
                    ClusterState clusterState = AuthorizedSolrClient.getClusterState(createClient);
                    for (Slice slice : clusterState.getCollection(collection).getSlices()) {
                        ArrayList arrayList = new ArrayList(slice.getReplicas());
                        Collections.shuffle(arrayList);
                        Replica replica = null;
                        Iterator it = arrayList.iterator();
                        while (true) {
                            if (!it.hasNext()) {
                                break;
                            }
                            Replica replica2 = (Replica) it.next();
                            if (replica2.getState() == Replica.State.ACTIVE && clusterState.getLiveNodes().contains(replica2.getNodeName())) {
                                replica = replica2;
                                break;
                            }
                        }
                        Preconditions.checkState(replica != null, "Can not found an active replica for slice %s", slice.getName());
                        outputReceiver.output(KV.of(read, ReplicaInfo.create((Replica) Preconditions.checkNotNull(replica))));
                    }
                    if (createClient != null) {
                        if (0 == 0) {
                            createClient.close();
                            return;
                        }
                        try {
                            createClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (createClient != null) {
                    if (th != null) {
                        try {
                            createClient.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        createClient.close();
                    }
                }
                throw th4;
            }
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/solr/SolrIO$Write.class */
    public static abstract class Write extends PTransform<PCollection<SolrInputDocument>, PDone> {

        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/solr/SolrIO$Write$Builder.class */
        static abstract class Builder {
            abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);

            abstract Builder setCollection(String str);

            abstract Builder setMaxBatchSize(int i);

            abstract Builder setRetryConfiguration(RetryConfiguration retryConfiguration);

            abstract Write build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @VisibleForTesting
        /* loaded from: input_file:org/apache/beam/sdk/io/solr/SolrIO$Write$WriteFn.class */
        public static class WriteFn extends DoFn<SolrInputDocument, Void> {

            @VisibleForTesting
            static final String RETRY_ATTEMPT_LOG = "Error writing to Solr. Retry attempt[%d]";
            private static final Duration RETRY_INITIAL_BACKOFF = Duration.standardSeconds(5);
            private transient FluentBackoff retryBackoff;
            private final Write spec;
            private transient AuthorizedSolrClient solrClient;
            private Collection<SolrInputDocument> batch;

            WriteFn(Write write) {
                this.spec = write;
            }

            @DoFn.Setup
            public void setup() throws Exception {
                this.solrClient = this.spec.getConnectionConfiguration().createClient();
                this.retryBackoff = FluentBackoff.DEFAULT.withMaxRetries(0).withInitialBackoff(RETRY_INITIAL_BACKOFF);
                if (this.spec.getRetryConfiguration() != null) {
                    this.retryBackoff = this.retryBackoff.withMaxRetries(this.spec.getRetryConfiguration().getMaxAttempts() - 1).withMaxCumulativeBackoff(this.spec.getRetryConfiguration().getMaxDuration());
                }
            }

            @DoFn.StartBundle
            public void startBundle(DoFn<SolrInputDocument, Void>.StartBundleContext startBundleContext) {
                this.batch = new ArrayList();
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<SolrInputDocument, Void>.ProcessContext processContext) throws Exception {
                this.batch.add((SolrInputDocument) processContext.element());
                if (this.batch.size() >= this.spec.getMaxBatchSize()) {
                    flushBatch();
                }
            }

            @DoFn.FinishBundle
            public void finishBundle(DoFn<SolrInputDocument, Void>.FinishBundleContext finishBundleContext) throws Exception {
                flushBatch();
            }

            private void flushBatch() throws IOException, InterruptedException {
                if (this.batch.isEmpty()) {
                    return;
                }
                try {
                    UpdateRequest updateRequest = new UpdateRequest();
                    updateRequest.add(this.batch);
                    Sleeper sleeper = Sleeper.DEFAULT;
                    BackOff backoff = this.retryBackoff.backoff();
                    int i = 0;
                    while (true) {
                        i++;
                        try {
                            this.solrClient.process(this.spec.getCollection(), updateRequest);
                            return;
                        } catch (Exception e) {
                            if (this.spec.getRetryConfiguration() == null || !this.spec.getRetryConfiguration().getRetryPredicate().test(e)) {
                                throw new IOException("Error writing to Solr (no attempt made to retry)", e);
                            }
                            if (!BackOffUtils.next(sleeper, backoff)) {
                                throw new IOException(String.format("Error writing to Solr after %d attempt(s). No more attempts allowed", Integer.valueOf(i)), e);
                            }
                            SolrIO.LOG.warn(String.format(RETRY_ATTEMPT_LOG, Integer.valueOf(i)), e);
                        }
                    }
                    throw new IOException("Error writing to Solr (no attempt made to retry)", e);
                } finally {
                    this.batch.clear();
                }
            }

            @DoFn.Teardown
            public void closeClient() throws IOException {
                if (this.solrClient != null) {
                    this.solrClient.close();
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract ConnectionConfiguration getConnectionConfiguration();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract String getCollection();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract int getMaxBatchSize();

        abstract Builder builder();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract RetryConfiguration getRetryConfiguration();

        public Write withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
            Preconditions.checkArgument(connectionConfiguration != null, "connectionConfiguration can not be null");
            return builder().setConnectionConfiguration(connectionConfiguration).build();
        }

        public Write to(String str) {
            Preconditions.checkArgument(str != null, "collection can not be null");
            return builder().setCollection(str).build();
        }

        public Write withMaxBatchSize(int i) {
            Preconditions.checkArgument(i > 0, "batchSize must be larger than 0, but was: %s", i);
            return builder().setMaxBatchSize(i).build();
        }

        public Write withRetryConfiguration(RetryConfiguration retryConfiguration) {
            Preconditions.checkArgument(retryConfiguration != null, "retryConfiguration is required");
            return builder().setRetryConfiguration(retryConfiguration).build();
        }

        public PDone expand(PCollection<SolrInputDocument> pCollection) {
            Preconditions.checkState(getConnectionConfiguration() != null, "withConnectionConfiguration() is required");
            Preconditions.checkState(getCollection() != null, "to() is required");
            pCollection.apply(ParDo.of(new WriteFn(this)));
            return PDone.in(pCollection.getPipeline());
        }
    }

    public static Read read() {
        return new AutoValue_SolrIO_Read.Builder().setBatchSize(1000).setQuery("*:*").build();
    }

    public static Write write() {
        return new AutoValue_SolrIO_Write.Builder().setMaxBatchSize(1000).build();
    }

    private SolrIO() {
    }
}
