lundi 20 juin 2016

Reading a MongoDB collection from Spark, selecting its content via SparkSQL and outputting it as a csv file

I am using com.stratio.datasource to read a MongoDB collection from Spark by running a SQL query via SparkSQL and outputting the content as a csv file via com.databricks.spark.csv. I am using the sample JSON collection as provided by MongoDB and imported it into my db. This is my pom.xml with the versions of all the jars used:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>mongodb-spark</groupId>
    <artifactId>mongodb-spark</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.5.2</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.stratio.datasource</groupId>
            <artifactId>spark-mongodb_2.10</artifactId>
            <version>0.11.2</version>
        </dependency>
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongodb-driver</artifactId>
            <version>3.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongo-java-driver</artifactId>
            <version>3.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>casbah-commons_2.10</artifactId>
            <version>3.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>casbah-core_2.10</artifactId>
            <version>3.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>casbah-query_2.10</artifactId>
            <version>3.1.1</version>
        </dependency>
        <dependency>
            <groupId>com.databricks</groupId>
            <artifactId>spark-csv_2.10</artifactId>
            <version>1.4.0</version>
        </dependency>
    </dependencies>
</project>

Here is my code:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

import java.util.HashMap;
import java.util.Map;

public class MongoDBSparkIntegration {

    public static void main(String[] args) {

        String outputPath = "D:\Dev\MongoDb-Spark-Integration\src\main\resources\output";

        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("MongoDB-Spark-Application");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
        Map<String, String> options = new HashMap<String, String>();
        options.put("host", "localhost:27017");
        options.put("database", "mydb");
        options.put("collection", "test");

        DataFrame df = sqlContext.read().format("com.stratio.datasource.mongodb").options(options).load();
        df.registerTempTable("tmp");
        sqlContext.sql("SELECT * FROM tmp WHERE city = CHICOPEE").show();
        df.write().format("com.databricks.spark.csv").save(outputPath);
        sqlContext.dropTempTable("tmp");
    }
}

I am returned with the following error, although the SparkSQL syntax looks correct:

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.rdd.RDD.mapPartitionsInternal$default$2()Z
    at org.apache.spark.sql.execution.Filter.doExecute(basicOperators.scala:70)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:187)
    at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
    at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
    at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1498)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1505)
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1374)
    at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)
    at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374)
    at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456)
    at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:319)
    at com.mongodb.spark.integration.MongoDBSparkIntegration.main(MongoDBSparkIntegration.java:27)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

By doing a simple SELECT * from tmp does select the entire content of the mongodb collection without returning any error and export it as a csv file.

I am not sure where I am going wrong, can you help?

Thank you,

I.

Aucun commentaire:

Enregistrer un commentaire