ValueResolver
instead of FieldValueProvider
interface, use dedicated DAO to retrieve data instead of accessing data via Model API, pre-load data for all batch items to index, etc. These optimizations definitely reduced significantly the indexing time but the indexing job did not scale: the more items were indexed, the more time it took to index one item. I tracked the indexing speed of the jobs (see my blog post for more details) and found out that the speed dropped over time. The reason was again too many queries being executed against the database, except that this time the queries originated from the caches, not the indexing logic. The second set of customizations was focused on actively managing caches to avoid them executing queries. Once all optimizations were in place, the Solr indexing job ran with high and constant indexing speed from start to finish.ValueResolver
or FieldValueProvider
implementation should load all data required for indexing with one query. The approach for this customization is to implement the following logic in the AbstractValueResolver.loadData()
method:ValueResolver
implementation for the current batch. All items to be indexed by the current batch can be retrieved via IndexerBatchContext.getItems()
method.IndexerBatchContext
attributesIndexerBatchContext
attributeFieldValueProvider
implementations to ValueResolver
as it is otherwise not possible to implement the logic described above.AbstractValueResolver
class as following to ease implementation of bulk loading in your ValueResolver
implementations.abstract class AbstractBulkValueResolver<T extends ItemModel, M, Q, K> extends AbstractValueResolver<T, M, Q> {
private final String attribute;
public AbstractBulkValueResolver(final String attribute) {
super();
this.attribute = attribute;
}
@Override
protected M loadData(final IndexerBatchContext batchContext, final Collection<IndexedProperty> indexedProperties,
final T model) throws FieldValueProviderException {
final Map<K, M> allData = (Map<K, M>)batchContext.getAttributes()
.computeIfAbsent(attribute, (attribute) -> loadAllData(batchContext, indexedProperties));
final K key = getKey(model);
return allData.get(key);
}
abstract protected Map<K, M> loadAllData(final IndexerBatchContext batchContext,
final Collection<IndexedProperty> indexedProperties);
abstract protected K getKey(T model);
}
ValueResolver
implementation like shown in the example below.class ProductWarehouseCodeValueResolver extends AbstractBulkValueResolver<ProductModel, List<String>, List<String>, PK> {
private FlexibleSearchService flexibleSearchService;
public ProductWarehouseCodeValueResolver() {
super("productWarehouseCodes");
}
protected Map<PK, List<String>> loadAllData(final IndexerBatchContext batchContext,
final Collection<IndexedProperty> indexedProperties)
throws FieldValueProviderException {
final FlexibleSearchQuery fQuery = new FlexibleSearchQuery(
"SELECT {p.PK}, {w.code} FROM {Product AS p INNER JOIN StockLevel AS s ON {s.productCode} = {p.code} " +
"INNER JOIN Warehouse AS w ON {w.pk} = {s.warehouse}} WHERE {pk.pk} IN (?products)"
);
fQuery.addQueryParameter("products",
batchContext.getItems().stream().map(ItemModel::getPk).collect(Collectors.toList()));
fQuery.setDisableCaching(true);
fQuery.setResultClassList(asList(PK.class, String.class));
return flexibleSearchService.<List<Object>>search(fQuery).getResult().stream().collect(
Collectors.groupingBy(row -> (PK) row.get(0), Collectors.mapping(row -> (String) row.get(1), Collectors.toList())));
}
protected PK getKey(final ProductModel product) {
return product.getPk();
}
[...]
}
IN
clause of the query. Certain databases like Oracle limit the maximum number of parameters in the IN
clause to 1,000. Other databases do not limit but suffer from performance degradation when the number of parameters exceed certain values.prefetchSize
and is set to 100 by default. For example, if you query 500 Product
items, it will perform in background 5 queries fetching data for 100 Product
items each time. Generally, fetching 500 items at once takes sightly longer than fetching 100 items but significantly less than 5 times.prefetchSize
should be the same as the Solr index batch size during indexing to reduce the number of queries performed by the system. You can achieve that by extending the DefaultIndexerWorker
as shown below.public class MyIndexerWorker extends DefaultIndexerWorker {
private IndexerWorkerParameters workerParameters;
@Override
public void initialize(final IndexerWorkerParameters workerParameters) {
super.initialize(workerParameters);
this.workerParameters = workerParameters;
}
@Override
protected void initializeSession() {
super.initializeSession();
final int prefetchSize = this.workerParameters.getPks().size();
getSessionService().setAttribute(FlexibleSearch.PREFETCH_SIZE, prefetchSize);
}
}
ValueResolver
implementations query items and you know that the query result size will be significantly greater than the Solr index batch size, then you might want to adjust the prefetch size there as well to reduce even more the number of executed queries like in the example below.abstract class AbstractBulkValueResolver<T extends ItemModel, M, Q, K> extends AbstractValueResolver<T, M, Q> {
private SessionService sessionService;
protected <T> T executeWithPrefetchSize(final int prefetchSize, final Supplier<T> executable) {
final int currentPrefetchSize = getSessionService().getAttribute(FlexibleSearch.PREFETCH_SIZE);
final boolean adjustPrefetchSize = currentPrefetchSize != prefetchSize;
if (adjustPrefetchSize) {
getSessionService().setAttribute(FlexibleSearch.PREFETCH_SIZE, prefetchSize);
}
try {
return executable.get();
} finally {
if (adjustPrefetchSize) {
getSessionService().setAttribute(FlexibleSearch.PREFETCH_SIZE, currentPrefetchSize);
}
}
}
[...]
}
class ProductStockLevelValueResolver<P extends ProductModel> extends AbstractBulkValueResolver<P, List<StockLevelModel>, List<StockLevelModel>, String> {
protected Map<String, List<StockLevelModel>> loadAllData(final IndexerBatchContext batchContext,
final Collection<IndexedProperty> indexedProperties)
throws FieldValueProviderException {
// Assume that a product will have in average 3 stock level items associated
final List<StockLevelModel> stockLevels = executeWithPrefetchSize(batchContext.getItems().size() * 3, () -> {
final FlexibleSearchQuery fQuery = new FlexibleSearchQuery(
"SELECT {s.PK} FROM {StockLevel AS s} WHERE {s.productCode} IN (?productCodes)"
);
fQuery.addQueryParameter("productCodes",
batchContext.getItems().stream()
.map(ProductModel.class::cast)
.map(ProductModel::getCode)
.collect(Collectors.toList()));
fQuery.setDisableCaching(true);
return flexibleSearchService.<StockLevelModel>search(fQuery).getResult();
});
return stockLevels.stream().collect(Collectors.groupingBy(StockLevelModel::getProductCode));
}
[...]
}
SELECT * FROM <table> WHERE PK IN (?, ?, ...)
. The items with their data are cached in entity cache, while the batch query fetching the item data is cached in the query cache, even when caching is disabled in the Flexible Search query. When a cache hits its maximum size, it evicts items to make room to cache new items. If an evicted item is needed, the system will fetch again the data from the database executing SELECT * FROM <table> WHERE PK = ?
and will cache the item, causing potentially the eviction of another item, if the cache is full.public class MyIndexerBatchStrategy extends DefaultIndexerBatchStrategy {
public static final String PARAM_DISABLE_CACHING = "disableCaching";
private static final Logger LOG = LogManager.getLogger(MyIndexerBatchStrategy.class);
private GenerationalCacheDelegate generationalCacheDelegate;
@Override
public void execute() throws InterruptedException, IndexerException {
try {
super.execute();
} finally {
if (shouldRemoveIndexedItemsFromCache()) {
removeIndexedItemsFromCache();
}
}
}
protected boolean shouldRemoveIndexedItemsFromCache() {
if (getIndexOperation() != IndexOperation.DELETE) {
final IndexedType indexedType = this.getIndexedType();
if (indexedType != null) {
return Boolean.valueOf(indexedType.getAdditionalParameters().get(PARAM_DISABLE_CACHING));
}
}
return false;
}
protected void removeIndexedItemsFromCache() {
final long startTimestamp = System.currentTimeMillis();
final Cache cache = Registry.getCurrentTenantNoFallback().getCache();
final Object[] jaloItemCacheKey = new Object[]{Cache.CACHEKEY_JALOITEMCACHE, null};
final Object[] entityCacheKey = new Object[]{Cache.CACHEKEY_HJMP, Cache.CACHEKEY_ENTITY, null, null};
long evictedUnits = 0;
for (final PK pk : this.getPks()) {
entityCacheKey[2] = pk.getTypeCodeAsString();
entityCacheKey[3] = pk;
final AbstractCacheUnit entityCacheUnit = cache.getUnit(new AnonymousCacheUnit(cache, entityCacheKey));
if (entityCacheUnit != null) {
cache.removeUnit(entityCacheUnit);
evictedUnits++;
}
jaloItemCacheKey[1] = pk;
final AbstractCacheUnit jaloItemCacheUnit = cache.getUnit(new AnonymousCacheUnit(cache, jaloItemCacheKey));
if (jaloItemCacheUnit != null) {
cache.removeUnit(jaloItemCacheUnit);
evictedUnits++;
}
}
final Object[] ejbFindByPkListCacheKey = new Object[] { Cache.CACHEKEY_HJMP, Cache.CACHEKEY_FIND,
this.getPks().iterator().next().getTypeCodeAsString(), "ejbFindByPKList",
Collections.unmodifiableList(Arrays.asList(new HashSet<>(this.getPks()))) };
final AbstractCacheUnit ejbFindByPkListCacheUnit = cache.getUnit(new AnonymousCacheUnit(cache, ejbFindByPkListCacheKey){
@Override
public CacheKey getKey() {
if (this.cacheKey == null) {
final Object[] keyAsArray = this.getKeyAsArray();
this.cacheKey = getGenerationalCacheDelegate().getGenerationalCacheKey(
new FinderResult.FinderResultCacheKey((String) keyAsArray[2], keyAsArray, cache.getTenantId())
);
}
return this.cacheKey;
}
});
if (ejbFindByPkListCacheUnit != null) {
cache.removeUnit(ejbFindByPkListCacheUnit);
evictedUnits++;
}
final long endTimestamp = System.currentTimeMillis();
LOG.debug("Removed {} item(s) from cache within {}ms", evictedUnits, endTimestamp - startTimestamp);
}
protected GenerationalCacheDelegate getGenerationalCacheDelegate() {
return generationalCacheDelegate;
}
@Resource
public void setGenerationalCacheDelegate(GenerationalCacheDelegate generationalCacheDelegate) {
this.generationalCacheDelegate = generationalCacheDelegate;
}
}
ValueResolver
implementations during the indexing. Secondly, you should create an additional parameter named disableCaching
and set to true
in your index configuration in order to enable the logic. As it is configurable per index, you could for example decide to clean cache for all Solr indexing jobs except the product catalog one as keeping products in cache is critical for your performances.ValueResolver
implementations will perform queries causing the entity or query caches to be full. The trick is to query the item data rather than the items themselves. For example, if you need to fetch the StockLevel
items associated to the indexed products to index the available stock quantities, you should query directly the attribute available
along with the product key rather than the StockLevel
primary key.protected Map<String, List<StockLevelModel>> loadAllData(final IndexerBatchContext batchContext,
final Collection<IndexedProperty> indexedProperties)
throws FieldValueProviderException {
final FlexibleSearchQuery fQuery = new FlexibleSearchQuery(
"SELECT {s.PK} FROM {StockLevel AS s} WHERE {s.productCode} IN (?productCodes)"
);
fQuery.addQueryParameter("productCodes",
batchContext.getItems().stream()
.map(ProductModel.class::cast)
.map(ProductModel::getCode)
.collect(Collectors.toList()));
fQuery.setDisableCaching(true);
return flexibleSearchService.<StockLevelModel>search(fQuery).getResult()
.stream().collect(Collectors.groupingBy(StockLevelModel::getProductCode));
}
protected Map<String, List<Integer>> loadAllData(final IndexerBatchContext batchContext,
final Collection<IndexedProperty> indexedProperties)
throws FieldValueProviderException {
final FlexibleSearchQuery fQuery = new FlexibleSearchQuery(
"SELECT {s.productCode}, {s.available} FROM {StockLevel AS s} WHERE {s.productCode} IN (?productCodes)"
);
fQuery.addQueryParameter("productCodes",
batchContext.getItems().stream()
.map(ProductModel.class::cast)
.map(ProductModel::getCode)
.collect(Collectors.toList()));
fQuery.setDisableCaching(true);
fQuery.setResultClassList(asList(String.class, Integer.class));
return flexibleSearchService.<List<Object>>search(fQuery).getResult()
.stream().collect(Collectors.groupingBy(row -> (String)row.get(0), Collectors.mapping(row -> (Integer) row.get(1), Collectors.toList()));
}
FieldValueProvider
or ValueResolver
implementations developed with a very specific purpose and indexing one specific information. It definitely helps to separate concerns but does not serve always indexing performances.ValueResolver
interface introduced an important but often unnoticed optimization compared to the old FieldValueProvider
interface. It can resolve the values for multiple indexed properties. When multiple indexed properties are configured with the same provider implementing the ValueResolver
interface, the system calls the provider only once per item to index, with the list of indexed properties as parameter. Notice that the SolrIndexedProperty
item offers the valueProviderParameter
and valueProviderParameters
attributes to configure the provider behavior.class ProductStockLevel {
private ProductModel product;
private WarehouseModel warehouse;
private Integer available;
// Getter and setter methods for the POJO
[...]
}
class ProductStockLevelValueResolver<P extends ProductModel> extends AbstractBulkValueResolver<P, List<ProductStockLevel>, List<ProductStockLevel>, String> {
[...]
@Override
protected Map<String, List<ProductStockLevel>> loadAllData(final IndexerBatchContext batchContext,
final Collection<IndexedProperty> indexedProperties)
throws FieldValueProviderException {
// Ensure all warehouses are loaded in the entity cache so that modelService.get() will hit the cache and not the database
loadAllWarehouses();
// Load the stock level information the provider should be able to index (e.g. availability, warehouse)
final FlexibleSearchQuery fQuery = new FlexibleSearchQuery(
"SELECT {s.productCode}, {s.available}, {s.warehouse} FROM {StockLevel AS s} WHERE {s.productCode} IN (?productCodes)"
);
fQuery.addQueryParameter("productCodes",
batchContext.getItems().stream().map(ProductModel.class::cast).map(ProductModel::getCode).collect(Collectors.toList()));
fQuery.setDisableCaching(true);
fQuery.setResultClassList(asList(String.class, Integer.class, PK.class));
return flexibleSearchService.<List<Object>>search(fQuery).getResult().stream().collect(
Collectors.groupingBy(row -> (PK) row.get(0), Collectors.mapping(row -> {
final ProductStockLevel productStock = new ProductStockLevel();
productStock.setWarehouse(modelService.get((PK) row.get(2)));
productStock.setAvailable((Integer) row.get(1));
return productStock;
}, Collectors.toList())));
}
protected void loadAllWarehouses() {
// Fetch all warehouses to add them to the entity cache. Since the query will be cached, it will be executed
// the first time and not the subsequent times (unless evicted from the cache).
final FlexibleSearchQuery fQuery = new FlexibleSearchQuery("SELECT {w.PK} FROM {Warehouse AS w}");
fQuery.setDisableCaching(false);
fQuery.setResultClassList(asList(WarehouseModel.class));
// Iterate through the result to force the lazy list to load all warehouses in the entity cache
flexibleSearchService.<WarehouseModel>search(fQuery).getResult().forEach((warehouse) -> {});
}
@Override
protected String getKey(final ProductModel product) {
return product.getCode();
}
@Override
protected void addFieldValues(final InputDocument inputDocument, final IndexerBatchContext batchContext,
final IndexedProperty indexedProperty, final P product,
final ValueResolverContext<List<ProductStockLevel>, List<ProductStockLevel>> valueResolverContext)
throws FieldValueProviderException {
final List<ProductStockLevel> productStockLevels = valueResolverContext.getData();
if (isNotEmpty(productStockLevels)) {
final Expression expression = getSpelExpressionForIndexedProperty(batchContext, indexedProperty);
inputDocument.addField(indexedProperty, productStockLevels.stream()
.map((productStockLevel) -> expression.getValue(productStockLevel))
.filter(Objects::nonNull)
.collect(Collectors.toList()));
}
}
protected Expression getSpelExpressionForIndexedProperty(final IndexerBatchContext batchContext,
final IndexedProperty indexedProperty) {
// The Spel expression will be the same for all indexed items. Instead of parsing it over and over, it is cached
// in the batch context (ideally, it should be parsed at the very beginning of the indexing process, cached in
// the indexing context and retrieved from there any time needed).
final Map<IndexedProperty, Expression> parsers = (Map<IndexedProperty, Expression>) batchContext.getAttributes().computeIfAbsent("spelExpressions", (key) -> new HashMap<>());
return parsers.computeIfAbsent(indexedProperty, (key) -> {
String expression = key.getValueProviderParameter();
if (isBlank(expression)) {
expression = key.getName();
}
return new SpelExpressionParser().parseExpression(expression);
});
}
[...]
}
INSERT_UPDATE SolrIndexedProperty; id[unique=true]; ... ; valueProvider; valueProviderParameter;
; available ; ... ; productStockLevelValueResolver ; available ;
; warehouseName ; ... ; productStockLevelValueResolver ; warehouse.name ;
solrconfig.xml
file.<config>
[...]
<updateHandler>
[...]
<autoCommit>
<maxDocs>${solr.autoCommit.maxDocs:50000}</maxDocs>
<maxTime>${solr.autoCommit.maxTime:-1}</maxTime>
<openSearcher>false</openSearcher>
</autoCommit>
[...]
</updateHandler>
[...]
</config>
maxDocs
setting and compared the speed of my indexing jobs to pick the best setting. I ended up setting the parameter to 500k, which is the amount of documents added/modified by the indexing job within 30 seconds.You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
User | Count |
---|---|
7 | |
2 | |
1 | |
1 | |
1 | |
1 | |
1 | |
1 | |
1 | |
1 |