Newer
Older
package io.picodata
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.functions._
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import scala.reflect.io.Directory
import scala.util.Using
import java.nio.file.Files
object PicodataJDBCSparkExample extends App {
val logger = LoggerFactory.getLogger(PicodataJDBCSparkExample.getClass.getSimpleName)
// 1. Set up the Spark session
Using.Manager { use =>
val warehouseLocation = Files.createTempDirectory("spark-warehouse").toFile
val warehouseLocationPath = warehouseLocation.getAbsolutePath
val spark = use(SparkSession.builder()
.appName("Test Spark with picodata-jdbc")
Alexey Kuzin
committed
.master("local[*]") // use all available threads
.config("spark.ui.enabled", false)
.config("spark.sql.warehouse.dir", warehouseLocationPath)
.config("hive.metastore.warehouse.dir", warehouseLocationPath)
.config(
"javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$warehouseLocationPath/tarantoolTest;create=true"
)
.config("spark.driver.extraJavaOptions", "-Dlog4j.configuration=log4j2.properties")
.enableHiveSupport()
.getOrCreate()
)
val sc = spark.sparkContext
logger.info("Spark context created")
// 2. Load the CSV into a DataFrame
var df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("src/main/resources/onemillion.csv")
.select(col("id"), col("unique_key"), col("book_name"), col("author"), col("year"))
logger.info("Loaded 1M rows into memory")
Alexey Kuzin
committed
val jdbcUrl = "jdbc:picodata://localhost:5432/"
// only needed if the table is not created on Picodata server
// basic JDBC connector does not support primary keys
val options = Map(
("driver", "io.picodata.jdbc.Driver"),
("url", jdbcUrl),
Alexey Kuzin
committed
("user", "sqluser"),
("password", "P@ssw0rd"),
("sslmode", "disable"),
("dbtable", "test")
)
val jdbcOptions = new JDBCOptions(options)
val connection = JdbcDialects.get(jdbcUrl).createConnectionFactory(jdbcOptions)(-1)
Alexey Kuzin
committed
var statement = connection.prepareStatement("DROP TABLE IF EXISTS test")
statement.executeUpdate()
statement = connection.prepareStatement("CREATE TABLE test" +
Alexey Kuzin
committed
"(id UNSIGNED PRIMARY KEY, unique_key VARCHAR(1000), book_name VARCHAR(100), author VARCHAR(100), year INTEGER)")
statement.executeUpdate()
connection.close()
// 3. Write a Dataset to a Picodata table
df.write
.format("jdbc")
.option("driver", "io.picodata.jdbc.Driver")
.mode(SaveMode.Append)
// Picodata server connection options
.option("url", jdbcUrl)
Alexey Kuzin
committed
.option("sslmode", "disable")
.option("user", "sqluser")
.option("password", "P@ssw0rd")
// this option is important as it optimizes single INSERT statements into multi-value INSERTs
.option("reWriteBatchedInserts", "true")
// this option value can be tuned according to the number of Spark workers you have
.option("numPartitions", "8")
.option("batchsize", "1000")
// table to create / overwrite
.option("dbtable", "test")
.save()
logger.info("Saved 1M rows into Picodata table 'test'")
// 4. Print first 3 rows from the table
df = spark.read
.format("jdbc")
.option("driver", "io.picodata.jdbc.Driver")
.option("url", jdbcUrl)
.option("sslmode", "disable")
.option("user", "sqluser")
.option("password", "P@ssw0rd")
Alexey Kuzin
committed
// The next two options are necessary for querying large amounts of data.
// They must be set empirically depending on expected size of the dataset.
// If these values are too small, you'll see errors like
// "Exceeded maximum number of rows (10000) in virtual table: 41530" or
// "Reached a limit on max executed vdbe opcodes. Limit: 1024000"
.option("options", "vtable_max_rows=512000,vdbe_max_steps=10240000")
// Set the number of partitions empirically depending on the
// available amount of CPU and memory resources
.option("numPartitions", "8")
// The following 3 options cannot be used together with ".query()" option.
//
// PartitionColumn must be a numeric, date, or timestamp column
//.option("partitionColumn", "id")
// Set here real first and last index values if you want to process all
// the data in table
//.option("lowerBound", "1")
//.option("upperBound", "1000000")
// Using query option until the bug with integer boundaries is fixed in Picodata.
// This query will not get us accurate results.
.option("query", "SELECT * FROM \"test\" LIMIT 10")
// This option is to be used with "partitionColumn"
//.option("dbtable", "test")