statusCode
attribute of the active CronJobHistory
item with a message similar to the following one:9,456,049 item(s) indexed within 14m 23s (10,957 items/s), 0 error(s)
IndexerListener
and IndexerBatchListener
interfaces to intercept the batch end events and report the number of items being indexed in a given period. The hard part is that the batches are not connected to the indexing job as they run in their own session and in a different thread. Moreover, the job is not exposed to the batches. The only information connecting the indexing job to the batches is the indexing operation identifier. The logic to implement is consequently the following:AtomicInteger
and AtomicLong
classes to prevent issues with concurrent updates. First, I needed a container holding the indexing statistics for an indexing job.public class SolrIndexerCronJobStatistics {
private final SolrIndexerCronJobModel cronJob;
private final AtomicLong totalIndexedItems = new AtomicLong(0);
private final AtomicInteger totalErrors = new AtomicInteger(0);
private Date indexStartTime;
private Date indexEndTime;
SolrIndexerCronJobStatistics(final SolrIndexerCronJobModel cronJob) {
this.cronJob = cronJob;
setIndexStartTime(new Date());
}
public SolrIndexerCronJobModel getCronJob() {
return cronJob;
}
public long addIndexedItems(final int additionalIndexedItems) {
return this.totalIndexedItems.addAndGet(additionalIndexedItems);
}
public int addErrors(final int additionalErrors) {
return this.totalErrors.addAndGet(additionalErrors);
}
public long getTotalIndexedItems() {
return this.totalIndexedItems.get();
}
public int getTotalErrors() {
return this.totalErrors.get();
}
public Date getIndexStartTime() {
return indexStartTime;
}
public void setIndexStartTime(Date indexStartTime) {
this.indexStartTime = indexStartTime;
}
public Date getIndexEndTime() {
return indexEndTime;
}
public void setIndexEndTime(Date indexEndTime) {
this.indexEndTime = indexEndTime;
}
}
statusCode
attribute of the active CronJobHistory
item. Note that the modifiedTime
attribute is updated, which traditionally is useless since SAP Commerce overwrites it when modelService.save()
is executed. The reason for this odd line is to prevent parallel updates of the CronJobHistory
item. Under high parallelism, batches will very likely ends at the same time and it can take couple milliseconds to save the CronJobHistory
in the database and update the modifiedTime
attribute.public class SolrIndexerCronJobListener implements IndexerListener, IndexerBatchListener {
private static final Logger LOG = LogManager.getLogger(SolrIndexerCronJobListener.class);
private ModelService modelService;
private SessionService sessionService;
private final Map<Long, SolrIndexerCronJobStatistics> solrIndexerCronJobStatistics = new ConcurrentHashMap<>();
private long updateInterval = 1000 * 60 * 1;
@Override
public void beforeBatch(final IndexerBatchContext ctx) throws IndexerException {
// Nothing to do
}
@Override
public void afterBatch(final IndexerBatchContext ctx) throws IndexerException {
final int countIndexedItems = ctx.getItems().size();
updateSolrIndexerCronJobStatistics(ctx, countIndexedItems, 0);
}
protected void updateSolrIndexerCronJobStatistics(final IndexerBatchContext ctx, final int additionalIndexedItems, final int additionalErrors) {
final long indexOperationId = ctx.getIndexOperationId();
final SolrIndexerCronJobStatistics stats = solrIndexerCronJobStatistics.get(indexOperationId);
if (stats != null) {
stats.addIndexedItems(additionalIndexedItems);
stats.addErrors(additionalErrors);
updateSolrIndexerCronJobHistory(stats, false);
}
}
protected void updateSolrIndexerCronJobHistory(final SolrIndexerCronJobStatistics stats, final boolean force) {
final CronJobHistoryModel cronJobHistory = stats.getCronJob().getActiveCronJobHistory();
if (cronJobHistory != null) {
if (force || needsUpdate(cronJobHistory)) {
cronJobHistory.setModifiedtime(new Date());
final String statusLine = generateStatusLine(stats);
if (LOG.isInfoEnabled()) {
LOG.info("(" + stats.getCronJob().getCode() + ") " + statusLine);
}
cronJobHistory.setStatusLine(statusLine);
getModelService().save(cronJobHistory);
}
}
}
protected String generateStatusLine(final SolrIndexerCronJobStatistics stats) {
final long totalIndexedItems = stats.getTotalIndexedItems();
final int totalIndexErrors = stats.getTotalErrors();
final long totalIndexDuration =
(stats.getIndexEndTime() == null ? System.currentTimeMillis() : stats.getIndexEndTime().getTime())
- (stats.getIndexStartTime() == null ? stats.getCronJob().getStartTime() : stats.getIndexStartTime()).getTime();
final NumberFormat nf = NumberFormat.getIntegerInstance(Locale.ENGLISH);
return String.format("%s item(s) indexed within %s (%s items/s), %s error(s)",
nf.format(totalIndexedItems),
totalIndexDuration > 0 ? Duration.ofMillis(totalIndexDuration - (totalIndexDuration % 1000)).toString().substring(2).toLowerCase(Locale.ROOT) : "-",
nf.format((double) totalIndexedItems / (double)(totalIndexDuration / 1000)), nf.format(totalIndexErrors));
}
protected boolean needsUpdate(final CronJobHistoryModel cronJobHistory) {
return cronJobHistory.getModifiedtime().before(new Date(System.currentTimeMillis() - getUpdateInterval()));
}
@Override
public void afterBatchError(final IndexerBatchContext ctx) throws IndexerException {
updateSolrIndexerCronJobStatistics(ctx, 0, 1);
}
@Override
public void beforeIndex(final IndexerContext ctx) throws IndexerException {
hookSolrIndexerCronJob(ctx);
}
@Override
public void afterIndex(final IndexerContext ctx) throws IndexerException {
unhookSolrIndexerCronJob(ctx);
}
@Override
public void afterIndexError(final IndexerContext ctx) throws IndexerException {
unhookSolrIndexerCronJob(ctx);
}
protected void hookSolrIndexerCronJob(final IndexerContext ctx) {
final Object currentCronJob = getSessionService().getAttribute("currentCronJob");
if (currentCronJob instanceof SolrIndexerCronJobModel) {
final long indexOperationId = ctx.getIndexOperationId();
solrIndexerCronJobStatistics.put(indexOperationId, new SolrIndexerCronJobStatistics((SolrIndexerCronJobModel) currentCronJob));
}
}
protected void unhookSolrIndexerCronJob(final IndexerContext ctx) {
final Date indexEndTime = new Date();
final long indexOperationId = ctx.getIndexOperationId();
final SolrIndexerCronJobStatistics stats = solrIndexerCronJobStatistics.remove(indexOperationId);
if (stats != null) {
stats.setIndexEndTime(indexEndTime);
updateSolrIndexerCronJobHistory(stats, true);
}
}
protected ModelService getModelService() {
return modelService;
}
@Resource
public void setModelService(ModelService modelService) {
this.modelService = modelService;
}
protected SessionService getSessionService() {
return sessionService;
}
@Resource
public void setSessionService(SessionService sessionService) {
this.sessionService = sessionService;
}
public long getUpdateInterval() {
return updateInterval;
}
public void setUpdateInterval(long updateInterval) {
this.updateInterval = updateInterval;
}
}
<bean id="solrIndexerCronJobListener" class="com.discounttire.core.search.solrfacetsearch.indexer.listeners.SolrIndexerCronJobListener">
<property name="updateInterval" value="#{configurationService.configuration.getInt('solr.indexer.cronjob.status.updateInterval', 60000)}"/>
</bean>
<bean id="solrIndexerCronJobListenerDefinition" parent="solrListenerDefinition">
<property name="priority" value="2000"/>
<property name="listener" ref="solrIndexerCronJobListener"/>
</bean>
CronJobHistory
are not synchronized to not slow down operations. Even though modifiedTime
is updated manually, it can still happen that batches end at the same time and detect at the same time that CronJobHistory
should be updated.You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
User | Count |
---|---|
7 | |
4 | |
3 | |
2 | |
2 | |
1 | |
1 | |
1 | |
1 | |
1 |