package org.apache.flink.table.store.connector.sink.global;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.class */
public class GlobalCommitterOperatorTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest$DefaultGlobalCommitter.class */
    public static class DefaultGlobalCommitter implements GlobalCommitter<String, String> {
        private static final Function<List<String>, String> COMBINER = list -> {
            Collections.sort(list);
            return String.join("+", list);
        };
        private final Queue<String> committedData;
        private boolean isClosed;
        private final String committedSuccessData;

        DefaultGlobalCommitter() {
            this("");
        }

        DefaultGlobalCommitter(String str) {
            this.committedData = new ConcurrentLinkedQueue();
            this.isClosed = false;
            this.committedSuccessData = str;
        }

        public List<String> filterRecoveredCommittables(List<String> list) {
            return this.committedSuccessData == null ? list : (List) list.stream().filter(str -> {
                return !str.equals(this.committedSuccessData);
            }).collect(Collectors.toList());
        }

        public String combine(long j, List<String> list) {
            return COMBINER.apply(list);
        }

        public void commit(List<String> list) {
            this.committedData.addAll(list);
        }

        public List<String> getCommittedData() {
            return this.committedData != null ? new ArrayList(this.committedData) : Collections.emptyList();
        }

        public void close() {
            this.isClosed = true;
        }

        public boolean isClosed() {
            return this.isClosed;
        }

        /* renamed from: combine, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m5combine(long j, List list) throws IOException {
            return combine(j, (List<String>) list);
        }
    }

    @Test
    public void closeCommitter() throws Exception {
        DefaultGlobalCommitter defaultGlobalCommitter = new DefaultGlobalCommitter();
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> createTestHarness = createTestHarness(defaultGlobalCommitter);
        createTestHarness.initializeEmptyState();
        createTestHarness.open();
        createTestHarness.close();
        Assertions.assertThat(defaultGlobalCommitter.isClosed()).isTrue();
    }

    @Test
    public void restoredFromMergedState() throws Exception {
        List asList = Arrays.asList("host", "drop");
        OperatorSubtaskState buildSubtaskState = buildSubtaskState(createTestHarness(), asList);
        List asList2 = Arrays.asList("future", "evil", "how");
        OperatorSubtaskState buildSubtaskState2 = buildSubtaskState(createTestHarness(), asList2);
        DefaultGlobalCommitter defaultGlobalCommitter = new DefaultGlobalCommitter();
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> createTestHarness = createTestHarness(defaultGlobalCommitter);
        createTestHarness.initializeState(OneInputStreamOperatorTestHarness.repartitionOperatorState(OneInputStreamOperatorTestHarness.repackageState(new OperatorSubtaskState[]{buildSubtaskState, buildSubtaskState2}), 2, 2, 1, 0));
        createTestHarness.open();
        ArrayList arrayList = new ArrayList();
        arrayList.add(DefaultGlobalCommitter.COMBINER.apply(asList));
        arrayList.add(DefaultGlobalCommitter.COMBINER.apply(asList2));
        createTestHarness.snapshot(1L, 1L);
        createTestHarness.notifyOfCompletedCheckpoint(1L);
        createTestHarness.close();
        Assertions.assertThat(defaultGlobalCommitter.getCommittedData()).containsExactlyInAnyOrder(arrayList.toArray(new String[0]));
    }

    @Test
    public void commitMultipleStagesTogether() throws Exception {
        DefaultGlobalCommitter defaultGlobalCommitter = new DefaultGlobalCommitter();
        List asList = Arrays.asList("cautious", "nature");
        List asList2 = Arrays.asList("count", "over");
        List asList3 = Arrays.asList("lawyer", "grammar");
        ArrayList arrayList = new ArrayList();
        arrayList.add(DefaultGlobalCommitter.COMBINER.apply(asList));
        arrayList.add(DefaultGlobalCommitter.COMBINER.apply(asList2));
        arrayList.add(DefaultGlobalCommitter.COMBINER.apply(asList3));
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> createTestHarness = createTestHarness(defaultGlobalCommitter);
        createTestHarness.initializeEmptyState();
        createTestHarness.open();
        createTestHarness.processElements(committableRecords(asList));
        createTestHarness.snapshot(1L, 1L);
        createTestHarness.processElements(committableRecords(asList2));
        createTestHarness.snapshot(2L, 2L);
        createTestHarness.processElements(committableRecords(asList3));
        createTestHarness.snapshot(3L, 3L);
        createTestHarness.notifyOfCompletedCheckpoint(3L);
        createTestHarness.close();
        Assertions.assertThat(defaultGlobalCommitter.getCommittedData()).containsExactlyInAnyOrder(arrayList.toArray(new String[0]));
    }

    @Test
    public void filterRecoveredCommittables() throws Exception {
        List asList = Arrays.asList("silent", "elder", "patience");
        String str = (String) DefaultGlobalCommitter.COMBINER.apply(asList);
        OperatorSubtaskState buildSubtaskState = buildSubtaskState(createTestHarness(), asList);
        DefaultGlobalCommitter defaultGlobalCommitter = new DefaultGlobalCommitter(str);
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> createTestHarness = createTestHarness(defaultGlobalCommitter);
        createTestHarness.initializeState(buildSubtaskState);
        createTestHarness.open();
        createTestHarness.snapshot(1L, 1L);
        createTestHarness.notifyOfCompletedCheckpoint(1L);
        Assertions.assertThat(defaultGlobalCommitter.getCommittedData()).isEmpty();
        createTestHarness.close();
    }

    @Test
    public void endOfInput() throws Exception {
        DefaultGlobalCommitter defaultGlobalCommitter = new DefaultGlobalCommitter();
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> createTestHarness = createTestHarness(defaultGlobalCommitter);
        createTestHarness.initializeEmptyState();
        createTestHarness.open();
        createTestHarness.processElements(committableRecords(Arrays.asList("silent", "elder", "patience")));
        createTestHarness.endInput();
        createTestHarness.close();
        Assertions.assertThat(defaultGlobalCommitter.getCommittedData()).contains(new String[]{"elder+patience+silent"});
    }

    private OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> createTestHarness() throws Exception {
        return createTestHarness(new DefaultGlobalCommitter());
    }

    private OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> createTestHarness(GlobalCommitter<String, String> globalCommitter) throws Exception {
        return new OneInputStreamOperatorTestHarness<>(new GlobalCommitterOperator(() -> {
            return globalCommitter;
        }, () -> {
            return TestSink.StringCommittableSerializer.INSTANCE;
        }), CommittableMessageTypeInfo.of(() -> {
            return TestSink.StringCommittableSerializer.INSTANCE;
        }).createSerializer(new ExecutionConfig()));
    }

    public static OperatorSubtaskState buildSubtaskState(OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> oneInputStreamOperatorTestHarness, List<String> list) throws Exception {
        oneInputStreamOperatorTestHarness.initializeEmptyState();
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElements((Collection) list.stream().map(GlobalCommitterOperatorTest::toCommittableMessage).map((v1) -> {
            return new StreamRecord(v1);
        }).collect(Collectors.toList()));
        oneInputStreamOperatorTestHarness.prepareSnapshotPreBarrier(1L);
        OperatorSubtaskState snapshot = oneInputStreamOperatorTestHarness.snapshot(1L, 1L);
        oneInputStreamOperatorTestHarness.close();
        return snapshot;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static List<StreamRecord<CommittableMessage<String>>> committableRecords(Collection<String> collection) {
        return (List) collection.stream().map(GlobalCommitterOperatorTest::toCommittableMessage).map((v1) -> {
            return new StreamRecord(v1);
        }).collect(Collectors.toList());
    }

    private static CommittableMessage<String> toCommittableMessage(String str) {
        return new CommittableWithLineage(str, (Long) null, -1);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1245123709:
                if (implMethodName.equals("lambda$createTestHarness$97e4975a$1")) {
                    z = true;
                    break;
                }
                break;
            case 2122989825:
                if (implMethodName.equals("lambda$createTestHarness$fb568d6b$1")) {
                    z = 2;
                    break;
                }
                break;
            case 2122989826:
                if (implMethodName.equals("lambda$createTestHarness$fb568d6b$2")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/util/function/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/flink/core/io/SimpleVersionedSerializer;")) {
                    return () -> {
                        return TestSink.StringCommittableSerializer.INSTANCE;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/util/function/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/table/store/connector/sink/global/GlobalCommitter;)Lorg/apache/flink/table/store/connector/sink/global/GlobalCommitter;")) {
                    GlobalCommitter globalCommitter = (GlobalCommitter) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return globalCommitter;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/util/function/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/flink/core/io/SimpleVersionedSerializer;")) {
                    return () -> {
                        return TestSink.StringCommittableSerializer.INSTANCE;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
