package org.apache.beam.sdk.io.solace;

import com.google.auto.value.AutoValue;
import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.DeliveryMode;
import com.solacesystems.jcsmp.Destination;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.Queue;
import com.solacesystems.jcsmp.Topic;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.solace.AutoValue_SolaceIO_Read_Configuration;
import org.apache.beam.sdk.io.solace.AutoValue_SolaceIO_Write;
import org.apache.beam.sdk.io.solace.broker.SempClientFactory;
import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;
import org.apache.beam.sdk.io.solace.data.Solace;
import org.apache.beam.sdk.io.solace.read.UnboundedSolaceSource;
import org.apache.beam.sdk.io.solace.write.SolaceOutput;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/beam/sdk/io/solace/SolaceIO.class */
public class SolaceIO {
    private static final boolean DEFAULT_DEDUPLICATE_RECORDS = false;
    public static final int DEFAULT_WRITER_MAX_NUMBER_OF_WORKERS = 20;
    public static final int DEFAULT_WRITER_CLIENTS_PER_WORKER = 4;
    public static final SerializableFunction<Solace.Record, Instant> SENDER_TIMESTAMP_FUNCTION = record -> {
        Long senderTimestamp = record != null ? record.getSenderTimestamp() : null;
        return senderTimestamp != null ? Instant.ofEpochMilli(senderTimestamp.longValue()) : Instant.now();
    };
    private static final Duration DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD = Duration.standardSeconds(30);
    public static final Boolean DEFAULT_WRITER_PUBLISH_LATENCY_METRICS = false;
    public static final SubmissionMode DEFAULT_WRITER_SUBMISSION_MODE = SubmissionMode.HIGHER_THROUGHPUT;
    public static final DeliveryMode DEFAULT_WRITER_DELIVERY_MODE = DeliveryMode.DIRECT;
    public static final WriterType DEFAULT_WRITER_TYPE = WriterType.BATCHED;

    /* loaded from: input_file:org/apache/beam/sdk/io/solace/SolaceIO$Read.class */
    public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
        private static final Logger LOG = LoggerFactory.getLogger(Read.class);

        @VisibleForTesting
        final Configuration.Builder<T> configurationBuilder;

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue
        /* loaded from: input_file:org/apache/beam/sdk/io/solace/SolaceIO$Read$Configuration.class */
        public static abstract class Configuration<T> {

            @AutoValue.Builder
            /* loaded from: input_file:org/apache/beam/sdk/io/solace/SolaceIO$Read$Configuration$Builder.class */
            public static abstract class Builder<T> {
                abstract Builder<T> setQueue(Queue queue);

                abstract Builder<T> setTopic(Topic topic);

                abstract Builder<T> setTimestampFn(SerializableFunction<T, Instant> serializableFunction);

                abstract Builder<T> setMaxNumConnections(Integer num);

                abstract Builder<T> setDeduplicateRecords(boolean z);

                abstract Builder<T> setParseFn(SerializableFunction<BytesXMLMessage, T> serializableFunction);

                abstract Builder<T> setSempClientFactory(SempClientFactory sempClientFactory);

                abstract Builder<T> setSessionServiceFactory(SessionServiceFactory sessionServiceFactory);

                abstract Builder<T> setTypeDescriptor(TypeDescriptor<T> typeDescriptor);

                abstract Builder<T> setWatermarkIdleDurationThreshold(Duration duration);

                abstract Configuration<T> build();
            }

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract Queue getQueue();

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract Topic getTopic();

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract SerializableFunction<T, Instant> getTimestampFn();

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract Integer getMaxNumConnections();

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract boolean getDeduplicateRecords();

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract SerializableFunction<BytesXMLMessage, T> getParseFn();

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract SempClientFactory getSempClientFactory();

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract SessionServiceFactory getSessionServiceFactory();

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract TypeDescriptor<T> getTypeDescriptor();

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract Duration getWatermarkIdleDurationThreshold();

            public static <T> Builder<T> builder() {
                return new AutoValue_SolaceIO_Read_Configuration.Builder();
            }
        }

        private Read(Configuration.Builder<T> builder) {
            this.configurationBuilder = builder;
        }

        public Read<T> from(Solace.Queue queue) {
            this.configurationBuilder.setQueue(SolaceIO.queueFromName(queue.getName()));
            return this;
        }

        public Read<T> from(Solace.Topic topic) {
            this.configurationBuilder.setTopic(SolaceIO.topicFromName(topic.getName()));
            return this;
        }

        public Read<T> withTimestampFn(SerializableFunction<T, Instant> serializableFunction) {
            Preconditions.checkState(serializableFunction != null, "SolaceIO.Read: timestampFn must not be null. This function must be set or use the no-argument `Read.read()` method");
            this.configurationBuilder.setTimestampFn(serializableFunction);
            return this;
        }

        public Read<T> withMaxNumConnections(Integer num) {
            this.configurationBuilder.setMaxNumConnections(num);
            return this;
        }

        public Read<T> withWatermarkIdleDurationThreshold(Duration duration) {
            this.configurationBuilder.setWatermarkIdleDurationThreshold(duration);
            return this;
        }

        public Read<T> withDeduplicateRecords(boolean z) {
            this.configurationBuilder.setDeduplicateRecords(z);
            return this;
        }

        public Read<T> withSempClientFactory(SempClientFactory sempClientFactory) {
            Preconditions.checkState(sempClientFactory != null, "SolaceIO.Read: sempClientFactory must not be null.");
            this.configurationBuilder.setSempClientFactory(sempClientFactory);
            return this;
        }

        public Read<T> withSessionServiceFactory(SessionServiceFactory sessionServiceFactory) {
            Preconditions.checkState(sessionServiceFactory != null, "SolaceIO.Read: sessionServiceFactory must not be null.");
            this.configurationBuilder.setSessionServiceFactory(sessionServiceFactory);
            return this;
        }

        public void validate(PipelineOptions pipelineOptions) {
            Configuration<T> build = this.configurationBuilder.build();
            Preconditions.checkState((build.getQueue() == null) ^ (build.getTopic() == null), "SolaceIO.Read: One of the Solace {Queue, Topic} must be set.");
        }

        public PCollection<T> expand(PBegin pBegin) {
            Configuration<T> build = this.configurationBuilder.build();
            SempClientFactory sempClientFactory = build.getSempClientFactory();
            Queue initializeQueueForTopicIfNeeded = initializeQueueForTopicIfNeeded(build.getQueue(), build.getTopic(), pBegin.getPipeline().getOptions().getJobName(), sempClientFactory);
            SessionServiceFactory sessionServiceFactory = build.getSessionServiceFactory();
            sessionServiceFactory.setQueue(initializeQueueForTopicIfNeeded);
            return pBegin.apply(org.apache.beam.sdk.io.Read.from(new UnboundedSolaceSource(initializeQueueForTopicIfNeeded, sempClientFactory, sessionServiceFactory, build.getMaxNumConnections(), build.getDeduplicateRecords(), inferCoder(pBegin.getPipeline(), build.getTypeDescriptor()), build.getTimestampFn(), build.getWatermarkIdleDurationThreshold(), build.getParseFn())));
        }

        @VisibleForTesting
        Coder<T> inferCoder(Pipeline pipeline, TypeDescriptor<T> typeDescriptor) {
            Coder<T> fromCoderRegistry = getFromCoderRegistry(pipeline, typeDescriptor);
            if (fromCoderRegistry != null) {
                return fromCoderRegistry;
            }
            Coder<T> fromSchemaRegistry = getFromSchemaRegistry(pipeline, typeDescriptor);
            if (fromSchemaRegistry != null) {
                return fromSchemaRegistry;
            }
            throw new RuntimeException("SolaceIO.Read: Cannot infer a coder for the TypeDescriptor. Annotate your output class with @DefaultSchema annotation or create a coder manually and register it in the CoderRegistry.");
        }

        private Coder<T> getFromSchemaRegistry(Pipeline pipeline, TypeDescriptor<T> typeDescriptor) {
            try {
                return pipeline.getSchemaRegistry().getSchemaCoder(typeDescriptor);
            } catch (NoSuchSchemaException e) {
                return null;
            }
        }

        private Coder<T> getFromCoderRegistry(Pipeline pipeline, TypeDescriptor<T> typeDescriptor) {
            try {
                return pipeline.getCoderRegistry().getCoder(typeDescriptor);
            } catch (CannotProvideCoderException e) {
                return null;
            }
        }

        private Queue initializeQueueForTopicIfNeeded(Queue queue, Topic topic, String str, SempClientFactory sempClientFactory) {
            if (queue != null) {
                return queue;
            }
            try {
                Queue createQueueForTopic = sempClientFactory.create().createQueueForTopic(String.format("queue-%s-%s", topic, str), ((Topic) Preconditions.checkNotNull(topic)).getName());
                LOG.warn("SolaceIO.Read: A new queue {} was created. The Queue will not be deleted when this job finishes. Make sure to remove it yourself when not needed.", createQueueForTopic.getName());
                return createQueueForTopic;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/solace/SolaceIO$SubmissionMode.class */
    public enum SubmissionMode {
        HIGHER_THROUGHPUT,
        LOWER_LATENCY
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/solace/SolaceIO$Write.class */
    public static abstract class Write<T> extends PTransform<PCollection<T>, SolaceOutput> {
        public static final TupleTag<Solace.PublishResult> FAILED_PUBLISH_TAG = new TupleTag<Solace.PublishResult>() { // from class: org.apache.beam.sdk.io.solace.SolaceIO.Write.1
        };
        public static final TupleTag<Solace.PublishResult> SUCCESSFUL_PUBLISH_TAG = new TupleTag<Solace.PublishResult>() { // from class: org.apache.beam.sdk.io.solace.SolaceIO.Write.2
        };

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/solace/SolaceIO$Write$Builder.class */
        public static abstract class Builder<T> {
            abstract Builder<T> setMaxNumOfUsedWorkers(int i);

            abstract Builder<T> setNumberOfClientsPerWorker(int i);

            abstract Builder<T> setDestination(Destination destination);

            abstract Builder<T> setDeliveryMode(DeliveryMode deliveryMode);

            abstract Builder<T> setPublishLatencyMetrics(Boolean bool);

            abstract Builder<T> setDispatchMode(SubmissionMode submissionMode);

            abstract Builder<T> setWriterType(WriterType writerType);

            abstract Builder<T> setFormatFunction(SerializableFunction<T, Solace.Record> serializableFunction);

            abstract Builder<T> setSessionServiceFactory(SessionServiceFactory sessionServiceFactory);

            abstract Write<T> build();
        }

        public Write<T> to(Solace.Topic topic) {
            return toBuilder().setDestination(SolaceIO.topicFromName(topic.getName())).build();
        }

        public Write<T> to(Solace.Queue queue) {
            return toBuilder().setDestination(SolaceIO.queueFromName(queue.getName())).build();
        }

        public Write<T> withMaxNumOfUsedWorkers(int i) {
            return toBuilder().setMaxNumOfUsedWorkers(i).build();
        }

        public Write<T> withNumberOfClientsPerWorker(int i) {
            return toBuilder().setNumberOfClientsPerWorker(i).build();
        }

        public Write<T> withDeliveryMode(DeliveryMode deliveryMode) {
            return toBuilder().setDeliveryMode(deliveryMode).build();
        }

        public Write<T> publishLatencyMetrics() {
            return toBuilder().setPublishLatencyMetrics(true).build();
        }

        public Write<T> withSubmissionMode(SubmissionMode submissionMode) {
            return toBuilder().setDispatchMode(submissionMode).build();
        }

        public Write<T> withWriterType(WriterType writerType) {
            return toBuilder().setWriterType(writerType).build();
        }

        public Write<T> withSessionServiceFactory(SessionServiceFactory sessionServiceFactory) {
            return toBuilder().setSessionServiceFactory(sessionServiceFactory).build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract int getMaxNumOfUsedWorkers();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract int getNumberOfClientsPerWorker();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Destination getDestination();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract DeliveryMode getDeliveryMode();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract boolean getPublishLatencyMetrics();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract SubmissionMode getDispatchMode();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract WriterType getWriterType();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract SerializableFunction<T, Solace.Record> getFormatFunction();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract SessionServiceFactory getSessionServiceFactory();

        static <T> Builder<T> builder() {
            return new AutoValue_SolaceIO_Write.Builder().setDeliveryMode(SolaceIO.DEFAULT_WRITER_DELIVERY_MODE).setMaxNumOfUsedWorkers(20).setNumberOfClientsPerWorker(4).setPublishLatencyMetrics(SolaceIO.DEFAULT_WRITER_PUBLISH_LATENCY_METRICS).setDispatchMode(SolaceIO.DEFAULT_WRITER_SUBMISSION_MODE).setWriterType(SolaceIO.DEFAULT_WRITER_TYPE);
        }

        abstract Builder<T> toBuilder();

        public SolaceOutput expand(PCollection<T> pCollection) {
            return SolaceOutput.in(pCollection.getPipeline(), null, null);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/solace/SolaceIO$WriterType.class */
    public enum WriterType {
        STREAMING,
        BATCHED
    }

    static Topic topicFromName(String str) {
        return JCSMPFactory.onlyInstance().createTopic(str);
    }

    static Queue queueFromName(String str) {
        return JCSMPFactory.onlyInstance().createQueue(str);
    }

    public static Destination convertToJcsmpDestination(Solace.Destination destination) {
        if (destination.getType().equals(Solace.DestinationType.TOPIC)) {
            return topicFromName((String) Preconditions.checkNotNull(destination.getName()));
        }
        if (destination.getType().equals(Solace.DestinationType.QUEUE)) {
            return queueFromName((String) Preconditions.checkNotNull(destination.getName()));
        }
        throw new IllegalArgumentException("SolaceIO.Write: Unknown destination type: " + destination.getType());
    }

    public static Read<Solace.Record> read() {
        return new Read<>(Read.Configuration.builder().setTypeDescriptor(TypeDescriptor.of(Solace.Record.class)).setParseFn(Solace.SolaceRecordMapper::map).setTimestampFn(SENDER_TIMESTAMP_FUNCTION).setDeduplicateRecords(false).setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD));
    }

    public static <T> Read<T> read(TypeDescriptor<T> typeDescriptor, SerializableFunction<BytesXMLMessage, T> serializableFunction, SerializableFunction<T, Instant> serializableFunction2) {
        Preconditions.checkState(typeDescriptor != null, "SolaceIO.Read: typeDescriptor must not be null");
        Preconditions.checkState(serializableFunction != null, "SolaceIO.Read: parseFn must not be null");
        Preconditions.checkState(serializableFunction2 != null, "SolaceIO.Read: timestampFn must not be null");
        return new Read<>(Read.Configuration.builder().setTypeDescriptor(typeDescriptor).setParseFn(serializableFunction).setTimestampFn(serializableFunction2).setDeduplicateRecords(false).setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD));
    }

    public static <T> Write<T> write(SerializableFunction<T, Solace.Record> serializableFunction) {
        return Write.builder().setFormatFunction(serializableFunction).build();
    }

    public static Write<Solace.Record> write() {
        return Write.builder().build();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 107868:
                if (implMethodName.equals("map")) {
                    z = true;
                    break;
                }
                break;
            case 1498777688:
                if (implMethodName.equals("lambda$static$7f547836$1")) {
                    z = DEFAULT_DEDUPLICATE_RECORDS;
                    break;
                }
                break;
        }
        switch (z) {
            case DEFAULT_DEDUPLICATE_RECORDS /* 0 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/solace/SolaceIO") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/solace/data/Solace$Record;)Lorg/joda/time/Instant;")) {
                    return record -> {
                        Long senderTimestamp = record != null ? record.getSenderTimestamp() : null;
                        return senderTimestamp != null ? Instant.ofEpochMilli(senderTimestamp.longValue()) : Instant.now();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/solace/data/Solace$SolaceRecordMapper") && serializedLambda.getImplMethodSignature().equals("(Lcom/solacesystems/jcsmp/BytesXMLMessage;)Lorg/apache/beam/sdk/io/solace/data/Solace$Record;")) {
                    return Solace.SolaceRecordMapper::map;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
