/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source.reader;

import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.common.schema.BytesSchema;
import org.apache.flink.connector.pulsar.common.schema.PulsarSchema;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarSchemaWrapper;
import org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter;
import org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader;
import org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader;
import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader;
import org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;

@Internal
public final class PulsarSourceReaderFactory {
    private PulsarSourceReaderFactory() {
    }

    public static <OUT> SourceReader<OUT, PulsarPartitionSplit> create(SourceReaderContext readerContext, PulsarDeserializationSchema<OUT> deserializationSchema, SourceConfiguration sourceConfiguration, @Nullable CryptoKeyReader cryptoKeyReader) {
        BytesSchema schema;
        PulsarClient pulsarClient = PulsarClientFactory.createClient(sourceConfiguration);
        PulsarAdmin pulsarAdmin = PulsarClientFactory.createAdmin(sourceConfiguration);
        if (sourceConfiguration.isEnableSchemaEvolution()) {
            PulsarSchema<?> pulsarSchema = ((PulsarSchemaWrapper)deserializationSchema).pulsarSchema();
            schema = new BytesSchema(pulsarSchema);
        } else {
            schema = Schema.BYTES;
        }
        int queueCapacity = sourceConfiguration.getMessageQueueCapacity();
        FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> elementsQueue = new FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>>(queueCapacity);
        PulsarRecordEmitter<OUT> recordEmitter = new PulsarRecordEmitter<OUT>(deserializationSchema);
        SubscriptionType subscriptionType = sourceConfiguration.getSubscriptionType();
        if (subscriptionType == SubscriptionType.Failover || subscriptionType == SubscriptionType.Exclusive) {
            Supplier<PulsarOrderedPartitionSplitReader> splitReaderSupplier = () -> new PulsarOrderedPartitionSplitReader(pulsarClient, pulsarAdmin, sourceConfiguration, schema, cryptoKeyReader);
            return new PulsarOrderedSourceReader<OUT>(elementsQueue, splitReaderSupplier, recordEmitter, readerContext, sourceConfiguration, pulsarClient, pulsarAdmin);
        }
        if (subscriptionType == SubscriptionType.Shared || subscriptionType == SubscriptionType.Key_Shared) {
            TransactionCoordinatorClientImpl coordinatorClient = ((PulsarClientImpl)pulsarClient).getTcClient();
            if (coordinatorClient == null && !sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
                throw new IllegalStateException("Transaction is required but didn't enabled");
            }
            Supplier<PulsarUnorderedPartitionSplitReader> splitReaderSupplier = () -> new PulsarUnorderedPartitionSplitReader(pulsarClient, pulsarAdmin, sourceConfiguration, schema, cryptoKeyReader, coordinatorClient);
            return new PulsarUnorderedSourceReader<OUT>(elementsQueue, splitReaderSupplier, recordEmitter, readerContext, sourceConfiguration, pulsarClient, pulsarAdmin, coordinatorClient);
        }
        throw new UnsupportedOperationException("This subscription type is not " + (Object)((Object)subscriptionType) + " supported currently.");
    }
}

