/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.pipe.extractor;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PipeRealtimeExtractTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeExtractTest.class);
    private final String dataRegion1 = "1";
    private final String dataRegion2 = "2";
    private final String pattern1 = "root.sg.d";
    private final String pattern2 = "root.sg.d.a";
    private final String[] device = new String[]{"root", "sg", "d"};
    private final AtomicBoolean alive = new AtomicBoolean();
    private File tmpDir;
    private File tsFileDir;
    private ExecutorService writeService;
    private ExecutorService listenerService;

    @Before
    public void setUp() throws IOException {
        this.writeService = Executors.newFixedThreadPool(2);
        this.listenerService = Executors.newFixedThreadPool(4);
        this.tmpDir = new File(Files.createTempDirectory("pipeRealtimeExtractor", new FileAttribute[0]).toString());
        this.tsFileDir = new File(this.tmpDir.getPath() + File.separator + "sequence" + File.separator + "root.sg");
    }

    @After
    public void tearDown() {
        this.writeService.shutdownNow();
        this.listenerService.shutdownNow();
        FileUtils.deleteFileOrDirectory((File)this.tmpDir);
    }

    @Test
    public void testRealtimeExtractProcess() {
        try (PipeRealtimeDataRegionLogExtractor extractor0 = new PipeRealtimeDataRegionLogExtractor();
             PipeRealtimeDataRegionHybridExtractor extractor1 = new PipeRealtimeDataRegionHybridExtractor();
             PipeRealtimeDataRegionTsFileExtractor extractor2 = new PipeRealtimeDataRegionTsFileExtractor();
             PipeRealtimeDataRegionHybridExtractor extractor3 = new PipeRealtimeDataRegionHybridExtractor();){
            PipeParameters parameters0 = new PipeParameters((Map)new HashMap<String, String>(){
                {
                    this.put("extractor.pattern", "root.sg.d");
                }
            });
            PipeParameters parameters1 = new PipeParameters((Map)new HashMap<String, String>(){
                {
                    this.put("extractor.pattern", "root.sg.d.a");
                }
            });
            PipeParameters parameters2 = new PipeParameters((Map)new HashMap<String, String>(){
                {
                    this.put("extractor.pattern", "root.sg.d");
                }
            });
            PipeParameters parameters3 = new PipeParameters((Map)new HashMap<String, String>(){
                {
                    this.put("extractor.pattern", "root.sg.d.a");
                }
            });
            PipeTaskRuntimeConfiguration configuration0 = new PipeTaskRuntimeConfiguration((PipeRuntimeEnvironment)new PipeTaskExtractorRuntimeEnvironment("1", 1L, Integer.parseInt("1"), null));
            PipeTaskRuntimeConfiguration configuration1 = new PipeTaskRuntimeConfiguration((PipeRuntimeEnvironment)new PipeTaskExtractorRuntimeEnvironment("1", 1L, Integer.parseInt("1"), null));
            PipeTaskRuntimeConfiguration configuration2 = new PipeTaskRuntimeConfiguration((PipeRuntimeEnvironment)new PipeTaskExtractorRuntimeEnvironment("1", 1L, Integer.parseInt("2"), null));
            PipeTaskRuntimeConfiguration configuration3 = new PipeTaskRuntimeConfiguration((PipeRuntimeEnvironment)new PipeTaskExtractorRuntimeEnvironment("1", 1L, Integer.parseInt("2"), null));
            extractor0.validate(new PipeParameterValidator(parameters0));
            extractor0.customize(parameters0, (PipeExtractorRuntimeConfiguration)configuration0);
            extractor1.validate(new PipeParameterValidator(parameters1));
            extractor1.customize(parameters1, (PipeExtractorRuntimeConfiguration)configuration1);
            extractor2.validate(new PipeParameterValidator(parameters2));
            extractor2.customize(parameters2, (PipeExtractorRuntimeConfiguration)configuration2);
            extractor3.validate(new PipeParameterValidator(parameters3));
            extractor3.customize(parameters3, (PipeExtractorRuntimeConfiguration)configuration3);
            PipeRealtimeDataRegionExtractor[] extractors = new PipeRealtimeDataRegionExtractor[]{extractor0, extractor1, extractor2, extractor3};
            extractors[0].start();
            extractors[1].start();
            int writeNum = 10;
            List<Future> writeFutures = Arrays.asList(this.write2DataRegion(writeNum, "1", 0), this.write2DataRegion(writeNum, "2", 0));
            this.alive.set(true);
            List<Future> listenFutures = Arrays.asList(this.listen(extractors[0], event -> event instanceof TabletInsertionEvent ? 1 : 2, writeNum << 1), this.listen(extractors[1], event -> 1, writeNum));
            try {
                listenFutures.get(0).get(10L, TimeUnit.MINUTES);
                listenFutures.get(1).get(10L, TimeUnit.MINUTES);
            }
            catch (TimeoutException e) {
                LOGGER.warn("Time out when listening extractor", (Throwable)e);
                this.alive.set(false);
                Assert.fail();
            }
            writeFutures.forEach(future -> {
                try {
                    future.get();
                }
                catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            });
            extractors[2].start();
            extractors[3].start();
            writeFutures = Arrays.asList(this.write2DataRegion(writeNum, "1", writeNum), this.write2DataRegion(writeNum, "2", writeNum));
            this.alive.set(true);
            listenFutures = Arrays.asList(this.listen(extractors[0], event -> event instanceof TabletInsertionEvent ? 1 : 2, writeNum << 1), this.listen(extractors[1], event -> 1, writeNum), this.listen(extractors[2], event -> event instanceof TabletInsertionEvent ? 1 : 2, writeNum << 1), this.listen(extractors[3], event -> 1, writeNum));
            try {
                listenFutures.get(0).get(10L, TimeUnit.MINUTES);
                listenFutures.get(1).get(10L, TimeUnit.MINUTES);
                listenFutures.get(2).get(10L, TimeUnit.MINUTES);
                listenFutures.get(3).get(10L, TimeUnit.MINUTES);
            }
            catch (TimeoutException e) {
                LOGGER.warn("Time out when listening extractor", (Throwable)e);
                this.alive.set(false);
                Assert.fail();
            }
            writeFutures.forEach(future -> {
                try {
                    future.get();
                }
                catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            });
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private Future<?> write2DataRegion(int writeNum, String dataRegionId, int startNum) {
        File dataRegionDir = new File(this.tsFileDir.getPath() + File.separator + dataRegionId + File.separator + "0");
        boolean ignored = dataRegionDir.mkdirs();
        return this.writeService.submit(() -> {
            for (int i = startNum; i < startNum + writeNum; ++i) {
                File tsFile = new File(dataRegionDir, String.format("%s-%s-0-0.tsfile", i, i));
                try {
                    boolean bl = tsFile.createNewFile();
                }
                catch (IOException e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }
                TsFileResource resource = new TsFileResource(tsFile);
                resource.updateStartTime((IDeviceID)new PlainDeviceID(String.join((CharSequence)".", this.device)), 0L);
                PipeInsertionDataNodeListener.getInstance().listenToInsertNode(dataRegionId, (WALEntryHandler)Mockito.mock(WALEntryHandler.class), (InsertNode)new InsertRowNode(new PlanNodeId(String.valueOf(i)), new PartialPath(this.device), false, new String[]{"a"}, null, 0L, null, false), resource);
                PipeInsertionDataNodeListener.getInstance().listenToInsertNode(dataRegionId, (WALEntryHandler)Mockito.mock(WALEntryHandler.class), (InsertNode)new InsertRowNode(new PlanNodeId(String.valueOf(i)), new PartialPath(this.device), false, new String[]{"b"}, null, 0L, null, false), resource);
                PipeInsertionDataNodeListener.getInstance().listenToTsFile(dataRegionId, resource, false, false);
            }
        });
    }

    private Future<?> listen(PipeRealtimeDataRegionExtractor extractor, Function<Event, Integer> weight, int expectNum) {
        return this.listenerService.submit(() -> {
            int eventNum = 0;
            try {
                while (this.alive.get() && eventNum < expectNum) {
                    Event event;
                    try {
                        event = extractor.supply();
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                    if (event == null) continue;
                    eventNum += ((Integer)weight.apply(event)).intValue();
                }
            }
            finally {
                Assert.assertEquals((long)expectNum, (long)eventNum);
            }
        });
    }
}

