
private File convertToCSV(List<Device> list) {
JavaSparkContext sparkContext = null;
File tempFile = null;
try (SparkSession spark = SparkSession.builder()
.master("local[4]")
.appName("sample-application")
.getOrCreate()) {
tempFile = this.createTempFile();
Gson gson = new Gson();
List<String> data = Arrays.asList(gson.toJson(list));
sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
Dataset<String> stringDataSet = spark.createDataset(data, Encoders.STRING());
Dataset<Row> csvDataSet = spark.read().json(stringDataSet);
log.info("Inserted json conversion schema and value");
csvDataSet.printSchema();
csvDataSet.show();
if (tempFile != null) {
csvDataSet.write()
.option("compression","GZIP")
.csv(tempFile.getPath());
tempFile = this.retrieveCSVFileFromPath(tempFile);
}
} catch (Exception ex) {
log.error("Stack Trace: {}", ex);
} finally {
if (sparkContext != null) {
sparkContext.close();
}
}
return tempFile;
}
//retrieve the file after json to csv converted data
private File retrieveCSVFileFromPath(File tempFilePath) {
List<File> files = Arrays.asList(tempFilePath.listFiles());
return files.stream()
.filter(
tmpFile -> tmpFile.getPath().contains(FILE_EXTENSION) && tmpFile.getPath().endsWith(FILE_EXTENSION))
.findAny()
.orElse(null);
}
public static final String FILE_EXTENSION_ORC = ".orc";
private File convertToORC(List<Device> list) {
JavaSparkContext sparkContext = null;
File tempFile = null;
try (SparkSession spark = SparkSession.builder()
.master("local[4]")
.appName("sample-application")
.getOrCreate()) {
tempFile = this.createTempFile();
Gson gson = new Gson();
List<String> data = Arrays.asList(gson.toJson(list));
sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
Dataset<String> stringDataSet = spark.createDataset(data, Encoders.STRING());
Dataset<Row> orcDataSet = spark.read().json(stringDataSet);
log.info("Inserted json conversion schema and value");
orcDataSet.printSchema();
orcDataSet.show();
if (tempFile != null) {
orcDataSet.write().orc(tempFile.getPath());
tempFile = this.retrieveORCFileFromPath(tempFile);
}
} catch (Exception ex) {
log.error("Stack Trace: {}", ex);
} finally {
if (sparkContext != null) {
sparkContext.close();
}
}
return tempFile;
}
//To get the converted .orc file path
private File retrieveORCFileFromPath(File tempFilePath) {
List<File> files = Arrays.asList(tempFilePath.listFiles());
return files.stream()
.filter(
tmpFile -> tmpFile.getPath().contains(FILE_EXTENSION_ORC) && tmpFile.getPath().endsWith(FILE_EXTENSION_ORC))
.findAny()
.orElse(null);
}
Few of my other blogs:-
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
User | Count |
---|---|
19 | |
19 | |
16 | |
8 | |
7 | |
7 | |
6 | |
6 | |
6 | |
5 |