I am using com.stratio.datasource to read a MongoDB collection from Spark by running a SQL query via SparkSQL and outputting the content as a csv file via com.databricks.spark.csv. I am using the sample JSON collection as provided by MongoDB and imported it into my db. This is my pom.xml with the versions of all the jars used:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>mongodb-spark</groupId>
<artifactId>mongodb-spark</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.stratio.datasource</groupId>
<artifactId>spark-mongodb_2.10</artifactId>
<version>0.11.2</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-commons_2.10</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.10</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-query_2.10</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.4.0</version>
</dependency>
</dependencies>
</project>
Here is my code:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import java.util.HashMap;
import java.util.Map;
public class MongoDBSparkIntegration {
public static void main(String[] args) {
String outputPath = "D:\Dev\MongoDb-Spark-Integration\src\main\resources\output";
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("MongoDB-Spark-Application");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
Map<String, String> options = new HashMap<String, String>();
options.put("host", "localhost:27017");
options.put("database", "mydb");
options.put("collection", "test");
DataFrame df = sqlContext.read().format("com.stratio.datasource.mongodb").options(options).load();
df.registerTempTable("tmp");
sqlContext.sql("SELECT * FROM tmp WHERE city = CHICOPEE").show();
df.write().format("com.databricks.spark.csv").save(outputPath);
sqlContext.dropTempTable("tmp");
}
}
I am returned with the following error, although the SparkSQL syntax looks correct:
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.rdd.RDD.mapPartitionsInternal$default$2()Z
at org.apache.spark.sql.execution.Filter.doExecute(basicOperators.scala:70)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:187)
at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1498)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1505)
at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)
at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1374)
at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:319)
at com.mongodb.spark.integration.MongoDBSparkIntegration.main(MongoDBSparkIntegration.java:27)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
By doing a simple SELECT * from tmp
does select the entire content of the mongodb collection without returning any error and export it as a csv file.
I am not sure where I am going wrong, can you help?
Thank you,
I.
Aucun commentaire:
Enregistrer un commentaire