package org.apache.iceberg.flink.sink.shuffle;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.class */
public class TestDataStatisticsOperator {
    private final RowType rowType = RowType.of(new LogicalType[]{new VarCharType()});
    private final TypeSerializer<RowData> rowSerializer = new RowDataSerializer(this.rowType);
    private DataStatisticsOperator<MapDataStatistics, Map<RowData, Long>> operator;

    private Environment getTestingEnvironment() {
        return new StreamMockEnvironment(new Configuration(), new Configuration(), new ExecutionConfig(), 1L, new MockInputSplitProvider(), 1, new TestTaskStateManager());
    }

    @Before
    public void before() throws Exception {
        this.operator = createOperator();
        this.operator.setup(new OneInputStreamTask(getTestingEnvironment()), new MockStreamConfig(new Configuration(), 1), new MockOutput(Lists.newArrayList()));
    }

    private DataStatisticsOperator<MapDataStatistics, Map<RowData, Long>> createOperator() {
        return new DataStatisticsOperator<>(new KeySelector<RowData, RowData>() { // from class: org.apache.iceberg.flink.sink.shuffle.TestDataStatisticsOperator.1
            private static final long serialVersionUID = 7662520075515707428L;

            public RowData getKey(RowData rowData) {
                return rowData;
            }
        }, new MockOperatorEventGateway(), MapDataStatisticsSerializer.fromKeySerializer(this.rowSerializer));
    }

    @After
    public void clean() throws Exception {
        this.operator.close();
    }

    @Test
    public void testProcessElement() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, DataStatisticsOrRecord<MapDataStatistics, Map<RowData, Long>>> createHarness = createHarness(this.operator);
        try {
            this.operator.initializeState(getStateContext());
            this.operator.processElement(new StreamRecord(GenericRowData.of(new Object[]{StringData.fromString("a")})));
            this.operator.processElement(new StreamRecord(GenericRowData.of(new Object[]{StringData.fromString("a")})));
            this.operator.processElement(new StreamRecord(GenericRowData.of(new Object[]{StringData.fromString("b")})));
            Assert.assertTrue(this.operator.localDataStatistics() instanceof MapDataStatistics);
            Map statistics = this.operator.localDataStatistics().statistics();
            Assertions.assertThat(statistics).hasSize(2);
            Assertions.assertThat(statistics).containsExactlyInAnyOrderEntriesOf(ImmutableMap.of(GenericRowData.of(new Object[]{StringData.fromString("a")}), 2L, GenericRowData.of(new Object[]{StringData.fromString("b")}), 1L));
            createHarness.endInput();
            if (createHarness != null) {
                $closeResource(null, createHarness);
            }
        } catch (Throwable th) {
            if (createHarness != null) {
                $closeResource(null, createHarness);
            }
            throw th;
        }
    }

    @Test
    public void testOperatorOutput() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, DataStatisticsOrRecord<MapDataStatistics, Map<RowData, Long>>> createHarness = createHarness(this.operator);
        try {
            createHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{StringData.fromString("a")})));
            createHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{StringData.fromString("b")})));
            createHarness.processElement(new StreamRecord(GenericRowData.of(new Object[]{StringData.fromString("b")})));
            Assertions.assertThat((List) createHarness.extractOutputValues().stream().filter((v0) -> {
                return v0.hasRecord();
            }).map((v0) -> {
                return v0.record();
            }).collect(Collectors.toList())).containsExactlyInAnyOrderElementsOf(ImmutableList.of(GenericRowData.of(new Object[]{StringData.fromString("a")}), GenericRowData.of(new Object[]{StringData.fromString("b")}), GenericRowData.of(new Object[]{StringData.fromString("b")})));
            if (createHarness != null) {
                $closeResource(null, createHarness);
            }
        } catch (Throwable th) {
            if (createHarness != null) {
                $closeResource(null, createHarness);
            }
            throw th;
        }
    }

    @Test
    public void testRestoreState() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, DataStatisticsOrRecord<MapDataStatistics, Map<RowData, Long>>> createHarness = createHarness(this.operator);
        Throwable th = null;
        try {
            try {
                MapDataStatistics mapDataStatistics = new MapDataStatistics();
                mapDataStatistics.add(GenericRowData.of(new Object[]{StringData.fromString("a")}));
                mapDataStatistics.add(GenericRowData.of(new Object[]{StringData.fromString("a")}));
                mapDataStatistics.add(GenericRowData.of(new Object[]{StringData.fromString("b")}));
                mapDataStatistics.add(GenericRowData.of(new Object[]{StringData.fromString("c")}));
                this.operator.handleOperatorEvent(new DataStatisticsEvent(0L, mapDataStatistics));
                Assertions.assertThat(this.operator.globalDataStatistics()).isInstanceOf(MapDataStatistics.class);
                Assertions.assertThat(this.operator.globalDataStatistics().statistics()).containsExactlyInAnyOrderEntriesOf(ImmutableMap.of(GenericRowData.of(new Object[]{StringData.fromString("a")}), 2L, GenericRowData.of(new Object[]{StringData.fromString("b")}), 1L, GenericRowData.of(new Object[]{StringData.fromString("c")}), 1L));
                OperatorSubtaskState snapshot = createHarness.snapshot(1L, 0L);
                if (createHarness != null) {
                    $closeResource(null, createHarness);
                }
                DataStatisticsOperator<MapDataStatistics, Map<RowData, Long>> createOperator = createOperator();
                OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(createOperator, 2, 2, 1);
                Throwable th2 = null;
                try {
                    try {
                        oneInputStreamOperatorTestHarness.setup();
                        oneInputStreamOperatorTestHarness.initializeState(snapshot);
                        Assertions.assertThat(createOperator.globalDataStatistics()).isInstanceOf(MapDataStatistics.class);
                        HashMap newHashMap = Maps.newHashMap();
                        createOperator.globalDataStatistics().statistics().forEach((rowData, l) -> {
                        });
                        Assertions.assertThat(newHashMap).containsExactlyInAnyOrderEntriesOf(ImmutableMap.of(GenericRowData.of(new Object[]{StringData.fromString("a")}), 2L, GenericRowData.of(new Object[]{StringData.fromString("b")}), 1L, GenericRowData.of(new Object[]{StringData.fromString("c")}), 1L));
                        $closeResource(null, oneInputStreamOperatorTestHarness);
                    } finally {
                    }
                } catch (Throwable th3) {
                    $closeResource(th2, oneInputStreamOperatorTestHarness);
                    throw th3;
                }
            } finally {
            }
        } catch (Throwable th4) {
            if (createHarness != null) {
                $closeResource(th, createHarness);
            }
            throw th4;
        }
    }

    private StateInitializationContext getStateContext() throws Exception {
        MockEnvironment build = new MockEnvironmentBuilder().build();
        return new StateInitializationContextImpl((Long) null, new HashMapStateBackend().createOperatorStateBackend(build, "test-operator", Collections.emptyList(), new CloseableRegistry()), (KeyedStateStore) null, (Iterable) null, (Iterable) null);
    }

    private OneInputStreamOperatorTestHarness<RowData, DataStatisticsOrRecord<MapDataStatistics, Map<RowData, Long>>> createHarness(DataStatisticsOperator<MapDataStatistics, Map<RowData, Long>> dataStatisticsOperator) throws Exception {
        OneInputStreamOperatorTestHarness<RowData, DataStatisticsOrRecord<MapDataStatistics, Map<RowData, Long>>> oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>(dataStatisticsOperator, 1, 1, 0);
        oneInputStreamOperatorTestHarness.setup(new DataStatisticsOrRecordSerializer(MapDataStatisticsSerializer.fromKeySerializer(this.rowSerializer), this.rowSerializer));
        oneInputStreamOperatorTestHarness.open();
        return oneInputStreamOperatorTestHarness;
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
