package org.apache.iceberg.flink.sink.shuffle;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinatorProvider.class */
public class TestDataStatisticsCoordinatorProvider {
    private static final OperatorID OPERATOR_ID = new OperatorID();
    private static final int NUM_SUBTASKS = 1;
    private DataStatisticsCoordinatorProvider<MapDataStatistics, Map<RowData, Long>> provider;
    private EventReceivingTasks receivingTasks;
    private TypeSerializer<DataStatistics<MapDataStatistics, Map<RowData, Long>>> statisticsSerializer;

    @Before
    public void before() {
        this.statisticsSerializer = MapDataStatisticsSerializer.fromKeySerializer(new RowDataSerializer(RowType.of(new LogicalType[]{new VarCharType()})));
        this.provider = new DataStatisticsCoordinatorProvider<>("DataStatisticsCoordinatorProvider", OPERATOR_ID, this.statisticsSerializer);
        this.receivingTasks = EventReceivingTasks.createForRunningTasks();
    }

    @Test
    public void testCheckpointAndReset() throws Exception {
        RowType of = RowType.of(new LogicalType[]{new VarCharType()});
        BinaryRowData binaryRow = new RowDataSerializer(of).toBinaryRow(GenericRowData.of(new Object[]{StringData.fromString("a")}));
        BinaryRowData binaryRow2 = new RowDataSerializer(of).toBinaryRow(GenericRowData.of(new Object[]{StringData.fromString("b")}));
        BinaryRowData binaryRow3 = new RowDataSerializer(of).toBinaryRow(GenericRowData.of(new Object[]{StringData.fromString("c")}));
        BinaryRowData binaryRow4 = new RowDataSerializer(of).toBinaryRow(GenericRowData.of(new Object[]{StringData.fromString("d")}));
        BinaryRowData binaryRow5 = new RowDataSerializer(of).toBinaryRow(GenericRowData.of(new Object[]{StringData.fromString("e")}));
        RecreateOnResetOperatorCoordinator create = this.provider.create(new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS));
        DataStatisticsCoordinator<MapDataStatistics, Map<RowData, Long>> dataStatisticsCoordinator = (DataStatisticsCoordinator) create.getInternalCoordinator();
        create.start();
        TestDataStatisticsCoordinator.setAllTasksReady(NUM_SUBTASKS, dataStatisticsCoordinator, this.receivingTasks);
        MapDataStatistics mapDataStatistics = new MapDataStatistics();
        mapDataStatistics.add(binaryRow);
        mapDataStatistics.add(binaryRow2);
        mapDataStatistics.add(binaryRow3);
        create.handleEventFromOperator(0, 0, DataStatisticsEvent.create(1L, mapDataStatistics, this.statisticsSerializer));
        TestDataStatisticsCoordinator.waitForCoordinatorToProcessActions(dataStatisticsCoordinator);
        MapDataStatistics dataStatistics = dataStatisticsCoordinator.completedStatistics().dataStatistics();
        Assertions.assertThat(dataStatistics.statistics()).isEqualTo(mapDataStatistics.statistics());
        byte[] waitForCheckpoint = waitForCheckpoint(1L, dataStatisticsCoordinator);
        MapDataStatistics mapDataStatistics2 = new MapDataStatistics();
        mapDataStatistics2.add(binaryRow4);
        mapDataStatistics2.add(binaryRow5);
        mapDataStatistics2.add(binaryRow5);
        create.handleEventFromOperator(0, 0, DataStatisticsEvent.create(2L, mapDataStatistics2, this.statisticsSerializer));
        TestDataStatisticsCoordinator.waitForCoordinatorToProcessActions(dataStatisticsCoordinator);
        Assertions.assertThat(dataStatisticsCoordinator.completedStatistics().dataStatistics().statistics()).isEqualTo(mapDataStatistics2.statistics());
        waitForCheckpoint(2L, dataStatisticsCoordinator);
        create.resetToCheckpoint(1L, waitForCheckpoint);
        DataStatisticsCoordinator internalCoordinator = create.getInternalCoordinator();
        Assertions.assertThat(dataStatisticsCoordinator).isNotEqualTo(internalCoordinator);
        Assertions.assertThat(internalCoordinator.completedStatistics().dataStatistics().statistics()).isEqualTo(dataStatistics.statistics());
    }

    private byte[] waitForCheckpoint(long j, DataStatisticsCoordinator<MapDataStatistics, Map<RowData, Long>> dataStatisticsCoordinator) throws InterruptedException, ExecutionException {
        CompletableFuture completableFuture = new CompletableFuture();
        dataStatisticsCoordinator.checkpointCoordinator(j, completableFuture);
        return (byte[]) completableFuture.get();
    }
}
