/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source.reader.source;

import java.util.Set;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderBase;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter;
import org.apache.flink.connector.pulsar.source.reader.fetcher.PulsarFetcherManagerBase;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;

abstract class PulsarSourceReaderBase<OUT>
extends SourceReaderBase<Message<byte[]>, OUT, PulsarPartitionSplit, PulsarPartitionSplitState> {
    protected final SourceConfiguration sourceConfiguration;
    protected final PulsarClient pulsarClient;
    protected final PulsarAdmin pulsarAdmin;

    protected PulsarSourceReaderBase(FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> elementsQueue, PulsarFetcherManagerBase splitFetcherManager, PulsarRecordEmitter<OUT> recordEmitter, SourceReaderContext context, SourceConfiguration sourceConfiguration, PulsarClient pulsarClient, PulsarAdmin pulsarAdmin) {
        super(elementsQueue, (SplitFetcherManager)splitFetcherManager, recordEmitter, (Configuration)sourceConfiguration, context);
        this.sourceConfiguration = sourceConfiguration;
        this.pulsarClient = pulsarClient;
        this.pulsarAdmin = pulsarAdmin;
    }

    protected PulsarPartitionSplitState initializedState(PulsarPartitionSplit split) {
        return new PulsarPartitionSplitState(split);
    }

    protected PulsarPartitionSplit toSplitType(String splitId, PulsarPartitionSplitState splitState) {
        return splitState.toPulsarPartitionSplit();
    }

    protected void closeFinishedSplits(Set<String> finishedSplitIds) {
        for (String splitId : finishedSplitIds) {
            ((PulsarFetcherManagerBase)this.splitFetcherManager).closeFetcher(splitId);
        }
    }

    public void close() throws Exception {
        super.close();
        this.pulsarClient.shutdown();
        this.pulsarAdmin.close();
    }
}

