/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.Serializable;
import java.time.Duration;
import java.util.Collections;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.lifecycle.TestJobExecutor;
import org.apache.flink.runtime.operators.lifecycle.TestJobWithDescription;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommand;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher;
import org.apache.flink.runtime.operators.lifecycle.event.CheckpointCompletedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.OperatorStartedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestEventQueue;
import org.apache.flink.runtime.operators.lifecycle.graph.OneInputTestStreamOperatorFactory;
import org.apache.flink.runtime.operators.lifecycle.graph.TestDataElement;
import org.apache.flink.runtime.operators.lifecycle.graph.TestEventSource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class IncrementalStateReuseAfterFailureITCase {
    private static final String UID_SRC = IncrementalStateReuseAfterFailureITCase.asUidHash(0);
    private static final String UID_OP1 = IncrementalStateReuseAfterFailureITCase.asUidHash(1);
    private static final String UID_OP2 = IncrementalStateReuseAfterFailureITCase.asUidHash(2);
    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();
    private MiniClusterWithClientResource miniClusterResource;

    @Test
    public void testChangelogStateReuse() throws Exception {
        TestJobExecutor.execute(this.createJob(), this.miniClusterResource).waitForAllRunning().waitForEvent(CheckpointCompletedEvent.class).sendOperatorCommand(UID_OP1, TestCommand.DELAY_SNAPSHOT, TestCommandDispatcher.TestCommandScope.SINGLE_SUBTASK).sendOperatorCommand(UID_OP1, TestCommand.FAIL_SNAPSHOT, TestCommandDispatcher.TestCommandScope.SINGLE_SUBTASK).waitForEvent(OperatorStartedEvent.class).waitForAllRunning().waitForEvent(CheckpointCompletedEvent.class).sendBroadcastCommand(TestCommand.FINISH_SOURCES, TestCommandDispatcher.TestCommandScope.ALL_SUBTASKS).waitForTermination().assertFinishedSuccessfully();
    }

    private TestJobWithDescription createJob() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(200L);
        env.enableChangelogStateBackend(true);
        RestartStrategyUtils.configureFixedDelayRestartStrategy((StreamExecutionEnvironment)env, (int)1, (long)1L);
        env.setMaxParallelism(1);
        env.setParallelism(1);
        TestEventQueue evQueue = TestEventQueue.createShared(this.sharedObjects);
        TestCommandDispatcher cmdQueue = TestCommandDispatcher.createShared(this.sharedObjects);
        SingleOutputStreamOperator src = env.addSource((SourceFunction)new TestEventSource(UID_SRC, evQueue, cmdQueue)).setUidHash(UID_SRC);
        SingleOutputStreamOperator transform1 = src.keyBy((KeySelector & Serializable)x -> x).transform("transform-1", TypeInformation.of(TestDataElement.class), (OneInputStreamOperatorFactory)new OneInputTestStreamOperatorFactory(UID_OP1, evQueue, cmdQueue)).setUidHash(UID_OP1);
        SingleOutputStreamOperator transform2 = DataStreamUtils.reinterpretAsKeyedStream((DataStream)transform1, (KeySelector & Serializable)x -> x).transform("transform-2", TypeInformation.of(TestDataElement.class), (OneInputStreamOperatorFactory)new OneInputTestStreamOperatorFactory(UID_OP2, evQueue, cmdQueue)).setUidHash(UID_OP2);
        transform2.sinkTo((Sink)new DiscardingSink());
        return new TestJobWithDescription(env.getStreamGraph().getJobGraph(), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), evQueue, cmdQueue);
    }

    @Before
    public void before() throws Exception {
        Configuration configuration = new Configuration();
        FsStateChangelogStorageFactory.configure((Configuration)configuration, (File)this.temporaryFolder.newFolder(), (Duration)Duration.ofMinutes(1L), (int)10);
        this.miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(1).build());
        this.miniClusterResource.before();
    }

    @After
    public void after() {
        this.miniClusterResource.after();
    }

    private static String asUidHash(int num) {
        return String.format("%032X", num);
    }
}

