Skip to content
Snippets Groups Projects
PicodataJDBCSparkExample.scala 5.22 KiB
Newer Older
package io.picodata

import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.functions._
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import scala.reflect.io.Directory
import scala.util.Using

import java.nio.file.Files


object PicodataJDBCSparkExample extends App {
  val logger = LoggerFactory.getLogger(PicodataJDBCSparkExample.getClass.getSimpleName)

  // 1. Set up the Spark session
  Using.Manager { use =>
    val warehouseLocation = Files.createTempDirectory("spark-warehouse").toFile
    val warehouseLocationPath = warehouseLocation.getAbsolutePath

    val spark = use(SparkSession.builder()
      .appName("Test Spark with picodata-jdbc")
      .master("local[*]") // use all available threads
      .config("spark.ui.enabled", false)
      .config("spark.sql.warehouse.dir", warehouseLocationPath)
      .config("hive.metastore.warehouse.dir", warehouseLocationPath)
      .config(
        "javax.jdo.option.ConnectionURL",
        s"jdbc:derby:;databaseName=$warehouseLocationPath/tarantoolTest;create=true"
      )
      .config("spark.driver.extraJavaOptions", "-Dlog4j.configuration=log4j2.properties")
      .enableHiveSupport()
      .getOrCreate()
    )

    val sc = spark.sparkContext

    logger.info("Spark context created")

    // 2. Load the CSV into a DataFrame
    var df = spark.read
      .format("csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load("src/main/resources/onemillion.csv")
      .select(col("id"), col("unique_key"), col("book_name"), col("author"), col("year"))

    logger.info("Loaded 1M rows into memory")

    val jdbcUrl = "jdbc:picodata://localhost:5432/"
      // only needed if the table is not created on Picodata server
      // basic JDBC connector does not support primary keys
      val options = Map(
        ("driver", "io.picodata.jdbc.Driver"),
        ("url", jdbcUrl),
        ("user", "sqluser"),
        ("password", "P@ssw0rd"),
        ("sslmode", "disable"),
        ("dbtable", "test")
      )
      val jdbcOptions = new JDBCOptions(options)
      val connection = JdbcDialects.get(jdbcUrl).createConnectionFactory(jdbcOptions)(-1)
      var statement = connection.prepareStatement("DROP TABLE IF EXISTS test")
      statement.executeUpdate()
      statement = connection.prepareStatement("CREATE TABLE test" +
        "(id UNSIGNED PRIMARY KEY, unique_key VARCHAR(1000), book_name VARCHAR(100), author VARCHAR(100), year INTEGER)")
      statement.executeUpdate()
      connection.close()

      // 3. Write a Dataset to a Picodata table
      df.write
        .format("jdbc")
        .option("driver", "io.picodata.jdbc.Driver")
        .mode(SaveMode.Append)
        // Picodata server connection options
        .option("url", jdbcUrl)
        .option("sslmode", "disable")
        .option("user", "sqluser")
        .option("password", "P@ssw0rd")
        // this option is important as it optimizes single INSERT statements into multi-value INSERTs
        .option("reWriteBatchedInserts", "true")
        // this option value can be tuned according to the number of Spark workers you have
        .option("numPartitions", "8")
        .option("batchsize", "1000")
        // table to create / overwrite
        .option("dbtable", "test")
        .save()

      logger.info("Saved 1M rows into Picodata table 'test'")

      // 4. Print first 3 rows from the table
      df = spark.read
        .format("jdbc")
        .option("driver", "io.picodata.jdbc.Driver")
        .option("url", jdbcUrl)
        .option("sslmode", "disable")
        .option("user", "sqluser")
        .option("password", "P@ssw0rd")
        // The next two options are necessary for querying large amounts of data.
        // They must be set empirically depending on expected size of the dataset.
        // If these values are too small, you'll see errors like
        // "Exceeded maximum number of rows (10000) in virtual table: 41530" or
        // "Reached a limit on max executed vdbe opcodes. Limit: 1024000"
        .option("options", "vtable_max_rows=512000,vdbe_max_steps=10240000")
        // Set the number of partitions empirically depending on the
        // available amount of CPU and memory resources
        .option("numPartitions", "8")
        // The following 3 options cannot be used together with ".query()" option.
        //
        // PartitionColumn must be a numeric, date, or timestamp column
        //.option("partitionColumn", "id")
        // Set here real first and last index values if you want to process all
        // the data in table
        //.option("lowerBound", "1")
        //.option("upperBound", "1000000")
        // Using query option until the bug with integer boundaries is fixed in Picodata.
        // This query will not get us accurate results.
        .option("query", "SELECT * FROM \"test\" LIMIT 10")
        // This option is to be used with "partitionColumn"
        //.option("dbtable", "test")
        .load()
      df.printSchema()
      df.limit(3).show()
    } catch {
      case throwable: Throwable => throwable.printStackTrace()
    } finally {
      sc.stop()
      Directory(warehouseLocation).deleteRecursively()
    }
  }
}