package org.apache.tika.batch;

import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tika/batch/BatchProcess.class */
public class BatchProcess implements Callable<ParallelFileProcessingResult> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BatchProcess.class);
    private final long consumersManagerMaxMillis;
    private final FileResourceCrawler fileResourceCrawler;
    private final ConsumersManager consumersManager;
    private final StatusReporter reporter;
    private final Interrupter interrupter;
    private final ArrayBlockingQueue<FileStarted> timedOuts;
    private PrintStream outputStreamWriter;
    private long timeoutThresholdMillis = 300000;
    private long timeoutCheckPulseMillis = 120000;
    private long pauseOnEarlyTerminationMillis = 30000;
    private int maxAliveTimeSeconds = -1;
    private boolean alreadyExecuted = false;

    /* loaded from: input_file:org/apache/tika/batch/BatchProcess$BATCH_CONSTANTS.class */
    public enum BATCH_CONSTANTS {
        BATCH_PROCESS_EXCEEDED_MAX_ALIVE_TIME,
        BATCH_PROCESS_FATAL_MUST_RESTART
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tika/batch/BatchProcess$CAUSE_FOR_TERMINATION.class */
    public enum CAUSE_FOR_TERMINATION {
        COMPLETED_NORMALLY,
        MAIN_LOOP_EXCEPTION_NO_RESTART,
        CONSUMERS_MANAGER_DIDNT_INIT_IN_TIME_NO_RESTART,
        MAIN_LOOP_EXCEPTION,
        CRAWLER_TIMED_OUT,
        TIMED_OUT_CONSUMER,
        PARENT_SHUTDOWN,
        BATCH_PROCESS_ALIVE_TOO_LONG
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tika/batch/BatchProcess$State.class */
    public static class State {
        long start = -1;
        int numConsumers = 0;
        int numNonConsumers = 0;
        int removed = 0;
        int consumersRemoved = 0;
        int crawlersRemoved = 0;
        CAUSE_FOR_TERMINATION causeForTermination = null;

        private State() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tika/batch/BatchProcess$TimeoutChecker.class */
    public class TimeoutChecker implements Callable<IFileProcessorFutureResult> {
        private TimeoutChecker() {
        }

        @Override // java.util.concurrent.Callable
        /* renamed from: call, reason: merged with bridge method [inline-methods] */
        public IFileProcessorFutureResult call2() throws Exception {
            while (true) {
                if (BatchProcess.this.timedOuts.size() != 0) {
                    break;
                }
                try {
                    Thread.sleep(BatchProcess.this.timeoutCheckPulseMillis);
                    checkForTimedOutConsumers();
                    if (BatchProcess.this.countActiveConsumers() == 0) {
                        BatchProcess.LOG.info("No activeConsumers in TimeoutChecker");
                        break;
                    }
                } catch (InterruptedException e) {
                    BatchProcess.LOG.debug("Thread interrupted exception in TimeoutChecker");
                }
            }
            BatchProcess.LOG.debug("TimeoutChecker quitting: {}", Integer.valueOf(BatchProcess.this.timedOuts.size()));
            return new TimeoutFutureResult(BatchProcess.this.timedOuts.size());
        }

        private void checkForTimedOutConsumers() {
            Iterator<FileResourceConsumer> it = BatchProcess.this.consumersManager.getConsumers().iterator();
            while (it.hasNext()) {
                FileStarted checkForTimedOutMillis = it.next().checkForTimedOutMillis(BatchProcess.this.timeoutThresholdMillis);
                if (checkForTimedOutMillis != null) {
                    BatchProcess.this.timedOuts.add(checkForTimedOutMillis);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tika/batch/BatchProcess$TimeoutFutureResult.class */
    public static class TimeoutFutureResult implements IFileProcessorFutureResult {
        private final int timedOutCount;

        private TimeoutFutureResult(int i) {
            this.timedOutCount = i;
        }

        protected int getTimedOutCount() {
            return this.timedOutCount;
        }
    }

    public BatchProcess(FileResourceCrawler fileResourceCrawler, ConsumersManager consumersManager, StatusReporter statusReporter, Interrupter interrupter) {
        this.fileResourceCrawler = fileResourceCrawler;
        this.consumersManager = consumersManager;
        this.reporter = statusReporter;
        this.interrupter = interrupter;
        this.timedOuts = new ArrayBlockingQueue<>(consumersManager.getConsumers().size());
        this.consumersManagerMaxMillis = consumersManager.getConsumersManagerMaxMillis();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public ParallelFileProcessingResult call() throws InterruptedException {
        if (this.alreadyExecuted) {
            throw new IllegalStateException("Can only execute BatchRunner once.");
        }
        try {
            this.outputStreamWriter = new PrintStream((OutputStream) System.err, true, StandardCharsets.UTF_8.toString());
            System.setErr(System.out);
            try {
                int size = this.consumersManager.getConsumers().size();
                int i = 2;
                if (this.interrupter != null) {
                    i = 2 + 1;
                }
                if (this.reporter != null) {
                    i++;
                }
                ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(size + i);
                ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(newFixedThreadPool);
                TimeoutChecker timeoutChecker = new TimeoutChecker();
                try {
                    startConsumersManager();
                    ParallelFileProcessingResult shutdown = shutdown(newFixedThreadPool, executorCompletionService, timeoutChecker, mainLoop(executorCompletionService, timeoutChecker));
                    shutdownConsumersManager();
                    LOG.trace("finishing up");
                    return shutdown;
                } catch (BatchNoRestartError e) {
                    ParallelFileProcessingResult parallelFileProcessingResult = new ParallelFileProcessingResult(0, 0, 0, 0, 0.0d, 254, CAUSE_FOR_TERMINATION.CONSUMERS_MANAGER_DIDNT_INIT_IN_TIME_NO_RESTART.toString());
                    shutdownConsumersManager();
                    return parallelFileProcessingResult;
                }
            } catch (Throwable th) {
                shutdownConsumersManager();
                throw th;
            }
        } catch (IOException e2) {
            throw new RuntimeException("Can't redirect streams");
        }
    }

    private State mainLoop(CompletionService<IFileProcessorFutureResult> completionService, TimeoutChecker timeoutChecker) {
        CAUSE_FOR_TERMINATION cause_for_termination;
        this.alreadyExecuted = true;
        State state = new State();
        LOG.info("BatchProcess starting up");
        state.start = System.currentTimeMillis();
        if (this.interrupter != null) {
            completionService.submit(this.interrupter);
        }
        if (this.reporter != null) {
            completionService.submit(this.reporter);
        }
        completionService.submit(this.fileResourceCrawler);
        completionService.submit(timeoutChecker);
        Iterator<FileResourceConsumer> it = this.consumersManager.getConsumers().iterator();
        while (it.hasNext()) {
            completionService.submit(it.next());
        }
        state.numConsumers = this.consumersManager.getConsumers().size();
        while (true) {
            try {
                Future<IFileProcessorFutureResult> poll = completionService.poll(1L, TimeUnit.SECONDS);
                if (poll != null) {
                    state.removed++;
                    IFileProcessorFutureResult iFileProcessorFutureResult = poll.get();
                    LOG.trace("result: " + iFileProcessorFutureResult);
                    if (!(iFileProcessorFutureResult instanceof FileConsumerFutureResult)) {
                        if (!(iFileProcessorFutureResult instanceof FileResourceCrawlerFutureResult)) {
                            if (!(iFileProcessorFutureResult instanceof InterrupterFutureResult)) {
                                if (iFileProcessorFutureResult instanceof TimeoutFutureResult) {
                                    cause_for_termination = CAUSE_FOR_TERMINATION.TIMED_OUT_CONSUMER;
                                    break;
                                }
                            } else {
                                cause_for_termination = CAUSE_FOR_TERMINATION.PARENT_SHUTDOWN;
                                break;
                            }
                        } else {
                            state.crawlersRemoved++;
                            if (this.fileResourceCrawler.wasTimedOut()) {
                                cause_for_termination = CAUSE_FOR_TERMINATION.CRAWLER_TIMED_OUT;
                                break;
                            }
                        }
                    } else {
                        state.consumersRemoved++;
                    }
                }
                if (state.consumersRemoved >= state.numConsumers) {
                    cause_for_termination = CAUSE_FOR_TERMINATION.COMPLETED_NORMALLY;
                    break;
                }
                if (aliveTooLong(state.start)) {
                    cause_for_termination = CAUSE_FOR_TERMINATION.BATCH_PROCESS_ALIVE_TOO_LONG;
                    break;
                }
            } catch (Throwable th) {
                cause_for_termination = isNonRestart(th) ? CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION_NO_RESTART : CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION;
                LOG.error("Main loop execution exception: {}", th.getMessage(), th);
            }
        }
        state.causeForTermination = cause_for_termination;
        return state;
    }

    private ParallelFileProcessingResult shutdown(ExecutorService executorService, CompletionService<IFileProcessorFutureResult> completionService, TimeoutChecker timeoutChecker, State state) {
        if (this.reporter != null) {
            this.reporter.setIsShuttingDown(true);
        }
        int added = this.fileResourceCrawler.getAdded();
        int considered = this.fileResourceCrawler.getConsidered();
        LOG.trace("about to shutdown");
        executorService.shutdown();
        Iterator<FileResourceConsumer> it = this.consumersManager.getConsumers().iterator();
        while (it.hasNext()) {
            it.next().pleaseShutdown();
        }
        this.fileResourceCrawler.shutDownNoPoison();
        politelyAwaitTermination(state.causeForTermination);
        LOG.trace("About to shutdownNow()");
        List<Runnable> shutdownNow = executorService.shutdownNow();
        LOG.trace("TERMINATED {} : {} : {}", Boolean.valueOf(executorService.isTerminated()), Integer.valueOf(state.consumersRemoved), Integer.valueOf(state.crawlersRemoved));
        int size = ((state.numConsumers + state.numNonConsumers) - state.removed) - shutdownNow.size();
        for (int i = 0; i < size; i++) {
            try {
                Future<IFileProcessorFutureResult> poll = completionService.poll(10L, TimeUnit.MILLISECONDS);
                LOG.trace("In while future==null loop in final shutdown loop");
                if (poll == null) {
                    break;
                }
                try {
                    IFileProcessorFutureResult iFileProcessorFutureResult = poll.get();
                    if (iFileProcessorFutureResult instanceof FileConsumerFutureResult) {
                        FileStarted fileStarted = ((FileConsumerFutureResult) iFileProcessorFutureResult).getFileStarted();
                        LOG.trace("file started " + fileStarted);
                        if (fileStarted != null && fileStarted.getElapsedMillis() > this.timeoutThresholdMillis) {
                            LOG.warn("{} caused a file processor to hang or crash. You may need to remove this file from your input set and rerun.", fileStarted.getResourceId());
                        }
                    } else if (iFileProcessorFutureResult instanceof FileResourceCrawlerFutureResult) {
                        FileResourceCrawlerFutureResult fileResourceCrawlerFutureResult = (FileResourceCrawlerFutureResult) iFileProcessorFutureResult;
                        considered += fileResourceCrawlerFutureResult.getConsidered();
                        added += fileResourceCrawlerFutureResult.getAdded();
                    }
                } catch (InterruptedException e) {
                    LOG.error("Interrupted exception trying to shutdown after shutdownNow", (Throwable) e);
                } catch (ExecutionException e2) {
                    LOG.error("Execution exception trying to shutdown after shutdownNow", (Throwable) e2);
                }
            } catch (InterruptedException e3) {
                LOG.warn("thread interrupt while polling in final shutdown loop");
            }
        }
        String str = null;
        if (state.causeForTermination != CAUSE_FOR_TERMINATION.PARENT_SHUTDOWN && state.causeForTermination != CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION_NO_RESTART) {
            if (state.causeForTermination == CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION) {
                str = "Uncaught consumer throwable";
            } else if (state.causeForTermination == CAUSE_FOR_TERMINATION.TIMED_OUT_CONSUMER) {
                if (areResourcesPotentiallyRemaining()) {
                    str = "Consumer timed out with resources remaining";
                }
            } else if (state.causeForTermination == CAUSE_FOR_TERMINATION.BATCH_PROCESS_ALIVE_TOO_LONG) {
                str = BATCH_CONSTANTS.BATCH_PROCESS_EXCEEDED_MAX_ALIVE_TIME.toString();
            } else if (state.causeForTermination == CAUSE_FOR_TERMINATION.CRAWLER_TIMED_OUT) {
                str = "Crawler timed out.";
            } else if (this.fileResourceCrawler.wasTimedOut()) {
                str = "Crawler was timed out.";
            } else if (this.fileResourceCrawler.isActive()) {
                str = "Crawler is still active.";
            } else if (!this.fileResourceCrawler.isQueueEmpty()) {
                str = "Resources still exist for processing";
            }
        }
        LOG.trace("restart msg: " + str);
        int exitStatus = getExitStatus(state.causeForTermination, str);
        timeoutChecker.checkForTimedOutConsumers();
        Iterator<FileStarted> it2 = this.timedOuts.iterator();
        while (it2.hasNext()) {
            FileStarted next = it2.next();
            LOG.warn("A parser was still working on >{}< for {} milliseconds after it started. This exceeds the maxTimeoutMillis parameter", next.getResourceId(), Long.valueOf(next.getElapsedMillis()));
        }
        double currentTimeMillis = (System.currentTimeMillis() - state.start) / 1000.0d;
        int i2 = 0;
        int i3 = 0;
        for (FileResourceConsumer fileResourceConsumer : this.consumersManager.getConsumers()) {
            i2 += fileResourceConsumer.getNumResourcesConsumed();
            i3 += fileResourceConsumer.getNumHandledExceptions();
        }
        LOG.trace("returning " + state.causeForTermination);
        return new ParallelFileProcessingResult(considered, added, i2, i3, currentTimeMillis, exitStatus, state.causeForTermination.toString());
    }

    private void startConsumersManager() {
        if (this.consumersManagerMaxMillis < 0) {
            this.consumersManager.init();
            return;
        }
        Thread thread = new Thread(() -> {
            LOG.trace("about to start consumers manager");
            this.consumersManager.init();
            LOG.trace("finished starting consumers manager");
        });
        thread.setDaemon(true);
        thread.start();
        try {
            thread.join(this.consumersManagerMaxMillis);
        } catch (InterruptedException e) {
            LOG.warn("interruption exception during consumers manager shutdown");
        }
        if (thread.isAlive()) {
            LOG.error("ConsumersManager did not start within {}ms", Long.valueOf(this.consumersManagerMaxMillis));
            throw new BatchNoRestartError("ConsumersManager did not start within " + this.consumersManagerMaxMillis + "ms");
        }
    }

    private void shutdownConsumersManager() {
        if (this.consumersManagerMaxMillis < 0) {
            this.consumersManager.shutdown();
            return;
        }
        Thread thread = new Thread(() -> {
            LOG.trace("starting to shutdown consumers manager");
            this.consumersManager.shutdown();
            LOG.trace("finished shutting down consumers manager");
        });
        thread.setDaemon(true);
        thread.start();
        try {
            thread.join(this.consumersManagerMaxMillis);
        } catch (InterruptedException e) {
            LOG.warn("interruption exception during consumers manager shutdown");
        }
        if (thread.isAlive()) {
            LOG.error("ConsumersManager was still alive during shutdown!");
            throw new BatchNoRestartError("ConsumersManager did not shutdown within: " + this.consumersManagerMaxMillis + "ms");
        }
    }

    private void politelyAwaitTermination(CAUSE_FOR_TERMINATION cause_for_termination) {
        if (cause_for_termination == CAUSE_FOR_TERMINATION.COMPLETED_NORMALLY) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (countActiveConsumers() > 0) {
            try {
                Thread.sleep(500L);
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                if (this.pauseOnEarlyTerminationMillis > -1 && currentTimeMillis2 > this.pauseOnEarlyTerminationMillis) {
                    LOG.warn("Waited after an early termination for {}ms, but there was at least one active consumer", Long.valueOf(currentTimeMillis2));
                    return;
                }
            } catch (InterruptedException e) {
                LOG.warn("Thread interrupted while trying to politelyAwaitTermination");
                return;
            }
        }
    }

    private boolean isNonRestart(Throwable th) {
        if (th instanceof BatchNoRestartError) {
            return true;
        }
        Throwable cause = th.getCause();
        return cause != null && isNonRestart(cause);
    }

    private int getExitStatus(CAUSE_FOR_TERMINATION cause_for_termination, String str) {
        if (cause_for_termination == CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION_NO_RESTART) {
            LOG.info(CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION_NO_RESTART.name());
            return 254;
        }
        if (str == null) {
            return 0;
        }
        if (str.equals(BATCH_CONSTANTS.BATCH_PROCESS_EXCEEDED_MAX_ALIVE_TIME.toString())) {
            LOG.warn(str);
        } else {
            LOG.error(str);
        }
        this.outputStreamWriter.println(BATCH_CONSTANTS.BATCH_PROCESS_FATAL_MUST_RESTART.toString() + " >> " + str);
        this.outputStreamWriter.flush();
        return 253;
    }

    private boolean areResourcesPotentiallyRemaining() {
        return this.fileResourceCrawler.isActive() || !this.fileResourceCrawler.isQueueEmpty();
    }

    private boolean aliveTooLong(long j) {
        return this.maxAliveTimeSeconds >= 0 && ((double) (System.currentTimeMillis() - j)) / 1000.0d > ((double) this.maxAliveTimeSeconds);
    }

    private int countActiveConsumers() {
        int i = 0;
        Iterator<FileResourceConsumer> it = this.consumersManager.getConsumers().iterator();
        while (it.hasNext()) {
            if (it.next().isStillActive()) {
                i++;
            }
        }
        return i;
    }

    public void setPauseOnEarlyTerminationMillis(long j) {
        this.pauseOnEarlyTerminationMillis = j;
    }

    public void setTimeoutThresholdMillis(long j) {
        this.timeoutThresholdMillis = j;
    }

    public void setTimeoutCheckPulseMillis(long j) {
        this.timeoutCheckPulseMillis = j;
    }

    public void setMaxAliveTimeSeconds(int i) {
        this.maxAliveTimeSeconds = i;
    }
}
