package org.apache.hudi.execution;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.queue.DisruptorExecutor;
import org.apache.hudi.common.util.queue.ExecutorType;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.common.util.queue.WaitStrategyFactory;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/hudi/execution/TestDisruptorExecutionInSpark.class */
public class TestDisruptorExecutionInSpark extends HoodieSparkClientTestHarness {
    private final String instantTime = HoodieActiveTimeline.createNewInstantTime();
    private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withExecutorType(ExecutorType.DISRUPTOR.name()).withWriteExecutorDisruptorWriteBufferLimitBytes(8).build(false);

    @BeforeEach
    public void setUp() throws Exception {
        initTestDataGenerator();
        initExecutorServiceWithFixedThreadPool(2);
    }

    @AfterEach
    public void tearDown() throws Exception {
        cleanupResources();
    }

    private Runnable getPreExecuteRunnable() {
        TaskContext taskContext = TaskContext.get();
        return () -> {
            TaskContext$.MODULE$.setTaskContext(taskContext);
        };
    }

    @Test
    public void testExecutor() {
        List generateInserts = this.dataGen.generateInserts(this.instantTime, 128);
        final ArrayList arrayList = new ArrayList();
        DisruptorExecutor disruptorExecutor = null;
        try {
            disruptorExecutor = new DisruptorExecutor(this.writeConfig.getWriteExecutorDisruptorWriteBufferLimitBytes(), generateInserts.iterator(), new HoodieConsumer<HoodieRecord, Integer>() { // from class: org.apache.hudi.execution.TestDisruptorExecutionInSpark.1
                private int count = 0;

                public void consume(HoodieRecord hoodieRecord) {
                    arrayList.add(hoodieRecord);
                    this.count++;
                }

                /* renamed from: finish, reason: merged with bridge method [inline-methods] */
                public Integer m15finish() {
                    return Integer.valueOf(this.count);
                }
            }, Function.identity(), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
            Assertions.assertEquals(128, ((Integer) disruptorExecutor.execute()).intValue());
            Assertions.assertFalse(disruptorExecutor.isRunning());
            Assertions.assertEquals(generateInserts, arrayList);
            for (int i = 0; i < generateInserts.size(); i++) {
                Assertions.assertEquals(generateInserts.get(i), arrayList.get(i));
            }
            if (disruptorExecutor != null) {
                disruptorExecutor.shutdownNow();
            }
        } catch (Throwable th) {
            if (disruptorExecutor != null) {
                disruptorExecutor.shutdownNow();
            }
            throw th;
        }
    }

    @Timeout(60)
    @Test
    public void testInterruptExecutor() {
        List generateInserts = this.dataGen.generateInserts(this.instantTime, 100);
        DisruptorExecutor disruptorExecutor = new DisruptorExecutor(1024, generateInserts.iterator(), new HoodieConsumer<HoodieRecord, Integer>() { // from class: org.apache.hudi.execution.TestDisruptorExecutionInSpark.2
            public void consume(HoodieRecord hoodieRecord) {
                try {
                    synchronized (this) {
                        wait();
                    }
                } catch (InterruptedException e) {
                }
            }

            /* renamed from: finish, reason: merged with bridge method [inline-methods] */
            public Integer m16finish() {
                return 0;
            }
        }, Function.identity(), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
        try {
            Thread.currentThread().interrupt();
            disruptorExecutor.getClass();
            Assertions.assertThrows(HoodieException.class, disruptorExecutor::execute);
            Assertions.assertTrue(Thread.interrupted());
        } catch (Exception e) {
        }
    }
}
