“We’ll Do It Live!” -or- Just Give Me Every Result

In the previous two posts we’ve looked at ways to get distinct results from a search index. Well, suppose neither of those approaches works for you and you’ve decided to group and/or sort yourself. As we discovered earlier, there is a limit of 200 results from any single call to a search index. All hope is not lost though, let’s look at the actual JSON we get back when performing a call against a search index URI:

{
	"total_rows":980,
	"bookmark":"g1AAAAGneJzLYWBgYMtgTmGQT0lKzi9KdUhJMjTUy0zKNTCw1EvOyS9NScwr0ctLLckBKmRKZEiS____f1YGk5uD0sugB0CxJAahPbYgA-TgBhjj1J-UANJRDzfifWwD2Ah28To0NxjhNCOPBUgyNAApoDHzIebMzYKYI_bxDapTzAgYswBizH6IMV9OHwAbw6-cS6yPIMYcgBhzH2LMw9UQY9inPCDNmAcQY6CB870aakx6dRYA32qFdg",
	"rows":
	[
		{
			"id":"0bd8dab855b66e643350625a33d79b00",
			"order":[9.491182327270508,925635],
			"fields":
			{
				"default":"0000789019",
				"conceptNameSplit":"Deferred Revenue Revenue Recognized"
			}
		}
		... continues ...
	]
}

Notice the first field in the result, “total_rows”. Cloudant knows how many results there are, however it will only give them to you in batches of up to 200. Notice the second field in the result “bookmark”. In order to get the next n results, simply call the same URI with a “bookmark=” parameter, using the bookmark provided from the previous n results. Using the miracles of Node.js, streaming, and recursion, we’ll look at a simple method to first stream every result, then get a list of n distinct results. All code is shown in CoffeeScript, since I like it better than plain Javascript.

First we’ll need a stream that can extract the bookmark value and store it in a pass-by-reference variable.

stream = require('stream')

#Buffer an incoming stream until a "bookmark" field is found and store the value.
class exports.BookmarkExtractorStream extends stream.Transform

  #Ensure that @bookmark is an object with a value field so that it will
  #be passed by reference.  (e.g. myBookmark = {value:'blah'}
  constructor: (@bookmark) ->
    @buffer = ""
    @found = false
    super

  _transform: (chunk, enc, next) ->

    #If we haven't found what we're looking for, keep appending the incoming chunk to our buffer.
    #The bookmark field appears near the beginning of the JSON anyway, so we shouldn't
    #have to store very much in our buffer.
    if not @found
      @buffer += chunk.toString('utf8')
      match = @buffer.match(/\"bookmark\"\:\"([^\"]+)\"/)

      #We found it, throw out the buffer
      if match?
        @bookmark.value = match[1]
        @buffer = null

    @push chunk
    next()

Next we’ll need a stream that takes incoming objects and outputs them as a JSON array. This presumes we’re sending the results somewhere, like a response stream, a file, or the console.

stream = require('stream')

class exports.JSONArrayTransformStream extends stream.Transform

  constructor: () ->
    @first = true
    super
      objectMode: true

  _transform: (chunk, enc, next) ->

    if (@first)
      @push('[')
      @first  = false
    else
      @push(',\n')

    @push(JSON.stringify(chunk))

    next()

  _flush: (next) ->
    if (@first)
      @push('[]')
    else
      @push(']')
    next()

Finally we’ll use a recursive function to execute each API call in sequence, using the bookmark from the previous call. The JSONStream package is very useful here.

JSONStream = require('JSONStream')
objectJSONArrayStream = new JSONArrayTransformStream()

recursiveCloudantSearch = (designDocUri, query, bookmark, callback) ->

  readUrl = "#{designDocUri}?q=#{query}&limit=200"
  oldBookmarkValue = bookmark.value
  if oldBookmarkValue?
    readUrl += "&bookmark=#{oldBookmarkValue}"
  request({url: readUrl})
  .pipe(new BookmarkExtractorStream(bookmark))
  .pipe(JSONStream.parse('rows.*.fields')).on('finish', ->
    if (bookmark.value isnt oldBookmarkValue)
      recursiveCloudantSearch(designDocUri, query, bookmark, callback)
    else
      callback()
  )
  .pipe(objectJSONArrayStream)
  .pipe(<send the output somewhere>)

designDocUri = "http://myAccount.cloudant.com/myDb/_design/searches/_search/mySearch"
query = "query goes heere"

recursiveCloudantSearch(designDocUri, query, {value:null}, ->
  console.log('done')
)

If we want distinct results, we’re going to need a new stream that accumulates results before outputting at the very end.

stream = require('stream')

class exports.DistinctingTransformStream extends stream.Transform

  #@keySelector: the name of a field in the object that we wish to find distinct values of
  #@limit: stop after finding this many distinct values
  constructor: (@keySelector, @limit) ->
    @myArray = {}
    super
      objectMode: true

  _transform: (chunk, enc, next) ->

    if (Object.keys(@myArray).length < @limit)
      @myArray[chunk[@keySelector]] = chunk[@keySelector]

    next()

  _flush: (next) ->

    for k,v of @myArray
      @push(v)
    next()

Our recursive function now looks like this:

JSONStream = require('JSONStream')
objectJSONArrayStream = new ObjectStringTransformStream()
distinctingTransformStream = new DistinctingTransformStream('<key property name>', 100)

recursiveCloudantSearch = (designDocUri, query, bookmark, callback) ->

  readUrl = "#{designDocUri}?q=#{query}&limit=200"
  oldBookmarkValue = bookmark.value
  if oldBookmarkValue?
    readUrl += "&bookmark=#{oldBookmarkValue}"
  request({url: readUrl})
  .pipe(new BookmarkExtractorStream(bookmark))
  .pipe(JSONStream.parse('rows.*.fields')).on('finish', ->
    if (bookmark.value isnt oldBookmarkValue)
      recursiveCloudantSearch(designDocUri, query, bookmark, callback)
    else
      callback()
  )
  .pipe(distinctingTransformStream)
  .pipe(objectJSONArrayStream)
  .pipe(<send the output somewhere>)

designDocUri = "http://myAccount.cloudant.com/myDb/_design/searches/_search/mySearch"
query = "query goes heere"

recursiveCloudantSearch(designDocUri, query, {value:null}, ->
  console.log('done')
)
Advertisements
“We’ll Do It Live!” -or- Just Give Me Every Result

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s