Consuming the Twitter Stream

Apr 28, 2010

I've got a bunch of code I've been using for various projects for years that I've been sitting on with plans of documenting and sharing, so first to get that treatment is some code to handle parsing of the Twitter Stream. Apt timing given the recent creation of dev.twitter.com and the conclusion of the first @Chirp developer conference.

What is the Twitter Streaming API

Essentially it's a feed of the public twitter stream (so not private tweets or direct messages), cleaned for relevance (so hopefully known spammers are excluded), and downsampled to a manageable size based on your level of access and requirements (the default current sampling rate is 5% of statuses). It's different to just requesting the rss/xml/json of the public timeline as it is an open connection that is constantly streaming data to you, so you only need to start another request if the connection is dropped for some reason.

How to process the incoming stream

I've been using the ruby-yajl library because it's a C-based JSON parser which is a bit quicker than the standard JSON parsing libraries, but also because it has a built-in HttpStream class. Lets take a look at some code:

url = URI.parse("http://#{username}:#{password}@stream.twitter.com/1/statuses/sample.json")
Yajl::HttpStream.get(url) do |status|
  puts status.inspect
end

The code above will connect to the given url, and iterate over each JSON object it gets back. In the example, I'm just outputting the object so we can see what it is.

Of course things will sometimes go wrong, connections will be dropped, etc. at which point Yajl typically complains that it can't process the stream. So to get around it I rescue the exception and wrap the whole thing in a loop:

loop do
  url = URI.parse("http://#{username}:#{password}@stream.twitter.com/1/statuses/sample.json")
  begin
    Yajl::HttpStream.get(url) do |status|
      puts status.inspect
    end
  rescue Yajl::HttpStream::InvalidContentType
  end
end

Now being a good citizen requires that if you start getting errors you need to back off your retry rate, so best to put that in place so it's not forgotten. Keep track of the number of consecutive errors, and switch to a while loop:

max_allowed_errors = 1200
consecutive_errors = 0
while consecutive_errors < max_allowed_errors do
  url = URI.parse("http://#{username}:#{password}@stream.twitter.com/1/statuses/sample.json")
  begin
    Yajl::HttpStream.get(url) do |status|
      consecutive_errors = 0
      puts status.inspect
    end
  rescue Yajl::HttpStream::InvalidContentType
    consecutive_errors += 1
  end
  sleep(0.25*consecutive_errors)
end

And you're moreorless done. Of course it would make sense to wrap this code up into a re-usable class, so:

require 'uri'
require 'yajl/http_stream'

class Twitter
  MAX_ALLOWED_ERRORS = 1200
  def self.stream(username, password, &block)
    url = URI.parse("http://#{username}:#{password}@stream.twitter.com/1/statuses/sample.json")
    consecutive_errors = 0
    while consecutive_errors < max_allowed_errors  do
      begin
        Yajl::HttpStream.get(url, :symbolize_keys => true) do |status|
          consecutive_errors = 0
          yield(status)
        end
      rescue Yajl::HttpStream::InvalidContentType
        consecutive_errors += 1
      end
      sleep(0.25*consecutive_errors)
    end
  end
end

You may have noticed a couple of other changes in the code above. I'm no longer outputting the status but rather I'm yielding it to a block. I've also set an extra parameter on the Yajl::HttpStream.get call to automatically symbolize the keys of all the objects returned.

Using out new Twitter stream

Putting the code into action, I use it like the following:

Twitter.stream("mytwittername", "secret") do |status|
  Tweet.create!(status)
end

The benefit of having our stream method yield it to a block is that you now have the flexibility to do almost anything with each returned status. In my simple example above I've got it storing straight to a MongoDB collection I've created, but I could just as easily add some additional information to the status object before saving:

Twitter.stream("mytwittername", "secret") do |status|
  Tweet.create!(status.merge(
    :weather => current_weather(status[:location]),
    :mood => determine_mood(status[:text])
  )
end

Or store the status, in addition to adding it to a queue for post-processing and aggregation by our army of machines in the cloud:

Twitter.stream("mytwittername", "secret") do |status|
  Tweet.create!(status.merge(
    :weather => current_weather(status[:location]),
    :mood => determine_mood(status[:text])
  )
  TweetProcessors.queue(status)
end

Taking it further

In coming posts I'll expand on the previous examples, show you how to easily:

Hi, I'm Glenn! 👋 I've spent most of my career working with or at startups. I'm currently the Director of Product @ Ockam where I'm helping developers build applications and systems that are secure-by-design. It's time we started securely connecting apps, not networks.

Previously I led the Terraform product team @ HashiCorp, where we launched Terraform Cloud and set the stage for a successful IPO. Prior to that I was part of the Startup Team @ AWS, and earlier still an early employee @ Heroku. I've also invested in a couple of dozen early stage startups.