/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.webmonitor.history;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.io.Writer;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HistoryServerOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.history.FsJobArchivist;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.HistoryServer;
import org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class HistoryServerTest
extends TestLogger {
    private static final JsonFactory JACKSON_FACTORY = new JsonFactory().enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET).disable(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().enable(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
    @Rule
    public final TemporaryFolder tmpFolder = new TemporaryFolder();
    private MiniClusterWithClientResource cluster;
    private File jmDirectory;
    private File hsDirectory;
    @Parameterized.Parameter
    public static boolean versionLessThan14;

    @Parameterized.Parameters(name="Flink version less than 1.4: {0}")
    public static Collection<Boolean> parameters() {
        return Arrays.asList(true, false);
    }

    @Before
    public void setUp() throws Exception {
        this.jmDirectory = this.tmpFolder.newFolder("jm_" + versionLessThan14);
        this.hsDirectory = this.tmpFolder.newFolder("hs_" + versionLessThan14);
        Configuration clusterConfig = new Configuration();
        clusterConfig.setString(JobManagerOptions.ARCHIVE_DIR, this.jmDirectory.toURI().toString());
        this.cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(clusterConfig).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(2).build());
        this.cluster.before();
    }

    @After
    public void tearDown() {
        if (this.cluster != null) {
            this.cluster.after();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHistoryServerIntegration() throws Exception {
        int numJobs = 2;
        boolean numLegacyJobs = true;
        CountDownLatch numExpectedArchivedJobs = new CountDownLatch(3);
        Configuration historyServerConfig = this.createTestConfiguration(false);
        HistoryServer hs = new HistoryServer(historyServerConfig, event -> {
            if (event.getType() == HistoryServerArchiveFetcher.ArchiveEventType.CREATED) {
                numExpectedArchivedJobs.countDown();
            }
        });
        try {
            hs.start();
            String baseUrl = "http://localhost:" + hs.getWebPort();
            Assert.assertEquals((long)0L, (long)HistoryServerTest.getJobsOverview(baseUrl).getJobs().size());
            for (int x = 0; x < 2; ++x) {
                HistoryServerTest.runJob();
            }
            HistoryServerTest.createLegacyArchive(this.jmDirectory.toPath());
            this.waitForArchivesCreation(3);
            Assert.assertTrue((boolean)numExpectedArchivedJobs.await(10L, TimeUnit.SECONDS));
            Assert.assertEquals((long)3L, (long)HistoryServerTest.getJobsOverview(baseUrl).getJobs().size());
            HistoryServerTest.getDashboardConfiguration(baseUrl);
        }
        finally {
            hs.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRemoveOldestModifiedArchivesBeyondHistorySizeLimit() throws Exception {
        int numArchivesToKeepInHistory = 2;
        int numArchivesBeforeHsStarted = 4;
        int numArchivesAfterHsStarted = 2;
        int numArchivesToRemoveUponHsStart = 2;
        long oneMinuteSinceEpoch = 60000L;
        LinkedList<String> expectedJobIdsToKeep = new LinkedList<String>();
        for (int j = 0; j < 4; ++j) {
            String jobId = HistoryServerTest.createLegacyArchive(this.jmDirectory.toPath(), (long)j * 60000L);
            if (j < 2) continue;
            expectedJobIdsToKeep.add(jobId);
        }
        CountDownLatch numArchivesCreatedInitially = new CountDownLatch(2);
        CountDownLatch numArchivesDeletedInitially = new CountDownLatch(2);
        CountDownLatch numArchivesCreatedTotal = new CountDownLatch(4);
        CountDownLatch numArchivesDeletedTotal = new CountDownLatch(4);
        Configuration historyServerConfig = this.createTestConfiguration((Boolean)HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS.defaultValue());
        historyServerConfig.set(HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS, (Object)2);
        HistoryServer hs = new HistoryServer(historyServerConfig, event -> {
            switch (event.getType()) {
                case CREATED: {
                    numArchivesCreatedInitially.countDown();
                    numArchivesCreatedTotal.countDown();
                    break;
                }
                case DELETED: {
                    numArchivesDeletedInitially.countDown();
                    numArchivesDeletedTotal.countDown();
                }
            }
        });
        try {
            hs.start();
            String baseUrl = "http://localhost:" + hs.getWebPort();
            Assert.assertTrue((boolean)numArchivesCreatedInitially.await(10L, TimeUnit.SECONDS));
            Assert.assertTrue((boolean)numArchivesDeletedInitially.await(10L, TimeUnit.SECONDS));
            Assert.assertEquals(new HashSet(expectedJobIdsToKeep), this.getIdsFromJobOverview(baseUrl));
            for (int j = 4; j < 6; ++j) {
                expectedJobIdsToKeep.remove(0);
                expectedJobIdsToKeep.add(HistoryServerTest.createLegacyArchive(this.jmDirectory.toPath(), (long)j * 60000L));
            }
            Assert.assertTrue((boolean)numArchivesCreatedTotal.await(10L, TimeUnit.SECONDS));
            Assert.assertTrue((boolean)numArchivesDeletedTotal.await(10L, TimeUnit.SECONDS));
            Assert.assertEquals(new HashSet(expectedJobIdsToKeep), this.getIdsFromJobOverview(baseUrl));
        }
        finally {
            hs.stop();
        }
    }

    private Set<String> getIdsFromJobOverview(String baseUrl) throws Exception {
        return HistoryServerTest.getJobsOverview(baseUrl).getJobs().stream().map(JobDetails::getJobId).map(AbstractID::toString).collect(Collectors.toSet());
    }

    @Test(expected=IllegalConfigurationException.class)
    public void testFailIfHistorySizeLimitIsZero() throws Exception {
        this.startHistoryServerWithSizeLimit(0);
    }

    @Test(expected=IllegalConfigurationException.class)
    public void testFailIfHistorySizeLimitIsLessThanMinusOne() throws Exception {
        this.startHistoryServerWithSizeLimit(-2);
    }

    private void startHistoryServerWithSizeLimit(int maxHistorySize) throws IOException, FlinkException, InterruptedException {
        Configuration historyServerConfig = this.createTestConfiguration((Boolean)HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS.defaultValue());
        historyServerConfig.setInteger(HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS, maxHistorySize);
        new HistoryServer(historyServerConfig).start();
    }

    @Test
    public void testCleanExpiredJob() throws Exception {
        this.runArchiveExpirationTest(true);
    }

    @Test
    public void testRemainExpiredJob() throws Exception {
        this.runArchiveExpirationTest(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runArchiveExpirationTest(boolean cleanupExpiredJobs) throws Exception {
        int numExpiredJobs = cleanupExpiredJobs ? 1 : 0;
        int numJobs = 3;
        for (int x = 0; x < numJobs; ++x) {
            HistoryServerTest.runJob();
        }
        this.waitForArchivesCreation(numJobs);
        CountDownLatch numExpectedArchivedJobs = new CountDownLatch(numJobs);
        CountDownLatch firstArchiveExpiredLatch = new CountDownLatch(numExpiredJobs);
        CountDownLatch allArchivesExpiredLatch = new CountDownLatch(cleanupExpiredJobs ? numJobs : 0);
        Configuration historyServerConfig = this.createTestConfiguration(cleanupExpiredJobs);
        HistoryServer hs = new HistoryServer(historyServerConfig, event -> {
            switch (event.getType()) {
                case CREATED: {
                    numExpectedArchivedJobs.countDown();
                    break;
                }
                case DELETED: {
                    firstArchiveExpiredLatch.countDown();
                    allArchivesExpiredLatch.countDown();
                }
            }
        });
        try {
            hs.start();
            String baseUrl = "http://localhost:" + hs.getWebPort();
            Assert.assertTrue((boolean)numExpectedArchivedJobs.await(10L, TimeUnit.SECONDS));
            Collection jobs = HistoryServerTest.getJobsOverview(baseUrl).getJobs();
            Assert.assertEquals((long)numJobs, (long)jobs.size());
            String jobIdToDelete = jobs.stream().findFirst().map(JobDetails::getJobId).map(AbstractID::toString).orElseThrow(() -> new IllegalStateException("Expected at least one existing job"));
            Files.deleteIfExists(this.jmDirectory.toPath().resolve(jobIdToDelete));
            Assert.assertTrue((boolean)firstArchiveExpiredLatch.await(10L, TimeUnit.SECONDS));
            Collection jobsAfterDeletion = HistoryServerTest.getJobsOverview(baseUrl).getJobs();
            Assert.assertEquals((long)(numJobs - numExpiredJobs), (long)jobsAfterDeletion.size());
            Assert.assertEquals((long)(1 - numExpiredJobs), (long)jobsAfterDeletion.stream().map(JobDetails::getJobId).map(AbstractID::toString).filter(jobId -> jobId.equals(jobIdToDelete)).count());
            List remainingJobIds = jobsAfterDeletion.stream().map(JobDetails::getJobId).map(AbstractID::toString).collect(Collectors.toList());
            for (String remainingJobId : remainingJobIds) {
                Files.deleteIfExists(this.jmDirectory.toPath().resolve(remainingJobId));
            }
            Assert.assertTrue((boolean)allArchivesExpiredLatch.await(10L, TimeUnit.SECONDS));
            this.assertJobFilesCleanedUp(cleanupExpiredJobs);
        }
        finally {
            hs.stop();
        }
    }

    private void assertJobFilesCleanedUp(boolean jobFilesShouldBeDeleted) throws IOException {
        try (Stream<java.nio.file.Path> paths = Files.walk(this.hsDirectory.toPath(), new FileVisitOption[0]);){
            List jobFiles = paths.filter(path -> !path.equals(this.hsDirectory.toPath())).map(path -> this.hsDirectory.toPath().relativize((java.nio.file.Path)path)).filter(path -> !path.equals(Paths.get("config.json", new String[0]))).filter(path -> !path.equals(Paths.get("jobs", new String[0]))).filter(path -> !path.equals(Paths.get("jobs", "overview.json"))).filter(path -> !path.equals(Paths.get("overviews", new String[0]))).collect(Collectors.toList());
            MatcherAssert.assertThat(jobFiles, (Matcher)(jobFilesShouldBeDeleted ? Matchers.empty() : Matchers.not((Matcher)Matchers.empty())));
        }
    }

    private void waitForArchivesCreation(int numJobs) throws InterruptedException {
        File[] archives = this.jmDirectory.listFiles();
        while (archives == null || archives.length != numJobs) {
            Thread.sleep(50L);
            archives = this.jmDirectory.listFiles();
        }
    }

    private Configuration createTestConfiguration(boolean cleanupExpiredJobs) {
        Configuration historyServerConfig = new Configuration();
        historyServerConfig.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, this.jmDirectory.toURI().toString());
        historyServerConfig.setString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR, this.hsDirectory.getAbsolutePath());
        historyServerConfig.setLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL, 100L);
        historyServerConfig.setBoolean(HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS, cleanupExpiredJobs);
        historyServerConfig.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 0);
        return historyServerConfig;
    }

    private static DashboardConfiguration getDashboardConfiguration(String baseUrl) throws Exception {
        Tuple2<Integer, String> response = HistoryServerTest.getFromHTTP(baseUrl + DashboardConfigurationHeaders.INSTANCE.getTargetRestEndpointURL());
        return (DashboardConfiguration)OBJECT_MAPPER.readValue((String)response.f1, DashboardConfiguration.class);
    }

    private static MultipleJobsDetails getJobsOverview(String baseUrl) throws Exception {
        Tuple2<Integer, String> response = HistoryServerTest.getFromHTTP(baseUrl + "/jobs/overview");
        return (MultipleJobsDetails)OBJECT_MAPPER.readValue((String)response.f1, MultipleJobsDetails.class);
    }

    private static void runJob() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromElements((Object[])new Integer[]{1, 2, 3}).addSink((SinkFunction)new DiscardingSink());
        env.execute();
    }

    public static Tuple2<Integer, String> getFromHTTP(String url) throws Exception {
        URL u = new URL(url);
        HttpURLConnection connection = (HttpURLConnection)u.openConnection();
        connection.setConnectTimeout(100000);
        connection.connect();
        InputStream is = connection.getResponseCode() >= 400 ? connection.getErrorStream() : connection.getInputStream();
        return Tuple2.of((Object)connection.getResponseCode(), (Object)IOUtils.toString((InputStream)is, (String)(connection.getContentEncoding() != null ? connection.getContentEncoding() : "UTF-8")));
    }

    private static String createLegacyArchive(java.nio.file.Path directory, long fileModifiedDate) throws IOException {
        String jobId = HistoryServerTest.createLegacyArchive(directory);
        File jobArchive = directory.resolve(jobId).toFile();
        jobArchive.setLastModified(fileModifiedDate);
        return jobId;
    }

    private static String createLegacyArchive(java.nio.file.Path directory) throws IOException {
        JobID jobId = JobID.generate();
        StringWriter sw = new StringWriter();
        try (JsonGenerator gen = JACKSON_FACTORY.createGenerator((Writer)sw);
             JsonObject root = new JsonObject(gen);
             JsonArray finished = new JsonArray(gen, "finished");
             JsonObject job = new JsonObject(gen);){
            gen.writeStringField("jid", jobId.toString());
            gen.writeStringField("name", "testjob");
            gen.writeStringField("state", JobStatus.FINISHED.name());
            gen.writeNumberField("start-time", 0L);
            gen.writeNumberField("end-time", 1L);
            gen.writeNumberField("duration", 1L);
            gen.writeNumberField("last-modification", 1L);
            try (JsonObject tasks = new JsonObject(gen, "tasks");){
                gen.writeNumberField("total", 0);
                if (versionLessThan14) {
                    gen.writeNumberField("pending", 0);
                } else {
                    gen.writeNumberField("created", 0);
                    gen.writeNumberField("deploying", 0);
                    gen.writeNumberField("scheduled", 0);
                }
                gen.writeNumberField("running", 0);
                gen.writeNumberField("finished", 0);
                gen.writeNumberField("canceling", 0);
                gen.writeNumberField("canceled", 0);
                gen.writeNumberField("failed", 0);
            }
        }
        String json = sw.toString();
        ArchivedJson archivedJson = new ArchivedJson("/joboverview", json);
        FsJobArchivist.archiveJob((Path)new Path(directory.toUri()), (JobID)jobId, Collections.singleton(archivedJson));
        return jobId.toString();
    }

    private static final class JsonArray
    implements AutoCloseable {
        private final JsonGenerator gen;

        JsonArray(JsonGenerator gen, String name) throws IOException {
            this.gen = gen;
            gen.writeArrayFieldStart(name);
        }

        @Override
        public void close() throws IOException {
            this.gen.writeEndArray();
        }
    }

    private static final class JsonObject
    implements AutoCloseable {
        private final JsonGenerator gen;

        JsonObject(JsonGenerator gen) throws IOException {
            this.gen = gen;
            gen.writeStartObject();
        }

        private JsonObject(JsonGenerator gen, String name) throws IOException {
            this.gen = gen;
            gen.writeObjectFieldStart(name);
        }

        @Override
        public void close() throws IOException {
            this.gen.writeEndObject();
        }
    }
}

