package com.ibm.fhir.bucket.scanner;

import com.ibm.fhir.bucket.api.BucketLoaderJob;
import com.ibm.fhir.bucket.api.BucketPath;
import com.ibm.fhir.bucket.api.FileType;
import com.ibm.fhir.bucket.api.ResourceBundleError;
import com.ibm.fhir.bucket.api.ResourceEntry;
import com.ibm.fhir.bucket.api.ResourceRef;
import com.ibm.fhir.database.utils.thread.ThreadHandler;
import com.ibm.fhir.exception.FHIROperationException;
import com.ibm.fhir.model.format.Format;
import com.ibm.fhir.model.parser.FHIRParser;
import com.ibm.fhir.model.parser.exception.FHIRParserException;
import com.ibm.fhir.model.resource.Bundle;
import com.ibm.fhir.model.resource.OperationOutcome;
import com.ibm.fhir.model.resource.Resource;
import com.ibm.fhir.model.util.FHIRUtil;
import com.ibm.fhir.validation.FHIRValidator;
import com.ibm.fhir.validation.exception.FHIRValidationException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/* loaded from: input_file:com/ibm/fhir/bucket/scanner/BaseFileReader.class */
public abstract class BaseFileReader {
    private static final Logger logger = Logger.getLogger(BaseFileReader.class.getName());
    private final Consumer<ResourceEntry> resourceHandler;
    private final ExecutorService pool;
    private final FileType fileType;
    private final DataAccess dataAccess;
    private Thread mainLoopThread;
    private int maxInflight;
    private int rescanThreshold;
    private final boolean incremental;
    private final boolean incrementalExact;
    private int recycleSeconds;
    private final double bundleCostFactor;
    private final List<BucketPath> bucketPaths;
    private Lock lock = new ReentrantLock();
    private Condition resourceLimit = this.lock.newCondition();
    private int inflight = 0;
    private volatile boolean running = true;

    public BaseFileReader(ExecutorService executorService, FileType fileType, Consumer<ResourceEntry> consumer, int i, DataAccess dataAccess, boolean z, int i2, boolean z2, double d, Collection<BucketPath> collection) {
        this.pool = executorService;
        this.fileType = fileType;
        this.resourceHandler = consumer;
        this.dataAccess = dataAccess;
        this.maxInflight = i;
        this.rescanThreshold = Math.max(1, i / 2);
        this.incremental = z;
        this.recycleSeconds = i2;
        this.incrementalExact = z2;
        this.bundleCostFactor = d;
        this.bucketPaths = new ArrayList(collection);
    }

    public void signalStop() {
        if (this.running) {
            logger.info("Stopping local file reader");
            this.running = false;
        }
        if (this.mainLoopThread != null) {
            this.mainLoopThread.interrupt();
            this.lock.lock();
            try {
                this.resourceLimit.signalAll();
            } finally {
                this.lock.unlock();
            }
        }
    }

    public void waitForStop() {
        signalStop();
        logger.info("Waiting for reader to stop");
        if (this.mainLoopThread != null) {
            try {
                this.mainLoopThread.join(5000L);
            } catch (InterruptedException e) {
                logger.warning("CosReader loop did not terminate in 5000ms");
            }
        }
        logger.info("Reader stopped");
    }

    public void init() {
        this.mainLoopThread = new Thread(() -> {
            mainAllocationLoop();
        });
        this.mainLoopThread.start();
    }

    public void mainAllocationLoop() {
        while (this.running) {
            int i = 0;
            int i2 = 0;
            this.lock.lock();
            while (this.running && this.inflight >= this.rescanThreshold) {
                try {
                    try {
                        this.resourceLimit.await();
                    } catch (InterruptedException e) {
                        this.lock.unlock();
                    } catch (Exception e2) {
                        logger.severe("Error in main allocation loop. Sleeping before retry");
                        if (this.running) {
                            ThreadHandler.safeSleep(60000L);
                        }
                        this.lock.unlock();
                    }
                } catch (Throwable th) {
                    this.lock.unlock();
                    throw th;
                }
            }
            if (this.running) {
                i = this.maxInflight - this.inflight;
                if (i > 0) {
                    i2 = allocateJobs(i);
                    this.inflight += i2;
                    logger.info("Jobs inflight[" + this.fileType.name() + "] " + this.inflight + ", just allocated: " + i2);
                }
            }
            this.lock.unlock();
            if (this.running && i2 < i) {
                logger.fine("No work. Napping");
                ThreadHandler.safeSleep(10000L);
            }
        }
    }

    private int allocateJobs(int i) {
        logger.info("allocateJobs[" + this.fileType.name() + "]: free = " + i);
        ArrayList arrayList = new ArrayList();
        this.dataAccess.allocateJobs(arrayList, this.fileType, i, this.recycleSeconds, this.bucketPaths);
        logger.info("Allocated job count[" + this.fileType.name() + "]: " + arrayList.size());
        arrayList.stream().forEach(bucketLoaderJob -> {
            bucketLoaderJob.registerCallback(bucketLoaderJob -> {
                markJobDone(bucketLoaderJob);
            });
        });
        arrayList.stream().forEach(bucketLoaderJob2 -> {
            process(bucketLoaderJob2);
        });
        return arrayList.size();
    }

    protected void markJobDone(BucketLoaderJob bucketLoaderJob) {
        double processingEndTime = (bucketLoaderJob.getProcessingEndTime() - bucketLoaderJob.getProcessingStartTime()) / 1.0E9d;
        double lastCallResponseTime = bucketLoaderJob.getLastCallResponseTime() / 1000.0d;
        int totalResourceCount = bucketLoaderJob.getTotalResourceCount();
        double d = Double.NaN;
        if (lastCallResponseTime > 0.0d) {
            d = totalResourceCount / lastCallResponseTime;
        }
        try {
            logger.info(String.format("Completed entry: %s [took %.3f secs, lastCall: %.3f secs, resources: %d, rate: %.1f resources/sec]", bucketLoaderJob.toString(), Double.valueOf(processingEndTime), Double.valueOf(lastCallResponseTime), Integer.valueOf(totalResourceCount), Double.valueOf(d)));
            this.dataAccess.markJobDone(bucketLoaderJob);
            this.lock.lock();
            try {
                this.inflight--;
                logger.info("Job completed[" + this.fileType.name() + "] inflight count now: " + this.inflight);
                if (this.inflight < this.rescanThreshold) {
                    logger.info("triggering job fetch[" + this.fileType.name() + "]");
                    this.resourceLimit.signal();
                }
                this.lock.unlock();
            } finally {
            }
        } catch (Throwable th) {
            this.lock.lock();
            try {
                this.inflight--;
                logger.info("Job completed[" + this.fileType.name() + "] inflight count now: " + this.inflight);
                if (this.inflight < this.rescanThreshold) {
                    logger.info("triggering job fetch[" + this.fileType.name() + "]");
                    this.resourceLimit.signal();
                }
                this.lock.unlock();
                throw th;
            } finally {
            }
        }
    }

    private void process(BucketLoaderJob bucketLoaderJob) {
        this.pool.submit(() -> {
            processThr(bucketLoaderJob);
        });
    }

    protected abstract void processThr(BucketLoaderJob bucketLoaderJob);

    /* JADX INFO: Access modifiers changed from: protected */
    public void processJSON(BucketLoaderJob bucketLoaderJob, Reader reader) {
        try {
            try {
                process(bucketLoaderJob, FHIRParser.parser(Format.JSON).parse(reader), 0, "");
                bucketLoaderJob.fileProcessingComplete();
            } catch (FHIRParserException e) {
                this.dataAccess.recordErrors(bucketLoaderJob.getResourceBundleLoadId(), 0, Collections.singletonList(new ResourceBundleError(0, "Parse error: " + e.getMessage())));
                bucketLoaderJob.fileProcessingComplete();
            }
        } catch (Throwable th) {
            bucketLoaderJob.fileProcessingComplete();
            throw th;
        }
    }

    private int costForResource(Resource resource) {
        if (!resource.is(Bundle.class)) {
            return 1;
        }
        return Math.max(1, (int) (this.bundleCostFactor * resource.as(Bundle.class).getEntry().size()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processNDJSON(BucketLoaderJob bucketLoaderJob, BufferedReader bufferedReader) {
        Integer lastProcessedLineNumber;
        int i = 0;
        if (this.incremental && !this.incrementalExact && (lastProcessedLineNumber = getLastProcessedLineNumber(bucketLoaderJob)) != null) {
            i = lastProcessedLineNumber.intValue() + 1;
            logger.info(bucketLoaderJob.toString() + "; previously processed, so skipping lines: " + i);
        }
        boolean z = true;
        int i2 = 0;
        while (i2 < i) {
            try {
                try {
                    if (bufferedReader.readLine() == null) {
                        break;
                    } else {
                        i2++;
                    }
                } catch (IOException e) {
                    throw new IllegalStateException(e);
                }
            } finally {
                bucketLoaderJob.fileProcessingComplete();
            }
        }
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                return;
            }
            if (!this.incrementalExact || getLogicalIdsForLine(bucketLoaderJob, i2).size() <= 0) {
                try {
                    int i3 = i2;
                    i2++;
                    z = process(bucketLoaderJob, FHIRParser.parser(Format.JSON).parse(new StringReader(readLine)), i3, readLine) && z;
                } catch (FHIRParserException e2) {
                    logger.log(Level.WARNING, readLine, e2);
                    this.dataAccess.recordErrors(bucketLoaderJob.getResourceBundleLoadId(), i2, Collections.singletonList(new ResourceBundleError(i2, "Parse error: " + e2.getMessage())));
                }
            }
        }
    }

    private boolean process(BucketLoaderJob bucketLoaderJob, Resource resource, int i, String str) {
        boolean z = false;
        try {
            validateInput(resource);
            z = true;
        } catch (FHIROperationException e) {
            logger.warning("Resource validation failed: " + e.getMessage());
            String str2 = (String) e.getIssues().stream().flatMap(issue -> {
                return Stream.of(issue.getDetails());
            }).flatMap(codeableConcept -> {
                return Stream.of(codeableConcept.getText());
            }).flatMap(string -> {
                return Stream.of(string.getValue());
            }).collect(Collectors.joining(", "));
            if (logger.isLoggable(Level.FINER)) {
                logger.finer("Validation warnings for input resource: [" + str2 + "]");
            }
            this.dataAccess.recordErrors(bucketLoaderJob.getResourceBundleLoadId(), i, Collections.singletonList(new ResourceBundleError(i, str2)));
        } catch (FHIRValidationException e2) {
            logger.warning("Resource validation exception: " + e2.getMessage());
            this.dataAccess.recordErrors(bucketLoaderJob.getResourceBundleLoadId(), i, Collections.singletonList(new ResourceBundleError(i, e2.getMessage())));
        }
        if (z) {
            try {
                this.resourceHandler.accept(new ResourceEntry(bucketLoaderJob, resource, i, costForResource(resource)));
            } catch (Exception e3) {
                z = false;
                this.dataAccess.recordErrors(bucketLoaderJob.getResourceBundleLoadId(), i, Collections.singletonList(new ResourceBundleError(i, e3.getMessage())));
                if (logger.isLoggable(Level.FINE)) {
                    logger.fine(bucketLoaderJob.getObjectKey() + "[" + i + "]: " + str);
                    logger.fine(e3.getMessage());
                }
            }
        }
        return z;
    }

    public List<OperationOutcome.Issue> validateInput(Resource resource) throws FHIRValidationException, FHIROperationException {
        List<OperationOutcome.Issue> validate = FHIRValidator.validator().validate(resource, new String[0]);
        if (!validate.isEmpty()) {
            boolean z = false;
            Iterator<OperationOutcome.Issue> it = validate.iterator();
            while (it.hasNext()) {
                if (FHIRUtil.isFailure(it.next().getSeverity())) {
                    z = true;
                }
            }
            if (z) {
                throw new FHIROperationException("Input resource failed validation.").withIssue(validate);
            }
            if (logger.isLoggable(Level.FINER)) {
                logger.finer("Validation warnings for input resource: [" + ((String) validate.stream().flatMap(issue -> {
                    return Stream.of(issue.getDetails());
                }).flatMap(codeableConcept -> {
                    return Stream.of(codeableConcept.getText());
                }).flatMap(string -> {
                    return Stream.of(string.getValue());
                }).collect(Collectors.joining(", "))) + "]");
            }
        }
        return validate;
    }

    private Integer getLastProcessedLineNumber(BucketLoaderJob bucketLoaderJob) {
        return this.dataAccess.getLastProcessedLineNumber(bucketLoaderJob.getResourceBundleId(), bucketLoaderJob.getVersion());
    }

    private List<ResourceRef> getLogicalIdsForLine(BucketLoaderJob bucketLoaderJob, int i) {
        return this.dataAccess.getResourceRefsForLine(bucketLoaderJob.getResourceBundleId(), bucketLoaderJob.getVersion(), i);
    }
}
