/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tiered.storage;

import java.io.FilenameFilter;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import kafka.log.UnifiedLog;
import kafka.server.KafkaBroker;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot;
import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
import org.apache.kafka.tiered.storage.TieredStorageTestReport;
import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
import org.apache.kafka.tiered.storage.specs.TopicSpec;
import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public final class TieredStorageTestContext
implements AutoCloseable {
    private final TieredStorageTestHarness harness;
    private final Serializer<String> ser = new StringSerializer();
    private final Deserializer<String> de = new StringDeserializer();
    private final Map<String, TopicSpec> topicSpecs = new HashMap<String, TopicSpec>();
    private final TieredStorageTestReport testReport;
    private volatile KafkaProducer<String, String> producer;
    private volatile Consumer<String, String> consumer;
    private volatile Admin admin;
    private volatile List<LocalTieredStorage> remoteStorageManagers;
    private volatile List<BrokerLocalStorage> localStorages;

    public TieredStorageTestContext(TieredStorageTestHarness harness) {
        this.harness = harness;
        this.testReport = new TieredStorageTestReport(this);
        this.initClients();
        this.initContext();
    }

    private void initClients() {
        ListenerName listenerName = this.harness.listenerName();
        Properties commonOverrideProps = new Properties();
        commonOverrideProps.put("bootstrap.servers", this.harness.bootstrapServers(listenerName));
        Properties producerOverrideProps = new Properties();
        producerOverrideProps.put("linger.ms", String.valueOf(TimeUnit.SECONDS.toMillis(60L)));
        producerOverrideProps.putAll((Map<?, ?>)commonOverrideProps);
        this.producer = this.harness.createProducer(this.ser, this.ser, producerOverrideProps);
        this.consumer = this.harness.createConsumer(this.de, this.de, commonOverrideProps, JavaConverters.asScalaBuffer(Collections.emptyList()).toList());
        this.admin = this.harness.createAdminClient(listenerName, commonOverrideProps);
    }

    private void initContext() {
        this.remoteStorageManagers = TieredStorageTestHarness.remoteStorageManagers((Seq<KafkaBroker>)this.harness.aliveBrokers());
        this.localStorages = TieredStorageTestHarness.localStorages((Seq<KafkaBroker>)this.harness.aliveBrokers());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void createTopic(TopicSpec spec) throws ExecutionException, InterruptedException {
        NewTopic newTopic;
        if (spec.getAssignment() == null || spec.getAssignment().isEmpty()) {
            newTopic = new NewTopic(spec.getTopicName(), spec.getPartitionCount(), (short)spec.getReplicationFactor());
        } else {
            Map<Integer, List<Integer>> replicasAssignments = spec.getAssignment();
            newTopic = new NewTopic(spec.getTopicName(), replicasAssignments);
        }
        newTopic.configs(spec.getProperties());
        this.admin.createTopics(Collections.singletonList(newTopic)).all().get();
        TestUtils.waitForAllPartitionsMetadata((Seq)this.harness.brokers(), (String)spec.getTopicName(), (int)spec.getPartitionCount());
        TieredStorageTestContext tieredStorageTestContext = this;
        synchronized (tieredStorageTestContext) {
            this.topicSpecs.put(spec.getTopicName(), spec);
        }
    }

    public void createPartitions(ExpandPartitionCountSpec spec) throws ExecutionException, InterruptedException {
        NewPartitions newPartitions;
        if (spec.getAssignment() == null || spec.getAssignment().isEmpty()) {
            newPartitions = NewPartitions.increaseTo((int)spec.getPartitionCount());
        } else {
            Map<Integer, List<Integer>> assignment = spec.getAssignment();
            List newAssignments = assignment.entrySet().stream().sorted(Map.Entry.comparingByKey()).map(Map.Entry::getValue).collect(Collectors.toList());
            newPartitions = NewPartitions.increaseTo((int)spec.getPartitionCount(), newAssignments);
        }
        Map<String, NewPartitions> partitionsMap = Collections.singletonMap(spec.getTopicName(), newPartitions);
        this.admin.createPartitions(partitionsMap).all().get();
        TestUtils.waitForAllPartitionsMetadata((Seq)this.harness.brokers(), (String)spec.getTopicName(), (int)spec.getPartitionCount());
    }

    public void updateTopicConfig(String topic, Map<String, String> configsToBeAdded, List<String> configsToBeDeleted) throws ExecutionException, InterruptedException, TimeoutException {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
        this.updateResource(configResource, configsToBeAdded, configsToBeDeleted);
    }

    public void updateBrokerConfig(Integer brokerId, Map<String, String> configsToBeAdded, List<String> configsToBeDeleted) throws ExecutionException, InterruptedException, TimeoutException {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId.toString());
        this.updateResource(configResource, configsToBeAdded, configsToBeDeleted);
    }

    private void updateResource(ConfigResource configResource, Map<String, String> configsToBeAdded, List<String> configsToBeDeleted) throws ExecutionException, InterruptedException, TimeoutException {
        ArrayList alterEntries = new ArrayList();
        configsToBeDeleted.forEach(k -> alterEntries.add(new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE)));
        configsToBeAdded.forEach((k, v) -> alterEntries.add(new AlterConfigOp(new ConfigEntry(k, v), AlterConfigOp.OpType.SET)));
        AlterConfigsOptions alterOptions = new AlterConfigsOptions().timeoutMs(Integer.valueOf(30000));
        Map configsMap = Collections.singletonMap(configResource, alterEntries);
        this.admin.incrementalAlterConfigs(configsMap, alterOptions).all().get(30L, TimeUnit.SECONDS);
    }

    public void deleteTopic(String topic) {
        TestUtils.deleteTopicWithAdmin((Admin)this.admin, (String)topic, (Seq)this.harness.brokers(), (Seq)this.harness.controllerServers());
    }

    public void produce(List<ProducerRecord<String, String>> recordsToProduce, Integer batchSize) {
        int counter = 1;
        for (ProducerRecord<String, String> record : recordsToProduce) {
            this.producer.send(record);
            if (counter++ % batchSize != 0) continue;
            this.producer.flush();
        }
        this.producer.flush();
    }

    public List<ConsumerRecord<String, String>> consume(TopicPartition topicPartition, Integer expectedTotalCount, Long fetchOffset) {
        this.consumer.assign(Collections.singletonList(topicPartition));
        this.consumer.seek(topicPartition, fetchOffset.longValue());
        long timeoutMs = 60000L;
        long pollTimeoutMs = 100L;
        String sep = System.lineSeparator();
        ArrayList<ConsumerRecord<String, String>> records = new ArrayList<ConsumerRecord<String, String>>();
        Function1 pollAction = polledRecords -> {
            polledRecords.forEach(records::add);
            return records.size() >= expectedTotalCount;
        };
        Function0 messageSupplier = () -> String.format("Could not consume %d records of %s from offset %d in %d ms. %d message(s) consumed:%s%s", expectedTotalCount, topicPartition, fetchOffset, timeoutMs, records.size(), sep, records.stream().map(Object::toString).collect(Collectors.joining(sep)));
        TestUtils.pollRecordsUntilTrue(this.consumer, (Function1)pollAction, (Function0)messageSupplier, (long)timeoutMs, (long)pollTimeoutMs);
        return records;
    }

    public Long nextOffset(TopicPartition topicPartition) {
        List<TopicPartition> partitions = Collections.singletonList(topicPartition);
        this.consumer.assign(partitions);
        this.consumer.seekToEnd(partitions);
        return this.consumer.position(topicPartition);
    }

    public Long beginOffset(TopicPartition topicPartition) {
        List<TopicPartition> partitions = Collections.singletonList(topicPartition);
        this.consumer.assign(partitions);
        this.consumer.seekToBeginning(partitions);
        return this.consumer.position(topicPartition);
    }

    public void bounce(int brokerId) {
        this.harness.killBroker(brokerId);
        boolean allBrokersDead = this.harness.aliveBrokers().isEmpty();
        this.harness.startBroker(brokerId);
        if (allBrokersDead) {
            this.reinitClients();
        }
        this.initContext();
    }

    public void stop(int brokerId) {
        this.harness.killBroker(brokerId);
        this.initContext();
    }

    public void start(int brokerId) {
        boolean allBrokersDead = this.harness.aliveBrokers().isEmpty();
        this.harness.startBroker(brokerId);
        if (allBrokersDead) {
            this.reinitClients();
        }
        this.initContext();
    }

    public void eraseBrokerStorage(int brokerId, FilenameFilter filter, boolean isStopped) throws IOException {
        BrokerLocalStorage brokerLocalStorage = isStopped ? TieredStorageTestHarness.localStorages((Seq<KafkaBroker>)this.harness.brokers()).stream().filter(bls -> bls.getBrokerId() == brokerId).findFirst().orElseThrow(() -> new IllegalArgumentException("No local storage found for broker " + brokerId)) : this.localStorages.get(brokerId);
        brokerLocalStorage.eraseStorage(filter);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TopicSpec topicSpec(String topicName) {
        Map<String, TopicSpec> map = this.topicSpecs;
        synchronized (map) {
            return this.topicSpecs.get(topicName);
        }
    }

    public LocalTieredStorageSnapshot takeTieredStorageSnapshot() {
        int aliveBrokerId = ((KafkaBroker)this.harness.aliveBrokers().head()).config().brokerId();
        return LocalTieredStorageSnapshot.takeSnapshot(this.remoteStorageManager(aliveBrokerId));
    }

    public LocalTieredStorageHistory tieredStorageHistory(int brokerId) {
        return this.remoteStorageManager(brokerId).getHistory();
    }

    public LocalTieredStorage remoteStorageManager(int brokerId) {
        return this.remoteStorageManagers.stream().filter(rsm -> rsm.brokerId() == brokerId).findFirst().orElseThrow(() -> new IllegalArgumentException("No remote storage manager found for broker " + brokerId));
    }

    public List<LocalTieredStorage> remoteStorageManagers() {
        return this.remoteStorageManagers;
    }

    public List<BrokerLocalStorage> localStorages() {
        return this.localStorages;
    }

    public Deserializer<String> de() {
        return this.de;
    }

    public Admin admin() {
        return this.admin;
    }

    public boolean isActive(Integer brokerId) {
        return this.harness.aliveBrokers().exists(b -> b.config().brokerId() == brokerId.intValue());
    }

    public boolean isAssignedReplica(TopicPartition topicPartition, Integer replicaId) throws ExecutionException, InterruptedException {
        String topic = topicPartition.topic();
        int partition = topicPartition.partition();
        TopicDescription description = (TopicDescription)((Map)this.admin.describeTopics(Collections.singletonList(topicPartition.topic())).allTopicNames().get()).get(topic);
        TopicPartitionInfo partitionInfo = (TopicPartitionInfo)description.partitions().get(partition);
        return partitionInfo.replicas().stream().anyMatch(node -> node.id() == replicaId.intValue());
    }

    public Optional<UnifiedLog> log(Integer brokerId, TopicPartition partition) {
        Option log = ((KafkaBroker)this.harness.brokers().apply((Object)brokerId)).logManager().getLog(partition, false);
        return log.isDefined() ? Optional.of(log.get()) : Optional.empty();
    }

    public void succeed(TieredStorageTestAction action) {
        this.testReport.addSucceeded(action);
    }

    public void fail(TieredStorageTestAction action) {
        this.testReport.addFailed(action);
    }

    public void printReport(PrintStream output) {
        this.testReport.print(output);
    }

    @Override
    public void close() throws IOException {
    }

    private void reinitClients() {
        Utils.closeQuietly(this.producer, (String)"Producer client");
        Utils.closeQuietly(this.consumer, (String)"Consumer client");
        Utils.closeQuietly((AutoCloseable)this.admin, (String)"Admin client");
        this.initClients();
    }
}

