Preamble

Working on moving a large part of data from our database into the offline storage, I and my team at JN Solutions have faced a problem of keeping it gzipped in the filesystem, but at the same time making it easy to send it for processing to Elastic Search. The obvious solution was to use Logstash as the middleman, but it doesn’t really handle archived data from the box.

Of course, if something doesn’t have a feature by itself – we can always add one.Let’s see how we can write an input plugin for Logstash to make it work.

NB: please note that the files and code given here is based on Logstash version 1.3.2. If you’re reading this in a distant future (hi there!) – YMMV.

First, let me point it out that despite being packed up in a jar file and looking like Java – Logstash is really a ruby tool, or JRuby in the standard distribution. When makes it trivially easy to modify and extend it for any Ruby developer, and a good place to start is by examining the code at GitHub repository

In general the things one would be interested in are inputs, outputs, codes and filters.

Building the codec

But let’s get back and focus at the task we have at hand:

1. We receive a message through a Redis list with an absolute path of the file that needs to be imported. 2. The file is expected to be a gzipped file, available to the Logstash server under the path provided. 3. Logstash has to unpack the file, parse it as a JSON data, and send it on for further processing.

Looking at the available inputs and codecs for Logstash, it seems that we could use redis input to interact with Redis, and json codec as the basis for our data processing. Alas, json codec expects its data to be readily available as a string, and we’re receiving file names instead, so that’s what we need to write.

Let’s start with a simple config file for Logstash:

input {
  redis {
    data_type => list
    # set list name in redis to gzfiles, and add it as the source field
    # to the final data
    key => gzfiles
    add_field => { source => gzfiles }
    debug => true

    # and this is the codec we need to implement
    codec => json_file_gz
  }
}

output {
  stdout {
    codec => rubydebug
  }
}

I’m going to ignore output here – let’s just leave it at debug output, as what matters here is codec => json_file_gz line, which indicates the codec plugin we need to write.

As we’re just loading and parsing JSON files, it makes sense to use json codec as the basis for the new codec. We’ll need a jar file of the logstash itself – just grab the one available on their website, and save in your working dir. Next, we need the source code of json code as the basis – grab the file here and save it under "logstash/codec/json_file_gz.rb" path in the same dir as the jar file (make note of the relative path – it IS important).

Open up the file, and the only thing we’re really interested in is its decode method:

def decode(data)
  data = @converter.convert(data)
  begin
    yield LogStash::Event.new(JSON.parse(data))
  rescue JSON::ParserError => e
    @logger.info("JSON parse failure. Falling back to plain-text", :error => e, :data => data)
    yield LogStash::Event.new("message" => data)
  end
end

And the key here is yield LogStash::Event.new(JSON.parse(data)) line – it just parses the incoming data as JSON, and sends it on its way. That’s our hook point – we need to use the data as the file name instead, read and unzip it, and only then parse it. Along the way, let’s get rid of charset conversion, as we’re in controlled environment and know exactly what encoding we’re going to get.

Using Ruby’s Zlib::GzipReader, the modification is trivial, just with some error handling making it more verbose:

def decode(path)
  begin
    json_data = Zlib::GzipReader.open(path) { |f| f.read }
  rescue Zlib::GzipFile::Error => e
    @logger.info('Gzip failure, skipped', :error => e, :data => data)
  end

  begin
    yield LogStash::Event.new(JSON.parse(json_data)) if json_data
  rescue JSON::ParserError => e
    @logger.info('JSON parse failure. Falling back to plain-text', :error => e, :data => data)
    yield LogStash::Event.new('message' => data)
  end
end

And that’s all. Change the name of the class and its config name, clean up the register method, and arrive to the final file similar to the one I have uploaded here

Packaging and testing

Now all that is left is to repack the jar file with the new codec file:

% jar -uf logstash-1.3.2-flatjar.jar logstash/codecs/json_file_gz.rb

and start logstash up:

% java -jar logstash-1.3.2-flatjar.jar agent -f test.conf -v

Given that we have redis server running locally, and a valid gzipped json file at /tmp/test.gz, we can make sure our codec works by sending it a message via redis cli:

% redis-cli
127.0.0.1:6379> LPUSH gzfiles '/tmp/test.gz'
(integer) 1

and if everything works as it should, you’ve got the file contents already printed in logstash’s output.

Got a question? Use the comments!