Consuming the Twitter Stream
Apr 28, 2010
I've got a bunch of code I've been using for various projects for years that I've been sitting on with plans of documenting and sharing, so first to get that treatment is some code to handle parsing of the Twitter Stream. Apt timing given the recent creation of dev.twitter.com and the conclusion of the first @Chirp developer conference.
What is the Twitter Streaming API
Essentially it's a feed of the public twitter stream (so not private tweets or direct messages), cleaned for relevance (so hopefully known spammers are excluded), and down–sampled to a manageable size based on your level of access and requirements (the default current sampling rate is 5% of statuses). It's different to just requesting the rss/xml/json of the public timeline as it is an open connection that is constantly streaming data to you, so you only need to start another request if the connection is dropped for some reason.
How to process the incoming stream
I've been using the ruby-yajl
library because it's a C-based JSON parser which is a bit quicker than the standard JSON parsing libraries, but also because it has a built-in HttpStream
class. Lets take a look at some code:
url = URI.parse("http://#{username}:#{password}@stream.twitter.com/1/statuses/sample.json")
Yajl::HttpStream.get(url) do |status|
puts status.inspect
end
The code above will connect to the given url
, and iterate over each JSON object it gets back. In the example, I'm just outputting the object so we can see what it is.
Of course things will sometimes go wrong, connections will be dropped, etc. at which point Yajl
typically complains that it can't process the stream. So to get around it I rescue the exception and wrap the whole thing in a loop:
loop do
url = URI.parse("http://#{username}:#{password}@stream.twitter.com/1/statuses/sample.json")
begin
Yajl::HttpStream.get(url) do |status|
puts status.inspect
end
rescue Yajl::HttpStream::InvalidContentType
end
end
Now being a good citizen requires that if you start getting errors you need to back off your retry rate, so best to put that in place so it's not forgotten. Keep track of the number of consecutive errors, and switch to a while loop:
max_allowed_errors = 1200
consecutive_errors = 0
while consecutive_errors < max_allowed_errors do
url = URI.parse("http://#{username}:#{password}@stream.twitter.com/1/statuses/sample.json")
begin
Yajl::HttpStream.get(url) do |status|
consecutive_errors = 0
puts status.inspect
end
rescue Yajl::HttpStream::InvalidContentType
consecutive_errors += 1
end
sleep(0.25*consecutive_errors)
end
And you're more–or–less done. Of course it would make sense to wrap this code up into a re-usable class, so:
require 'uri'
require 'yajl/http_stream'
class Twitter
MAX_ALLOWED_ERRORS = 1200
def self.stream(username, password, &block)
url = URI.parse("http://#{username}:#{password}@stream.twitter.com/1/statuses/sample.json")
consecutive_errors = 0
while consecutive_errors < max_allowed_errors do
begin
Yajl::HttpStream.get(url, :symbolize_keys => true) do |status|
consecutive_errors = 0
yield(status)
end
rescue Yajl::HttpStream::InvalidContentType
consecutive_errors += 1
end
sleep(0.25*consecutive_errors)
end
end
end
You may have noticed a couple of other changes in the code above. I'm no longer outputting the status
but rather I'm yielding it to a block. I've also set an extra parameter on the Yajl::HttpStream.get
call to automatically symbolize the keys of all the objects returned.
Using out new Twitter stream
Putting the code into action, I use it like the following:
Twitter.stream("mytwittername", "secret") do |status|
Tweet.create!(status)
end
The benefit of having our stream
method yield it to a block is that you now have the flexibility to do almost anything with each returned status. In my simple example above I've got it storing straight to a MongoDB
collection I've created, but I could just as easily add some additional information to the status object before saving:
Twitter.stream("mytwittername", "secret") do |status|
Tweet.create!(status.merge(
:weather => current_weather(status[:location]),
:mood => determine_mood(status[:text])
)
end
Or store the status, in addition to adding it to a queue for post-processing and aggregation by our army of machines in the cloud:
Twitter.stream("mytwittername", "secret") do |status|
Tweet.create!(status.merge(
:weather => current_weather(status[:location]),
:mood => determine_mood(status[:text])
)
TweetProcessors.queue(status)
end
Taking it further
In coming posts I'll expand on the previous examples, show you how to easily:
- Supply additional options to the stream to filter it down to just the tweets you are interested in
- Setup
MongoDB
to store the data you need - Provision
Amazon EC2
instances to help you deal with processing load - Get
Chef
involved to handle the provision and setup of yourEC2
instances automatically - Use RabbitMQ to dispatch work to multiple servers
- Load balance your
MongoDB
instances acrossEC2
Previously I led the Terraform product team @ HashiCorp, where we launched Terraform Cloud and set the stage for a successful IPO. Prior to that I was part of the Startup Team @ AWS, and earlier still an early employee @ Heroku. I've also invested in a couple of dozen early stage startups.