Abstract
The aim of this article is to provide some insights about high performance, low latency data read and search capability using
Apache Spark and
Apache Solr for large data sets. We use data-sets provided by
Kaggle as test data for our analysis.
Introduction
For our experiment, we plan to use a data-set on Hotel Reviews, which are stored in a CSV format. There are about 120,000+ rows of data in the CSV file.
The columns in the table are SL, Name, City, Cuisine Style, Ranking, Rating, Price Range, Number of Reviews and URL_TA.
The reviews about hotels from 31 Cities are provided in the file. We use Apache Spark to read the large data volumes and Apache Solr for indexing data records.
Prior knowledge about
Apache Spark and
Apache Solr would be helpful to know more about the actual data processing, but it is not mandatory.
Problem Statement
The aim of this experiment is to design an application which can read large volumes of data, which can then be queried on a near real-time basis to provide analysis. The main aim is to improve read performance and to reduce the latency of the search capability using Apache Solr.
Solution
The entire experiment can be split into 2 main parts. i.e. reading data from the data source and indexing the data for querying.
1.Reading data with Apache Spark
We use the DataFrame API in Spark (available from Spark 2.x) to read the data into memory. For large data sets (in the order of magnitude of GBs and TBs), it is recommended to split the entire data-set into chunks, which can then be stored on the file system for faster processing. Vertical partitioning also helps to avoid Out-of-memory (OOM) errors.
@GetMapping(value = "/splitData")
public void splitData()
{
Dataset<Row> df = SparkInit.getDF();
String[] areas = {"Amsterdam", "Athens", "Barcelona", "Berlin", "Bratislava", "Brussels", "Budapest", "Copenhagen", "Dublin", "Edinburgh", "Geneva", "Hamburg", "Helsinki", "Krakow", "Lisbon", "Ljubljana", "London", "Luxembourg", "Lyon", "Madrid", "Milan", "Munich", "Oporto", "Oslo", "Paris", "Prague", "Rome", "Stockholm", "Vienna", "Warsaw", "Zurich"};
for(String entry: areas)
{
Dataset<Row> tempDf = df.select("*").where(df.col("City").equalTo(entry));
tempDf.coalesce(1).write().mode(SaveMode.Overwrite).option("inferSchema","true").option("header","true").csv("C:\\Work\\Spark\\Test\\DataServices\\DB\\" + entry);
}
}
Once the data is read into Spark DataFrame, the data needs to be fit into a data structure(Beans/POJOs) suited for our use-case. This can then be serialized and returned as responses to web based request from a client or for other advanced analytics.
2. Indexing and Storing data with Apache Solr
Once the modeled data is available, the same can be indexed and stored in Solr. In our experiment, we use Solr both as a database. In actual production environment, Solr can also be used as a data-cache for super-fast CRUD operations. This mechanism can be provided as a micro-service to add/modify data.
@GetMapping(value = "/solrSparkWrite")
public String solrSparkWrite(@RequestParam String location) throws SolrServerException, IOException {
SolrSparkController instance = new SolrSparkController();
Dataset<Row> dfAll = SparkInit.getInstance().read()
.option("inferSchema", "true")
.option("header", "true")
.option("delimiter", ",")
.csv("C:\\Work\\Spark\\Test\\DataServices\\DB\\"+ location + ".csv");
dfAll.persist(StorageLevel.MEMORY_ONLY_2());
instance.maxCount = dfAll.count();
instance.currentCount = 0L;
dfAll.foreach(e -> {
String name = e.getAs("Name") != null? e.getAs("Name").toString() : "nil";
String city = e.getAs("City") != null? e.getAs("City").toString() : "nil";
String ranking = e.getAs("Ranking") != null? e.getAs("Ranking").toString() : "0";
String rating = e.getAs("Rating") != null? e.getAs("Rating").toString() : "0";
String reviewCount = e.getAs("Number of Reviews") != null? e.getAs("Number of Reviews").toString() : "0";
String url = e.getAs("URL_TA") != null? e.getAs("URL_TA").toString() : "/";
instance.writeToDB(new HotelModelBean(name, city, ranking, rating, reviewCount, url));
});
log.info("processing Completed!!!");
return "OK";
}
private void writeToDB(HotelModelBean hotelModelBean) throws SolrServerException, IOException {
String urlString = "http://localhost:8983/solr/spark_hotel";
SolrClient Solr = new HttpSolrClient.Builder(urlString).build();
SolrInputDocument doc = new SolrInputDocument();
doc.addField("name", hotelModelBean.getName());
doc.addField("city", hotelModelBean.getCity());
doc.addField("rank", hotelModelBean.getRank());
doc.addField("rating", hotelModelBean.getRating());
doc.addField("reviewCount", hotelModelBean.getReviewCount());
doc.addField("url", hotelModelBean.getUrl());
Solr.add(doc);
Solr.commit();
if (++this.currentCount % 10 == 0)
log.info("{0} / {1}", this.currentCount , this.maxCount);
}
Analysis
Spark Querying
Using the capabilities of Spark, the time required to query the locations of a particular Hotel took around
15 seconds. This can be further improved to
2 seconds if the
data is persisted in memory via the persist() API
@GetMapping(value = "/fetchCustomerInfo", params = {"customer", "location"}, produces = {MediaType.APPLICATION_JSON_VALUE})
public List<?> fetchCustomerInfo(@RequestParam String customer, @RequestParam String location)
{
PerformanceUtil obj = new PerformanceUtil();
obj.startClock();
Dataset<Row> dfAll = SparkInit.getInstance().read()
.option("inferSchema", "true")
.option("header", "true")
.option("delimiter", ",")
.csv("C:\\Work\\Spark\\Test\\DataServices\\DB\\*.csv");
dfAll.persist(StorageLevel.MEMORY_ONLY_2());
Dataset<Row> dfRegional = null;
if(location.equals("all"))
{
Dataset<Row> newDf = dfAll.filter(dfAll.col("Name").contains(customer));
stringList.clear();
newDf.foreach( e -> {
String val = e.getAs("Name").toString() + "--> " + e.getAs("City").toString();
stringList.add(val);
});
}
else
{
if(locationMap.getOrDefault(location, null)== null)
{
dfRegional = SparkInit.getInstance().read()
.option("inferSchema", "true")
.option("header", "true")
.option("delimiter", ",")
.csv("C:\\Work\\Spark\\Test\\DataServices\\DB\\"+ location + ".csv");
}
else
{
dfRegional = locationMap.getOrDefault(location, null);
}
if(dfRegional != null)
{
locationMap.put(location, dfRegional);
dfRegional.persist(StorageLevel.MEMORY_ONLY_2());
}
Dataset<Row> newDf = dfRegional.filter(dfRegional.col("City").
equalTo(location)).filter(dfRegional.col("Name").contains(customer));
stringList.clear();
newDf.foreach( e -> {
String val = e.getAs("Name").toString() + "--> " + e.getAs("City").toString();
stringList.add(val);
});
}
obj.endClock("/fetchCustomerInfo");
return stringList;
}
}
Spark query without caching in memory (takes around 15s)
Spark query after caching (takes around 2s)
Solr Querying
With Solr, the time required to query the locations of a Hotel took less than 50 milliseconds. This is a huge boost in performance.
@GetMapping(value = "/solrTestRead")
public SolrDocumentList solrTestRead(@RequestParam String q, @RequestParam Integer rows) throws SolrServerException, IOException {
PerformanceUtil obj = new PerformanceUtil();
obj.startClock();
String urlString = "http://localhost:8983/solr/spark_hotel";
SolrClient Solr = new HttpSolrClient.Builder(urlString).build();
SolrQuery query = new SolrQuery();
query.setQuery(q);
query.addField("*");
query.setRows(rows);
QueryResponse queryResponse = Solr.query(query);
SolrDocumentList docs = queryResponse.getResults();
log.info("Count = {0}", docs.size());
Solr.commit();
obj.endClock("/solrTestRead");
return docs;
}
Solr query (takes around 50ms)
Conclusion
From the above experiment, we can conclude that Apache Solr is optimal for search queries on large data. We are able to fetch query results for data-set involving 120000+ rows in a few milliseconds.
The full code base is available here
Git
Note:
- All tests are done on a dual core Windows 10 - Core i5 CPU @ 2.30GHz with 16.0 GB RAM.
- We have assumed that the initial Solr indexing of all records is a one time activity and hence the time taken to index records is not included in this study.