package org.jumpmind.symmetric.service.impl;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Table;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.SyntaxParsingException;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.transform.JavaColumnTransform;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.DataGap;
import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeChannel;
import org.jumpmind.symmetric.model.NodeGroupLinkAction;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.Router;
import org.jumpmind.symmetric.model.TriggerRouter;
import org.jumpmind.symmetric.route.AuditTableDataRouter;
import org.jumpmind.symmetric.route.BshDataRouter;
import org.jumpmind.symmetric.route.ChannelRouterContext;
import org.jumpmind.symmetric.route.ColumnMatchDataRouter;
import org.jumpmind.symmetric.route.ConfigurationChangedDataRouter;
import org.jumpmind.symmetric.route.DataGapDetector;
import org.jumpmind.symmetric.route.DataGapRouteReader;
import org.jumpmind.symmetric.route.DefaultBatchAlgorithm;
import org.jumpmind.symmetric.route.DefaultDataRouter;
import org.jumpmind.symmetric.route.FileSyncDataRouter;
import org.jumpmind.symmetric.route.IBatchAlgorithm;
import org.jumpmind.symmetric.route.IDataRouter;
import org.jumpmind.symmetric.route.IDataToRouteReader;
import org.jumpmind.symmetric.route.JavaDataRouter;
import org.jumpmind.symmetric.route.LookupTableDataRouter;
import org.jumpmind.symmetric.route.NonTransactionalBatchAlgorithm;
import org.jumpmind.symmetric.route.SimpleRouterContext;
import org.jumpmind.symmetric.route.SubSelectDataRouter;
import org.jumpmind.symmetric.route.TransactionalBatchAlgorithm;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IRouterService;

/* loaded from: classes.dex */
public class RouterService extends AbstractService implements IRouterService {
    protected Map<String, IBatchAlgorithm> batchAlgorithms;
    protected Map<String, Boolean> defaultRouterOnlyLastState;
    protected ISymmetricEngine engine;
    protected transient ExecutorService readThread;
    protected Map<String, IDataRouter> routers;
    protected boolean syncTriggersBeforeInitialLoadAttempted;

    public RouterService(ISymmetricEngine iSymmetricEngine) {
        super(iSymmetricEngine.getParameterService(), iSymmetricEngine.getSymmetricDialect());
        this.defaultRouterOnlyLastState = new HashMap();
        this.readThread = null;
        this.syncTriggersBeforeInitialLoadAttempted = false;
        this.engine = iSymmetricEngine;
        this.batchAlgorithms = new HashMap();
        this.batchAlgorithms.put("default", new DefaultBatchAlgorithm());
        this.batchAlgorithms.put("nontransactional", new NonTransactionalBatchAlgorithm());
        this.batchAlgorithms.put("transactional", new TransactionalBatchAlgorithm());
        this.routers = new HashMap();
        this.routers.put(ConfigurationChangedDataRouter.ROUTER_TYPE, new ConfigurationChangedDataRouter(iSymmetricEngine));
        this.routers.put("bsh", new BshDataRouter(iSymmetricEngine));
        this.routers.put(JavaColumnTransform.NAME, new JavaDataRouter(iSymmetricEngine));
        this.routers.put("subselect", new SubSelectDataRouter(this.symmetricDialect));
        this.routers.put("lookuptable", new LookupTableDataRouter(this.symmetricDialect));
        this.routers.put("default", new DefaultDataRouter());
        this.routers.put("audit", new AuditTableDataRouter(iSymmetricEngine));
        this.routers.put("column", new ColumnMatchDataRouter(iSymmetricEngine.getConfigurationService(), iSymmetricEngine.getSymmetricDialect()));
        this.routers.put("filesync", new FileSyncDataRouter(iSymmetricEngine));
        setSqlMap(new RouterServiceSqlMap(this.symmetricDialect.getPlatform(), createSqlReplacementTokens()));
    }

    @Override // org.jumpmind.symmetric.service.IRouterService
    public void addBatchAlgorithm(String str, IBatchAlgorithm iBatchAlgorithm) {
        this.batchAlgorithms.put(str, iBatchAlgorithm);
    }

    @Override // org.jumpmind.symmetric.service.IRouterService
    public void addDataRouter(String str, IDataRouter iDataRouter) {
        this.routers.put(str, iDataRouter);
    }

    protected void completeBatchesAndCommit(ChannelRouterContext channelRouterContext) {
        HashSet hashSet = new HashSet(channelRouterContext.getUsedDataRouters());
        ArrayList<OutgoingBatch> arrayList = new ArrayList(channelRouterContext.getBatchesByNodes().values());
        channelRouterContext.commit();
        if (this.engine.getParameterService().is(ParameterConstants.ROUTING_LOG_STATS_ON_BATCH_ERROR)) {
            this.engine.getStatisticManager().addRouterStats(channelRouterContext.getStartDataId(), channelRouterContext.getEndDataId(), channelRouterContext.getDataReadCount(), channelRouterContext.getPeekAheadFillCount(), channelRouterContext.getDataGaps(), channelRouterContext.getTransactions(), arrayList);
        }
        for (OutgoingBatch outgoingBatch : arrayList) {
            outgoingBatch.setRouterMillis(System.currentTimeMillis() - outgoingBatch.getCreateTime().getTime());
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                ((IDataRouter) it.next()).completeBatch(channelRouterContext, outgoingBatch);
            }
            if (Constants.UNROUTED_NODE_ID.equals(outgoingBatch.getNodeId())) {
                outgoingBatch.setStatus(OutgoingBatch.Status.OK);
            } else {
                outgoingBatch.setStatus(OutgoingBatch.Status.NE);
            }
            this.engine.getOutgoingBatchService().updateOutgoingBatch(outgoingBatch);
            channelRouterContext.getBatchesByNodes().remove(outgoingBatch.getNodeId());
        }
        Iterator it2 = hashSet.iterator();
        while (it2.hasNext()) {
            ((IDataRouter) it2.next()).contextCommitted(channelRouterContext);
        }
        channelRouterContext.setNeedsCommitted(false);
    }

    protected Set<Node> findAvailableNodes(TriggerRouter triggerRouter, ChannelRouterContext channelRouterContext) {
        Set<Node> set = channelRouterContext.getAvailableNodes().get(triggerRouter);
        if (set == null) {
            set = new HashSet<>();
            Router router = triggerRouter.getRouter();
            if (this.engine.getConfigurationService().getNodeGroupLinkFor(router.getNodeGroupLink().getSourceNodeGroupId(), router.getNodeGroupLink().getTargetNodeGroupId(), false) != null) {
                set.addAll(this.engine.getNodeService().findEnabledNodesFromNodeGroup(router.getNodeGroupLink().getTargetNodeGroupId()));
            } else {
                this.log.error("The router {} has no node group link configured from {} to {}", new Object[]{router.getRouterId(), router.getNodeGroupLink().getSourceNodeGroupId(), router.getNodeGroupLink().getTargetNodeGroupId()});
            }
            channelRouterContext.getAvailableNodes().put(triggerRouter, set);
        }
        return this.engine.getGroupletService().getTargetEnabled(triggerRouter, set);
    }

    @Override // org.jumpmind.symmetric.service.IRouterService
    public List<String> getAvailableBatchAlgorithms() {
        return new ArrayList(this.batchAlgorithms.keySet());
    }

    protected IDataRouter getDataRouter(Router router) {
        IDataRouter iDataRouter = null;
        if (!StringUtils.isBlank(router.getRouterType()) && (iDataRouter = this.routers.get(router.getRouterType())) == null) {
            this.log.warn("Could not find configured router type of {} with the id of {}. Defaulting the router", router.getRouterType(), router.getRouterId());
        }
        return iDataRouter == null ? this.routers.get("default") : iDataRouter;
    }

    @Override // org.jumpmind.symmetric.service.IRouterService
    public Map<String, IDataRouter> getRouters() {
        return this.routers;
    }

    protected List<TriggerRouter> getTriggerRoutersForData(Data data) {
        if (data == null) {
            return null;
        }
        if (data.getTriggerHistory() != null) {
            List<TriggerRouter> list = this.engine.getTriggerRouterService().getTriggerRoutersForCurrentNode(false).get(data.getTriggerHistory().getTriggerId());
            return (list == null || list.size() == 0) ? this.engine.getTriggerRouterService().getTriggerRoutersForCurrentNode(true).get(data.getTriggerHistory().getTriggerId()) : list;
        }
        this.log.warn("Could not find a trigger hist record for recorded data {}.  Was the trigger hist record deleted manually?", Long.valueOf(data.getDataId()));
        return null;
    }

    @Override // org.jumpmind.symmetric.service.IRouterService
    public long getUnroutedDataCount() {
        long findMaxDataId = this.engine.getDataService().findMaxDataId() - this.sqlTemplate.queryForLong(getSql("selectLastDataIdRoutedUsingDataGapSql"), new Object[0]);
        List<DataGap> findDataGaps = this.engine.getDataService().findDataGaps();
        for (int i = 0; i < findDataGaps.size() - 2; i++) {
            DataGap dataGap = findDataGaps.get(i);
            findMaxDataId += dataGap.getEndId() - dataGap.getStartId();
        }
        if (findMaxDataId > 0) {
            return findMaxDataId;
        }
        return 0L;
    }

    protected int insertDataEvents(ProcessInfo processInfo, ChannelRouterContext channelRouterContext, DataMetaData dataMetaData, Collection<String> collection) {
        int i = 0;
        if (collection == null || collection.size() == 0) {
            collection = new HashSet<>(1);
            collection.add(Constants.UNROUTED_NODE_ID);
        }
        long currentTimeMillis = System.currentTimeMillis();
        long j = -1;
        boolean z = false;
        for (String str : collection) {
            if (str != null) {
                OutgoingBatch outgoingBatch = channelRouterContext.getBatchesByNodes().get(str);
                if (outgoingBatch == null) {
                    outgoingBatch = new OutgoingBatch(str, dataMetaData.getNodeChannel().getChannelId(), OutgoingBatch.Status.RT);
                    outgoingBatch.setBatchId(j);
                    outgoingBatch.setCommonFlag(channelRouterContext.isProduceCommonBatches());
                    this.log.debug("About to insert a new batch for node {} on the '{}' channel.  Batches in progress are: {}.", new Object[]{str, outgoingBatch.getChannelId(), channelRouterContext.getBatchesByNodes().values()});
                    this.engine.getOutgoingBatchService().insertOutgoingBatch(outgoingBatch);
                    processInfo.incrementBatchCount();
                    channelRouterContext.getBatchesByNodes().put(str, outgoingBatch);
                    if (channelRouterContext.isProduceCommonBatches()) {
                        j = outgoingBatch.getBatchId();
                    }
                }
                if (dataMetaData.getData().getDataEventType() == DataEventType.RELOAD) {
                    long lastLoadId = channelRouterContext.getLastLoadId();
                    if (lastLoadId < 0) {
                        lastLoadId = this.engine.getSequenceService().nextVal(Constants.SEQUENCE_OUTGOING_BATCH_LOAD_ID);
                        channelRouterContext.setLastLoadId(lastLoadId);
                    }
                    outgoingBatch.setLoadId(lastLoadId);
                } else {
                    channelRouterContext.setLastLoadId(-1L);
                }
                outgoingBatch.incrementEventCount(dataMetaData.getData().getDataEventType());
                outgoingBatch.incrementDataEventCount();
                if (!channelRouterContext.isProduceCommonBatches() || (channelRouterContext.isProduceCommonBatches() && !z)) {
                    Router router = dataMetaData.getRouter();
                    channelRouterContext.addDataEvent(dataMetaData.getData().getDataId(), outgoingBatch.getBatchId(), router != null ? router.getRouterId() : Constants.UNKNOWN_ROUTER_ID);
                    i++;
                    z = true;
                }
                if (this.batchAlgorithms.get(channelRouterContext.getChannel().getBatchAlgorithm()).isBatchComplete(outgoingBatch, dataMetaData, channelRouterContext)) {
                    channelRouterContext.setNeedsCommitted(true);
                }
            }
        }
        channelRouterContext.incrementStat(System.currentTimeMillis() - currentTimeMillis, ChannelRouterContext.STAT_INSERT_DATA_EVENTS_MS);
        return i;
    }

    /* JADX WARN: Code restructure failed: missing block: B:56:0x0106, code lost:
    
        if (r13 == false) goto L93;
     */
    /* JADX WARN: Code restructure failed: missing block: B:58:0x0108, code lost:
    
        if (r12 == false) goto L94;
     */
    /* JADX WARN: Code restructure failed: missing block: B:60:0x010a, code lost:
    
        if (r8 == false) goto L95;
     */
    /* JADX WARN: Code restructure failed: missing block: B:62:0x010c, code lost:
    
        if (r14 == false) goto L79;
     */
    /* JADX WARN: Code restructure failed: missing block: B:63:0x010e, code lost:
    
        if (r15 != false) goto L96;
     */
    /* JADX WARN: Code restructure failed: missing block: B:65:0x0110, code lost:
    
        r18 = java.lang.System.currentTimeMillis();
        r25.engine.getDataService().insertReloadEvents(r25.engine.getNodeService().findNode(r16.getNodeId()), false);
        r18 = java.lang.System.currentTimeMillis() - r18;
     */
    /* JADX WARN: Code restructure failed: missing block: B:66:0x0145, code lost:
    
        if (r18 <= org.jumpmind.symmetric.common.Constants.LONG_OPERATION_THRESHOLD) goto L81;
     */
    /* JADX WARN: Code restructure failed: missing block: B:68:0x0164, code lost:
    
        r25.log.info("Inserted reload events for node {} in {} ms", r16.getNodeId(), java.lang.Long.valueOf(r18));
     */
    /* JADX WARN: Code restructure failed: missing block: B:71:0x0147, code lost:
    
        r25.log.warn("Inserted reload events for node {} in {} ms", r16.getNodeId(), java.lang.Long.valueOf(r18));
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    protected void insertInitialLoadEvents() {
        /*
            Method dump skipped, instructions count: 548
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.jumpmind.symmetric.service.impl.RouterService.insertInitialLoadEvents():void");
    }

    protected boolean producesCommonBatches(Channel channel, List<TriggerRouter> list) {
        String channelId = channel.getChannelId();
        Boolean valueOf = Boolean.valueOf((Constants.CHANNEL_CONFIG.equals(channelId) || channel.isFileSyncFlag() || channel.isReloadFlag() || Constants.CHANNEL_HEARTBEAT.equals(channelId)) ? false : true);
        String nodeGroupId = this.parameterService.getNodeGroupId();
        if (list != null) {
            Iterator<TriggerRouter> it = list.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                TriggerRouter next = it.next();
                IDataRouter dataRouter = getDataRouter(next.getRouter());
                if (next.getRouter().getNodeGroupLink().getSourceNodeGroupId().equals(nodeGroupId)) {
                    if (!(dataRouter instanceof DefaultDataRouter)) {
                        valueOf = false;
                        break;
                    }
                    if (next.getTrigger().isSyncOnIncomingBatch()) {
                        String fullyQualifiedSourceTableName = next.getTrigger().getFullyQualifiedSourceTableName();
                        Iterator<TriggerRouter> it2 = list.iterator();
                        while (true) {
                            if (it2.hasNext()) {
                                TriggerRouter next2 = it2.next();
                                if (next2.getTrigger().getFullyQualifiedSourceTableName().equals(fullyQualifiedSourceTableName) && next2.getRouter().getNodeGroupLink().getTargetNodeGroupId().equals(nodeGroupId)) {
                                    valueOf = false;
                                    break;
                                }
                            }
                        }
                    }
                }
            }
        }
        if (!valueOf.equals(this.defaultRouterOnlyLastState.get(channelId))) {
            if (valueOf.booleanValue()) {
                this.log.info("The '{}' channel is in common batch mode", channelId);
            } else {
                this.log.info("The '{}' channel is NOT in common batch mode", channelId);
            }
            this.defaultRouterOnlyLastState.put(channelId, valueOf);
        }
        return valueOf.booleanValue();
    }

    protected int routeData(ProcessInfo processInfo, Data data, ChannelRouterContext channelRouterContext) {
        int i = 0;
        List<TriggerRouter> triggerRoutersForData = getTriggerRoutersForData(data);
        Table table = this.symmetricDialect.getTable(data.getTriggerHistory(), true);
        if (triggerRoutersForData == null || triggerRoutersForData.size() <= 0) {
            this.log.warn("Could not find trigger routers for trigger history id of {}.  There is a good chance that data was captured and the trigger router link was removed before the data could be routed", Integer.valueOf(data.getTriggerHistory().getTriggerHistoryId()));
            this.log.info("Not processing data with the data id of {}", Long.valueOf(data.getDataId()));
            i = 0 + insertDataEvents(processInfo, channelRouterContext, new DataMetaData(data, table, null, channelRouterContext.getChannel()), new HashSet());
        } else {
            for (TriggerRouter triggerRouter : triggerRoutersForData) {
                DataMetaData dataMetaData = new DataMetaData(data, table, triggerRouter.getRouter(), channelRouterContext.getChannel());
                Collection<String> collection = null;
                if (!channelRouterContext.getChannel().isIgnoreEnabled() && triggerRouter.isRouted(data.getDataEventType())) {
                    String nodeList = data.getNodeList();
                    if (StringUtils.isNotBlank(nodeList)) {
                        collection = CollectionUtils.intersection(Arrays.asList(nodeList.split(",")), toNodeIds(findAvailableNodes(triggerRouter, channelRouterContext)));
                        if (collection.size() == 0) {
                            this.log.warn("None of the target nodes specified in the data.node_list field ({}) were qualified nodes.  {} will not be routed", nodeList, Long.valueOf(data.getDataId()));
                        }
                    } else {
                        IDataRouter dataRouter = getDataRouter(triggerRouter.getRouter());
                        channelRouterContext.addUsedDataRouter(dataRouter);
                        long currentTimeMillis = System.currentTimeMillis();
                        collection = dataRouter.routeToNodes(channelRouterContext, dataMetaData, findAvailableNodes(triggerRouter, channelRouterContext), false, false, triggerRouter);
                        channelRouterContext.incrementStat(System.currentTimeMillis() - currentTimeMillis, ChannelRouterContext.STAT_DATA_ROUTER_MS);
                    }
                    if (collection != null) {
                        if (!triggerRouter.isPingBackEnabled() && data.getSourceNodeId() != null) {
                            collection.remove(data.getSourceNodeId());
                        }
                        collection.remove(this.engine.getNodeService().findIdentityNodeId());
                    }
                }
                i += insertDataEvents(processInfo, channelRouterContext, dataMetaData, collection);
            }
        }
        channelRouterContext.incrementStat(i, ChannelRouterContext.STAT_DATA_EVENTS_INSERTED);
        return i;
    }

    @Override // org.jumpmind.symmetric.service.IRouterService
    public synchronized long routeData(boolean z) {
        long j;
        j = -1;
        if (this.engine.getNodeService().findIdentity() != null && (z || this.engine.getClusterService().lock(ClusterConstants.ROUTE))) {
            try {
                this.engine.getOutgoingBatchService().updateAbandonedRoutingBatches();
                insertInitialLoadEvents();
                long currentTimeMillis = System.currentTimeMillis();
                DataGapDetector dataGapDetector = new DataGapDetector(this.engine.getDataService(), this.parameterService, this.symmetricDialect, this, this.engine.getStatisticManager(), this.engine.getNodeService());
                dataGapDetector.beforeRouting();
                j = routeDataForEachChannel(dataGapDetector);
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                if (j > 0 || currentTimeMillis2 > Constants.LONG_OPERATION_THRESHOLD) {
                    this.log.info("Routed {} data events in {} ms", Long.valueOf(j), Long.valueOf(currentTimeMillis2));
                }
            } finally {
                if (!z) {
                    this.engine.getClusterService().unlock(ClusterConstants.ROUTE);
                }
            }
        }
        return j;
    }

    protected int routeDataForChannel(ProcessInfo processInfo, NodeChannel nodeChannel, Node node, boolean z, DataGapDetector dataGapDetector) {
        ChannelRouterContext channelRouterContext;
        ChannelRouterContext channelRouterContext2 = null;
        long currentTimeMillis = System.currentTimeMillis();
        try {
            try {
                channelRouterContext = new ChannelRouterContext(node.getNodeId(), nodeChannel, this.symmetricDialect.getPlatform().getSqlTemplate().startSqlTransaction());
            } catch (Throwable th) {
                th = th;
            }
        } catch (InterruptedException e) {
        } catch (SyntaxParsingException e2) {
            e = e2;
        } catch (Throwable th2) {
            th = th2;
        }
        try {
            channelRouterContext.setProduceCommonBatches(z);
            int selectDataAndRoute = selectDataAndRoute(processInfo, channelRouterContext);
            try {
                if (selectDataAndRoute > 0) {
                    try {
                        long currentTimeMillis2 = System.currentTimeMillis();
                        this.engine.getDataService().insertDataEvents(channelRouterContext.getSqlTransaction(), channelRouterContext.getDataEventList());
                        channelRouterContext.clearDataEventsList();
                        completeBatchesAndCommit(channelRouterContext);
                        channelRouterContext.incrementStat(System.currentTimeMillis() - currentTimeMillis2, ChannelRouterContext.STAT_INSERT_DATA_EVENTS_MS);
                        Data lastDataProcessed = channelRouterContext.getLastDataProcessed();
                        if (lastDataProcessed != null && lastDataProcessed.getDataId() > 0) {
                            String channelId = nodeChannel.getChannelId();
                            long currentTimeMillis3 = System.currentTimeMillis();
                            long queryForInt = this.sqlTemplate.queryForInt(getSql("selectUnroutedCountForChannelSql"), channelId, Long.valueOf(lastDataProcessed.getDataId()));
                            long currentTimeMillis4 = System.currentTimeMillis() - currentTimeMillis3;
                            if (currentTimeMillis4 > Constants.LONG_OPERATION_THRESHOLD) {
                                this.log.warn("Unrouted query for channel {} took longer than expected", channelId, Long.valueOf(currentTimeMillis4));
                                this.log.info("The query took {} ms", Long.valueOf(currentTimeMillis4));
                            }
                            this.engine.getStatisticManager().setDataUnRouted(channelId, queryForInt);
                        }
                    } catch (Exception e3) {
                        if (channelRouterContext != null) {
                            channelRouterContext.rollback();
                        }
                        this.log.error(e3.getMessage(), (Throwable) e3);
                        long currentTimeMillis5 = System.currentTimeMillis() - currentTimeMillis;
                        channelRouterContext.incrementStat(currentTimeMillis5, ChannelRouterContext.STAT_ROUTE_TOTAL_TIME);
                        channelRouterContext.logStats(this.log, currentTimeMillis5);
                        boolean isRequestGapDetection = channelRouterContext.isRequestGapDetection();
                        channelRouterContext.cleanup();
                        if (isRequestGapDetection) {
                            dataGapDetector.beforeRouting();
                        }
                    }
                }
                return selectDataAndRoute;
            } finally {
                long currentTimeMillis6 = System.currentTimeMillis() - currentTimeMillis;
                channelRouterContext.incrementStat(currentTimeMillis6, ChannelRouterContext.STAT_ROUTE_TOTAL_TIME);
                channelRouterContext.logStats(this.log, currentTimeMillis6);
                boolean isRequestGapDetection2 = channelRouterContext.isRequestGapDetection();
                channelRouterContext.cleanup();
                if (isRequestGapDetection2) {
                    dataGapDetector.beforeRouting();
                }
            }
        } catch (InterruptedException e4) {
            channelRouterContext2 = channelRouterContext;
            this.log.warn("The routing process was interrupted.  Rolling back changes");
            if (channelRouterContext2 != null) {
                channelRouterContext2.rollback();
            }
            if (-1 > 0) {
                try {
                    try {
                        long currentTimeMillis7 = System.currentTimeMillis();
                        this.engine.getDataService().insertDataEvents(channelRouterContext2.getSqlTransaction(), channelRouterContext2.getDataEventList());
                        channelRouterContext2.clearDataEventsList();
                        completeBatchesAndCommit(channelRouterContext2);
                        channelRouterContext2.incrementStat(System.currentTimeMillis() - currentTimeMillis7, ChannelRouterContext.STAT_INSERT_DATA_EVENTS_MS);
                        Data lastDataProcessed2 = channelRouterContext2.getLastDataProcessed();
                        if (lastDataProcessed2 != null && lastDataProcessed2.getDataId() > 0) {
                            String channelId2 = nodeChannel.getChannelId();
                            long currentTimeMillis8 = System.currentTimeMillis();
                            long queryForInt2 = this.sqlTemplate.queryForInt(getSql("selectUnroutedCountForChannelSql"), channelId2, Long.valueOf(lastDataProcessed2.getDataId()));
                            long currentTimeMillis9 = System.currentTimeMillis() - currentTimeMillis8;
                            if (currentTimeMillis9 > Constants.LONG_OPERATION_THRESHOLD) {
                                this.log.warn("Unrouted query for channel {} took longer than expected", channelId2, Long.valueOf(currentTimeMillis9));
                                this.log.info("The query took {} ms", Long.valueOf(currentTimeMillis9));
                            }
                            this.engine.getStatisticManager().setDataUnRouted(channelId2, queryForInt2);
                        }
                    } catch (Exception e5) {
                        if (channelRouterContext2 != null) {
                            channelRouterContext2.rollback();
                        }
                        this.log.error(e5.getMessage(), (Throwable) e5);
                        long currentTimeMillis10 = System.currentTimeMillis() - currentTimeMillis;
                        channelRouterContext2.incrementStat(currentTimeMillis10, ChannelRouterContext.STAT_ROUTE_TOTAL_TIME);
                        channelRouterContext2.logStats(this.log, currentTimeMillis10);
                        boolean isRequestGapDetection3 = channelRouterContext2.isRequestGapDetection();
                        channelRouterContext2.cleanup();
                        if (!isRequestGapDetection3) {
                            return 0;
                        }
                        dataGapDetector.beforeRouting();
                        return 0;
                    }
                } catch (Throwable th3) {
                    throw th3;
                }
            }
            long currentTimeMillis11 = System.currentTimeMillis() - currentTimeMillis;
            channelRouterContext2.incrementStat(currentTimeMillis11, ChannelRouterContext.STAT_ROUTE_TOTAL_TIME);
            channelRouterContext2.logStats(this.log, currentTimeMillis11);
            boolean isRequestGapDetection4 = channelRouterContext2.isRequestGapDetection();
            channelRouterContext2.cleanup();
            if (!isRequestGapDetection4) {
                return 0;
            }
            dataGapDetector.beforeRouting();
            return 0;
        } catch (SyntaxParsingException e6) {
            e = e6;
            channelRouterContext2 = channelRouterContext;
            this.log.error(String.format("Failed to route and batch data on '%s' channel due to an invalid router expression", nodeChannel.getChannelId()), (Throwable) e);
            if (channelRouterContext2 != null) {
                channelRouterContext2.rollback();
            }
            if (-1 > 0) {
                try {
                    try {
                        long currentTimeMillis12 = System.currentTimeMillis();
                        this.engine.getDataService().insertDataEvents(channelRouterContext2.getSqlTransaction(), channelRouterContext2.getDataEventList());
                        channelRouterContext2.clearDataEventsList();
                        completeBatchesAndCommit(channelRouterContext2);
                        channelRouterContext2.incrementStat(System.currentTimeMillis() - currentTimeMillis12, ChannelRouterContext.STAT_INSERT_DATA_EVENTS_MS);
                        Data lastDataProcessed3 = channelRouterContext2.getLastDataProcessed();
                        if (lastDataProcessed3 != null && lastDataProcessed3.getDataId() > 0) {
                            String channelId3 = nodeChannel.getChannelId();
                            long currentTimeMillis13 = System.currentTimeMillis();
                            long queryForInt3 = this.sqlTemplate.queryForInt(getSql("selectUnroutedCountForChannelSql"), channelId3, Long.valueOf(lastDataProcessed3.getDataId()));
                            long currentTimeMillis14 = System.currentTimeMillis() - currentTimeMillis13;
                            if (currentTimeMillis14 > Constants.LONG_OPERATION_THRESHOLD) {
                                this.log.warn("Unrouted query for channel {} took longer than expected", channelId3, Long.valueOf(currentTimeMillis14));
                                this.log.info("The query took {} ms", Long.valueOf(currentTimeMillis14));
                            }
                            this.engine.getStatisticManager().setDataUnRouted(channelId3, queryForInt3);
                        }
                    } catch (Exception e7) {
                        if (channelRouterContext2 != null) {
                            channelRouterContext2.rollback();
                        }
                        this.log.error(e7.getMessage(), (Throwable) e7);
                        long currentTimeMillis15 = System.currentTimeMillis() - currentTimeMillis;
                        channelRouterContext2.incrementStat(currentTimeMillis15, ChannelRouterContext.STAT_ROUTE_TOTAL_TIME);
                        channelRouterContext2.logStats(this.log, currentTimeMillis15);
                        boolean isRequestGapDetection5 = channelRouterContext2.isRequestGapDetection();
                        channelRouterContext2.cleanup();
                        if (!isRequestGapDetection5) {
                            return 0;
                        }
                        dataGapDetector.beforeRouting();
                        return 0;
                    }
                } catch (Throwable th4) {
                    long currentTimeMillis16 = System.currentTimeMillis() - currentTimeMillis;
                    channelRouterContext2.incrementStat(currentTimeMillis16, ChannelRouterContext.STAT_ROUTE_TOTAL_TIME);
                    channelRouterContext2.logStats(this.log, currentTimeMillis16);
                    boolean isRequestGapDetection6 = channelRouterContext2.isRequestGapDetection();
                    channelRouterContext2.cleanup();
                    if (isRequestGapDetection6) {
                        dataGapDetector.beforeRouting();
                    }
                    throw th4;
                }
            }
            long currentTimeMillis17 = System.currentTimeMillis() - currentTimeMillis;
            channelRouterContext2.incrementStat(currentTimeMillis17, ChannelRouterContext.STAT_ROUTE_TOTAL_TIME);
            channelRouterContext2.logStats(this.log, currentTimeMillis17);
            boolean isRequestGapDetection7 = channelRouterContext2.isRequestGapDetection();
            channelRouterContext2.cleanup();
            if (!isRequestGapDetection7) {
                return 0;
            }
            dataGapDetector.beforeRouting();
            return 0;
        } catch (Throwable th5) {
            th = th5;
            channelRouterContext2 = channelRouterContext;
            if (-1 > 0) {
                try {
                    try {
                        long currentTimeMillis18 = System.currentTimeMillis();
                        this.engine.getDataService().insertDataEvents(channelRouterContext2.getSqlTransaction(), channelRouterContext2.getDataEventList());
                        channelRouterContext2.clearDataEventsList();
                        completeBatchesAndCommit(channelRouterContext2);
                        channelRouterContext2.incrementStat(System.currentTimeMillis() - currentTimeMillis18, ChannelRouterContext.STAT_INSERT_DATA_EVENTS_MS);
                        Data lastDataProcessed4 = channelRouterContext2.getLastDataProcessed();
                        if (lastDataProcessed4 != null && lastDataProcessed4.getDataId() > 0) {
                            String channelId4 = nodeChannel.getChannelId();
                            long currentTimeMillis19 = System.currentTimeMillis();
                            long queryForInt4 = this.sqlTemplate.queryForInt(getSql("selectUnroutedCountForChannelSql"), channelId4, Long.valueOf(lastDataProcessed4.getDataId()));
                            long currentTimeMillis20 = System.currentTimeMillis() - currentTimeMillis19;
                            if (currentTimeMillis20 > Constants.LONG_OPERATION_THRESHOLD) {
                                this.log.warn("Unrouted query for channel {} took longer than expected", channelId4, Long.valueOf(currentTimeMillis20));
                                this.log.info("The query took {} ms", Long.valueOf(currentTimeMillis20));
                            }
                            this.engine.getStatisticManager().setDataUnRouted(channelId4, queryForInt4);
                        }
                    } catch (Exception e8) {
                        if (channelRouterContext2 != null) {
                            channelRouterContext2.rollback();
                        }
                        this.log.error(e8.getMessage(), (Throwable) e8);
                        long currentTimeMillis21 = System.currentTimeMillis() - currentTimeMillis;
                        channelRouterContext2.incrementStat(currentTimeMillis21, ChannelRouterContext.STAT_ROUTE_TOTAL_TIME);
                        channelRouterContext2.logStats(this.log, currentTimeMillis21);
                        boolean isRequestGapDetection8 = channelRouterContext2.isRequestGapDetection();
                        channelRouterContext2.cleanup();
                        if (isRequestGapDetection8) {
                            dataGapDetector.beforeRouting();
                        }
                        throw th;
                    }
                } finally {
                    long currentTimeMillis22 = System.currentTimeMillis() - currentTimeMillis;
                    channelRouterContext2.incrementStat(currentTimeMillis22, ChannelRouterContext.STAT_ROUTE_TOTAL_TIME);
                    channelRouterContext2.logStats(this.log, currentTimeMillis22);
                    boolean isRequestGapDetection9 = channelRouterContext2.isRequestGapDetection();
                    channelRouterContext2.cleanup();
                    if (isRequestGapDetection9) {
                        dataGapDetector.beforeRouting();
                    }
                }
            }
            long currentTimeMillis23 = System.currentTimeMillis() - currentTimeMillis;
            channelRouterContext2.incrementStat(currentTimeMillis23, ChannelRouterContext.STAT_ROUTE_TOTAL_TIME);
            channelRouterContext2.logStats(this.log, currentTimeMillis23);
            boolean isRequestGapDetection10 = channelRouterContext2.isRequestGapDetection();
            channelRouterContext2.cleanup();
            if (isRequestGapDetection10) {
                dataGapDetector.beforeRouting();
            }
            throw th;
        }
    }

    protected int routeDataForEachChannel(DataGapDetector dataGapDetector) {
        int i = 0;
        Node findIdentity = this.engine.getNodeService().findIdentity();
        ProcessInfo newProcessInfo = this.engine.getStatisticManager().newProcessInfo(new ProcessInfoKey(findIdentity.getNodeId(), null, ProcessInfoKey.ProcessType.ROUTER_JOB));
        newProcessInfo.setStatus(ProcessInfo.Status.PROCESSING);
        try {
            List<NodeChannel> nodeChannels = this.engine.getConfigurationService().getNodeChannels(false);
            Map<String, List<TriggerRouter>> triggerRoutersByChannel = this.engine.getTriggerRouterService().getTriggerRoutersByChannel(this.engine.getParameterService().getNodeGroupId());
            for (NodeChannel nodeChannel : nodeChannels) {
                if (nodeChannel.isEnabled()) {
                    newProcessInfo.setCurrentChannelId(nodeChannel.getChannelId());
                    i += routeDataForChannel(newProcessInfo, nodeChannel, findIdentity, producesCommonBatches(nodeChannel.getChannel(), triggerRoutersByChannel.get(nodeChannel.getChannelId())), dataGapDetector);
                } else if (this.log.isDebugEnabled()) {
                    this.log.debug("Not routing the {} channel.  It is either disabled or suspended.", nodeChannel.getChannelId());
                }
            }
            newProcessInfo.setStatus(ProcessInfo.Status.OK);
            return i;
        } catch (RuntimeException e) {
            newProcessInfo.setStatus(ProcessInfo.Status.ERROR);
            throw e;
        }
    }

    protected int selectDataAndRoute(ProcessInfo processInfo, ChannelRouterContext channelRouterContext) throws InterruptedException {
        Data data;
        IDataToRouteReader startReading = startReading(channelRouterContext);
        int i = 0;
        int i2 = 0;
        int i3 = 0;
        int i4 = 0;
        int i5 = this.parameterService.getInt(ParameterConstants.ROUTING_FLUSH_JDBC_BATCH_SIZE);
        try {
            Data take = startReading.take();
            do {
                if (take != null) {
                    data = take;
                    take = startReading.take();
                    if (data != null) {
                        processInfo.setCurrentTableName(data.getTableName());
                        processInfo.incrementCurrentDataCount();
                        boolean z = false;
                        if (take != null) {
                            String transactionId = take.getTransactionId();
                            z = transactionId == null || !transactionId.equals(data.getTransactionId());
                        }
                        channelRouterContext.setEncountedTransactionBoundary(z);
                        i3++;
                        i++;
                        int routeData = routeData(processInfo, data, channelRouterContext);
                        i4 += routeData;
                        i2 += routeData;
                        long currentTimeMillis = System.currentTimeMillis();
                        try {
                            if (i5 <= channelRouterContext.getDataEventList().size() || channelRouterContext.isNeedsCommitted()) {
                                this.engine.getDataService().insertDataEvents(channelRouterContext.getSqlTransaction(), channelRouterContext.getDataEventList());
                                channelRouterContext.clearDataEventsList();
                            }
                            if (channelRouterContext.isNeedsCommitted()) {
                                completeBatchesAndCommit(channelRouterContext);
                            }
                            channelRouterContext.setLastDataProcessed(data);
                        } finally {
                            channelRouterContext.incrementStat(System.currentTimeMillis() - currentTimeMillis, ChannelRouterContext.STAT_INSERT_DATA_EVENTS_MS);
                            if (i3 > 1024) {
                                this.engine.getStatisticManager().incrementDataRouted(channelRouterContext.getChannel().getChannelId(), i3);
                                i3 = 0;
                                this.engine.getStatisticManager().incrementDataEventInserted(channelRouterContext.getChannel().getChannelId(), i4);
                                i4 = 0;
                            }
                        }
                    }
                } else {
                    data = null;
                }
            } while (data != null);
            channelRouterContext.incrementStat(i, ChannelRouterContext.STAT_DATA_ROUTED_COUNT);
            return i2;
        } finally {
            startReading.setReading(false);
            if (i3 > 0) {
                this.engine.getStatisticManager().incrementDataRouted(channelRouterContext.getChannel().getChannelId(), i3);
            }
            if (i4 > 0) {
                this.engine.getStatisticManager().incrementDataEventInserted(channelRouterContext.getChannel().getChannelId(), i4);
            }
        }
    }

    protected void sendReverseInitialLoad() {
        INodeService nodeService = this.engine.getNodeService();
        boolean z = false;
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(nodeService.findTargetNodesFor(NodeGroupLinkAction.P));
        arrayList.addAll(nodeService.findTargetNodesFor(NodeGroupLinkAction.W));
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            this.engine.getDataService().insertReloadEvents((Node) it.next(), true);
            z = true;
        }
        if (z) {
            return;
        }
        this.log.info("{} was enabled but no nodes were linked to load", ParameterConstants.AUTO_RELOAD_REVERSE_ENABLED);
    }

    @Override // org.jumpmind.symmetric.service.IRouterService
    public boolean shouldDataBeRouted(SimpleRouterContext simpleRouterContext, DataMetaData dataMetaData, Node node, boolean z, boolean z2, TriggerRouter triggerRouter) {
        IDataRouter dataRouter = getDataRouter(dataMetaData.getRouter());
        HashSet hashSet = new HashSet(1);
        hashSet.add(node);
        Set<String> routeToNodes = dataRouter.routeToNodes(simpleRouterContext, dataMetaData, hashSet, z, z2, triggerRouter);
        return routeToNodes != null && routeToNodes.contains(node.getNodeId());
    }

    protected IDataToRouteReader startReading(ChannelRouterContext channelRouterContext) {
        DataGapRouteReader dataGapRouteReader = new DataGapRouteReader(channelRouterContext, this.engine);
        if (this.parameterService.is(ParameterConstants.SYNCHRONIZE_ALL_JOBS)) {
            dataGapRouteReader.run();
        } else {
            if (this.readThread == null) {
                this.readThread = Executors.newCachedThreadPool(new ThreadFactory() { // from class: org.jumpmind.symmetric.service.impl.RouterService.1
                    final String namePrefix;
                    final AtomicInteger threadNumber = new AtomicInteger(1);

                    {
                        this.namePrefix = RouterService.this.parameterService.getEngineName().toLowerCase() + "-router-reader-";
                    }

                    @Override // java.util.concurrent.ThreadFactory
                    public Thread newThread(Runnable runnable) {
                        Thread thread = new Thread(runnable);
                        thread.setName(this.namePrefix + this.threadNumber.getAndIncrement());
                        if (thread.isDaemon()) {
                            thread.setDaemon(false);
                        }
                        if (thread.getPriority() != 5) {
                            thread.setPriority(5);
                        }
                        return thread;
                    }
                });
            }
            this.readThread.execute(dataGapRouteReader);
        }
        return dataGapRouteReader;
    }

    @Override // org.jumpmind.symmetric.service.IRouterService
    public synchronized void stop() {
        if (this.readThread != null) {
            try {
                this.log.info("RouterService is shutting down");
                this.readThread.shutdown();
                this.readThread = null;
            } catch (Exception e) {
                this.log.error(e.getMessage(), (Throwable) e);
            }
        }
    }
}
