/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.source;

import java.io.IOException;
import java.util.Properties;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
import org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader;
import org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter;
import org.apache.flink.connector.kafka.source.reader.KafkaSourceReader;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializer;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;

public class KafkaSource<OUT>
implements Source<OUT, KafkaPartitionSplit, KafkaSourceEnumState>,
ResultTypeQueryable<OUT> {
    private static final long serialVersionUID = -8755372893283732098L;
    private final KafkaSubscriber subscriber;
    private final OffsetsInitializer startingOffsetsInitializer;
    private final OffsetsInitializer stoppingOffsetsInitializer;
    private final Boundedness boundedness;
    private final KafkaRecordDeserializer<OUT> deserializationSchema;
    private final Properties props;

    KafkaSource(KafkaSubscriber subscriber, OffsetsInitializer startingOffsetsInitializer, @Nullable OffsetsInitializer stoppingOffsetsInitializer, Boundedness boundedness, KafkaRecordDeserializer<OUT> deserializationSchema, Properties props) {
        this.subscriber = subscriber;
        this.startingOffsetsInitializer = startingOffsetsInitializer;
        this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
        this.boundedness = boundedness;
        this.deserializationSchema = deserializationSchema;
        this.props = props;
    }

    public static <OUT> KafkaSourceBuilder<OUT> builder() {
        return new KafkaSourceBuilder();
    }

    public Boundedness getBoundedness() {
        return this.boundedness;
    }

    public SourceReader<OUT, KafkaPartitionSplit> createReader(SourceReaderContext readerContext) {
        FutureCompletingBlockingQueue elementsQueue = new FutureCompletingBlockingQueue();
        Supplier splitReaderSupplier = () -> new KafkaPartitionSplitReader<OUT>(this.props, this.deserializationSchema, readerContext.getIndexOfSubtask());
        KafkaRecordEmitter recordEmitter = new KafkaRecordEmitter();
        return new KafkaSourceReader(elementsQueue, splitReaderSupplier, recordEmitter, this.toConfiguration(this.props), readerContext);
    }

    public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> createEnumerator(SplitEnumeratorContext<KafkaPartitionSplit> enumContext) {
        return new KafkaSourceEnumerator(this.subscriber, this.startingOffsetsInitializer, this.stoppingOffsetsInitializer, this.props, enumContext);
    }

    public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> restoreEnumerator(SplitEnumeratorContext<KafkaPartitionSplit> enumContext, KafkaSourceEnumState checkpoint) throws IOException {
        return new KafkaSourceEnumerator(this.subscriber, this.startingOffsetsInitializer, this.stoppingOffsetsInitializer, this.props, enumContext, checkpoint.getCurrentAssignment());
    }

    public SimpleVersionedSerializer<KafkaPartitionSplit> getSplitSerializer() {
        return new KafkaPartitionSplitSerializer();
    }

    public SimpleVersionedSerializer<KafkaSourceEnumState> getEnumeratorCheckpointSerializer() {
        return new KafkaSourceEnumStateSerializer();
    }

    public TypeInformation<OUT> getProducedType() {
        return this.deserializationSchema.getProducedType();
    }

    private Configuration toConfiguration(Properties props) {
        Configuration config = new Configuration();
        props.stringPropertyNames().forEach(key -> config.setString(key, props.getProperty((String)key)));
        return config;
    }
}

