package org.apache.iceberg.flink.sink.shuffle;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.util.ExceptionUtils;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinator.class */
public class TestDataStatisticsCoordinator {
    private static final String OPERATOR_NAME = "TestCoordinator";
    private static final OperatorID TEST_OPERATOR_ID = new OperatorID(1234, 5678);
    private static final int NUM_SUBTASKS = 2;
    private TypeSerializer<DataStatistics<MapDataStatistics, Map<RowData, Long>>> statisticsSerializer;
    private EventReceivingTasks receivingTasks;
    private DataStatisticsCoordinator<MapDataStatistics, Map<RowData, Long>> dataStatisticsCoordinator;

    @Before
    public void before() throws Exception {
        this.receivingTasks = EventReceivingTasks.createForRunningTasks();
        this.statisticsSerializer = MapDataStatisticsSerializer.fromKeySerializer(new RowDataSerializer(RowType.of(new LogicalType[]{new VarCharType()})));
        this.dataStatisticsCoordinator = new DataStatisticsCoordinator<>(OPERATOR_NAME, new MockOperatorCoordinatorContext(TEST_OPERATOR_ID, NUM_SUBTASKS), this.statisticsSerializer);
    }

    private void tasksReady() throws Exception {
        this.dataStatisticsCoordinator.start();
        setAllTasksReady(NUM_SUBTASKS, this.dataStatisticsCoordinator, this.receivingTasks);
    }

    @Test
    public void testThrowExceptionWhenNotStarted() {
        Assertions.assertThatThrownBy(() -> {
            this.dataStatisticsCoordinator.handleEventFromOperator(0, 0, DataStatisticsEvent.create(0L, new MapDataStatistics(), this.statisticsSerializer));
        }).isInstanceOf(IllegalStateException.class).hasMessage("The coordinator of TestCoordinator has not started yet.");
        Assertions.assertThatThrownBy(() -> {
            this.dataStatisticsCoordinator.executionAttemptFailed(0, 0, (Throwable) null);
        }).isInstanceOf(IllegalStateException.class).hasMessage("The coordinator of TestCoordinator has not started yet.");
        Assertions.assertThatThrownBy(() -> {
            this.dataStatisticsCoordinator.checkpointCoordinator(0L, (CompletableFuture) null);
        }).isInstanceOf(IllegalStateException.class).hasMessage("The coordinator of TestCoordinator has not started yet.");
    }

    @Test
    public void testDataStatisticsEventHandling() throws Exception {
        tasksReady();
        RowType of = RowType.of(new LogicalType[]{new VarCharType()});
        BinaryRowData binaryRow = new RowDataSerializer(of).toBinaryRow(GenericRowData.of(new Object[]{StringData.fromString("a")}));
        BinaryRowData binaryRow2 = new RowDataSerializer(of).toBinaryRow(GenericRowData.of(new Object[]{StringData.fromString("b")}));
        BinaryRowData binaryRow3 = new RowDataSerializer(of).toBinaryRow(GenericRowData.of(new Object[]{StringData.fromString("c")}));
        MapDataStatistics mapDataStatistics = new MapDataStatistics();
        mapDataStatistics.add(binaryRow);
        mapDataStatistics.add(binaryRow2);
        mapDataStatistics.add(binaryRow2);
        mapDataStatistics.add(binaryRow3);
        mapDataStatistics.add(binaryRow3);
        mapDataStatistics.add(binaryRow3);
        DataStatisticsEvent create = DataStatisticsEvent.create(1L, mapDataStatistics, this.statisticsSerializer);
        MapDataStatistics mapDataStatistics2 = new MapDataStatistics();
        mapDataStatistics2.add(binaryRow);
        mapDataStatistics2.add(binaryRow2);
        mapDataStatistics2.add(binaryRow3);
        mapDataStatistics2.add(binaryRow3);
        DataStatisticsEvent create2 = DataStatisticsEvent.create(1L, mapDataStatistics2, this.statisticsSerializer);
        this.dataStatisticsCoordinator.handleEventFromOperator(0, 0, create);
        this.dataStatisticsCoordinator.handleEventFromOperator(1, 0, create2);
        waitForCoordinatorToProcessActions(this.dataStatisticsCoordinator);
        Assertions.assertThat(this.dataStatisticsCoordinator.completedStatistics().dataStatistics().statistics()).containsExactlyInAnyOrderEntriesOf(ImmutableMap.of(binaryRow, Long.valueOf(((Long) mapDataStatistics.statistics().get(binaryRow)).longValue() + ((Long) mapDataStatistics2.statistics().get(binaryRow)).longValue()), binaryRow2, Long.valueOf(((Long) mapDataStatistics.statistics().get(binaryRow2)).longValue() + ((Long) mapDataStatistics2.statistics().get(binaryRow2)).longValue()), binaryRow3, Long.valueOf(((Long) mapDataStatistics.statistics().get(binaryRow3)).longValue() + ((Long) mapDataStatistics2.statistics().get(binaryRow3)).longValue())));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void setAllTasksReady(int i, DataStatisticsCoordinator<MapDataStatistics, Map<RowData, Long>> dataStatisticsCoordinator, EventReceivingTasks eventReceivingTasks) {
        for (int i2 = 0; i2 < i; i2++) {
            dataStatisticsCoordinator.executionAttemptReady(i2, 0, eventReceivingTasks.createGatewayForSubtask(i2, 0));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void waitForCoordinatorToProcessActions(DataStatisticsCoordinator<MapDataStatistics, Map<RowData, Long>> dataStatisticsCoordinator) {
        CompletableFuture completableFuture = new CompletableFuture();
        dataStatisticsCoordinator.callInCoordinatorThread(() -> {
            completableFuture.complete(null);
            return null;
        }, "Coordinator fails to process action");
        try {
            completableFuture.get();
        } catch (InterruptedException e) {
            throw new AssertionError("test interrupted");
        } catch (ExecutionException e2) {
            ExceptionUtils.rethrow(ExceptionUtils.stripExecutionException(e2));
        }
    }
}
