Presto with Doobie

Martin Lechner · October 30, 2019 · 4 min read

Big DataScala
Share on linkedinShare on facebookShare on twitterShare on reddit
doobie
doobie

At Scout24 we have quite a big data infrastructure with a huge data lake in the center of it. The high level interfaces to accessing the data are (non-exhaustive):

  • Alation (= a data catalog + query interface)
  • Personal Analytics Clusters to be spun up for ad-hoc analysis work with notebook engines such as Jupyter and Zeppelin
  • An abstraction about AWS data pipeline for scheduled ETL
  • a wrapper around AWS sage maker, which provides an easily accessible interface to publish machine learning apis
  • Microstrategy as main BI tool as GUI for easy access

Those interfaces support most of the access patterns, which we can encounter from day to day use. For a more complete overview you can check out this presentation from my colleague Raffael Dzikowski:

https://www.slideshare.net/RaffaelDzikowski/the-scout24-data-platform-a-technical-deep-dive-138235234

For raw data access one could potentially load the data from S3, connect to HIVE meta store or use Presto, which is a distributed query engine and provides a nice SQL interface.

Recently we started working on a scripting task where it became necessary to extract data out of the data lake and fill up a DynamoDB to back fill some data gaps. For this task we decided to use Presto via JDBC, because the SQL based interface is quite convenient and since we love functional programming Doobie is the library of choice here.

Unfortunately getting Doobie to work with the Presto JDBC driver (v 0.227) didn’t work out of the box and a quick google search didn’t reveal any in-depth insights, therefore I’d like to share our solution here.

For executing SQL queries Doobie uses a Transactor. For most database systems setting this up is really easy and straightforward, but the Presto JDBC driver has certain subtleties attached to it.

In the following snippet we see in the first function how to convert a java.sql.Connection to a Transactor Resource. The second function creates a java.sql.Connection, wraps it in a Resource and then builds the Transactor from it.

private def buildTransactor(c: Connection): Resource[IO, Transactor[IO]] =
  Blocker[IO].map { b =>
   Transactor.fromConnection[IO](c, b)
  }

private def prestoTransactor(): Resource[IO, Transactor[IO]] =
 Resource.fromAutoCloseable[IO, Connection] {
   IO.fromTry(Try {
     val url = "jdbc:presto://your-presto-jdbc-url"
     val properties = new Properties()
     properties.setProperty("user", System.getenv("PRESTO_USER"))
     properties.setProperty("password", System.getenv("PRESTO_PW"))
     val connection = DriverManager.getConnection(url, properties)
     connection.setSchema("your_schema")
     connection.setAutoCommit(true)
     connection
   })
 }
 .flatMap(buildTransactor)

Why do you want to use a Resource? The answer is simple: To prevent leaking Connections. Resource from cats-effect is a powerful abstraction to manage resources, which need some cleanup after the task is finished.

Resource.fromAutoCloseable is especially cool, since it’s allowing to plug in an IO[T <: AutoCloseable] and then manages the resource acquisition and release for the user. Fortunately a Connection is indeed AutoCloseable so this is a good fit.

One important thing to note is that the Presto Connection needs auto commit to be set to true, else the querying is breaking.

You can run the query then by extracting it by using the Resource and pass the Transactor to your query function:

prestoTransactor().use { xa => queryProgram(xa) }.unsafeRunSync()

Take note of the unsafeRunSync! All code seen so far (+ the queryProgram example below) is just lazy building up a program, which needs to be executed at some point. You can either do this manually with unsafeRunSync or use an IOApp, where you can plug in your program.

To complete the exercise I’d like to show you some code which runs a query against presto, converts the result to your domain model and streams the output via fs2. The evalMap is used to run some effectful code for every entry in the stream, in our case printing out the entries. Fs2 has a rich api for streaming operations, which is worth checking out on its own.

def queryProgram(xa: Transactor[IO]) =
sql"""SELECT your_fields FROM your_table"""
 .query[YourModel]
 .stream
 .transact(xa)
 .evalMap(d => IO(println(d)))
 .compile
 .drain

This article has shown my approach on how to query presto with doobie and then run some effectful code with the entries. If you like to share some feedback, please feel free to drop me a message on twitter at https://twitter.com/m4nl5r or via email at martin.lechner@autoscout24.com

Share on linkedinShare on facebookShare on twitterShare on reddit

About the author

Martin Lechner

Father, Data Nerd, Functional Programming Advocate, Web Developer, Photographer.

Connect on Linkedin

Discover more articles like this:

Stats

Over 170 engineers

50+nationalities
60+liters of coffeeper week
5+office dogs
8minmedian build time
1.1daysmedianlead time
So many deployments per day
1000+ Github Repositories

AutoScout24: the largest pan-European online car market.

© Copyright by AutoScout24 GmbH. All Rights reserved.