/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;

import java.nio.charset.StandardCharsets;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.KafkaTestRecord;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;

public class KafkaTestTable
extends BeamKafkaTable {
    private final int partitionsPerTopic;
    private final List<KafkaTestRecord> records;
    private static final String TIMESTAMP_TYPE_CONFIG = "test.timestamp.type";

    public KafkaTestTable(Schema beamSchema, List<String> topics, int partitionsPerTopic) {
        super(beamSchema, "server:123", topics);
        this.partitionsPerTopic = partitionsPerTopic;
        this.records = new ArrayList<KafkaTestRecord>();
    }

    KafkaIO.Read<byte[], byte[]> createKafkaRead() {
        return super.createKafkaRead().withConsumerFactoryFn(this::mkMockConsumer);
    }

    public void addRecord(KafkaTestRecord record) {
        this.records.add(record);
    }

    double computeRate(int numberOfRecords) throws BeamKafkaTable.NoEstimationException {
        return super.computeRate(this.mkMockConsumer(new HashMap<String, Object>()), numberOfRecords);
    }

    public void setNumberOfRecordsForRate(int numberOfRecordsForRate) {
        this.numberOfRecordsForRate = numberOfRecordsForRate;
    }

    private MockConsumer<byte[], byte[]> mkMockConsumer(Map<String, Object> config) {
        OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.EARLIEST;
        final HashMap kafkaRecords = new HashMap();
        HashMap partitionInfoMap = new HashMap();
        final HashMap partitionMap = new HashMap();
        for (Object topic : this.getTopics()) {
            ArrayList<PartitionInfo> partIds = new ArrayList<PartitionInfo>(this.partitionsPerTopic);
            ArrayList<TopicPartition> topicParitions = new ArrayList<TopicPartition>(this.partitionsPerTopic);
            for (int i = 0; i < this.partitionsPerTopic; ++i) {
                TopicPartition tp = new TopicPartition((String)topic, i);
                topicParitions.add(tp);
                partIds.add(new PartitionInfo((String)topic, i, null, null, null));
                kafkaRecords.put(tp, new ArrayList());
            }
            partitionInfoMap.put(topic, partIds);
            partitionMap.put(topic, topicParitions);
        }
        TimestampType timestampType = TimestampType.forName((String)((String)config.getOrDefault(TIMESTAMP_TYPE_CONFIG, TimestampType.LOG_APPEND_TIME.toString())));
        for (KafkaTestRecord record : this.records) {
            int partitionIndex = record.getKey().hashCode() % this.partitionsPerTopic;
            TopicPartition tp = (TopicPartition)((List)partitionMap.get(record.getTopic())).get(partitionIndex);
            byte[] key = record.getKey().getBytes(StandardCharsets.UTF_8);
            byte[] value = record.getValue().toByteArray();
            ((List)kafkaRecords.get(tp)).add(new ConsumerRecord(tp.topic(), tp.partition(), (long)((List)kafkaRecords.get(tp)).size(), record.getTimeStamp(), timestampType, 0L, key.length, value.length, (Object)key, (Object)value));
        }
        final AtomicReference assignedPartitions = new AtomicReference(Collections.emptyList());
        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(offsetResetStrategy){

            public synchronized void assign(Collection<TopicPartition> assigned) {
                Collection realPartitions = assigned.stream().map(part -> (TopicPartition)((List)partitionMap.get(part.topic())).get(part.partition())).collect(Collectors.toList());
                super.assign(realPartitions);
                assignedPartitions.set(ImmutableList.copyOf((Collection)realPartitions));
                for (TopicPartition tp : realPartitions) {
                    this.updateBeginningOffsets((Map)ImmutableMap.of((Object)tp, (Object)0L));
                    this.updateEndOffsets((Map)ImmutableMap.of((Object)tp, (Object)((List)kafkaRecords.get(tp)).size()));
                }
            }

            public synchronized Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
                return timestampsToSearch.entrySet().stream().map(e -> {
                    long maxOffset = ((List)kafkaRecords.get(e.getKey())).size();
                    long offset = (Long)e.getValue();
                    OffsetAndTimestamp value = offset >= maxOffset ? null : new OffsetAndTimestamp(offset, offset);
                    return new AbstractMap.SimpleEntry<TopicPartition, OffsetAndTimestamp>((TopicPartition)e.getKey(), value);
                }).collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
            }
        };
        for (String topic : this.getTopics()) {
            consumer.updatePartitions(topic, (List)partitionInfoMap.get(topic));
        }
        Runnable recordEnqueueTask = new Runnable((MockConsumer)consumer, kafkaRecords, config){
            final /* synthetic */ MockConsumer val$consumer;
            final /* synthetic */ Map val$kafkaRecords;
            final /* synthetic */ Map val$config;
            {
                this.val$consumer = mockConsumer;
                this.val$kafkaRecords = map;
                this.val$config = map2;
            }

            @Override
            public void run() {
                int recordsAdded = 0;
                for (TopicPartition tp : (List)assignedPartitions.get()) {
                    long curPos = this.val$consumer.position(tp);
                    for (ConsumerRecord r : (List)this.val$kafkaRecords.get(tp)) {
                        if (r.offset() < curPos) continue;
                        this.val$consumer.addRecord(r);
                        ++recordsAdded;
                    }
                }
                if (recordsAdded == 0) {
                    if (this.val$config.get("inject.error.at.eof") != null) {
                        this.val$consumer.setException(new KafkaException("Injected error in consumer.poll()"));
                    }
                    Uninterruptibles.sleepUninterruptibly((long)10L, (TimeUnit)TimeUnit.MILLISECONDS);
                }
                this.val$consumer.schedulePollTask((Runnable)this);
            }
        };
        consumer.schedulePollTask(recordEnqueueTask);
        return consumer;
    }

    public PTransform<PCollection<KafkaRecord<byte[], byte[]>>, PCollection<Row>> getPTransformForInput() {
        throw new RuntimeException("KafkaTestTable does not implement getPTransformForInput method.");
    }

    public PTransform<PCollection<Row>, PCollection<ProducerRecord<byte[], byte[]>>> getPTransformForOutput() {
        throw new RuntimeException("KafkaTestTable does not implement getPTransformForOutput method.");
    }
}

