/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceImpl;
import org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class StreamOperatorWrapperTest
extends TestLogger {
    private static SystemProcessingTimeService timerService;
    private static final int numOperators = 3;
    private List<StreamOperatorWrapper<?, ?>> operatorWrappers;
    private ConcurrentLinkedQueue<Object> output;
    private volatile StreamTask<?, ?> containingTask;

    @BeforeClass
    public static void startTimeService() {
        CompletableFuture errorFuture = new CompletableFuture();
        timerService = new SystemProcessingTimeService(errorFuture::complete);
    }

    @AfterClass
    public static void shutdownTimeService() {
        timerService.shutdownService();
    }

    @Before
    public void setup() throws Exception {
        this.operatorWrappers = new ArrayList();
        this.output = new ConcurrentLinkedQueue();
        try (MockEnvironment env = MockEnvironment.builder().build();){
            this.containingTask = new MockStreamTaskBuilder((Environment)env).build();
            for (int i = 0; i < 3; ++i) {
                MailboxExecutor mailboxExecutor = this.containingTask.getMailboxExecutorFactory().createExecutor(i);
                TimerMailController timerMailController = new TimerMailController(this.containingTask, mailboxExecutor);
                ProcessingTimeServiceImpl processingTimeService = new ProcessingTimeServiceImpl((TimerService)timerService, timerMailController::wrapCallback);
                TestOneInputStreamOperator streamOperator = new TestOneInputStreamOperator("Operator" + i, this.output, (ProcessingTimeService)processingTimeService, mailboxExecutor, timerMailController);
                streamOperator.setProcessingTimeService((ProcessingTimeService)processingTimeService);
                StreamOperatorWrapper operatorWrapper = new StreamOperatorWrapper((StreamOperator)streamOperator, Optional.ofNullable(streamOperator.getProcessingTimeService()), mailboxExecutor);
                this.operatorWrappers.add(operatorWrapper);
            }
            StreamOperatorWrapper<?, ?> previous = null;
            for (StreamOperatorWrapper<?, ?> current : this.operatorWrappers) {
                if (previous != null) {
                    previous.setNext(current);
                }
                current.setPrevious(previous);
                previous = current;
            }
        }
    }

    @After
    public void teardown() throws Exception {
        this.containingTask.cleanup();
    }

    @Test
    public void testClose() throws Exception {
        this.output.clear();
        this.operatorWrappers.get(0).close(this.containingTask.getActionExecutor());
        ArrayList expected = new ArrayList();
        for (int i = 0; i < this.operatorWrappers.size(); ++i) {
            String prefix = "[Operator" + i + "]";
            Collections.addAll(expected, prefix + ": End of input", prefix + ": Timer that was in mailbox before closing operator", prefix + ": Bye", prefix + ": Mail to put in mailbox when closing operator");
        }
        Assert.assertArrayEquals((String)"Output was not correct.", (Object[])expected.subList(2, expected.size()).toArray(), (Object[])this.output.toArray());
    }

    @Test
    public void testClosingOperatorWithException() {
        AbstractStreamOperator<Void> streamOperator = new AbstractStreamOperator<Void>(){

            public void close() throws Exception {
                throw new Exception("test exception at closing");
            }
        };
        StreamOperatorWrapper operatorWrapper = new StreamOperatorWrapper((StreamOperator)streamOperator, Optional.ofNullable(streamOperator.getProcessingTimeService()), this.containingTask.getMailboxExecutorFactory().createExecutor(0x7FFFFFFE));
        try {
            operatorWrapper.close(this.containingTask.getActionExecutor());
            Assert.fail((String)"should throw an exception");
        }
        catch (Throwable t) {
            Optional optional = ExceptionUtils.findThrowableWithMessage((Throwable)t, (String)"test exception at closing");
            Assert.assertTrue((boolean)optional.isPresent());
        }
    }

    @Test
    public void testReadIterator() {
        TestOneInputStreamOperator operator;
        StreamOperatorWrapper next;
        int i;
        StreamOperatorWrapper.ReadIterator it = new StreamOperatorWrapper.ReadIterator(this.operatorWrappers.get(0), false);
        for (i = 0; i < this.operatorWrappers.size(); ++i) {
            Assert.assertTrue((boolean)it.hasNext());
            next = (StreamOperatorWrapper)it.next();
            Assert.assertNotNull((Object)next);
            operator = this.getStreamOperatorFromWrapper(next);
            Assert.assertEquals((Object)("Operator" + i), (Object)operator.getName());
        }
        Assert.assertFalse((boolean)it.hasNext());
        it = new StreamOperatorWrapper.ReadIterator(this.operatorWrappers.get(this.operatorWrappers.size() - 1), true);
        for (i = this.operatorWrappers.size() - 1; i >= 0; --i) {
            Assert.assertTrue((boolean)it.hasNext());
            next = (StreamOperatorWrapper)it.next();
            Assert.assertNotNull((Object)next);
            operator = this.getStreamOperatorFromWrapper(next);
            Assert.assertEquals((Object)("Operator" + i), (Object)operator.getName());
        }
        Assert.assertFalse((boolean)it.hasNext());
    }

    private TestOneInputStreamOperator getStreamOperatorFromWrapper(StreamOperatorWrapper<?, ?> operatorWrapper) {
        return (TestOneInputStreamOperator)Objects.requireNonNull(operatorWrapper.getStreamOperator());
    }

    private static class TestOneInputStreamOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String>,
    BoundedOneInput {
        private static final long serialVersionUID = 1L;
        private final String name;
        private final ConcurrentLinkedQueue<Object> output;
        private final ProcessingTimeService processingTimeService;
        private final MailboxExecutor mailboxExecutor;
        private final TimerMailController timerMailController;

        TestOneInputStreamOperator(String name, ConcurrentLinkedQueue<Object> output, ProcessingTimeService processingTimeService, MailboxExecutor mailboxExecutor, TimerMailController timerMailController) {
            this.name = name;
            this.output = output;
            this.processingTimeService = processingTimeService;
            this.mailboxExecutor = mailboxExecutor;
            this.timerMailController = timerMailController;
            processingTimeService.registerTimer(Long.MAX_VALUE, t2 -> output.add("[" + name + "]: Timer not triggered"));
        }

        public String getName() {
            return this.name;
        }

        public void processElement(StreamRecord<String> element) {
        }

        public void endInput() throws InterruptedException {
            this.output.add("[" + this.name + "]: End of input");
            ProcessingTimeCallback callback = t1 -> this.output.add("[" + this.name + "]: Timer that was in mailbox before closing operator");
            this.processingTimeService.registerTimer(0L, callback);
            this.timerMailController.getInMailboxLatch(callback).await();
        }

        public void close() throws Exception {
            ProcessingTimeCallback callback = t1 -> this.output.add("[" + this.name + "]: Timer to put in mailbox when closing operator");
            Assert.assertNotNull((Object)this.processingTimeService.registerTimer(0L, callback));
            Assert.assertNull((Object)this.timerMailController.getPuttingLatch(callback));
            this.mailboxExecutor.submit(() -> this.output.add("[" + this.name + "]: Mail to put in mailbox when closing operator"), "");
            this.output.add("[" + this.name + "]: Bye");
        }
    }

    private static class TimerMailController {
        private final StreamTask<?, ?> containingTask;
        private final MailboxExecutor mailboxExecutor;
        private final ConcurrentHashMap<ProcessingTimeCallback, OneShotLatch> puttingLatches;
        private final ConcurrentHashMap<ProcessingTimeCallback, OneShotLatch> inMailboxLatches;

        TimerMailController(StreamTask<?, ?> containingTask, MailboxExecutor mailboxExecutor) {
            this.containingTask = containingTask;
            this.mailboxExecutor = mailboxExecutor;
            this.puttingLatches = new ConcurrentHashMap();
            this.inMailboxLatches = new ConcurrentHashMap();
        }

        OneShotLatch getPuttingLatch(ProcessingTimeCallback callback) {
            return this.puttingLatches.get(callback);
        }

        OneShotLatch getInMailboxLatch(ProcessingTimeCallback callback) {
            return this.inMailboxLatches.get(callback);
        }

        ProcessingTimeCallback wrapCallback(ProcessingTimeCallback callback) {
            this.puttingLatches.put(callback, new OneShotLatch());
            this.inMailboxLatches.put(callback, new OneShotLatch());
            return timestamp -> {
                this.puttingLatches.get(callback).trigger();
                this.containingTask.deferCallbackToMailbox(this.mailboxExecutor, callback).onProcessingTime(timestamp);
                this.inMailboxLatches.get(callback).trigger();
            };
        }
    }
}

