package org.apache.flink.runtime.blob;

import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.security.MessageDigest;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/blob/BlobClientTest.class */
public class BlobClientTest extends TestLogger {
    private static final int TEST_BUFFER_SIZE = 17000;
    static TestBlobServer blobServer;
    static Configuration clientConfig;

    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/blob/BlobClientTest$TestBlobServer.class */
    public static class TestBlobServer extends BlobServer {
        private volatile long blockingMillis;

        /* JADX INFO: Access modifiers changed from: package-private */
        public TestBlobServer(Configuration configuration, BlobStore blobStore) throws IOException {
            super(configuration, blobStore);
            this.blockingMillis = 0L;
        }

        void getFileInternal(@Nullable JobID jobID, BlobKey blobKey, File file) throws IOException {
            if (this.blockingMillis > 0) {
                try {
                    Thread.sleep(this.blockingMillis);
                } catch (InterruptedException e) {
                    throw new IOException(e);
                }
            }
            super.getFileInternal(jobID, blobKey, file);
        }

        void setBlockingMillis(long j) {
            this.blockingMillis = j;
        }
    }

    @BeforeClass
    public static void startServer() throws IOException {
        Configuration configuration = new Configuration();
        configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
        blobServer = new TestBlobServer(configuration, new VoidBlobStore());
        blobServer.start();
        clientConfig = new Configuration();
    }

    @AfterClass
    public static void stopServer() throws IOException {
        if (blobServer != null) {
            blobServer.close();
        }
    }

    private static byte[] createTestBuffer() {
        byte[] bArr = new byte[TEST_BUFFER_SIZE];
        for (int i = 0; i < bArr.length; i++) {
            bArr[i] = (byte) (i % 128);
        }
        return bArr;
    }

    private static byte[] prepareTestFile(File file) throws IOException {
        MessageDigest createMessageDigest = BlobUtils.createMessageDigest();
        byte[] bArr = new byte[TEST_BUFFER_SIZE];
        for (int i = 0; i < bArr.length; i++) {
            bArr[i] = (byte) (i % 128);
        }
        FileOutputStream fileOutputStream = null;
        try {
            fileOutputStream = new FileOutputStream(file);
            for (int i2 = 0; i2 < 20; i2++) {
                fileOutputStream.write(bArr);
                createMessageDigest.update(bArr);
            }
            if (fileOutputStream != null) {
                fileOutputStream.close();
            }
            return createMessageDigest.digest();
        } catch (Throwable th) {
            if (fileOutputStream != null) {
                fileOutputStream.close();
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void validateGetAndClose(InputStream inputStream, byte[] bArr) throws IOException {
        try {
            byte[] bArr2 = new byte[bArr.length];
            int i = 0;
            do {
                int read = inputStream.read(bArr2, i, bArr2.length - i);
                if (read < 0) {
                    throw new EOFException();
                }
                i += read;
            } while (i != bArr2.length);
            Assert.assertEquals(-1L, inputStream.read());
            Assert.assertArrayEquals(bArr, bArr2);
            inputStream.close();
        } catch (Throwable th) {
            inputStream.close();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void validateGetAndClose(InputStream inputStream, InputStream inputStream2) throws IOException {
        int read;
        do {
            try {
                read = inputStream.read();
                Assert.assertEquals(inputStream2.read(), read);
            } finally {
                inputStream.close();
                inputStream2.close();
            }
        } while (read >= 0);
    }

    static void validateGetAndClose(InputStream inputStream, File file) throws IOException {
        validateGetAndClose(inputStream, new FileInputStream(file));
    }

    @Test
    public void testContentAddressableBufferTransientBlob() throws IOException, InterruptedException {
        testContentAddressableBuffer(BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    public void testContentAddressableBufferPermantBlob() throws IOException, InterruptedException {
        testContentAddressableBuffer(BlobKey.BlobType.PERMANENT_BLOB);
    }

    private void testContentAddressableBuffer(BlobKey.BlobType blobType) throws IOException, InterruptedException {
        InputStream internal;
        Throwable th;
        BlobClient blobClient = null;
        try {
            byte[] createTestBuffer = createTestBuffer();
            MessageDigest createMessageDigest = BlobUtils.createMessageDigest();
            createMessageDigest.update(createTestBuffer);
            byte[] digest = createMessageDigest.digest();
            InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", getBlobServer().getPort());
            BlobClient blobClient2 = new BlobClient(inetSocketAddress, getBlobClientConfig());
            JobID jobID = new JobID();
            BlobKey blobKey = null;
            if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                blobKey = blobClient2.putBuffer((JobID) null, createTestBuffer, 0, createTestBuffer.length, blobType);
                Assert.assertArrayEquals(digest, blobKey.getHash());
            }
            BlobKey putBuffer = blobClient2.putBuffer(jobID, createTestBuffer, 0, createTestBuffer.length, blobType);
            Assert.assertArrayEquals(digest, putBuffer.getHash());
            if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                BlobKeyTest.verifyKeyDifferentHashEquals(blobKey, putBuffer);
            }
            if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                validateGetAndClose(blobClient2.getInternal((JobID) null, blobKey), createTestBuffer);
                BlobCachePutTest.verifyDeletedEventually(getBlobServer(), null, blobKey);
            }
            validateGetAndClose(blobClient2.getInternal(jobID, putBuffer), createTestBuffer);
            if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                BlobCachePutTest.verifyDeletedEventually(getBlobServer(), jobID, putBuffer);
            }
            try {
                InputStream internal2 = blobClient2.getInternal((JobID) null, BlobKey.createKey(blobType));
                Throwable th2 = null;
                try {
                    Assert.fail("Expected IOException did not occur");
                    if (internal2 != null) {
                        if (0 != 0) {
                            try {
                                internal2.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            internal2.close();
                        }
                    }
                } catch (Throwable th4) {
                    if (internal2 != null) {
                        if (0 != 0) {
                            try {
                                internal2.close();
                            } catch (Throwable th5) {
                                th2.addSuppressed(th5);
                            }
                        } else {
                            internal2.close();
                        }
                    }
                    throw th4;
                }
            } catch (IOException e) {
            }
            BlobClient blobClient3 = new BlobClient(inetSocketAddress, getBlobClientConfig());
            try {
                internal = blobClient3.getInternal(jobID, BlobKey.createKey(blobType));
                th = null;
            } catch (IOException e2) {
            }
            try {
                try {
                    Assert.fail("Expected IOException did not occur");
                    if (internal != null) {
                        if (0 != 0) {
                            try {
                                internal.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        } else {
                            internal.close();
                        }
                    }
                    if (blobClient3 != null) {
                        try {
                            blobClient3.close();
                        } catch (Throwable th7) {
                        }
                    }
                } catch (Throwable th8) {
                    th = th8;
                    throw th8;
                }
            } catch (Throwable th9) {
                if (internal != null) {
                    if (th != null) {
                        try {
                            internal.close();
                        } catch (Throwable th10) {
                            th.addSuppressed(th10);
                        }
                    } else {
                        internal.close();
                    }
                }
                throw th9;
            }
        } catch (Throwable th11) {
            if (0 != 0) {
                try {
                    blobClient.close();
                } catch (Throwable th12) {
                }
            }
            throw th11;
        }
    }

    protected Configuration getBlobClientConfig() {
        return clientConfig;
    }

    protected TestBlobServer getBlobServer() {
        return blobServer;
    }

    @Test
    public void testContentAddressableStreamTransientBlob() throws IOException, InterruptedException {
        testContentAddressableStream(BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    public void testContentAddressableStreamPermanentBlob() throws IOException, InterruptedException {
        testContentAddressableStream(BlobKey.BlobType.PERMANENT_BLOB);
    }

    private void testContentAddressableStream(BlobKey.BlobType blobType) throws IOException, InterruptedException {
        File newFile = temporaryFolder.newFile();
        byte[] prepareTestFile = prepareTestFile(newFile);
        InputStream inputStream = null;
        try {
            BlobClient blobClient = new BlobClient(new InetSocketAddress("localhost", getBlobServer().getPort()), getBlobClientConfig());
            Throwable th = null;
            try {
                try {
                    JobID jobID = new JobID();
                    BlobKey blobKey = null;
                    if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                        blobKey = blobClient.putInputStream((JobID) null, new FileInputStream(newFile), blobType);
                        Assert.assertArrayEquals(prepareTestFile, blobKey.getHash());
                    }
                    FileInputStream fileInputStream = new FileInputStream(newFile);
                    BlobKey putInputStream = blobClient.putInputStream(jobID, fileInputStream, blobType);
                    fileInputStream.close();
                    inputStream = null;
                    if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                        BlobKeyTest.verifyKeyDifferentHashEquals(blobKey, putInputStream);
                        validateGetAndClose(blobClient.getInternal((JobID) null, blobKey), newFile);
                        BlobCachePutTest.verifyDeletedEventually(getBlobServer(), null, blobKey);
                    }
                    validateGetAndClose(blobClient.getInternal(jobID, putInputStream), newFile);
                    if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                        BlobCachePutTest.verifyDeletedEventually(getBlobServer(), jobID, putInputStream);
                    }
                    if (blobClient != null) {
                        if (0 != 0) {
                            try {
                                blobClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            blobClient.close();
                        }
                    }
                    if (0 != 0) {
                        try {
                            inputStream.close();
                        } catch (Throwable th3) {
                        }
                    }
                } catch (Throwable th4) {
                    th = th4;
                    throw th4;
                }
            } finally {
            }
        } catch (Throwable th5) {
            if (inputStream != null) {
                try {
                    inputStream.close();
                } catch (Throwable th6) {
                }
            }
            throw th5;
        }
    }

    @Test
    public void testGetFailsDuringStreamingNoJobTransientBlob() throws IOException {
        testGetFailsDuringStreaming(null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    public void testGetFailsDuringStreamingForJobTransientBlob() throws IOException {
        testGetFailsDuringStreaming(new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    public void testGetFailsDuringStreamingForJobPermanentBlob() throws IOException {
        testGetFailsDuringStreaming(new JobID(), BlobKey.BlobType.PERMANENT_BLOB);
    }

    private void testGetFailsDuringStreaming(@Nullable JobID jobID, BlobKey.BlobType blobType) throws IOException {
        BlobClient blobClient = new BlobClient(new InetSocketAddress("localhost", getBlobServer().getPort()), getBlobClientConfig());
        Throwable th = null;
        try {
            try {
                byte[] bArr = new byte[5000000];
                new Random().nextBytes(bArr);
                BlobKey putBuffer = blobClient.putBuffer(jobID, bArr, 0, bArr.length, blobType);
                Assert.assertNotNull(putBuffer);
                InputStream internal = blobClient.getInternal(jobID, putBuffer);
                byte[] bArr2 = new byte[bArr.length];
                BlobUtils.readFully(internal, bArr2, 0, 50000, (String) null);
                BlobUtils.readFully(internal, bArr2, 50000, 50000, (String) null);
                Iterator it = getBlobServer().getCurrentActiveConnections().iterator();
                while (it.hasNext()) {
                    ((BlobServerConnection) it.next()).close();
                }
                try {
                    BlobUtils.readFully(internal, bArr2, 2 * 50000, bArr.length - (2 * 50000), (String) null);
                    Assert.assertArrayEquals(bArr, bArr2);
                } catch (IOException e) {
                }
                if (blobClient != null) {
                    if (0 == 0) {
                        blobClient.close();
                        return;
                    }
                    try {
                        blobClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (blobClient != null) {
                if (th != null) {
                    try {
                        blobClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    blobClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testUploadJarFilesHelper() throws Exception {
        uploadJarFile(getBlobServer(), getBlobClientConfig());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void uploadJarFile(BlobServer blobServer2, Configuration configuration) throws Exception {
        File createTempFile = File.createTempFile("testfile", ".dat");
        createTempFile.deleteOnExit();
        prepareTestFile(createTempFile);
        InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", blobServer2.getPort());
        uploadJarFile(inetSocketAddress, configuration, createTempFile);
        uploadJarFile(inetSocketAddress, configuration, createTempFile);
    }

    private static void uploadJarFile(InetSocketAddress inetSocketAddress, Configuration configuration, File file) throws IOException {
        JobID jobID = new JobID();
        List uploadFiles = BlobClient.uploadFiles(inetSocketAddress, configuration, jobID, Collections.singletonList(new Path(file.toURI())));
        Assert.assertEquals(1L, uploadFiles.size());
        BlobClient blobClient = new BlobClient(inetSocketAddress, configuration);
        Throwable th = null;
        try {
            try {
                validateGetAndClose(blobClient.getInternal(jobID, (BlobKey) uploadFiles.get(0)), file);
                if (blobClient != null) {
                    if (0 == 0) {
                        blobClient.close();
                        return;
                    }
                    try {
                        blobClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (blobClient != null) {
                if (th != null) {
                    try {
                        blobClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    blobClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSocketTimeout() {
        Configuration blobClientConfig = getBlobClientConfig();
        int integer = blobClientConfig.getInteger(BlobServerOptions.SO_TIMEOUT);
        blobClientConfig.setInteger(BlobServerOptions.SO_TIMEOUT, 50);
        getBlobServer().setBlockingMillis(10000L);
        try {
            try {
                BlobClient blobClient = new BlobClient(new InetSocketAddress("localhost", getBlobServer().getPort()), blobClientConfig);
                Throwable th = null;
                try {
                    try {
                        blobClient.getInternal(new JobID(), BlobKey.createKey(BlobKey.BlobType.TRANSIENT_BLOB));
                        Assert.fail("Should throw an exception.");
                        if (blobClient != null) {
                            if (0 != 0) {
                                try {
                                    blobClient.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                blobClient.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (blobClient != null) {
                        if (th != null) {
                            try {
                                blobClient.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            blobClient.close();
                        }
                    }
                    throw th3;
                }
            } catch (Throwable th5) {
                MatcherAssert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowable(th5, SocketTimeoutException.class).isPresent()), Is.is(true));
            }
        } finally {
            blobClientConfig.setInteger(BlobServerOptions.SO_TIMEOUT, integer);
            getBlobServer().setBlockingMillis(0L);
        }
    }
}
