In CouchDB we can fetch a single document with an HTTP request like this:
GET mydb/doc-1
The response is the document in JSON.
We can also fetch multiple documents at once:
POST mydb/_all_docs?include_docs=true {"keys": ["doc-1", "doc-2"]}
The response contains a list of the requested documents.
We can optimize our workflow by aggregating all parallel requests into one!
For the purpose of this example, let’s imagine that we have these two endpoints:
case class SingleDocument(id: String) {
type Data = JsValue
...
}
case class MultipleDocuments(ids: Set[String]) {
type Data = List[JsValue]
...
}
Now we want to define a Source
for a document, that would “glue” to the other documents being fetched.
The first step is to understand that Source
can take 4 parameters:
The first one is easy:
def document[A](id: String)(implicit rule: Rule[JsValue, Resolvable[A]]) =
// use SingleDocument by default
Source[A](SingleDocument(id)) {
...,
...,
...
}
For the second one, we’ll teach our Source
to handle both SingleDocument
and MultipleDocuments
:
{
case e @ SingleDocument(`id`) ⇒
// just load normally
Resolvable[A].fromEndpoint(e)
case e @ MultipleDocuments(ids) if ids contains id ⇒
// inside the data returned by the endpoint,
// find the document we are looking for
Resolvable[A].fromEndpointPath(e)(_.find(_ \ "_id" == JsString(id)).get)
}
For aggregated requests to work, we need to give MultipleDocuments
higher priority.
The priority also increases with the number of documents in the batch.
{
case MultipleDocuments(ids) ⇒ Seq(1, ids.size)
case SingleDocument(_) ⇒ Seq(0) // default
}
Finally, we need to supply the function that will add MultipleDocuments
endpoints.
This function is of the form (ExecutionContext, EndpointPool) ⇒ Future[EndpointPool]
and looks like this:
{ (ec, pool) ⇒
Future.successful(pool.fold(Set.empty[String]) {
case (ids, SingleDocument(id)) ⇒ ids + id
case (ids, _) ⇒ ids
} match {
case x if x.size > 1 ⇒ EndpointPool(MultipleDocuments(x))
case _ ⇒ EndpointPool.empty
})
}
Assembling everything together, we get:
def document[A](id: String)(implicit rule: Rule[JsValue, Resolvable[A]]) =
Source[A](SingleDocument(id))({
case e @ SingleDocument(`id`) ⇒
Resolvable[A].fromEndpoint(e)
case e @ MultipleDocuments(ids) if ids contains id ⇒
Resolvable[A].fromEndpointPath(e)(_.find(_ \ "_id" == JsString(id)).get)
}, {
case MultipleDocuments(ids) ⇒ Seq(1, ids.size)
case SingleDocument(_) ⇒ Seq(0)
}, { (ec, pool) ⇒
Future.successful(pool.fold(Set.empty[String]) {
case (ids, SingleDocument(id)) ⇒ ids + id
case (ids, _) ⇒ ids
} match {
case x if x.size > 1 ⇒ EndpointPool(MultipleDocuments(x))
case _ ⇒ EndpointPool.empty
})
})
Here is a typical use-case:
def needBook(id: String) = document[Book](id)
// this will issue only one request!
Resolvable.fromList(List("book-1", "book-2").map(needBook)).go