/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.kafka.source;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.ConsumerMetadata;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaPartitionSplitReader;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaRecordEmitter;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceReader;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.fetch.KafkaSourceFetcherManager;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
import org.apache.seatunnel.shade.com.google.common.base.Supplier;

public class KafkaSource
implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSplit, KafkaSourceState>,
SupportParallelism {
    private final ReadonlyConfig readonlyConfig;
    private JobContext jobContext;
    private final KafkaSourceConfig kafkaSourceConfig;

    public KafkaSource(ReadonlyConfig readonlyConfig) {
        this.readonlyConfig = readonlyConfig;
        this.kafkaSourceConfig = new KafkaSourceConfig(readonlyConfig);
    }

    public Boundedness getBoundedness() {
        return JobMode.BATCH.equals((Object)this.jobContext.getJobMode()) ? Boundedness.BOUNDED : Boundedness.UNBOUNDED;
    }

    public String getPluginName() {
        return "Kafka";
    }

    public List<CatalogTable> getProducedCatalogTables() {
        return this.kafkaSourceConfig.getMapMetadata().values().stream().map(ConsumerMetadata::getCatalogTable).collect(Collectors.toList());
    }

    public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(SourceReader.Context readerContext) {
        LinkedBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>> elementsQueue = new LinkedBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>(this.kafkaSourceConfig.getReaderCacheQueueSize());
        Supplier kafkaPartitionSplitReaderSupplier = () -> new KafkaPartitionSplitReader(this.kafkaSourceConfig, readerContext);
        KafkaSourceFetcherManager kafkaSourceFetcherManager = new KafkaSourceFetcherManager((BlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>)elementsQueue, () -> ((Supplier)kafkaPartitionSplitReaderSupplier).get());
        KafkaRecordEmitter kafkaRecordEmitter = new KafkaRecordEmitter(this.kafkaSourceConfig.getMapMetadata(), this.kafkaSourceConfig.getMessageFormatErrorHandleWay());
        return new KafkaSourceReader(elementsQueue, kafkaSourceFetcherManager, kafkaRecordEmitter, new SourceReaderOptions(this.readonlyConfig), this.kafkaSourceConfig, readerContext);
    }

    public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> createEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext) {
        return new KafkaSourceSplitEnumerator(this.kafkaSourceConfig, enumeratorContext, null, this.getBoundedness() == Boundedness.UNBOUNDED);
    }

    public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> restoreEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext, KafkaSourceState checkpointState) {
        return new KafkaSourceSplitEnumerator(this.kafkaSourceConfig, enumeratorContext, checkpointState, this.getBoundedness() == Boundedness.UNBOUNDED);
    }

    public void setJobContext(JobContext jobContext) {
        this.jobContext = jobContext;
    }
}

