I've got a bunch of code I've been using for various projects for years that I've been sitting on with plans of documenting and sharing, so first to get that treatment is some code to handle parsing of the Twitter Stream. Apt timing given the recent creation of dev.twitter.com and the conclusion of the first @Chirp developer conference.
Essentially it's a feed of the public twitter stream (so not private tweets or direct messages), cleaned for relevance (so hopefully known spammers are excluded), and down–sampled to a manageable size based on your level of access and requirements (the default current sampling rate is 5% of statuses). It's different to just requesting the rss/xml/json of the public timeline as it is an open connection that is constantly streaming data to you, so you only need to start another request if the connection is dropped for some reason.
I've been using the ruby-yajl
library because it's a C-based JSON parser which is a bit quicker than the standard JSON parsing libraries, but also because it has a built-in HttpStream
class. Lets take a look at some code:
url = URI.parse("http://#{username}:#{password}@stream.twitter.com/1/statuses/sample.json")
Yajl::HttpStream.get(url) do |status|
puts status.inspect
end
The code above will connect to the given url
, and iterate over each JSON object it gets back. In the example, I'm just outputting the object so we can see what it is.
Of course things will sometimes go wrong, connections will be dropped, etc. at which point Yajl
typically complains that it can't process the stream. So to get around it I rescue the exception and wrap the whole thing in a loop:
loop do
url = URI.parse("http://#{username}:#{password}@stream.twitter.com/1/statuses/sample.json")
begin
Yajl::HttpStream.get(url) do |status|
puts status.inspect
end
rescue Yajl::HttpStream::InvalidContentType
end
end
Now being a good citizen requires that if you start getting errors you need to back off your retry rate, so best to put that in place so it's not forgotten. Keep track of the number of consecutive errors, and switch to a while loop:
max_allowed_errors = 1200
consecutive_errors = 0
while consecutive_errors < max_allowed_errors do
url = URI.parse("http://#{username}:#{password}@stream.twitter.com/1/statuses/sample.json")
begin
Yajl::HttpStream.get(url) do |status|
consecutive_errors = 0
puts status.inspect
end
rescue Yajl::HttpStream::InvalidContentType
consecutive_errors += 1
end
sleep(0.25*consecutive_errors)
end
And you're more–or–less done. Of course it would make sense to wrap this code up into a re-usable class, so:
require 'uri'
require 'yajl/http_stream'
class Twitter
MAX_ALLOWED_ERRORS = 1200
def self.stream(username, password, &block)
url = URI.parse("http://#{username}:#{password}@stream.twitter.com/1/statuses/sample.json")
consecutive_errors = 0
while consecutive_errors < max_allowed_errors do
begin
Yajl::HttpStream.get(url, :symbolize_keys => true) do |status|
consecutive_errors = 0
yield(status)
end
rescue Yajl::HttpStream::InvalidContentType
consecutive_errors += 1
end
sleep(0.25*consecutive_errors)
end
end
end
You may have noticed a couple of other changes in the code above. I'm no longer outputting the status
but rather I'm yielding it to a block. I've also set an extra parameter on the Yajl::HttpStream.get
call to automatically symbolize the keys of all the objects returned.
Putting the code into action, I use it like the following:
Twitter.stream("mytwittername", "secret") do |status|
Tweet.create!(status)
end
The benefit of having our stream
method yield it to a block is that you now have the flexibility to do almost anything with each returned status. In my simple example above I've got it storing straight to a MongoDB
collection I've created, but I could just as easily add some additional information to the status object before saving:
Twitter.stream("mytwittername", "secret") do |status|
Tweet.create!(status.merge(
:weather => current_weather(status[:location]),
:mood => determine_mood(status[:text])
)
end
Or store the status, in addition to adding it to a queue for post-processing and aggregation by our army of machines in the cloud:
Twitter.stream("mytwittername", "secret") do |status|
Tweet.create!(status.merge(
:weather => current_weather(status[:location]),
:mood => determine_mood(status[:text])
)
TweetProcessors.queue(status)
end
In coming posts I'll expand on the previous examples, show you how to easily:
MongoDB
to store the data you needAmazon EC2
instances to help you deal with processing loadChef
involved to handle the provision and setup of your EC2
instances automaticallyMongoDB
instances across EC2