/*
 * Decompiled with CFR 0.152.
 */
package org.modellwerkstatt.objectflow.batchjob;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.joda.time.DateTime;
import org.joda.time.ReadableInstant;
import org.modellwerkstatt.manmap.runtime.MMShutdownRequestException;
import org.modellwerkstatt.objectflow.batchjob.ConsWorkCanceledMsg;
import org.modellwerkstatt.objectflow.batchjob.ConsWorkDoneMsg;
import org.modellwerkstatt.objectflow.batchjob.ConsWorkExMsg;
import org.modellwerkstatt.objectflow.batchjob.ConsumerFinallyDownMsg;
import org.modellwerkstatt.objectflow.batchjob.ConsumerThread;
import org.modellwerkstatt.objectflow.batchjob.IOFXCommandImplConsumer;
import org.modellwerkstatt.objectflow.batchjob.IOFXCommandImplProducer;
import org.modellwerkstatt.objectflow.batchjob.IOFXTimerMasterController;
import org.modellwerkstatt.objectflow.batchjob.JobReporter;
import org.modellwerkstatt.objectflow.batchjob.Message;
import org.modellwerkstatt.objectflow.batchjob.OFXConsumerRunnable;
import org.modellwerkstatt.objectflow.batchjob.OFXExceptionStrategy;
import org.modellwerkstatt.objectflow.batchjob.PCPairReporter;
import org.modellwerkstatt.objectflow.batchjob.ProcessWorkMsg;
import org.modellwerkstatt.objectflow.batchjob.RunProducerMsg;
import org.modellwerkstatt.objectflow.batchjob.SchedInfo;
import org.modellwerkstatt.objectflow.batchjob.ShutdownMsg;
import org.modellwerkstatt.objectflow.batchjob.ShutdownWhenInboxEmptyMsg;
import org.modellwerkstatt.objectflow.batchjob.ToggleEnbldProdMsg;
import org.modellwerkstatt.objectflow.batchjob.WakeupPairCrtlMsg;
import org.modellwerkstatt.objectflow.runtime.IOFXCoreReporter;
import org.modellwerkstatt.objectflow.runtime.IOFXUserServices;
import org.modellwerkstatt.objectflow.runtime.IPrintingServiceImpl;
import org.modellwerkstatt.objectflow.runtime.MoVersion;
import org.modellwerkstatt.objectflow.runtime.OFXJobWorkCanceledException;
import org.modellwerkstatt.objectflow.runtime.StaticJmxAccess;

public abstract class OFXPCPairController<Entity>
extends PCPairReporter
implements Runnable {
    public static final int PRODUCER_QUEUE_CAPACITY = 50;
    public static final int PRODUCER_EX_MIN_RERUNTIME_INMS = 300000;
    public static final int GRACEFULL_WAITING_TIME_SEC = 5;
    public static final boolean START_NEW_CONSUMER_ON_UNEXPECTED_SHTUDOWN = false;
    public static final String VERSION = "MoWare 11 PairCrtl 2021";
    private int thisPCPairID;
    private String thisPCPairName;
    private IOFXTimerMasterController timerContoller;
    private IOFXCommandImplProducer<Entity> producerCommandImplStatefull;
    private List<ConsumerThread<Entity>> allConsumers;
    private BlockingQueue<Message> messageQueue;
    private Queue<Entity> inbox;
    private List<Message> dbg_processedMsg;
    private volatile boolean shutdownWhenInboxEmptyAndConsumersParked;
    private volatile boolean shuttingDown;
    private volatile boolean jmxUnregisterDone;
    private boolean checkInboxForRescheduling;
    private boolean manuallyInboxFilled;
    private volatile boolean producerRunsEnabled;
    private int consumerWaitTimeDueToEXinMS;
    public IOFXUserServices __userServices;
    private OFXExceptionStrategy exceptionStrategy;
    private OFXExceptionStrategy.Strategy stratRespForShutdown;

    public boolean inboxEmtpy() {
        return this.inbox.size() == 0;
    }

    public boolean consumerExWaitReqeusted() {
        return this.consumerWaitTimeDueToEXinMS >= 0;
    }

    private OFXPCPairController(int id, String name, IOFXTimerMasterController masterController, OFXExceptionStrategy strat) {
        super(name, masterController.getJobProperties());
        this.timerContoller = masterController;
        this.messageQueue = new LinkedBlockingQueue<Message>(50);
        this.allConsumers = new ArrayList<ConsumerThread<Entity>>();
        this.inbox = new LinkedList<Entity>();
        this.shuttingDown = false;
        this.jmxUnregisterDone = false;
        this.shutdownWhenInboxEmptyAndConsumersParked = false;
        this.stratRespForShutdown = null;
        this.thisPCPairID = id;
        this.thisPCPairName = name;
        if (this.timerContoller.getJobProperties().envMode != MODE.TOMMY_MODE) {
            this.dbg_processedMsg = new ArrayList<Message>();
        }
        this.exceptionStrategy = strat;
        this.__userServices = this;
    }

    public OFXPCPairController(int id, String name, IOFXTimerMasterController masterController, OFXExceptionStrategy strat, IPrintingServiceImpl print, IOFXCoreReporter reporter) {
        this(id, name, masterController, strat);
        this.initUserServices(print, reporter);
    }

    public void receive(Message message) {
        this.messageQueue.add(message);
    }

    @Override
    public String runProducerNow() {
        this.receive(new RunProducerMsg(this.thisPCPairID, RunProducerMsg.Source.MANUAL));
        return "Sent RunProducerMsg:Manual - clear inbox, reload @ " + this.asExactDateTimeFormatOrEmpty(new DateTime());
    }

    @Override
    public String startJobTimer() {
        boolean currentState = this.timerContoller.enableTimer(true);
        return "Started timer for job, timer enabled now " + currentState;
    }

    @Override
    public String stopJobTimer() {
        boolean currentState = this.timerContoller.enableTimer(false);
        return "Stopped timer for job, timer enabled now " + currentState;
    }

    @Override
    public String clearJobTimerState() {
        int newversion = this.timerContoller.clearJobTimerState();
        return "Cleared timer state for whole job (timer version now " + newversion + ")";
    }

    @Override
    public int getinbox_CurrentInboxSize() {
        return this.inbox.size();
    }

    @Override
    public String getbatchjob_PairSchedExpressions() {
        if (this.timerContoller == null) {
            return "timerControlelr=null (gcClean() ?)";
        }
        return this.timerContoller.getSchedSetting(this);
    }

    @Override
    public String fullStatusReportTimerController() {
        if (this.timerContoller == null) {
            return "timerControlelr=null (gcClean() ?)";
        }
        return this.timerContoller.getFullStatusReport(this);
    }

    @Override
    public String toggleProducerEnabled() {
        this.receive(new ToggleEnbldProdMsg(this.thisPCPairID));
        return "Enabled was " + this.producerRunsEnabled + " but sent ToggleProducerEnabled message now.";
    }

    @Override
    public String getbatchjob_PCPairNameAndID() {
        return this.thisPCPairID + "_" + this.thisPCPairName;
    }

    public OFXExceptionStrategy.Strategy exStratFor(Throwable t) {
        return this.exceptionStrategy.strategyFor(t);
    }

    public abstract IOFXCommandImplConsumer<Entity>[] createNewConsumerImplementations();

    public abstract IOFXCommandImplProducer<Entity> createNewProducerImplementation();

    public void setupPairController(int numConsumers) {
        this.setupPairController(numConsumers, true);
    }

    public synchronized void setupPairController(int numConsumers, boolean consoleMode) {
        if (this.producerCommandImplStatefull != null) {
            throw new IllegalStateException("OFXPCPairController already initialized. setup already called.");
        }
        this.producerCommandImplStatefull = this.createNewProducerImplementation();
        if (!consoleMode) {
            this.consoleModeOff();
            StaticJmxAccess.register(this, this.assembleJMXPrefix(this.getbatchjob_PCPairNameAndID(), true, 0));
        }
        for (int i = 0; i < numConsumers; ++i) {
            this.addAndStartConsumer();
        }
    }

    private int addAndStartConsumer() {
        int newId = this.allConsumers.size();
        OFXConsumerRunnable runnable = new OFXConsumerRunnable(this.thisPCPairName, newId, this.timerContoller.getJobProperties(), this.createNewConsumerImplementations());
        runnable.init(this, this.userPrintService, this.coreReporter);
        String shorJobName = MoVersion.getShortNameFromFQ(this.getbatchjob_Name());
        ConsumerThread t = new ConsumerThread(shorJobName, this.getbatchjob_PCPairNameAndID(), runnable, newId);
        t.setStatus(ConsumerThread.Status.WAITING);
        this.allConsumers.add(t);
        if (!this.isConsoleMode()) {
            runnable.consoleModeOff();
            StaticJmxAccess.register(runnable, this.assembleJMXPrefix(this.getbatchjob_PCPairNameAndID(), false, newId));
        }
        t.start();
        this.logFrmwrkTrace("Started consumer with id " + newId);
        return newId;
    }

    @Override
    public void run() {
        ConsumerThread<Entity> consumerSenderThread = null;
        this.logFrmwrkTrace("Starting into event loop (mode=" + this.timerContoller.getJobProperties().envMode + ")");
        Thread.currentThread().setName(MoVersion.getShortNameFromFQ(this.timerContoller.getJobProperties().swJobFqName) + "_" + this.thisPCPairName + " Producer");
        this.checkInboxForRescheduling = false;
        this.consumerWaitTimeDueToEXinMS = -1;
        this.manuallyInboxFilled = false;
        this.producerRunsEnabled = true;
        try {
            while (!this.shuttingDown || this.messageQueue.size() != 0) {
                consumerSenderThread = null;
                this.setInternalState("Waiting for messages");
                long before = System.currentTimeMillis();
                Message currentMessageProcessing = this.messageQueue.take();
                if (this.dbg_processedMsg != null) {
                    this.dbg_processedMsg.add(currentMessageProcessing);
                }
                this.addIdleSample(System.currentTimeMillis() - before);
                if (currentMessageProcessing.getConsumerSenderId() >= 0) {
                    consumerSenderThread = this.allConsumers.get(currentMessageProcessing.getConsumerSenderId());
                }
                if (currentMessageProcessing.getPCReceiverId() >= 0 && currentMessageProcessing.getPCReceiverId() != this.thisPCPairID) {
                    throw new IllegalArgumentException("Received Message '" + currentMessageProcessing + "' at pcPare with ID " + this.thisPCPairID + "  - what is not correct ...");
                }
                this.logFrmwrkTrace("Processing Message " + currentMessageProcessing + " from " + consumerSenderThread);
                this.setInternalState("Processing Message " + currentMessageProcessing);
                if (currentMessageProcessing instanceof ShutdownMsg) {
                    this.shuttingDown = true;
                } else if (currentMessageProcessing instanceof ToggleEnbldProdMsg) {
                    this.producerRunsEnabled = !this.producerRunsEnabled;
                    this.logFrmwrkTrace("ToggleEnbldProducer Message processed: Producer enabled now " + this.producerRunsEnabled);
                } else if (currentMessageProcessing instanceof ShutdownWhenInboxEmptyMsg) {
                    this.shutdownWhenInboxEmptyAndConsumersParked = true;
                } else if (currentMessageProcessing instanceof RunProducerMsg) {
                    if (this.shutdownWhenInboxEmptyAndConsumersParked || this.shuttingDown) {
                        this.logFrmwrkError("Reqeust to run producer, but waiting for a shutdown. (IGNORING !!, shutdown=" + this.shuttingDown + " shtWhenInboxEmpty=" + this.shutdownWhenInboxEmptyAndConsumersParked, null);
                    } else {
                        this.singleProducerRun(((RunProducerMsg)currentMessageProcessing).fromMan());
                    }
                } else if (currentMessageProcessing instanceof WakeupPairCrtlMsg) {
                    if (this.manuallyInboxFilled) {
                        this.logFrmwrkError("Received a WakeupPairCrtlMsg but inbox (" + this.inbox.size() + ") was filled by a manual run....", null);
                    } else if (this.inboxEmtpy()) {
                        this.logFrmwrkTrace("Received a WakeupPairCrtlMsg but inbox has size " + this.inbox.size() + ". Presumably ex on last consumer item.");
                    }
                    this.checkInboxForRescheduling = true;
                    if (!this.shuttingDown) {
                        this.wakeUpWaitingAndSendWork();
                    }
                } else if (currentMessageProcessing instanceof ConsWorkDoneMsg || currentMessageProcessing instanceof ConsWorkCanceledMsg) {
                    consumerSenderThread.setStatus(ConsumerThread.Status.WAITING);
                    consumerSenderThread.setProcessingKey(null);
                    if (currentMessageProcessing instanceof ConsWorkCanceledMsg) {
                        this.incConsumerCanceledProcessing();
                        this.logFrmwrkTrace("Got ConsWorkCanceledMsg - " + ((ConsWorkCanceledMsg)currentMessageProcessing).getMessage());
                    } else {
                        this.incConsumerOkProcessings();
                    }
                    if (!this.shuttingDown) {
                        this.checkInboxAndSendWork(consumerSenderThread);
                    }
                } else if (currentMessageProcessing instanceof ConsumerFinallyDownMsg) {
                    boolean wakeup = false;
                    consumerSenderThread.setStatus(ConsumerThread.Status.SHUTDOWN);
                    if (consumerSenderThread.getProcessingKey() != null) {
                        this.inbox.add(consumerSenderThread.getProcessingKey());
                        consumerSenderThread.setProcessingKey(null);
                        wakeup = true;
                    }
                    if (!this.shuttingDown) {
                        // empty if block
                    }
                    if (wakeup) {
                        this.wakeUpWaitingAndSendWork();
                    }
                } else if (currentMessageProcessing instanceof ConsWorkExMsg) {
                    boolean wakeup = false;
                    ConsWorkExMsg msg = (ConsWorkExMsg)currentMessageProcessing;
                    OFXExceptionStrategy.Strategy toFollow = this.exStratFor(msg.getThrowable());
                    if (toFollow.isSilentNoLog()) {
                        this.skipReportingEx();
                    } else {
                        this.logJobProblem(true, msg.getThrowable().getClass().getSimpleName() + " in consumer " + consumerSenderThread.toString() + " while processing '" + consumerSenderThread.getProcessingKey() + "': handling with " + toFollow, msg.getThrowable(), this.convertGuardMsg(msg.getThrowable()));
                    }
                    this.incConsumerEx();
                    if (!this.shuttingDown) {
                        if (toFollow.isReaddToInbox()) {
                            this.inbox.add(consumerSenderThread.getProcessingKey());
                            wakeup = true;
                        }
                        consumerSenderThread.setProcessingKey(null);
                        if (msg.wasEvtLoopStopped()) {
                            consumerSenderThread.setStatus(ConsumerThread.Status.SHUTDOWN);
                        } else {
                            consumerSenderThread.setStatus(ConsumerThread.Status.WAITING);
                            wakeup = true;
                        }
                        if (!this.manuallyInboxFilled && (toFollow.isJobRestart() || toFollow.isJobShutdown() || toFollow.isVMRestart() || toFollow.isVMShutdown())) {
                            this.stratRespForShutdown = toFollow;
                            wakeup = false;
                            this.shuttingDown = true;
                        } else if (toFollow.isConsumerRestart()) {
                            consumerSenderThread.setStatus(ConsumerThread.Status.SHUTDOWN);
                            if (!this.ensureConsumerShutdown(currentMessageProcessing.getConsumerSenderId(), 5)) {
                                this.logFrmwrkError("Can not stop " + consumerSenderThread + " (status set to SHUTDOWN now)", null);
                            }
                            int newId = this.addAndStartConsumer();
                            wakeup = true;
                        }
                        if (toFollow.isClearInbox()) {
                            this.inbox.clear();
                            wakeup = false;
                            if (!this.consumerExWaitReqeusted()) {
                                this.consumerWaitTimeDueToEXinMS = 0;
                            }
                        }
                        if (toFollow.getDelayTimeInMsOrZero() > 0) {
                            wakeup = false;
                            if (toFollow.getDelayTimeInMsOrZero() > this.consumerWaitTimeDueToEXinMS) {
                                this.consumerWaitTimeDueToEXinMS = toFollow.getDelayTimeInMsOrZero();
                            }
                        } else if (!this.manuallyInboxFilled && wakeup) {
                            this.wakeUpWaitingAndSendWork();
                        }
                    }
                }
                if (this.manuallyInboxFilled && this.consumerExWaitReqeusted() && this.isNoConsumerWorking()) {
                    this.inboxLoadProblem("Inbox manually filled but exception occured. Inbox with " + this.inbox.size() + " items cleared!");
                    this.inbox.clear();
                    this.consumerWaitTimeDueToEXinMS = -1;
                    this.manuallyInboxFilled = false;
                    this.checkInboxForRescheduling = false;
                } else if (this.consumerExWaitReqeusted() && this.isNoConsumerWorking()) {
                    SchedInfo info = this.timerContoller.runNotCompletedDueEXResched(this, this.consumerWaitTimeDueToEXinMS, false, this.inboxEmtpy());
                    this.logJobProblem(false, "In MainLoop scheduled: " + info.msg + " @ " + JobReporter.EXACT_TIME_ONLY_FORMATTER.print((ReadableInstant)info.when), null, null);
                    this.consumerWaitTimeDueToEXinMS = -1;
                    if (this.inboxEmtpy()) {
                        this.checkInboxForRescheduling = false;
                    }
                }
                if (this.checkInboxForRescheduling && !this.manuallyInboxFilled && !this.shutdownWhenInboxEmptyAndConsumersParked && !this.shuttingDown && this.inboxEmtpy() && this.isNoConsumerWorking()) {
                    this.logFrmwrkTrace("Successfully completed all work, inbox now 0 and no consumer working.");
                    this.reportConsumerWorkTotal();
                    this.timerContoller.runCompletedResched(this);
                    this.checkInboxForRescheduling = false;
                } else if (this.manuallyInboxFilled && this.inboxEmtpy() && this.isNoConsumerWorking()) {
                    this.manuallyInboxFilled = false;
                }
                if (!this.shutdownWhenInboxEmptyAndConsumersParked || !this.inboxEmtpy() || !this.isNoConsumerWorking()) continue;
                this.shutdownWhenInboxEmptyAndConsumersParked = false;
                this.shuttingDown = true;
                this.shutdownConsumersGraceFullyAndWait();
            }
        }
        catch (InterruptedException ex) {
            this.logFrmwrkError("Interrupted in main loop - shutting down", ex);
            Thread.currentThread().interrupt();
        }
        catch (Throwable t) {
            this.logFrmwrkError("Exception in main loop - shutting down", t);
        }
        if (!this.inboxEmtpy()) {
            this.inboxLoadProblem("Shutting down producer, but inbox is not empty right now :\n" + this.dumpInbox());
        }
        this.setInternalState("Exited eventloop, informing timerController");
        this.logFrmwrkTrace("Eventloop exited, sending shuttingDown(this) to timerCrtl " + this.timerContoller);
        this.timerContoller.shuttingDown(this);
        this.setInternalState("Existed eventloop, shutting down consumers");
        this.logFrmwrkTrace("next call shutdownConsumersGraceFullyAndWait()");
        this.shutdownConsumersGraceFullyAndWait();
        if (!this.isNoConsumerAlive()) {
            this.logFrmwrkTrace("Consumers still alive.. :(  waitForAllThreadsStopped() next");
            this.waitForAllThreadsStopped(5, true);
        }
        if (!this.isNoConsumerAlive()) {
            this.logFrmwrkError("Producer exiting, but not all consumer threads are !isAlive(), undeploy might lead to mem leaks.", null);
        }
        this.setInternalState("Existed eventloop, unregistring JMX");
        this.logFrmwrkTrace("next call ensureJMXUnregistered()");
        this.jmxUnregister();
        this.logFrmwrkTrace("next call gcClean(), good by");
        this.gcClean();
        this.setInternalState("No longer running, gcClean() done.");
        if (this.stratRespForShutdown == null) {
            // empty if block
        }
    }

    private void singleProducerRun(boolean manualRun) {
        block22: {
            if (!this.producerRunsEnabled && !manualRun) {
                this.logFrmwrkTrace("Producer run issued, but producerRuns are disabled, resched as successful run.. ");
                this.timerContoller.runCompletedResched(this);
            } else if (!this.isNoConsumerWorking()) {
                this.logFrmwrkTrace("Requested a producer-run while still processing isNoConsumerWorking()=false, inbox size=" + this.inbox.size() + " => rescheduling? " + !manualRun);
                if (!manualRun) {
                    SchedInfo info = this.timerContoller.runNotCompletedOutOfCronWindowResched(this, true);
                    this.logFrmwrkTrace("ProducerRun req. but consumers working, scheduled " + info.msg + " @ " + info.when);
                }
            } else {
                this.manuallyInboxFilled = false;
                try {
                    this.logFrmwrkTrace("singleProducerRun() requested @ " + this.asExactDateTimeFormatOrEmpty(new DateTime()) + " manually=" + manualRun);
                    if (!manualRun && this.timerContoller.outOfCronWindow(this)) {
                        SchedInfo info = this.timerContoller.runNotCompletedOutOfCronWindowResched(this, true);
                        this.logFrmwrkTrace("ProducerRun req. but out of cron window, scheduled " + info.msg + " @ " + info.when);
                        this.checkInboxForRescheduling = false;
                    } else {
                        this.inboxLoadStart(this.inbox.size());
                        this.inbox.clear();
                        ArrayList<Entity> listForInbox = this.producerCommandImplStatefull.process(this);
                        Object lastElem = null;
                        for (Entity elem : listForInbox) {
                            if (elem == null) {
                                this.inboxLoadProblem("Trying to add a <null> element to inbox after " + lastElem + " - what was prevented.");
                                continue;
                            }
                            lastElem = elem;
                            this.inbox.add(elem);
                        }
                        String lastAction = this.producerCommandImplStatefull.getLastAction();
                        this.inboxLoadStop(this.inbox.size(), lastAction, this.allConsumers.size() == 0);
                        if (manualRun) {
                            this.manuallyInboxFilled = true;
                        } else {
                            this.checkInboxForRescheduling = true;
                        }
                        if (this.waitingConsumersAvailable() == 0 && this.inbox.size() > 0) {
                            this.inboxLoadProblem("Inbox size was loaded with " + this.inbox.size() + " items, but there are no consumers around! CLEARING INBOX !!");
                            this.inbox.clear();
                        }
                        this.wakeUpWaitingAndSendWork();
                    }
                }
                catch (MMShutdownRequestException ex) {
                    this.logFrmwrkTrace("M3ShutdownRequestException received while in a singleProducerRun, waiting for shutdown message. ");
                }
                catch (InterruptedException ex) {
                    this.logFrmwrkTrace("InterruptedException received while in a singleProducerRun, waiting for shutdown message.");
                }
                catch (Throwable t) {
                    OFXExceptionStrategy.Strategy toFollow = this.exStratFor(t);
                    if (toFollow.isSilentNoLog()) {
                        this.skipReportingEx();
                    } else if (t instanceof OFXJobWorkCanceledException) {
                        OFXJobWorkCanceledException cancel = (OFXJobWorkCanceledException)t;
                        this.inboxLoadProblem("Inbox loading canceled - " + cancel.getFirstProblem());
                        this.skipReportingEx();
                    } else {
                        this.logJobProblem(false, t.getClass().getSimpleName() + " during producer-run: handling with " + toFollow, t, this.convertGuardMsg(t));
                    }
                    if (!manualRun && (toFollow.isJobRestart() || toFollow.isJobShutdown() || toFollow.isVMRestart() || toFollow.isVMShutdown())) {
                        this.stratRespForShutdown = toFollow;
                        this.shuttingDown = true;
                    }
                    if (manualRun) break block22;
                    int waitMS = toFollow.getDelayTimeInMsOrZero();
                    if (waitMS > 0 && waitMS < 300000) {
                        waitMS = 300000;
                    }
                    this.inbox.clear();
                    SchedInfo info = this.timerContoller.runNotCompletedDueEXResched(this, waitMS, true, true);
                    this.logJobProblem(false, "In ProducerRun scheduled: " + info.msg + " @ " + EXACT_TIME_ONLY_FORMATTER.print((ReadableInstant)info.when), null, null);
                    this.checkInboxForRescheduling = false;
                }
            }
        }
    }

    private void checkInboxAndSendWork(ConsumerThread t) {
        if (!this.shuttingDown && !this.consumerExWaitReqeusted() && this.inbox.size() > 0) {
            if (!this.manuallyInboxFilled && this.timerContoller.outOfCronWindow(this)) {
                Object logMessage = "Work for consumer req. but out of cron window";
                if (this.isNoConsumerWorking()) {
                    SchedInfo info = this.timerContoller.runNotCompletedOutOfCronWindowResched(this, false);
                    logMessage = ", scheduled " + info.msg + " @ " + info.when;
                }
                this.logFrmwrkTrace((String)logMessage);
            } else {
                String entityDescription;
                Entity ent = this.inbox.poll();
                if (ent == null) {
                    throw new IllegalStateException("Programming error, no head elem to remove from inbox. (size " + this.inbox.size() + ")");
                }
                t.setStatus(ConsumerThread.Status.WORKING);
                t.setProcessingKey(ent);
                t.receive(new ProcessWorkMsg<Entity>(ent));
                try {
                    entityDescription = "'" + ent.toString() + "'";
                }
                catch (Exception e) {
                    entityDescription = "'" + ent.getClass().getSimpleName() + " ????'";
                    this.logJobProblem(false, "Exception in toString() of " + ent.getClass().getSimpleName() + " while logging", e, null);
                }
                this.logFrmwrkTrace(entityDescription + " to consumer " + t.getConsumerId() + " for processing");
            }
        }
    }

    private void wakeUpWaitingAndSendWork() {
        for (ConsumerThread<Entity> curCon : this.allConsumers) {
            if (curCon.getStatus() != ConsumerThread.Status.WAITING) continue;
            this.checkInboxAndSendWork(curCon);
        }
    }

    private void shutdownConsumersGraceFullyAndWait() {
        this.logFrmwrkTrace("Trying to shut down consumers gracefully");
        for (ConsumerThread<Entity> curCon : this.allConsumers) {
            if (curCon.getStatus() == ConsumerThread.Status.SHUTDOWN) continue;
            curCon.receive(new ShutdownMsg());
        }
        boolean stopped = this.waitForAllThreadsStopped(5, false);
        if (!stopped) {
            this.logFrmwrkError("Not able to stop all consumer gracefully via ShutdownMsg within 5 secs.", null);
        }
        if (!(stopped = this.waitForAllThreadsStopped(10, true))) {
            this.logFrmwrkError("Not able to stop all consumers with interrupted within 10 secs.", null);
        }
    }

    private boolean isNoConsumerWorking() {
        int shutdownCons = 0;
        int workingCons = 0;
        for (ConsumerThread<Entity> cons : this.allConsumers) {
            if (cons.getStatus().equals((Object)ConsumerThread.Status.WORKING)) {
                ++workingCons;
                continue;
            }
            if (!cons.getStatus().equals((Object)ConsumerThread.Status.SHUTDOWN)) continue;
            ++shutdownCons;
        }
        if (shutdownCons == this.allConsumers.size() && this.allConsumers.size() != 0) {
            throw new IllegalStateException("All consumer are in SHUTDOWN status, no more consumers available");
        }
        return workingCons == 0;
    }

    private int waitingConsumersAvailable() {
        int waitingCons = 0;
        for (ConsumerThread<Entity> cons : this.allConsumers) {
            if (!cons.getStatus().equals((Object)ConsumerThread.Status.WAITING)) continue;
            ++waitingCons;
        }
        return waitingCons;
    }

    private boolean isNoConsumerAlive() {
        boolean noneAlive = true;
        for (ConsumerThread<Entity> t : this.allConsumers) {
            if (!t.isAlive()) continue;
            noneAlive = false;
        }
        return noneAlive;
    }

    public boolean ensureConsumerShutdown(int id, int sec) {
        ConsumerThread<Entity> t = this.allConsumers.get(id);
        t.receive(new ShutdownMsg());
        try {
            for (int cntDown = sec; cntDown > 0; --cntDown) {
                if (!t.isAlive()) {
                    return true;
                }
                t.interrupt();
                Thread.sleep(1000L);
            }
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
        return !t.isAlive();
    }

    private boolean waitForAllThreadsStopped(int sec, boolean interruptFirst) {
        boolean noneAlive = false;
        if (interruptFirst) {
            for (ConsumerThread<Entity> t : this.allConsumers) {
                if (!t.isAlive()) continue;
                t.interrupt();
            }
        }
        try {
            for (int cntDown = sec; cntDown > 0; --cntDown) {
                noneAlive = this.isNoConsumerAlive();
                if (noneAlive) {
                    return true;
                }
                Thread.sleep(1000L);
            }
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
        return this.isNoConsumerAlive();
    }

    public List<Message> dbg_getProcessedMessages() {
        return this.dbg_processedMsg;
    }

    public List<Message> dbg_getRemainingMessages() {
        ArrayList<Message> msgs = new ArrayList<Message>();
        for (Object msg : this.messageQueue.toArray()) {
            msgs.add((Message)msg);
        }
        return msgs;
    }

    public List<ConsumerThread<Entity>> dbg_getConsumerThreads() {
        return this.allConsumers;
    }

    public int dbg_inboxSize() {
        return this.inbox.size();
    }

    public String dbg_dumpState() {
        String s = "OFXProducerCrtl shutdown=" + this.shuttingDown + " shutdonInboxEmpty=" + this.shutdownWhenInboxEmptyAndConsumersParked;
        s = s + " msgQSize=" + this.messageQueue.size() + " inbox=" + this.inbox.size() + "\nOFXConsumerRunnable ";
        for (ConsumerThread<Entity> t : this.allConsumers) {
            s = s + t.getStatus() + ", ";
        }
        return s;
    }

    private String dumpInbox() {
        Object s = "";
        Object[] inboxState = this.inbox.toArray();
        for (int i = 0; i < inboxState.length; ++i) {
            if (i < 5 || i >= inboxState.length - 5) {
                s = (String)s + inboxState[i] + " ";
                continue;
            }
            if (i != 5) continue;
            s = (String)s + " ... ";
        }
        return s;
    }

    @Override
    public boolean getproducer6_ProducerEnabled() {
        return this.producerRunsEnabled;
    }

    public int getPCPairID() {
        return this.thisPCPairID;
    }

    public String getPCPairName() {
        return this.thisPCPairName;
    }

    public int getNumberOfConsumers() {
        return this.allConsumers.size();
    }

    public boolean needsShutdownMsg() {
        return !this.shuttingDown || !this.shutdownWhenInboxEmptyAndConsumersParked;
    }

    public void jmxUnregister() {
        if (!this.isConsoleMode() && !this.jmxUnregisterDone) {
            for (int i = 0; i < this.allConsumers.size(); ++i) {
                StaticJmxAccess.unregister(this.assembleJMXPrefix(this.getbatchjob_PCPairNameAndID(), false, this.allConsumers.get(i).getConsumerId()));
            }
            StaticJmxAccess.unregister(this.assembleJMXPrefix(this.getbatchjob_PCPairNameAndID(), true, 0));
        }
        this.jmxUnregisterDone = true;
    }

    public void gcClean() {
        this.producerCommandImplStatefull = null;
        this.exceptionStrategy = null;
        for (int i = 0; i < this.allConsumers.size(); ++i) {
            this.allConsumers.get(i).gcClean();
        }
        if (this.timerContoller.getJobProperties().envMode == MODE.TOMMY_MODE) {
            this.messageQueue.clear();
            this.inbox.clear();
            this.allConsumers.clear();
        }
        this.timerContoller = null;
    }

    public static enum MODE {
        TOMMY_MODE,
        CONSOLE_MODE,
        TEST_MODE_WITH_CRON;

    }
}

