Big Data Stream Processing

Alex Egg,

The trivial way to read a file is by loading all the contents into memory. However, this becomes impractical as file size grows and particularly as the file size grows larger than available memory. The problem is compounded by the fact that many data formats like XML or JSON need the whole file in memory in order parse or build the DOM representation.

The first problem of reading a large file can be handled by working w/ streams.

Stream Reading

Instead of processing the file once it has been loaded into memory, since this is not possible b/c of memory issues, process only sequential chunks of the data. For example if you are loading a large file from disk, break it into batches of S MB. That way you can process an infinitely file using constant memory.

Concrete Example

You want to load some data over the network from a ~30GB CSV file into your local DB.

  1. Read chunk of the CSV file from network socket
    1. Split chunks by newline
    2. Iterate each line and parse out CSV row
    3. Add data from row to DB
  2. Repeat #1 until end of file

Pseudo CSV file format example:

There are two fields: a company name and an ID delimited by :

  CORCORAN PROPERTY PARTNERS, LLC:0001658659:
  MEDIA GROUP, INC.:0000824104:
  THIRD AVE. GC II LLC:0001341596:
  THIRD AVE. GC LLC:0001341595:
  THIRD AVE. REALTY CORP.:0001336718:
  CENTRAL LLC:0001281138:

Ruby example

You have to remember since we are reading data off the socket it will come in chunks that not obey newline breaks, so CSV rows will come cut in half. We handle this in the parsing below by building the row w/ the line variable:

uri = URI(url)
Net::HTTP.start(uri.host, uri.port, use_ssl: true) do |http|
  request = Net::HTTP::Get.new uri

  http.request request do |response|
    line = "" #local queue/cache

    response.read_body do |chunk|
      chunk.split("\n").each do |partial|
        line += partial

        if line[-1]==":"
          yield line
          line="" #reset
        end
      end
    end
  end
end

The key is on line 13 where the code yields so that the calling code can then add the record to the DB in an evented manner — the contrary would be waiting for the whole file to loaded and then iterate.

Stream Processing

The above stream reading example gets more complicated when you have more complicated data formats like XML or JSON which cannot be parsed unless the whole file is available. However, there is a way around this called stream processing which can parse the document as it becomes available that is an alternative to that provided by the DOM. One implementation of this has been around the XML world for a while is called SAX which gives you an evented way parse xml.

For example, consider the CSV example above, for each chunk we get we’d pass it to the evented parser and it would trigger a series of event callbacks which will tell me some details of the document. Consider this example I stole from wikipedia[1] for the following XML document:

 <?xml version="1.0" encoding="UTF-8"?>
 <DocumentElement param="value">
     <FirstElement>
         &#xb6; Some Text
     </FirstElement>
     <?some_pi some_attr="some_value"?>
     <SecondElement param2="something">
         Pre-Text <Inline>Inlined text</Inline> Post-text.
     </SecondElement>
</DocumentElement>

This XML document, when passed through a SAX parser, will generate a sequence of events like the following:

  1. XML Element start, named DocumentElement, with an attribute param equal to “value”
  2. XML Element start, named FirstElement
  3. XML Text node, with data equal to “¶ Some Text” (note: certain white spaces can be changed)
  4. XML Element end, named FirstElement
  5. Processing Instruction event, with the target some_pi and data some_attr=”some_value” (the content after the target is just text; however, it is very common to imitate the syntax of XML attributes, as in this example)
  6. XML Element start, named SecondElement, with an attribute param2 equal to “something”
  7. XML Text node, with data equal to “Pre-Text”
  8. XML Element start, named Inline
  9. XML Text node, with data equal to “Inlined text”
  10. XML Element end, named Inline
  11. XML Text node, with data equal to “Post-text.”
  12. XML Element end, named SecondElement
  13. XML Element end, named DocumentElement

Concrete Example

I want the first 10 item elements from this JSON:

{
  "directory": {
    "item": [
      {
        "last-modified": "2017-06-12 16:42:20",
        "name": "000156218017002256",
        "type": "folder.gif",
        "size": ""
      },
      {
        "last-modified": "2017-06-09 19:14:46",
        "name": "000156218017002252",
        "type": "folder.gif",
        "size": ""
      },
      {
        "last-modified": "2017-06-09 19:14:40",
        "name": "000156218017002251",
        "type": "folder.gif",
        "size": ""
      }
      ...
   ]
}

Ruby example

This is really ugly, code but what I am doing is essentially hacking together a stjatemachine that keeps track of when the parser is passing through an item element. If the is_item state is true and the end_object callback fires, I yield the completed item.

The advantage of stream parsing here is that since I just want the first n items, I don’t have to load the whole file in memory or even stream the whole file!

is_item = false
key = nil

item = {}

parser = JSON::Stream::Parser.new
parser.start_document { }
parser.end_document do
end
parser.start_object   {}
parser.end_object do 
  if is_item # done parsing item
    yield item
  end
end
parser.start_array    {}
parser.end_array      {}
parser.key do |k|
  key = k
  if k=="item" 
    is_item^=true 
    item = {}
  end
end
parser.value{|v|  item[key]=v}

uri = URI(url)
Net::HTTP.start(uri.host, uri.port, use_ssl: true) do |http|
  request = Net::HTTP::Get.new uri
  http.request request do |response|
    response.read_body do |chunk|
      parser << chunk
    end
  end

Permalink: big-data-stream-processing

Tags:

Last edited by Alex Egg, 2017-06-15 04:21:19
View Revision History