package org.apache.iceberg.flink.sink.shuffle;

import java.util.Map;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/iceberg/flink/sink/shuffle/TestAggregatedStatisticsTracker.class */
public class TestAggregatedStatisticsTracker {
    private static final int NUM_SUBTASKS = 2;
    private final RowType rowType = RowType.of(new LogicalType[]{new VarCharType()});
    private final BinaryRowData binaryRowDataA = new RowDataSerializer(this.rowType).toBinaryRow(GenericRowData.of(new Object[]{StringData.fromString("a")}));
    private final BinaryRowData binaryRowDataB = new RowDataSerializer(this.rowType).toBinaryRow(GenericRowData.of(new Object[]{StringData.fromString("b")}));
    private final TypeSerializer<RowData> rowSerializer = new RowDataSerializer(this.rowType);
    private final TypeSerializer<DataStatistics<MapDataStatistics, Map<RowData, Long>>> statisticsSerializer = MapDataStatisticsSerializer.fromKeySerializer(this.rowSerializer);
    private AggregatedStatisticsTracker<MapDataStatistics, Map<RowData, Long>> aggregatedStatisticsTracker;

    @Before
    public void before() throws Exception {
        this.aggregatedStatisticsTracker = new AggregatedStatisticsTracker<>("testOperator", this.statisticsSerializer, NUM_SUBTASKS);
    }

    @Test
    public void receiveNewerDataStatisticEvent() {
        MapDataStatistics mapDataStatistics = new MapDataStatistics();
        mapDataStatistics.add(this.binaryRowDataA);
        Assertions.assertThat(this.aggregatedStatisticsTracker.updateAndCheckCompletion(0, DataStatisticsEvent.create(1L, mapDataStatistics, this.statisticsSerializer))).isNull();
        Assertions.assertThat(this.aggregatedStatisticsTracker.inProgressStatistics().checkpointId()).isEqualTo(1L);
        MapDataStatistics mapDataStatistics2 = new MapDataStatistics();
        mapDataStatistics2.add(this.binaryRowDataA);
        Assertions.assertThat(this.aggregatedStatisticsTracker.updateAndCheckCompletion(0, DataStatisticsEvent.create(2L, mapDataStatistics2, this.statisticsSerializer))).isNull();
        Assertions.assertThat(this.aggregatedStatisticsTracker.inProgressStatistics().checkpointId()).isEqualTo(2L);
    }

    @Test
    public void receiveOlderDataStatisticEventTest() {
        MapDataStatistics mapDataStatistics = new MapDataStatistics();
        mapDataStatistics.add(this.binaryRowDataA);
        mapDataStatistics.add(this.binaryRowDataB);
        mapDataStatistics.add(this.binaryRowDataB);
        Assertions.assertThat(this.aggregatedStatisticsTracker.updateAndCheckCompletion(0, DataStatisticsEvent.create(2L, mapDataStatistics, this.statisticsSerializer))).isNull();
        MapDataStatistics mapDataStatistics2 = new MapDataStatistics();
        mapDataStatistics2.add(this.binaryRowDataB);
        Assertions.assertThat(this.aggregatedStatisticsTracker.updateAndCheckCompletion(1, DataStatisticsEvent.create(1L, mapDataStatistics2, this.statisticsSerializer))).isNull();
        Assertions.assertThat(this.aggregatedStatisticsTracker.inProgressStatistics().checkpointId()).isEqualTo(2L);
    }

    @Test
    public void receiveCompletedDataStatisticEvent() {
        MapDataStatistics mapDataStatistics = new MapDataStatistics();
        mapDataStatistics.add(this.binaryRowDataA);
        mapDataStatistics.add(this.binaryRowDataB);
        mapDataStatistics.add(this.binaryRowDataB);
        Assertions.assertThat(this.aggregatedStatisticsTracker.updateAndCheckCompletion(0, DataStatisticsEvent.create(1L, mapDataStatistics, this.statisticsSerializer))).isNull();
        MapDataStatistics mapDataStatistics2 = new MapDataStatistics();
        mapDataStatistics2.add(this.binaryRowDataA);
        mapDataStatistics2.add(this.binaryRowDataA);
        mapDataStatistics2.add(this.binaryRowDataB);
        AggregatedStatistics updateAndCheckCompletion = this.aggregatedStatisticsTracker.updateAndCheckCompletion(1, DataStatisticsEvent.create(1L, mapDataStatistics2, this.statisticsSerializer));
        Assertions.assertThat(updateAndCheckCompletion).isNotNull();
        Assertions.assertThat(updateAndCheckCompletion.checkpointId()).isEqualTo(1L);
        MapDataStatistics dataStatistics = updateAndCheckCompletion.dataStatistics();
        Assertions.assertThat(((Long) dataStatistics.statistics().get(this.binaryRowDataA)).longValue()).isEqualTo(((Long) mapDataStatistics.statistics().get(this.binaryRowDataA)).longValue() + ((Long) mapDataStatistics2.statistics().get(this.binaryRowDataA)).longValue());
        Assertions.assertThat(((Long) dataStatistics.statistics().get(this.binaryRowDataB)).longValue()).isEqualTo(((Long) mapDataStatistics.statistics().get(this.binaryRowDataB)).longValue() + ((Long) mapDataStatistics2.statistics().get(this.binaryRowDataB)).longValue());
        Assertions.assertThat(this.aggregatedStatisticsTracker.inProgressStatistics().checkpointId()).isEqualTo(updateAndCheckCompletion.checkpointId() + 1);
        MapDataStatistics mapDataStatistics3 = new MapDataStatistics();
        mapDataStatistics3.add(this.binaryRowDataA);
        Assertions.assertThat(this.aggregatedStatisticsTracker.updateAndCheckCompletion(0, DataStatisticsEvent.create(2L, mapDataStatistics3, this.statisticsSerializer))).isNull();
        Assertions.assertThat(updateAndCheckCompletion.checkpointId()).isEqualTo(1L);
        MapDataStatistics mapDataStatistics4 = new MapDataStatistics();
        mapDataStatistics4.add(this.binaryRowDataB);
        AggregatedStatistics updateAndCheckCompletion2 = this.aggregatedStatisticsTracker.updateAndCheckCompletion(1, DataStatisticsEvent.create(2L, mapDataStatistics4, this.statisticsSerializer));
        Assertions.assertThat(updateAndCheckCompletion2).isNotNull();
        Assertions.assertThat(updateAndCheckCompletion2.checkpointId()).isEqualTo(2L);
        Assertions.assertThat(this.aggregatedStatisticsTracker.inProgressStatistics().checkpointId()).isEqualTo(updateAndCheckCompletion2.checkpointId() + 1);
    }
}
