/*
 * Decompiled with CFR 0.152.
 */
package org.modellwerkstatt.objectflow.batchjob;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.joda.time.DateTime;
import org.modellwerkstatt.manmap.runtime.MMShutdownRequestException;
import org.modellwerkstatt.objectflow.batchjob.ConsWorkCanceledMsg;
import org.modellwerkstatt.objectflow.batchjob.ConsWorkDoneMsg;
import org.modellwerkstatt.objectflow.batchjob.ConsWorkExMsg;
import org.modellwerkstatt.objectflow.batchjob.ConsumerFinallyDownMsg;
import org.modellwerkstatt.objectflow.batchjob.ConsumerReporter;
import org.modellwerkstatt.objectflow.batchjob.IOFXCommandImplConsumer;
import org.modellwerkstatt.objectflow.batchjob.JobProperties;
import org.modellwerkstatt.objectflow.batchjob.Message;
import org.modellwerkstatt.objectflow.batchjob.OFXExceptionStrategy;
import org.modellwerkstatt.objectflow.batchjob.OFXPCPairController;
import org.modellwerkstatt.objectflow.batchjob.ProcessWorkMsg;
import org.modellwerkstatt.objectflow.batchjob.ShutdownMsg;
import org.modellwerkstatt.objectflow.runtime.IOFXCoreReporter;
import org.modellwerkstatt.objectflow.runtime.IPrintingServiceImpl;
import org.modellwerkstatt.objectflow.runtime.OFXJobWorkCanceledException;

public class OFXConsumerRunnable<EntityOrKey>
extends ConsumerReporter
implements Runnable {
    private static final int QUEUE_CAPACITY = 15;
    public static final String TEST_STOP_EVENT_LOOP_EX_MSG = "Stop OFXConsumerRunnable EventLoop now!";
    private BlockingQueue<Message> queue = new ArrayBlockingQueue<Message>(15);
    private volatile boolean eventLoopRunning = false;
    private Message currentMessageProcessing;
    private IOFXCommandImplConsumer[] consumerImplementations;
    private OFXPCPairController producer;
    private boolean dbg_ignoreGraceFullyShutdown = false;
    private boolean dbg_ignoreWorkDoneSend = false;

    public OFXConsumerRunnable(String pcPairName, int consId, JobProperties props, IOFXCommandImplConsumer[] implementation) {
        super(pcPairName, consId, props);
        this.consumerImplementations = implementation;
    }

    public void init(OFXPCPairController prod, IPrintingServiceImpl printServiceImp, IOFXCoreReporter reporter) {
        this.producer = prod;
        this.userPrintService = printServiceImp;
        this.coreReporter = reporter;
        this.eventLoopRunning = true;
    }

    public String printQueue() {
        Object s = "";
        for (Object obj : this.queue.toArray()) {
            s = (String)s + obj.toString() + ", ";
        }
        return s;
    }

    public void receive(Message message) {
        this.queue.add(message);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        this.logFrmwrkTrace("Starting into event loop.");
        try {
            while (this.eventLoopRunning) {
                this.setInternalStatus("Waiting for messages");
                this.currentMessageProcessing = this.queue.take();
                this.logFrmwrkTrace("Processing Message " + this.currentMessageProcessing);
                this.setInternalStatus("Processing Message " + this.currentMessageProcessing);
                if (this.currentMessageProcessing instanceof ProcessWorkMsg) {
                    Object ek = ((ProcessWorkMsg)this.currentMessageProcessing).getEntityKey();
                    boolean problem = false;
                    try {
                        String status = "-";
                        this.startProcessing(ek);
                        boolean handled = false;
                        for (IOFXCommandImplConsumer imp : this.consumerImplementations) {
                            if (!imp.toExecute(ek)) continue;
                            handled = true;
                            imp.process(this, ek);
                            status = imp.getLastAction();
                            this.setLastAction(status);
                            break;
                        }
                        if (!handled) {
                            throw new RuntimeException("Inbox item '" + ek + "' was not handled by one of the consumer commands. This is probably a problem.");
                        }
                        this.stopProcessing();
                        if (Thread.currentThread().isInterrupted()) {
                            this.logFrmwrkTrace("Thread isInterrupted() in main loop - shutting down");
                            this.eventLoopRunning = false;
                            problem = true;
                        }
                    }
                    catch (MMShutdownRequestException ex) {
                        this.eventLoopRunning = false;
                        problem = true;
                    }
                    catch (OFXJobWorkCanceledException canceEx) {
                        String msg = "Work on '" + ek + "' canceled - first Problem: " + canceEx.getFirstProblem();
                        OFXExceptionStrategy.Strategy cancelExStrat = this.producer.exStratFor(canceEx);
                        boolean silentLogCancel = cancelExStrat.isSilentNoLog();
                        problem = true;
                        this.producer.receive(new ConsWorkCanceledMsg(this.id, msg));
                        this.canceledProcessing(silentLogCancel, canceEx.getCmdNAme(), canceEx.getSessionProblems());
                    }
                    catch (Throwable t) {
                        if (t instanceof InterruptedException) {
                            Thread.currentThread().interrupt();
                            this.eventLoopRunning = false;
                        }
                        if (TEST_STOP_EVENT_LOOP_EX_MSG.equals(t.getMessage())) {
                            this.eventLoopRunning = false;
                        }
                        OFXExceptionStrategy.Strategy exStrategy = this.producer.exStratFor(t);
                        boolean silentLogCancel = exStrategy.isSilentNoLog();
                        problem = true;
                        if (!silentLogCancel) {
                            this.logJobProblem(false, t.getClass().getSimpleName() + " while consumer processing '" + ek + "': handling with " + exStrategy, t, this.convertGuardMsg(t));
                        } else {
                            this.skipReportingEx();
                        }
                        this.producer.receive(new ConsWorkExMsg(this.id, new DateTime(), t, false));
                    }
                    if (problem || this.dbg_ignoreWorkDoneSend) continue;
                    this.producer.receive(new ConsWorkDoneMsg(this.id));
                    continue;
                }
                if (this.currentMessageProcessing instanceof ShutdownMsg) {
                    if (this.dbg_ignoreGraceFullyShutdown) continue;
                    this.eventLoopRunning = false;
                    continue;
                }
                throw new RuntimeException("Unknown message " + this.currentMessageProcessing + " sent to consumer.");
            }
        }
        catch (InterruptedException ex) {
            this.logFrmwrkTrace("Interrupted Exception in main loop - shutting down");
            Thread.currentThread().interrupt();
        }
        catch (Throwable t) {
            this.logFrmwrkError("Exception in main loop - shutting down", t);
            this.producer.receive(new ConsWorkExMsg(this.id, new DateTime(), t, true));
        }
        finally {
            this.logFrmwrkTrace("consumer shut down, sending ConsumerDownMsg to producer.");
            this.setInternalStatus("Shutdown");
            this.producer.receive(new ConsumerFinallyDownMsg(this.id));
        }
    }

    public void gcClean() {
        this.queue.clear();
    }
}

