/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.collect;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.streaming.api.operators.collect.CollectCoordinationRequest;
import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
import org.apache.flink.streaming.api.operators.collect.CollectSinkAddressEvent;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator;
import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Test;

public class CollectSinkOperatorCoordinatorTest {
    private static final int SOCKET_TIMEOUT_MILLIS = 1000;
    private static final TypeSerializer<Row> serializer = new RowTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO}).createSerializer(new ExecutionConfig());

    @Test
    public void testNoAddress() throws Exception {
        CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator(1000);
        coordinator.start();
        CollectCoordinationRequest request = new CollectCoordinationRequest("version", 123L);
        CollectCoordinationResponse response = (CollectCoordinationResponse)coordinator.handleCoordinationRequest((CoordinationRequest)request).get();
        this.assertResponseEquals(request, response, -1L, Collections.emptyList());
        coordinator.close();
    }

    @Test
    public void testServerFailure() throws Exception {
        CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator(1000);
        coordinator.start();
        List<List> expected = Arrays.asList(Arrays.asList(Row.of((Object[])new Object[]{1, "aaa"}), Row.of((Object[])new Object[]{2, "bbb"})), Arrays.asList(Row.of((Object[])new Object[]{3, "ccc"}), Row.of((Object[])new Object[]{4, "ddd"}), Row.of((Object[])new Object[]{5, "eee"})));
        ServerThread server = new ServerThread(expected, 3);
        server.start();
        coordinator.handleEventFromOperator(0, (OperatorEvent)new CollectSinkAddressEvent(server.getServerAddress()));
        CollectCoordinationRequest request = new CollectCoordinationRequest("version1", 123L);
        CollectCoordinationResponse response = (CollectCoordinationResponse)coordinator.handleCoordinationRequest((CoordinationRequest)request).get();
        this.assertResponseEquals(request, response, 0L, expected.get(0));
        request = new CollectCoordinationRequest("version2", 456L);
        response = (CollectCoordinationResponse)coordinator.handleCoordinationRequest((CoordinationRequest)request).get();
        this.assertResponseEquals(request, response, 0L, expected.get(1));
        request = new CollectCoordinationRequest("version3", 789L);
        CompletableFuture responseFuture = coordinator.handleCoordinationRequest((CoordinationRequest)request);
        coordinator.subtaskFailed(0, null);
        expected = Collections.singletonList(Arrays.asList(Row.of((Object[])new Object[]{6, "fff"}), Row.of((Object[])new Object[]{7, "ggg"})));
        server = new ServerThread(expected, 2);
        server.start();
        coordinator.handleEventFromOperator(0, (OperatorEvent)new CollectSinkAddressEvent(server.getServerAddress()));
        response = (CollectCoordinationResponse)responseFuture.get();
        this.assertResponseEquals(request, response, -1L, Collections.emptyList());
        request = new CollectCoordinationRequest("version4", 101112L);
        response = (CollectCoordinationResponse)coordinator.handleCoordinationRequest((CoordinationRequest)request).get();
        this.assertResponseEquals(request, response, 0L, expected.get(0));
        server.close();
        coordinator.close();
    }

    private void assertResponseEquals(CollectCoordinationRequest request, CollectCoordinationResponse response, long expectedLastCheckpointedOffset, List<Row> expectedResults) throws Exception {
        Assert.assertEquals((Object)request.getVersion(), (Object)response.getVersion());
        Assert.assertEquals((long)expectedLastCheckpointedOffset, (long)response.getLastCheckpointedOffset());
        List results = response.getResults(serializer);
        Assert.assertEquals((long)expectedResults.size(), (long)results.size());
        for (int i = 0; i < results.size(); ++i) {
            Row expectedRow = expectedResults.get(i);
            Row actualRow = (Row)results.get(i);
            Assert.assertEquals((long)expectedRow.getArity(), (long)actualRow.getArity());
            for (int j = 0; j < actualRow.getArity(); ++j) {
                Assert.assertEquals((Object)expectedRow.getField(j), (Object)actualRow.getField(j));
            }
        }
    }

    private static class ServerThread
    extends Thread {
        private final LinkedList<List<Row>> data;
        private final int closeRequestNum;
        private final ServerSocket server;
        private boolean running;

        private ServerThread(List<List<Row>> data, int closeRequestNum) throws IOException {
            this.data = new LinkedList<List<Row>>(data);
            this.closeRequestNum = closeRequestNum;
            this.server = new ServerSocket(0);
        }

        @Override
        public void run() {
            this.running = true;
            int requestNum = 0;
            Socket socket = null;
            DataInputViewStreamWrapper inStream = null;
            DataOutputViewStreamWrapper outStream = null;
            try {
                while (this.running) {
                    if (socket == null) {
                        socket = this.server.accept();
                        inStream = new DataInputViewStreamWrapper(socket.getInputStream());
                        outStream = new DataOutputViewStreamWrapper(socket.getOutputStream());
                    }
                    CollectCoordinationRequest request = new CollectCoordinationRequest(inStream);
                    if (++requestNum >= this.closeRequestNum) {
                        this.running = false;
                        break;
                    }
                    CollectCoordinationResponse response = new CollectCoordinationResponse(request.getVersion(), 0L, this.data.removeFirst(), serializer);
                    response.serialize((DataOutputView)outStream);
                }
                socket.close();
                this.server.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }

        public void close() {
            this.running = false;
        }

        public InetSocketAddress getServerAddress() {
            return new InetSocketAddress(InetAddress.getLoopbackAddress(), this.server.getLocalPort());
        }
    }
}

