package org.apache.hudi.execution;

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.queue.DisruptorExecutor;
import org.apache.hudi.common.util.queue.DisruptorMessageQueue;
import org.apache.hudi.common.util.queue.ExecutorType;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.common.util.queue.WaitStrategyFactory;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.ExceptionUtil;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.HoodieLazyInsertIterable;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/hudi/execution/TestDisruptorMessageQueue.class */
public class TestDisruptorMessageQueue extends HoodieSparkClientTestHarness {
    private final String instantTime = HoodieActiveTimeline.createNewInstantTime();
    private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withExecutorType(ExecutorType.DISRUPTOR.name()).withWriteExecutorDisruptorWriteBufferLimitBytes(16).build(false);

    @BeforeEach
    public void setUp() throws Exception {
        initTestDataGenerator();
        initExecutorServiceWithFixedThreadPool(2);
    }

    @AfterEach
    public void tearDown() throws Exception {
        cleanupResources();
    }

    private Runnable getPreExecuteRunnable() {
        TaskContext taskContext = TaskContext.get();
        return () -> {
            TaskContext$.MODULE$.setTaskContext(taskContext);
        };
    }

    @Timeout(60)
    @Test
    public void testRecordReading() throws Exception {
        List generateInserts = this.dataGen.generateInserts(this.instantTime, 100);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        final ArrayList arrayList3 = new ArrayList();
        final ArrayList arrayList4 = new ArrayList();
        generateInserts.forEach(hoodieRecord -> {
            HoodieAvroRecord hoodieAvroRecord = (HoodieAvroRecord) hoodieRecord;
            arrayList.add(hoodieAvroRecord);
            try {
                arrayList2.add(hoodieAvroRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get());
            } catch (IOException e) {
            }
        });
        DisruptorExecutor disruptorExecutor = null;
        try {
            disruptorExecutor = new DisruptorExecutor(this.writeConfig.getWriteExecutorDisruptorWriteBufferLimitBytes(), generateInserts.iterator(), new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() { // from class: org.apache.hudi.execution.TestDisruptorMessageQueue.1
                private int count = 0;

                public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> hoodieInsertValueGenResult) {
                    this.count++;
                    arrayList3.add(hoodieInsertValueGenResult.getResult());
                    try {
                        arrayList4.add((IndexedRecord) hoodieInsertValueGenResult.getResult().getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get());
                    } catch (IOException e) {
                    }
                }

                /* renamed from: finish, reason: merged with bridge method [inline-methods] */
                public Integer m18finish() {
                    return Integer.valueOf(this.count);
                }
            }, HoodieLazyInsertIterable.getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, this.writeConfig), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
            Assertions.assertEquals(100, ((Integer) disruptorExecutor.execute()).intValue());
            Assertions.assertFalse(disruptorExecutor.isRunning());
            Assertions.assertEquals(arrayList, arrayList3);
            Assertions.assertEquals(arrayList2, arrayList4);
            if (disruptorExecutor != null) {
                disruptorExecutor.shutdownNow();
            }
        } catch (Throwable th) {
            if (disruptorExecutor != null) {
                disruptorExecutor.shutdownNow();
            }
            throw th;
        }
    }

    @Timeout(60)
    @Test
    public void testCompositeProducerRecordReading() throws Exception {
        ArrayList arrayList = new ArrayList();
        DisruptorMessageQueue disruptorMessageQueue = new DisruptorMessageQueue(1024, HoodieLazyInsertIterable.getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, this.writeConfig), "BLOCKING_WAIT", 40, new Runnable() { // from class: org.apache.hudi.execution.TestDisruptorMessageQueue.2
            @Override // java.lang.Runnable
            public void run() {
            }
        });
        final HashMap hashMap = new HashMap();
        for (int i = 0; i < 40; i++) {
            List<HoodieRecord> generateInserts = this.dataGen.generateInserts(this.instantTime, 1000);
            int i2 = 0;
            for (HoodieRecord hoodieRecord : generateInserts) {
                Assertions.assertFalse(hashMap.containsKey(hoodieRecord.getRecordKey()));
                hashMap.put(hoodieRecord.getRecordKey(), Pair.of(Integer.valueOf(i), Integer.valueOf(i2)));
                i2++;
            }
            arrayList.add(generateInserts);
        }
        ArrayList arrayList2 = new ArrayList();
        for (int i3 = 0; i3 < arrayList.size(); i3++) {
            List list = (List) arrayList.get(i3);
            if (i3 % 2 == 0) {
                arrayList2.add(new IteratorBasedQueueProducer(list.iterator()));
            } else {
                arrayList2.add(new FunctionBasedQueueProducer(hoodieMessageQueue -> {
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        try {
                            hoodieMessageQueue.insertRecord(it.next());
                        } catch (Exception e) {
                            throw new HoodieException(e);
                        }
                    }
                    return true;
                }));
            }
        }
        final Map map = (Map) IntStream.range(0, 40).boxed().collect(Collectors.toMap(Function.identity(), num -> {
            return -1;
        }));
        final Map map2 = (Map) IntStream.range(0, 40).boxed().collect(Collectors.toMap(Function.identity(), num2 -> {
            return 0;
        }));
        HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> hoodieConsumer = new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() { // from class: org.apache.hudi.execution.TestDisruptorMessageQueue.3
            public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> hoodieInsertValueGenResult) {
                Pair pair = (Pair) hashMap.get(hoodieInsertValueGenResult.getResult().getRecordKey());
                Integer num3 = (Integer) map.get(pair.getLeft());
                map2.put(pair.getLeft(), Integer.valueOf(((Integer) map2.get(pair.getLeft())).intValue() + 1));
                map.put(pair.getLeft(), Integer.valueOf(num3.intValue() + 1));
                Assertions.assertEquals(num3.intValue() + 1, ((Integer) pair.getRight()).intValue());
            }

            /* renamed from: finish, reason: merged with bridge method [inline-methods] */
            public Integer m19finish() {
                return 0;
            }
        };
        Method declaredMethod = disruptorMessageQueue.getClass().getDeclaredMethod("setHandlers", HoodieConsumer.class);
        declaredMethod.setAccessible(true);
        declaredMethod.invoke(disruptorMessageQueue, hoodieConsumer);
        Method declaredMethod2 = disruptorMessageQueue.getClass().getDeclaredMethod("start", new Class[0]);
        declaredMethod2.setAccessible(true);
        declaredMethod2.invoke(disruptorMessageQueue, new Object[0]);
        CompletableFuture.allOf((CompletableFuture[]) arrayList2.stream().map(hoodieProducer -> {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    hoodieProducer.produce(disruptorMessageQueue);
                    return true;
                } catch (Throwable th) {
                    throw new HoodieException("Error producing records in disruptor executor", th);
                }
            }, this.executorService);
        }).toArray(i4 -> {
            return new CompletableFuture[i4];
        })).get();
        disruptorMessageQueue.close();
        for (int i5 = 0; i5 < 40; i5++) {
            Assertions.assertEquals(1000, (Integer) map2.get(Integer.valueOf(i5)));
        }
    }

    @Timeout(60)
    @Test
    public void testException() throws Exception {
        List generateInserts = this.dataGen.generateInserts(this.instantTime, 1000);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 2; i++) {
            if (i % 2 == 0) {
                arrayList.add(new IteratorBasedQueueProducer(generateInserts.iterator()));
            } else {
                arrayList.add(new FunctionBasedQueueProducer(hoodieMessageQueue -> {
                    throw new HoodieException("Exception when produce records!!!");
                }));
            }
        }
        DisruptorExecutor disruptorExecutor = new DisruptorExecutor(1024, arrayList, new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() { // from class: org.apache.hudi.execution.TestDisruptorMessageQueue.4
            int count = 0;

            public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> hoodieInsertValueGenResult) {
                hoodieInsertValueGenResult.getResult();
                this.count++;
            }

            /* renamed from: finish, reason: merged with bridge method [inline-methods] */
            public Integer m20finish() {
                return Integer.valueOf(this.count);
            }
        }, HoodieLazyInsertIterable.getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, this.writeConfig), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
        disruptorExecutor.getClass();
        Assertions.assertEquals("Exception when produce records!!!", ExceptionUtil.getRootCause(Assertions.assertThrows(HoodieException.class, disruptorExecutor::execute, "exception is expected")).getMessage());
    }
}
