GZipped JSON files and Logstash
Preamble
Working on moving a large part of data from our database into the offline storage, I and my team at JN Solutions have faced a problem of keeping it gzipped in the filesystem, but at the same time making it easy to send it for processing to Elastic Search. The obvious solution was to use Logstash as the middleman, but it doesn’t really handle archived data from the box.
Of course, if something doesn’t have a feature by itself – we can always add one.Let’s see how we can write an input plugin for Logstash to make it work.
NB: please note that the files and code given here is based on Logstash version 1.3.2. If you’re reading this in a distant future (hi there!) – YMMV.
First, let me point it out that despite being packed up in a jar file and looking like Java – Logstash is really a ruby tool, or JRuby in the standard distribution. When makes it trivially easy to modify and extend it for any Ruby developer, and a good place to start is by examining the code at GitHub repository
In general the things one would be interested in are inputs, outputs, codes and filters.
Building the codec
But let’s get back and focus at the task we have at hand:
1. We receive a message through a Redis list with an absolute path of the file that needs to be imported. 2. The file is expected to be a gzipped file, available to the Logstash server under the path provided. 3. Logstash has to unpack the file, parse it as a JSON data, and send it on for further processing.
Looking at the available inputs and codecs for Logstash, it seems that we could
use redis input to interact with
Redis, and json codec as the basis
for our data processing. Alas, json
codec expects its data to be readily
available as a string, and we’re receiving file names instead, so that’s what
we need to write.
Let’s start with a simple config file for Logstash:
input {
redis {
data_type => list
# set list name in redis to gzfiles, and add it as the source field
# to the final data
key => gzfiles
add_field => { source => gzfiles }
debug => true
# and this is the codec we need to implement
codec => json_file_gz
}
}
output {
stdout {
codec => rubydebug
}
}
I’m going to ignore output
here – let’s just leave it at debug output,
as what matters here is codec => json_file_gz
line, which indicates the
codec plugin we need to write.
As we’re just loading and parsing JSON files, it makes sense to use json
codec as the basis for the new codec. We’ll need a jar file of the logstash
itself – just grab the one available on their website,
and save in your working dir. Next, we need the source code of json code as
the basis – grab the file
here and save it under "logstash/codec/json_file_gz.rb"
path in the same dir as
the jar file (make note of the relative path – it IS important).
Open up the file, and the only thing we’re really interested in is its decode method:
def decode(data)
data = @converter.convert(data)
begin
yield LogStash::Event.new(JSON.parse(data))
rescue JSON::ParserError => e
@logger.info("JSON parse failure. Falling back to plain-text", :error => e, :data => data)
yield LogStash::Event.new("message" => data)
end
end
And the key here is yield LogStash::Event.new(JSON.parse(data))
line – it
just parses the incoming data as JSON, and sends it on its way. That’s our hook
point – we need to use the data as the file name instead, read and unzip it,
and only then parse it. Along the way, let’s get rid of charset conversion,
as we’re in controlled environment and know exactly what encoding we’re going
to get.
Using Ruby’s Zlib::GzipReader, the modification is trivial, just with some error handling making it more verbose:
def decode(path)
begin
json_data = Zlib::GzipReader.open(path) { |f| f.read }
rescue Zlib::GzipFile::Error => e
@logger.info('Gzip failure, skipped', :error => e, :data => data)
end
begin
yield LogStash::Event.new(JSON.parse(json_data)) if json_data
rescue JSON::ParserError => e
@logger.info('JSON parse failure. Falling back to plain-text', :error => e, :data => data)
yield LogStash::Event.new('message' => data)
end
end
And that’s all. Change the name of the class and its config name, clean up the register method, and arrive to the final file similar to the one I have uploaded here
Packaging and testing
Now all that is left is to repack the jar file with the new codec file:
% jar -uf logstash-1.3.2-flatjar.jar logstash/codecs/json_file_gz.rb
and start logstash up:
% java -jar logstash-1.3.2-flatjar.jar agent -f test.conf -v
Given that we have redis server running locally, and a valid gzipped json file
at /tmp/test.gz
, we can make sure our codec works by sending it a message
via redis cli:
% redis-cli
127.0.0.1:6379> LPUSH gzfiles '/tmp/test.gz'
(integer) 1
and if everything works as it should, you’ve got the file contents already printed in logstash’s output.
Got a question? Use the comments!
Comments