Tuesday, 26 August 2014

PostgreSQL and ElasticSearch

Recently i had to evaluate ElasticSearch for a possible installation, elasticsearch is basically a search server that provides a distributed full-text search engine using a restful web interface, stores documents in json, it is written in java it is fast and really works out of the box with almost minimum effort. After the installation, it's just reading the documentation and adding / searching documents, I didn't really experiment much with searching but the API looks really good.
One interesting question that i had to answer was about connectivity with postgres, and how to maintain a table in both datastores and that's what this post is all about.

The first (fast and easy) answer here was rivers, it creates a jdbc connection with another datastore and based on a query it can pump data from any database table. It is available for postgres, twitter, mongo etc.. Because its jdbc its relatively slow and elasticsearch will (re)pump the data once every restart so pay extra attention if you use this and read the documentation first.

One other way is to use LISTEN/NOTIFY commands of postgres which is basically a message queueing system. The idea is to raise a notification on every insert, a deamon would grab that and insert the record into elasticsearch..

For a single postgres table it would work like this :

DROP TABLE IF EXISTS messages CASCADE;
create table messages (
 id serial primary key,
 date timestamp without time zone,
 carrier text,
 message text
); 


CREATE OR REPLACE FUNCTION table_message_notify() RETURNS trigger AS $$
DECLARE
BEGIN
  PERFORM pg_notify('table_messages_notifier',CAST(NEW.id AS text));
   RETURN NEW;
  END;
$$ LANGUAGE plpgsql;


CREATE TRIGGER object_post_insert_notify AFTER insert ON messages FOR EACH ROW EXECUTE PROCEDURE table_message_notify();

This will simply send a notification on 'table_messages_notifier' channel after an insert that a new record has been inserted. Now you need something that would grab and handle these notifications, i tried with various ways like python requests, but after a while i just did it with python elasticsearch library, and my life was suddenly much easier :). Here's the python script that i end up having to work pretty well (managed to replicate about 50m rows with no errors).

NOTE that i've intentionally left garbage in the code just to show alternatives that i personally tried. Also, this was just a proof of concept and not an actual properly working solution, but it should be enough for someone who knows what he is doing to create a deamon that would actually work even in production.



Thanks for reading
-- Vasilis

2 comments:

  1. I take a similar approach and it has been work well. One thing to be wary of when relying on the content of the notification itself for indexing is that you risk missing some rows if they are inserted whilst your python script is not running.

    To avoid this, I have an xid column in my table with a default of txid_current(). My ES indexing script (triggered by the notification in a similar select loop) then finds the last_indexed_xmin from an ancillary document in ES and indexes all rows with xid >= last_indexed_xmin, finally updating the last_indexed_xmin to the current txid_snapshot_xmin(txid_current_snapshot()) in the ancillary document.

    ReplyDelete
  2. yeah, i was thinking something similar in case i used this for production. Something else that would work would be a status column stating the "replication status" of a row, that way would give you a potential flexibility if you want to partially replicate "some" data.

    ReplyDelete