/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tiered.storage.actions;

import java.io.PrintStream;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageCondition;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent;
import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec;
import org.apache.kafka.tiered.storage.specs.TopicSpec;
import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
import org.apache.kafka.tiered.storage.utils.RecordsKeyValueMatcher;
import org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils;
import org.hamcrest.MatcherAssert;

public final class ProduceAction
implements TieredStorageTestAction {
    private static final int OFFLOAD_WAIT_TIMEOUT_SEC = 40;
    private final TopicPartition topicPartition;
    private final List<OffloadedSegmentSpec> offloadedSegmentSpecs;
    private final List<ProducerRecord<String, String>> recordsToProduce;
    private final Integer batchSize;
    private final Long expectedEarliestLocalOffset;
    private final Serde<String> serde = Serdes.String();

    public ProduceAction(TopicPartition topicPartition, List<OffloadedSegmentSpec> offloadedSegmentSpecs, List<ProducerRecord<String, String>> recordsToProduce, Integer batchSize, Long expectedEarliestLocalOffset) {
        this.topicPartition = topicPartition;
        this.offloadedSegmentSpecs = offloadedSegmentSpecs;
        this.recordsToProduce = recordsToProduce;
        this.batchSize = batchSize;
        this.expectedEarliestLocalOffset = expectedEarliestLocalOffset;
    }

    @Override
    public void doExecute(TieredStorageTestContext context) throws InterruptedException, ExecutionException, TimeoutException {
        List<LocalTieredStorage> tieredStorages = context.remoteStorageManagers();
        List<BrokerLocalStorage> localStorages = context.localStorages();
        List tieredStorageConditions = this.offloadedSegmentSpecs.stream().map(spec -> LocalTieredStorageCondition.expectEvent((Iterable<LocalTieredStorage>)tieredStorages, LocalTieredStorageEvent.EventType.COPY_SEGMENT, spec.getSourceBrokerId(), spec.getTopicPartition(), spec.getBaseOffset(), false)).collect(Collectors.toList());
        long startOffset = context.nextOffset(this.topicPartition);
        long beginOffset = context.beginOffset(this.topicPartition);
        context.produce(this.recordsToProduce, this.batchSize);
        if (!tieredStorageConditions.isEmpty()) {
            ((LocalTieredStorageCondition)tieredStorageConditions.stream().reduce(LocalTieredStorageCondition::and).get()).waitUntilTrue(40L, TimeUnit.SECONDS);
        }
        TopicSpec topicSpec = context.topicSpec(this.topicPartition.topic());
        long earliestLocalOffset = this.expectedEarliestLocalOffset != -1L ? this.expectedEarliestLocalOffset : startOffset + (long)this.recordsToProduce.size() - (long)(this.recordsToProduce.size() % topicSpec.getMaxBatchCountPerSegment()) - 1L;
        for (BrokerLocalStorage localStorage : localStorages) {
            boolean isActive;
            boolean isAssignedReplica = context.isAssignedReplica(this.topicPartition, localStorage.getBrokerId());
            if (!isAssignedReplica || !(isActive = context.isActive(localStorage.getBrokerId()))) continue;
            localStorage.waitForEarliestLocalOffset(this.topicPartition, earliestLocalOffset);
        }
        List<ConsumerRecord<String, String>> consumedRecords = context.consume(this.topicPartition, this.recordsToProduce.size(), startOffset);
        MatcherAssert.assertThat(consumedRecords, RecordsKeyValueMatcher.correspondTo(this.recordsToProduce, this.topicPartition, this.serde, this.serde));
        List<Record> tieredStorageRecords = TieredStorageTestUtils.tieredStorageRecords(context, this.topicPartition);
        List<Record> discoveredRecords = tieredStorageRecords.subList((int)(startOffset - beginOffset), tieredStorageRecords.size());
        List<ProducerRecord<String, String>> producerRecords = this.offloadedSegmentSpecs.stream().flatMap(spec -> spec.getRecords().stream()).collect(Collectors.toList());
        this.compareRecords(discoveredRecords, producerRecords, this.topicPartition);
    }

    @Override
    public void describe(PrintStream output) {
        output.println("produce-records: " + this.topicPartition);
        this.recordsToProduce.forEach(record -> output.println("    " + record));
        this.offloadedSegmentSpecs.forEach(spec -> output.println("    " + spec));
    }

    private void compareRecords(List<Record> discoveredRecords, List<ProducerRecord<String, String>> producerRecords, TopicPartition topicPartition) {
        MatcherAssert.assertThat(discoveredRecords, RecordsKeyValueMatcher.correspondTo(producerRecords, topicPartition, this.serde, this.serde));
    }
}

