package org.apache.flink.runtime.blob;

import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/blob/BlobCacheRetriesTest.class */
public class BlobCacheRetriesTest {

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Test
    public void testBlobFetchRetries() throws IOException {
        testBlobFetchRetries(new Configuration(), new VoidBlobStore());
    }

    @Test
    public void testBlobFetchRetriesHa() throws IOException {
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
        configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, this.temporaryFolder.getRoot().getPath());
        BlobStore blobStore = null;
        try {
            blobStore = BlobUtils.createBlobStoreFromConfig(configuration);
            testBlobFetchRetries(configuration, blobStore);
            if (blobStore != null) {
                blobStore.closeAndCleanupAllData();
            }
        } catch (Throwable th) {
            if (blobStore != null) {
                blobStore.closeAndCleanupAllData();
            }
            throw th;
        }
    }

    private void testBlobFetchRetries(Configuration configuration, BlobStore blobStore) throws IOException {
        byte[] bArr = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
        TestingFailingBlobServer testingFailingBlobServer = null;
        BlobCache blobCache = null;
        try {
            testingFailingBlobServer = new TestingFailingBlobServer(configuration, blobStore, 2);
            InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", testingFailingBlobServer.getPort());
            BlobClient blobClient = null;
            try {
                blobClient = new BlobClient(inetSocketAddress, configuration);
                BlobKey put = blobClient.put(bArr);
                if (blobClient != null) {
                    blobClient.close();
                }
                blobCache = new BlobCache(inetSocketAddress, configuration, new VoidBlobStore());
                InputStream openStream = blobCache.getURL(put).openStream();
                try {
                    byte[] bArr2 = new byte[bArr.length];
                    Assert.assertEquals(bArr.length, openStream.read(bArr2));
                    Assert.assertArrayEquals(bArr, bArr2);
                    openStream.close();
                    if (blobCache != null) {
                        blobCache.close();
                    }
                    if (testingFailingBlobServer != null) {
                        testingFailingBlobServer.close();
                    }
                } catch (Throwable th) {
                    openStream.close();
                    throw th;
                }
            } catch (Throwable th2) {
                if (blobClient != null) {
                    blobClient.close();
                }
                throw th2;
            }
        } catch (Throwable th3) {
            if (blobCache != null) {
                blobCache.close();
            }
            if (testingFailingBlobServer != null) {
                testingFailingBlobServer.close();
            }
            throw th3;
        }
    }

    @Test
    public void testBlobFetchWithTooManyFailures() throws IOException {
        testBlobFetchWithTooManyFailures(new Configuration(), new VoidBlobStore());
    }

    @Test
    public void testBlobFetchWithTooManyFailuresHa() throws IOException {
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
        configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, this.temporaryFolder.getRoot().getPath());
        BlobStore blobStore = null;
        try {
            blobStore = BlobUtils.createBlobStoreFromConfig(configuration);
            testBlobFetchWithTooManyFailures(configuration, blobStore);
            if (blobStore != null) {
                blobStore.closeAndCleanupAllData();
            }
        } catch (Throwable th) {
            if (blobStore != null) {
                blobStore.closeAndCleanupAllData();
            }
            throw th;
        }
    }

    private void testBlobFetchWithTooManyFailures(Configuration configuration, BlobStore blobStore) throws IOException {
        byte[] bArr = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
        TestingFailingBlobServer testingFailingBlobServer = null;
        BlobCache blobCache = null;
        try {
            testingFailingBlobServer = new TestingFailingBlobServer(configuration, blobStore, 10);
            InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", testingFailingBlobServer.getPort());
            BlobClient blobClient = null;
            try {
                blobClient = new BlobClient(inetSocketAddress, configuration);
                BlobKey put = blobClient.put(bArr);
                if (blobClient != null) {
                    blobClient.close();
                }
                BlobCache blobCache2 = new BlobCache(inetSocketAddress, configuration, new VoidBlobStore());
                try {
                    blobCache2.getURL(put);
                    Assert.fail("This should fail");
                } catch (IOException e) {
                }
                if (blobCache2 != null) {
                    blobCache2.close();
                }
                if (testingFailingBlobServer != null) {
                    testingFailingBlobServer.close();
                }
            } catch (Throwable th) {
                if (blobClient != null) {
                    blobClient.close();
                }
                throw th;
            }
        } catch (Throwable th2) {
            if (0 != 0) {
                blobCache.close();
            }
            if (testingFailingBlobServer != null) {
                testingFailingBlobServer.close();
            }
            throw th2;
        }
    }
}
