/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.FileUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;
import org.slf4j.Logger;

public class TestStreamWriteOperatorCoordinator {
    private StreamWriteOperatorCoordinator coordinator;
    @TempDir
    File tempFile;

    @BeforeEach
    public void before() throws Exception {
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 2);
        this.coordinator = new StreamWriteOperatorCoordinator(TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath()), (OperatorCoordinator.Context)context);
        this.coordinator.start();
        this.coordinator.setExecutor((NonThrownExecutor)new MockCoordinatorExecutor((OperatorCoordinator.Context)context));
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)WriteMetadataEvent.emptyBootstrap((int)0));
        this.coordinator.handleEventFromOperator(1, (OperatorEvent)WriteMetadataEvent.emptyBootstrap((int)1));
    }

    @AfterEach
    public void after() throws Exception {
        this.coordinator.close();
    }

    @Test
    void testInstantState() {
        String instant = this.coordinator.getInstant();
        Assertions.assertNotEquals((Object)"", (Object)instant);
        WriteMetadataEvent event0 = TestStreamWriteOperatorCoordinator.createOperatorEvent(0, instant, "par1", true, 0.1);
        WriteMetadataEvent event1 = TestStreamWriteOperatorCoordinator.createOperatorEvent(1, instant, "par2", false, 0.2);
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event0);
        this.coordinator.handleEventFromOperator(1, (OperatorEvent)event1);
        this.coordinator.notifyCheckpointComplete(1L);
        String inflight = TestUtils.getLastPendingInstant(this.tempFile.getAbsolutePath());
        String lastCompleted = TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath());
        MatcherAssert.assertThat((String)"Instant should be complete", (Object)lastCompleted, (Matcher)CoreMatchers.is((Object)instant));
        Assertions.assertNotEquals((Object)"", (Object)inflight, (String)"Should start a new instant");
        Assertions.assertNotEquals((Object)instant, (Object)inflight, (String)"Should start a new instant");
    }

    @Test
    public void testTableInitialized() throws IOException {
        org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf((Configuration)new Configuration());
        String basePath = this.tempFile.getAbsolutePath();
        try (FileSystem fs = FSUtils.getFs((String)basePath, (org.apache.hadoop.conf.Configuration)hadoopConf);){
            Assertions.assertTrue((boolean)fs.exists(new Path(basePath, ".hoodie")));
        }
    }

    @Test
    public void testCheckpointAndRestore() throws Exception {
        CompletableFuture future = new CompletableFuture();
        this.coordinator.checkpointCoordinator(1L, future);
        this.coordinator.resetToCheckpoint(1L, (byte[])future.get());
    }

    @Test
    public void testReceiveInvalidEvent() {
        CompletableFuture future = new CompletableFuture();
        this.coordinator.checkpointCoordinator(1L, future);
        WriteMetadataEvent event = WriteMetadataEvent.builder().taskID(0).instantTime("abc").writeStatus(Collections.emptyList()).build();
        this.assertError(() -> this.lambda$testReceiveInvalidEvent$0((OperatorEvent)event), "Receive an unexpected event for instant abc from task 0");
    }

    @Test
    public void testCheckpointCompleteWithPartialEvents() {
        CompletableFuture future = new CompletableFuture();
        this.coordinator.checkpointCoordinator(1L, future);
        String instant = this.coordinator.getInstant();
        WriteMetadataEvent event = WriteMetadataEvent.builder().taskID(0).instantTime(instant).writeStatus(Collections.emptyList()).build();
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event);
        Assertions.assertDoesNotThrow(() -> this.coordinator.notifyCheckpointComplete(1L), (String)"Returns early for empty write results");
        String lastCompleted = TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath());
        Assertions.assertNull((Object)lastCompleted, (String)"Returns early for empty write results");
        Assertions.assertNull((Object)this.coordinator.getEventBuffer()[0]);
        WriteMetadataEvent event1 = TestStreamWriteOperatorCoordinator.createOperatorEvent(1, instant, "par2", false, 0.2);
        this.coordinator.handleEventFromOperator(1, (OperatorEvent)event1);
        Assertions.assertDoesNotThrow(() -> this.coordinator.notifyCheckpointComplete(2L), (String)"Commits the instant with partial events anyway");
        lastCompleted = TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath());
        MatcherAssert.assertThat((String)"Commits the instant with partial events anyway", (Object)lastCompleted, (Matcher)CoreMatchers.is((Object)instant));
    }

    @Test
    public void testRecommitWithPartialUncommittedEvents() {
        CompletableFuture future = new CompletableFuture();
        this.coordinator.checkpointCoordinator(1L, future);
        String instant = this.coordinator.getInstant();
        String lastCompleted = TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath());
        Assertions.assertNull((Object)lastCompleted, (String)"Returns early for empty write results");
        WriteMetadataEvent event1 = TestStreamWriteOperatorCoordinator.createOperatorEvent(0, instant, "par1", false, 0.2);
        event1.setBootstrap(true);
        WriteMetadataEvent event2 = WriteMetadataEvent.emptyBootstrap((int)1);
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event1);
        this.coordinator.handleEventFromOperator(1, (OperatorEvent)event2);
        lastCompleted = TestUtils.getLastCompleteInstant(this.tempFile.getAbsolutePath());
        MatcherAssert.assertThat((String)"Recommits the instant with partial uncommitted events", (Object)lastCompleted, (Matcher)CoreMatchers.is((Object)instant));
    }

    @Test
    public void testHiveSyncInvoked() throws Exception {
        this.reset();
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        conf.setBoolean(FlinkOptions.HIVE_SYNC_ENABLED, true);
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
        this.coordinator = new StreamWriteOperatorCoordinator(conf, (OperatorCoordinator.Context)context);
        this.coordinator.start();
        this.coordinator.setExecutor((NonThrownExecutor)new MockCoordinatorExecutor((OperatorCoordinator.Context)context));
        WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap((int)0);
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event0);
        String instant = this.mockWriteWithMetadata();
        Assertions.assertNotEquals((Object)"", (Object)instant);
        Assertions.assertDoesNotThrow(() -> this.coordinator.notifyCheckpointComplete(1L));
    }

    @Test
    void testSyncMetadataTable() throws Exception {
        int i;
        this.reset();
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
        conf.setInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 5);
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
        this.coordinator = new StreamWriteOperatorCoordinator(conf, (OperatorCoordinator.Context)context);
        this.coordinator.start();
        this.coordinator.setExecutor((NonThrownExecutor)new MockCoordinatorExecutor((OperatorCoordinator.Context)context));
        WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap((int)0);
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event0);
        String instant = this.coordinator.getInstant();
        Assertions.assertNotEquals((Object)"", (Object)instant);
        String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath((String)this.tempFile.getAbsolutePath());
        HoodieTableMetaClient metadataTableMetaClient = StreamerUtil.createMetaClient((String)metadataTableBasePath, (org.apache.hadoop.conf.Configuration)HadoopConfigurations.getHadoopConf((Configuration)conf));
        HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
        MatcherAssert.assertThat((String)"One instant need to sync to metadata table", (Object)completedTimeline.countInstants(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.lastInstant().get()).getTimestamp(), (Matcher)CoreMatchers.is((Object)"00000000000000"));
        for (i = 1; i < 5; ++i) {
            instant = this.mockWriteWithMetadata();
            metadataTableMetaClient.reloadActiveTimeline();
            completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
            MatcherAssert.assertThat((String)"One instant need to sync to metadata table", (Object)completedTimeline.countInstants(), (Matcher)CoreMatchers.is((Object)(i + 1)));
            MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.lastInstant().get()).getTimestamp(), (Matcher)CoreMatchers.is((Object)instant));
        }
        this.mockWriteWithMetadata();
        metadataTableMetaClient.reloadActiveTimeline();
        completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
        MatcherAssert.assertThat((String)"One instant need to sync to metadata table", (Object)completedTimeline.countInstants(), (Matcher)CoreMatchers.is((Object)7));
        MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.nthFromLastInstant(1).get()).getTimestamp(), (Matcher)CoreMatchers.is((Object)(instant + "001")));
        MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.nthFromLastInstant(1).get()).getAction(), (Matcher)CoreMatchers.is((Object)"commit"));
        for (i = 7; i < 8; ++i) {
            instant = this.mockWriteWithMetadata();
            metadataTableMetaClient.reloadActiveTimeline();
            completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
            MatcherAssert.assertThat((String)"One instant need to sync to metadata table", (Object)completedTimeline.countInstants(), (Matcher)CoreMatchers.is((Object)(i + 1)));
            MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.lastInstant().get()).getTimestamp(), (Matcher)CoreMatchers.is((Object)instant));
        }
        instant = this.mockWriteWithMetadata();
        metadataTableMetaClient.reloadActiveTimeline();
        completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
        MatcherAssert.assertThat((String)"One instant need to sync to metadata table", (Object)completedTimeline.countInstants(), (Matcher)CoreMatchers.is((Object)10));
        MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.lastInstant().get()).getTimestamp(), (Matcher)CoreMatchers.is((Object)(instant + "002")));
        MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.lastInstant().get()).getAction(), (Matcher)CoreMatchers.is((Object)"clean"));
        this.mockWriteWithMetadata();
        instant = this.mockWriteWithMetadata();
        this.mockWriteWithMetadata();
        metadataTableMetaClient.reloadActiveTimeline();
        completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
        MatcherAssert.assertThat((String)"One instant need to sync to metadata table", (Object)completedTimeline.countInstants(), (Matcher)CoreMatchers.is((Object)14));
        MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.nthFromLastInstant(1).get()).getTimestamp(), (Matcher)CoreMatchers.is((Object)(instant + "001")));
        MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.nthFromLastInstant(1).get()).getAction(), (Matcher)CoreMatchers.is((Object)"commit"));
    }

    @Test
    void testSyncMetadataTableWithReusedInstant() throws Exception {
        this.reset();
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
        this.coordinator = new StreamWriteOperatorCoordinator(conf, (OperatorCoordinator.Context)context);
        this.coordinator.start();
        this.coordinator.setExecutor((NonThrownExecutor)new MockCoordinatorExecutor((OperatorCoordinator.Context)context));
        WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap((int)0);
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event0);
        String instant = this.coordinator.getInstant();
        Assertions.assertNotEquals((Object)"", (Object)instant);
        String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath((String)this.tempFile.getAbsolutePath());
        HoodieTableMetaClient metadataTableMetaClient = StreamerUtil.createMetaClient((String)metadataTableBasePath, (org.apache.hadoop.conf.Configuration)HadoopConfigurations.getHadoopConf((Configuration)conf));
        HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
        MatcherAssert.assertThat((String)"One instant need to sync to metadata table", (Object)completedTimeline.countInstants(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.lastInstant().get()).getTimestamp(), (Matcher)CoreMatchers.is((Object)"00000000000000"));
        this.mockWriteWithMetadata();
        instant = this.coordinator.getInstant();
        metadataTableMetaClient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, "deltacommit", instant));
        metadataTableMetaClient.getActiveTimeline().transitionRequestedToInflight("deltacommit", instant);
        metadataTableMetaClient.reloadActiveTimeline();
        instant = this.mockWriteWithMetadata();
        metadataTableMetaClient.reloadActiveTimeline();
        completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
        MatcherAssert.assertThat((String)"One instant need to sync to metadata table", (Object)completedTimeline.countInstants(), (Matcher)CoreMatchers.is((Object)3));
        MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.lastInstant().get()).getTimestamp(), (Matcher)CoreMatchers.is((Object)instant));
    }

    @Test
    public void testEndInputIsTheLastEvent() throws Exception {
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
        Logger logger = (Logger)Mockito.mock(Logger.class);
        NonThrownExecutor executor = NonThrownExecutor.builder((Logger)logger).waitForTasksFinish(true).build();
        try (StreamWriteOperatorCoordinator coordinator = new StreamWriteOperatorCoordinator(conf, (OperatorCoordinator.Context)context);){
            coordinator.start();
            coordinator.setExecutor(executor);
            coordinator.handleEventFromOperator(0, (OperatorEvent)WriteMetadataEvent.emptyBootstrap((int)0));
            TimeUnit.SECONDS.sleep(5L);
            int eventCount = 20000;
            for (int i = 0; i < eventCount; ++i) {
                coordinator.handleEventFromOperator(0, (OperatorEvent)TestStreamWriteOperatorCoordinator.createOperatorEvent(0, coordinator.getInstant(), "par1", true, 0.1));
            }
            WriteMetadataEvent endInput = WriteMetadataEvent.builder().taskID(0).instantTime(coordinator.getInstant()).writeStatus(Collections.emptyList()).endInput(true).build();
            coordinator.handleEventFromOperator(0, (OperatorEvent)endInput);
            executor.close();
            Assertions.assertNull((Object)coordinator.getEventBuffer()[0]);
        }
    }

    @Test
    void testLockForMetadataTable() throws Exception {
        this.reset();
        Configuration conf = TestConfigurations.getDefaultConf(this.tempFile.getAbsolutePath());
        conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
        conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), "optimistic_concurrency_control");
        conf.setInteger("hoodie.write.lock.client.num_retries", 1);
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
        this.coordinator = new StreamWriteOperatorCoordinator(conf, (OperatorCoordinator.Context)context);
        this.coordinator.start();
        this.coordinator.setExecutor((NonThrownExecutor)new MockCoordinatorExecutor((OperatorCoordinator.Context)context));
        WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap((int)0);
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event0);
        String instant = this.coordinator.getInstant();
        Assertions.assertNotEquals((Object)"", (Object)instant);
        String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath((String)this.tempFile.getAbsolutePath());
        HoodieTableMetaClient metadataTableMetaClient = StreamerUtil.createMetaClient((String)metadataTableBasePath, (org.apache.hadoop.conf.Configuration)HadoopConfigurations.getHadoopConf((Configuration)conf));
        HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
        MatcherAssert.assertThat((String)"One instant need to sync to metadata table", (Object)completedTimeline.countInstants(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.lastInstant().get()).getTimestamp(), (Matcher)CoreMatchers.is((Object)"00000000000000"));
        instant = this.mockWriteWithMetadata();
        metadataTableMetaClient.reloadActiveTimeline();
        completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
        MatcherAssert.assertThat((String)"One instant need to sync to metadata table", (Object)completedTimeline.countInstants(), (Matcher)CoreMatchers.is((Object)2));
        MatcherAssert.assertThat((Object)((HoodieInstant)completedTimeline.lastInstant().get()).getTimestamp(), (Matcher)CoreMatchers.is((Object)instant));
    }

    private String mockWriteWithMetadata() {
        String instant = this.coordinator.getInstant();
        WriteMetadataEvent event = TestStreamWriteOperatorCoordinator.createOperatorEvent(0, instant, "par1", true, 0.1);
        this.coordinator.handleEventFromOperator(0, (OperatorEvent)event);
        this.coordinator.notifyCheckpointComplete(0L);
        return instant;
    }

    private static WriteMetadataEvent createOperatorEvent(int taskId, String instant, String partitionPath, boolean trackSuccessRecords, double failureFraction) {
        WriteStatus writeStatus = new WriteStatus(Boolean.valueOf(trackSuccessRecords), Double.valueOf(failureFraction));
        writeStatus.setPartitionPath(partitionPath);
        HoodieWriteStat writeStat = new HoodieWriteStat();
        writeStat.setPartitionPath(partitionPath);
        writeStat.setFileId("fileId123");
        writeStat.setPath("path123");
        writeStat.setFileSizeInBytes(123L);
        writeStat.setTotalWriteBytes(123L);
        writeStat.setNumWrites(1L);
        writeStatus.setStat(writeStat);
        return WriteMetadataEvent.builder().taskID(taskId).instantTime(instant).writeStatus(Collections.singletonList(writeStatus)).lastBatch(true).build();
    }

    private void reset() throws Exception {
        FileUtils.cleanDirectory((File)this.tempFile);
    }

    private void assertError(Runnable runnable, String message) {
        runnable.run();
        MatcherAssert.assertThat((Object)this.coordinator.getContext(), (Matcher)CoreMatchers.instanceOf(MockOperatorCoordinatorContext.class));
        MockOperatorCoordinatorContext context = (MockOperatorCoordinatorContext)this.coordinator.getContext();
        Assertions.assertTrue((boolean)context.isJobFailed(), (String)message);
    }

    private /* synthetic */ void lambda$testReceiveInvalidEvent$0(OperatorEvent event) {
        this.coordinator.handleEventFromOperator(0, event);
    }
}

