package org.apache.tajo.worker;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Random;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.pullserver.retriever.FileChunk;
import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.util.CommonTestingUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/tajo/worker/TestFetcher.class */
public class TestFetcher {
    private String TEST_DATA = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestFetcher";
    private String INPUT_DIR = this.TEST_DATA + "/in/";
    private String OUTPUT_DIR = this.TEST_DATA + "/out/";
    private TajoConf conf = new TajoConf();
    private TajoPullServerService pullServerService;

    @Before
    public void setUp() throws Exception {
        CommonTestingUtil.getTestDir(this.TEST_DATA);
        CommonTestingUtil.getTestDir(this.INPUT_DIR);
        CommonTestingUtil.getTestDir(this.OUTPUT_DIR);
        this.conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, this.INPUT_DIR);
        this.conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT, 1);
        this.conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE, 127);
        this.pullServerService = new TajoPullServerService();
        this.pullServerService.init(this.conf);
        this.pullServerService.start();
    }

    @After
    public void tearDown() {
        this.pullServerService.stop();
    }

    @Test
    public void testGet() throws IOException {
        Random random = new Random();
        QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
        String str = this.conf.getVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR) + queryId.toString() + "/output/1/hash-shuffle/" + HashShuffleAppenderManager.getPartParentId(Integer.parseInt("1"), this.conf) + "/1";
        String format = String.format("qid=%s&sid=%s&p=%s&type=%s", queryId, "1", "1", "h");
        Path path = new Path(str);
        FSDataOutputStream create = FileSystem.getLocal(this.conf).create(path, true);
        for (int i = 0; i < 100; i++) {
            create.write(("" + random.nextInt()).getBytes());
        }
        create.flush();
        create.close();
        URI create2 = URI.create("http://127.0.0.1:" + this.pullServerService.getPort() + "/?" + format);
        FileChunk fileChunk = new FileChunk(new File(this.OUTPUT_DIR + "data"), 0L, 0L);
        fileChunk.setFromRemote(true);
        Fetcher fetcher = new Fetcher(this.conf, create2, fileChunk);
        FileChunk fileChunk2 = (FileChunk) fetcher.get().get(0);
        Assert.assertNotNull(fileChunk2);
        Assert.assertNotNull(fileChunk2.getFile());
        LocalFileSystem local = FileSystem.getLocal(new TajoConf());
        Assert.assertEquals(local.getFileStatus(path).getLen(), local.getFileStatus(new Path(this.OUTPUT_DIR, "data")).getLen());
        Assert.assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
    }

    @Test
    public void testAdjustFetchProcess() {
        Assert.assertEquals(0.0f, TaskImpl.adjustFetchProcess(0, 0), 0.0f);
        Assert.assertEquals(0.0f, TaskImpl.adjustFetchProcess(10, 10), 0.0f);
        Assert.assertEquals(0.05f, TaskImpl.adjustFetchProcess(10, 9), 0.0f);
        Assert.assertEquals(0.1f, TaskImpl.adjustFetchProcess(10, 8), 0.0f);
        Assert.assertEquals(0.25f, TaskImpl.adjustFetchProcess(10, 5), 0.0f);
        Assert.assertEquals(0.45f, TaskImpl.adjustFetchProcess(10, 1), 0.0f);
        Assert.assertEquals(0.5f, TaskImpl.adjustFetchProcess(10, 0), 0.0f);
    }

    @Test
    public void testStatus() throws Exception {
        Random random = new Random();
        QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
        String str = this.INPUT_DIR + queryId.toString() + "/output/1/1_0/output/1";
        String format = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, "1", "1", "h", "1_0");
        FSDataOutputStream create = FileSystem.getLocal(this.conf).create(new Path(str), true);
        for (int i = 0; i < 100; i++) {
            create.write(("" + random.nextInt()).getBytes());
        }
        create.flush();
        create.close();
        URI create2 = URI.create("http://127.0.0.1:" + this.pullServerService.getPort() + "/?" + format);
        FileChunk fileChunk = new FileChunk(new File(this.OUTPUT_DIR + "data"), 0L, 0L);
        fileChunk.setFromRemote(true);
        Fetcher fetcher = new Fetcher(this.conf, create2, fileChunk);
        Assert.assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
        fetcher.get();
        Assert.assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
    }

    @Test
    public void testNoContentFetch() throws Exception {
        QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
        String str = this.INPUT_DIR + queryId.toString() + "/output/1/1_0/output/1";
        String format = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, "1", "1", "h", "1_0");
        Path path = new Path(str);
        LocalFileSystem local = FileSystem.getLocal(this.conf);
        if (local.exists(path)) {
            local.delete(new Path(str), true);
        }
        FileSystem.getLocal(this.conf).create(new Path(str).getParent(), true).close();
        URI create = URI.create("http://127.0.0.1:" + this.pullServerService.getPort() + "/?" + format);
        FileChunk fileChunk = new FileChunk(new File(this.OUTPUT_DIR + "data"), 0L, 0L);
        fileChunk.setFromRemote(true);
        Fetcher fetcher = new Fetcher(this.conf, create, fileChunk);
        Assert.assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
        fetcher.get();
        Assert.assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
    }

    @Test
    public void testFailureStatus() throws Exception {
        Random random = new Random();
        QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
        String str = this.INPUT_DIR + queryId.toString() + "/output/1/1_0/output/1";
        String format = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, "1", "1", "x", "1_0");
        FSDataOutputStream create = FileSystem.getLocal(this.conf).create(new Path(str), true);
        for (int i = 0; i < 100; i++) {
            create.write((format + random.nextInt()).getBytes());
        }
        create.flush();
        create.close();
        URI create2 = URI.create("http://127.0.0.1:" + this.pullServerService.getPort() + "/?" + format);
        FileChunk fileChunk = new FileChunk(new File(this.OUTPUT_DIR + "data"), 0L, 0L);
        fileChunk.setFromRemote(true);
        Fetcher fetcher = new Fetcher(this.conf, create2, fileChunk);
        Assert.assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
        fetcher.get();
        Assert.assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
    }

    @Test
    public void testServerFailure() throws Exception {
        QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
        String str = this.INPUT_DIR + queryId.toString() + "/output/1/1_0/output/1";
        URI create = URI.create("http://127.0.0.1:" + this.pullServerService.getPort() + "/?" + String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, "1", "1", "h", "1_0"));
        FileChunk fileChunk = new FileChunk(new File(this.OUTPUT_DIR + "data"), 0L, 0L);
        fileChunk.setFromRemote(true);
        Fetcher fetcher = new Fetcher(this.conf, create, fileChunk);
        Assert.assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
        this.pullServerService.stop();
        boolean z = false;
        try {
            fetcher.get();
        } catch (Throwable th) {
            z = true;
        }
        Assert.assertTrue(z);
        Assert.assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
    }
}
