package org.jumpmind.symmetric.route;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.jumpmind.db.sql.ISqlReadCursor;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.ISqlTemplate;
import org.jumpmind.db.sql.Row;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.DataGap;
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.util.AppUtils;
import org.jumpmind.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: classes.dex */
public class DataGapRouteReader implements IDataToRouteReader {
    protected ChannelRouterContext context;
    protected DataGap currentGap;
    protected List<DataGap> dataGaps;
    protected BlockingQueue<Data> dataQueue;
    protected ISymmetricEngine engine;
    protected int peekAheadCount;
    protected boolean reading = true;
    protected int takeTimeout;
    protected static final Logger log = LoggerFactory.getLogger(DataGapRouteReader.class);
    protected static Map<String, Boolean> lastSelectUsedGreaterThanQueryByEngineName = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes.dex */
    public class EOD extends Data {
        private static final long serialVersionUID = 1;

        EOD() {
        }
    }

    public DataGapRouteReader(ChannelRouterContext channelRouterContext, ISymmetricEngine iSymmetricEngine) {
        this.peekAheadCount = 1000;
        this.engine = iSymmetricEngine;
        IParameterService parameterService = iSymmetricEngine.getParameterService();
        this.peekAheadCount = parameterService.getInt(ParameterConstants.ROUTING_PEEK_AHEAD_WINDOW);
        this.takeTimeout = iSymmetricEngine.getParameterService().getInt(ParameterConstants.ROUTING_WAIT_FOR_DATA_TIMEOUT_SECONDS, 330);
        if (parameterService.is(ParameterConstants.SYNCHRONIZE_ALL_JOBS)) {
            this.dataQueue = new LinkedBlockingQueue();
        } else {
            this.dataQueue = new LinkedBlockingQueue(this.peekAheadCount);
        }
        this.context = channelRouterContext;
        String engineName = parameterService.getEngineName();
        if (lastSelectUsedGreaterThanQueryByEngineName.get(engineName) == null) {
            lastSelectUsedGreaterThanQueryByEngineName.put(engineName, Boolean.FALSE);
        }
    }

    protected void copyToQueue(Data data) {
        long currentTimeMillis = System.currentTimeMillis();
        while (!this.dataQueue.offer(data) && this.reading) {
            AppUtils.sleep(50L);
        }
        this.context.incrementStat(System.currentTimeMillis() - currentTimeMillis, ChannelRouterContext.STAT_ENQUEUE_DATA_MS);
    }

    protected void execute() {
        ISymmetricDialect symmetricDialect = this.engine.getSymmetricDialect();
        ISqlReadCursor<Data> iSqlReadCursor = null;
        ProcessInfo newProcessInfo = this.engine.getStatisticManager().newProcessInfo(new ProcessInfoKey(this.engine.getNodeService().findIdentityNodeId(), null, ProcessInfoKey.ProcessType.ROUTER_READER));
        newProcessInfo.setCurrentChannelId(this.context.getChannel().getChannelId());
        int i = 0;
        try {
            try {
                long maxDataToRoute = this.context.getChannel().getMaxDataToRoute();
                String str = null;
                ArrayList arrayList = new ArrayList(this.peekAheadCount);
                boolean z = this.context.getChannel().getBatchAlgorithm().equals("nontransactional") || !symmetricDialect.supportsTransactionId();
                newProcessInfo.setStatus(ProcessInfo.Status.QUERYING);
                iSqlReadCursor = prepareCursor();
                newProcessInfo.setStatus(ProcessInfo.Status.EXTRACTING);
                boolean z2 = true;
                while (true) {
                    if (i > maxDataToRoute && str == null) {
                        break;
                    }
                    if (z2) {
                        z2 = fillPeekAheadQueue(arrayList, this.peekAheadCount, iSqlReadCursor);
                    }
                    if ((str == null || z) && arrayList.size() > 0) {
                        Data remove = arrayList.remove(0);
                        copyToQueue(remove);
                        i++;
                        newProcessInfo.incrementCurrentDataCount();
                        newProcessInfo.setCurrentTableName(remove.getTableName());
                        str = remove.getTransactionId();
                        this.context.addTransaction(str);
                    } else if (str != null && arrayList.size() > 0) {
                        Iterator<Data> it = arrayList.iterator();
                        int i2 = 0;
                        while (it.hasNext()) {
                            Data next = it.next();
                            if (str.equals(next.getTransactionId())) {
                                i2++;
                                it.remove();
                                copyToQueue(next);
                                i++;
                                newProcessInfo.incrementCurrentDataCount();
                                newProcessInfo.setCurrentTableName(next.getTableName());
                            } else {
                                this.context.addTransaction(next.getTransactionId());
                            }
                        }
                        if (i2 == 0) {
                            str = null;
                        }
                    } else if (arrayList.size() == 0) {
                        break;
                    }
                }
                newProcessInfo.setStatus(ProcessInfo.Status.OK);
                if (iSqlReadCursor != null) {
                    iSqlReadCursor.close();
                }
                this.reading = false;
                copyToQueue(new EOD());
            } catch (Throwable th) {
                newProcessInfo.setStatus(ProcessInfo.Status.ERROR);
                log.error(th.getMessage(), th);
                if (iSqlReadCursor != null) {
                    iSqlReadCursor.close();
                }
                this.reading = false;
                copyToQueue(new EOD());
            }
        } catch (Throwable th2) {
            if (iSqlReadCursor != null) {
                iSqlReadCursor.close();
            }
            this.reading = false;
            copyToQueue(new EOD());
            throw th2;
        }
    }

    protected boolean fillPeekAheadQueue(List<Data> list, int i, ISqlReadCursor<Data> iSqlReadCursor) throws SQLException {
        boolean z = true;
        int i2 = 0;
        long currentTimeMillis = System.currentTimeMillis();
        boolean z2 = this.context.getStartDataId() == 0;
        while (true) {
            if (!this.reading || i2 >= i) {
                break;
            }
            Data next = iSqlReadCursor.next();
            if (next == null) {
                z = false;
                break;
            }
            if (process(next)) {
                list.add(next);
                i2++;
                this.context.incrementStat(System.currentTimeMillis() - currentTimeMillis, ChannelRouterContext.STAT_READ_DATA_MS);
            } else {
                this.context.incrementStat(System.currentTimeMillis() - currentTimeMillis, ChannelRouterContext.STAT_REREAD_DATA_MS);
            }
            if (z2) {
                this.context.setStartDataId(next.getDataId());
                z2 = false;
            }
            this.context.setEndDataId(next.getDataId());
            currentTimeMillis = System.currentTimeMillis();
        }
        this.context.incrementDataReadCount(i2);
        this.context.incrementPeekAheadFillCount(1L);
        return z;
    }

    public BlockingQueue<Data> getDataQueue() {
        return this.dataQueue;
    }

    protected String getSql(String str, Channel channel) {
        String sql = this.engine.getRouterService().getSql(str);
        if (!channel.isUseOldDataToRoute()) {
            sql = sql.replace("d.old_data", "''");
        }
        if (!channel.isUseRowDataToRoute()) {
            sql = sql.replace("d.row_data", "''");
        }
        if (!channel.isUsePkDataToRoute()) {
            sql = sql.replace("d.pk_data", "''");
        }
        return this.engine.getSymmetricDialect().massageDataExtractionSql(sql, channel);
    }

    @Override // org.jumpmind.symmetric.route.IDataToRouteReader
    public boolean isReading() {
        return this.reading;
    }

    protected ISqlReadCursor<Data> prepareCursor() {
        String qualifyUsingDataGaps;
        Object[] objArr;
        int[] iArr;
        IParameterService parameterService = this.engine.getParameterService();
        int i = parameterService.getInt(ParameterConstants.ROUTING_MAX_GAPS_TO_QUALIFY_IN_SQL, 100);
        int i2 = parameterService.getInt(ParameterConstants.ROUTING_DATA_READER_THRESHOLD_GAPS_TO_USE_GREATER_QUERY, 100);
        this.dataGaps = this.engine.getDataService().findDataGaps();
        if (this.dataGaps != null) {
            this.context.setDataGaps(new ArrayList(this.dataGaps));
        }
        boolean z = false;
        if (i2 > 0 && this.dataGaps.size() > i2) {
            z = true;
        }
        String channelId = this.context.getChannel().getChannelId();
        Boolean bool = lastSelectUsedGreaterThanQueryByEngineName.get(parameterService.getEngineName());
        if (bool == null) {
            bool = Boolean.FALSE;
        }
        if (z) {
            qualifyUsingDataGaps = getSql("selectDataUsingStartDataId", this.context.getChannel().getChannel());
            if (!bool.booleanValue()) {
                log.info("Switching to select from the data table where data_id >= start gap because there were {} gaps found which was more than the configured threshold of {}", Integer.valueOf(this.dataGaps.size()), Integer.valueOf(i2));
                lastSelectUsedGreaterThanQueryByEngineName.put(parameterService.getEngineName(), Boolean.TRUE);
            }
        } else {
            qualifyUsingDataGaps = qualifyUsingDataGaps(this.dataGaps, i, getSql("selectDataUsingGapsSql", this.context.getChannel().getChannel()));
            if (bool.booleanValue()) {
                log.info("Switching to select from the data table where data_id between gaps");
                lastSelectUsedGreaterThanQueryByEngineName.put(parameterService.getEngineName(), Boolean.FALSE);
            }
        }
        if (parameterService.is(ParameterConstants.ROUTING_DATA_READER_ORDER_BY_DATA_ID_ENABLED, true)) {
            qualifyUsingDataGaps = qualifyUsingDataGaps + this.engine.getRouterService().getSql("orderByDataId");
        }
        ISqlTemplate sqlTemplate = this.engine.getSymmetricDialect().getPlatform().getSqlTemplate();
        int sqlTypeForIds = this.engine.getSymmetricDialect().getSqlTypeForIds();
        if (z) {
            objArr = new Object[]{channelId, Long.valueOf(this.dataGaps.get(0).getStartId())};
            iArr = new int[]{12, sqlTypeForIds};
        } else {
            int size = ((i < this.dataGaps.size() ? i : this.dataGaps.size()) * 2) + 1;
            objArr = new Object[size];
            iArr = new int[size];
            objArr[0] = channelId;
            iArr[0] = 12;
            for (int i3 = 0; i3 < i && i3 < this.dataGaps.size(); i3++) {
                DataGap dataGap = this.dataGaps.get(i3);
                objArr[(i3 * 2) + 1] = Long.valueOf(dataGap.getStartId());
                iArr[(i3 * 2) + 1] = sqlTypeForIds;
                if (i3 + 1 != i || i3 + 1 >= this.dataGaps.size()) {
                    objArr[(i3 * 2) + 2] = Long.valueOf(dataGap.getEndId());
                } else {
                    objArr[(i3 * 2) + 2] = Long.valueOf(this.dataGaps.get(this.dataGaps.size() - 1).getEndId());
                }
                iArr[(i3 * 2) + 2] = sqlTypeForIds;
            }
        }
        this.currentGap = this.dataGaps.remove(0);
        return sqlTemplate.queryForCursor(qualifyUsingDataGaps, new ISqlRowMapper<Data>() { // from class: org.jumpmind.symmetric.route.DataGapRouteReader.1
            @Override // org.jumpmind.db.sql.ISqlRowMapper
            public Data mapRow(Row row) {
                return DataGapRouteReader.this.engine.getDataService().mapData(row);
            }
        }, objArr, iArr);
    }

    protected boolean process(Data data) {
        long dataId = data.getDataId();
        boolean z = false;
        while (!z && this.currentGap != null && dataId >= this.currentGap.getStartId()) {
            if (dataId <= this.currentGap.getEndId()) {
                z = true;
            } else if (this.dataGaps.size() > 0) {
                this.currentGap = this.dataGaps.remove(0);
            } else {
                this.currentGap = null;
            }
        }
        return z;
    }

    protected String qualifyUsingDataGaps(List<DataGap> list, int i, String str) {
        StringBuilder sb = new StringBuilder();
        for (int i2 = 0; i2 < i && i2 < list.size(); i2++) {
            if (i2 == 0) {
                sb.append(" and (");
            } else {
                sb.append(" or ");
            }
            sb.append("(d.data_id between ? and ?)");
        }
        sb.append(")");
        return FormatUtils.replace("dataRange", sb.toString(), str);
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            execute();
        } catch (Throwable th) {
            log.error(th.getMessage(), th);
        }
    }

    @Override // org.jumpmind.symmetric.route.IDataToRouteReader
    public void setReading(boolean z) {
        this.reading = z;
    }

    @Override // org.jumpmind.symmetric.route.IDataToRouteReader
    public Data take() throws InterruptedException {
        Data poll;
        do {
            poll = this.dataQueue.poll(this.takeTimeout, TimeUnit.SECONDS);
            if (poll == null && !this.reading) {
                throw new SymmetricException("The read of the data to route queue has timed out", new Object[0]);
            }
            if (poll instanceof EOD) {
                poll = null;
            }
            if (poll != null) {
                break;
            }
        } while (this.reading);
        return poll;
    }
}
