/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source;

import java.util.Collections;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.pulsar.source.MessageDeserializer;
import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.flink.connector.pulsar.source.PulsarSubscriber;
import org.apache.flink.connector.pulsar.source.SplitSchedulingStrategy;
import org.apache.flink.connector.pulsar.source.StartOffsetInitializer;
import org.apache.flink.connector.pulsar.source.StopCondition;
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator;
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumeratorState;
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumeratorStateSerializer;
import org.apache.flink.connector.pulsar.source.reader.PulsarPartitionSplitReader;
import org.apache.flink.connector.pulsar.source.reader.PulsarRecordEmitter;
import org.apache.flink.connector.pulsar.source.reader.PulsarSourceReader;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitSerializer;
import org.apache.flink.connector.pulsar.source.util.PulsarAdminUtils;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.connectors.pulsar.internal.CachedPulsarClient;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.shade.com.google.common.io.Closer;

@Internal
public class PulsarSource<OUT>
implements Source<OUT, PulsarPartitionSplit, PulsarSourceEnumeratorState>,
ResultTypeQueryable<OUT> {
    private static final long serialVersionUID = -8755372893283732098L;
    private final PulsarSubscriber subscriber;
    private final StartOffsetInitializer startOffsetInitializer;
    private final StopCondition stopCondition;
    private final Boundedness boundedness;
    private final MessageDeserializer<OUT> messageDeserializer;
    private final Configuration configuration;
    private final ClientConfigurationData pulsarConfiguration;
    private final ConsumerConfigurationData<byte[]> consumerConfigurationData;
    private final SplitSchedulingStrategy splitSchedulingStrategy;
    private final String adminUrl;
    private transient PulsarAdmin pulsarAdmin;
    private transient PulsarClient pulsarClient;

    public PulsarSource(PulsarSubscriber subscriber, StartOffsetInitializer startOffsetInitializer, StopCondition stopCondition, Boundedness boundedness, MessageDeserializer<OUT> messageDeserializer, Configuration configuration, ClientConfigurationData pulsarConfiguration, ConsumerConfigurationData<byte[]> consumerConfigurationData, SplitSchedulingStrategy splitSchedulingStrategy) {
        this.subscriber = (PulsarSubscriber)Preconditions.checkNotNull((Object)subscriber);
        this.startOffsetInitializer = (StartOffsetInitializer)Preconditions.checkNotNull((Object)startOffsetInitializer);
        this.stopCondition = (StopCondition)Preconditions.checkNotNull((Object)stopCondition);
        this.boundedness = boundedness;
        this.messageDeserializer = (MessageDeserializer)Preconditions.checkNotNull(messageDeserializer);
        this.configuration = (Configuration)Preconditions.checkNotNull((Object)configuration);
        this.pulsarConfiguration = (ClientConfigurationData)Preconditions.checkNotNull((Object)pulsarConfiguration);
        this.adminUrl = (String)configuration.get(PulsarSourceOptions.ADMIN_URL);
        this.consumerConfigurationData = consumerConfigurationData;
        this.splitSchedulingStrategy = splitSchedulingStrategy;
    }

    public static <OUT> PulsarSourceBuilder<OUT> builder() {
        return new PulsarSourceBuilder();
    }

    public Boundedness getBoundedness() {
        return this.boundedness;
    }

    public TypeInformation<OUT> getProducedType() {
        return this.messageDeserializer.getProducedType();
    }

    public SourceReader<OUT, PulsarPartitionSplit> createReader(SourceReaderContext readerContext) {
        FutureCompletingBlockingQueue elementsQueue = new FutureCompletingBlockingQueue();
        ExecutorProvider listenerExecutor = new ExecutorProvider(1, "Pulsar listener executor");
        Closer splitCloser = Closer.create();
        splitCloser.register(() -> ((ExecutorProvider)listenerExecutor).shutdownNow());
        Supplier splitReaderSupplier = () -> {
            PulsarPartitionSplitReader<OUT> reader = new PulsarPartitionSplitReader<OUT>(this.configuration, this.consumerConfigurationData, this.getClient(), this.getPulsarAdmin(), this.messageDeserializer, listenerExecutor);
            splitCloser.register(reader);
            return reader;
        };
        PulsarRecordEmitter recordEmitter = new PulsarRecordEmitter();
        return new PulsarSourceReader(elementsQueue, splitReaderSupplier, recordEmitter, this.configuration, readerContext, () -> ((Closer)splitCloser).close());
    }

    @Nonnull
    public PulsarAdmin getPulsarAdmin() {
        if (this.pulsarAdmin == null) {
            try {
                this.pulsarAdmin = PulsarAdminUtils.newAdminFromConf(this.adminUrl, this.pulsarConfiguration);
            }
            catch (PulsarClientException e) {
                throw new IllegalStateException("Cannot initialize pulsar admin", e);
            }
        }
        return this.pulsarAdmin;
    }

    @Nonnull
    public PulsarClient getClient() {
        if (this.pulsarClient == null) {
            try {
                this.pulsarClient = CachedPulsarClient.getOrCreate(this.pulsarConfiguration);
            }
            catch (PulsarClientException e) {
                throw new IllegalStateException("Cannot initialize pulsar client", e);
            }
        }
        return this.pulsarClient;
    }

    public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumeratorState> createEnumerator(SplitEnumeratorContext<PulsarPartitionSplit> enumContext) {
        return new PulsarSourceEnumerator(this.subscriber, this.startOffsetInitializer, this.stopCondition, this.getPulsarAdmin(), this.configuration, enumContext, Collections.emptyMap(), this.splitSchedulingStrategy);
    }

    public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumeratorState> restoreEnumerator(SplitEnumeratorContext<PulsarPartitionSplit> enumContext, PulsarSourceEnumeratorState checkpoint) {
        return new PulsarSourceEnumerator(this.subscriber, this.startOffsetInitializer, this.stopCondition, this.getPulsarAdmin(), this.configuration, enumContext, checkpoint.getCurrentAssignment(), this.splitSchedulingStrategy);
    }

    public SimpleVersionedSerializer<PulsarPartitionSplit> getSplitSerializer() {
        return new PulsarPartitionSplitSerializer();
    }

    public SimpleVersionedSerializer<PulsarSourceEnumeratorState> getEnumeratorCheckpointSerializer() {
        return new PulsarSourceEnumeratorStateSerializer();
    }
}

