Implementing a "distributed" reporting server using some of postgres10 features.

Today i will try to show how strong Postgres 10 is by combining different features in order to create a "distributed" reporting server. The features that i will be using are :
  • Logical Replication
  • Partitioning
  • Foreign Data Wrappers
  • Table Inheritance 
The scenario that we want to implement is the following : 
We have one central point for inserts, that we will call Bucket, bucket is partitioned by range yearly.In my example we have 3 partitions for 2016, 2017, 2018 and each partition is logically replicated to 3 data nodes, each responsible for 1 year of data. Finally we have a reporting proxy that is responsible for all selects and connects to each node through foreign data wrappers.
The setup consists in 5 docker containers that have the following roles.
  • 10.0.0.2, bucket, insert / update / delete
  • 10.0.0.3, node2016, data holder for 2016
  • 10.0.0.4, node2017, data holder for 2017
  • 10.0.0.5, node2018, data holder for 2018
  • 10.0.0.6, reporting proxy, main point for selects 


Now lets start with the bucket :


CREATE TABLE data_bucket (
  id int , 
  data text, 
  insert_time timestamp without time zone DEFAULT now()) 
  PARTITION BY RANGE (insert_time);

CREATE TABLE data_p2016 PARTITION OF data_bucket 
  FOR VALUES FROM ('2016-01-01 00:00:00') TO ('2017-01-01 00:00:00');
CREATE TABLE data_p2017 PARTITION OF data_bucket 
  FOR VALUES FROM ('2017-01-01 00:00:00') TO ('2018-01-01 00:00:00');
CREATE TABLE data_p2018 PARTITION OF data_bucket 
  FOR VALUES FROM ('2018-01-01 00:00:00') TO ('2019-01-01 00:00:00');

create unique index data_p2016_uniq on data_p2016 (id);
create unique index data_p2017_uniq on data_p2017 (id);
create unique index data_p2018_uniq on data_p2018 (id);

create index data_p2016_time on data_p2016 (insert_time);
create index data_p2017_time on data_p2017 (insert_time);
create index data_p2018_time on data_p2018 (insert_time);

CREATE PUBLICATION pub_data_p2016 FOR TABLE data_p2016 
  WITH (publish='insert,update');
CREATE PUBLICATION pub_data_p2017 FOR TABLE data_p2017 
  WITH (publish='insert,update');
CREATE PUBLICATION pub_data_p2018 FOR TABLE data_p2018 
  WITH (publish='insert,update');

Here we created a data bucket table that we will insert into, its yearly partitions, some indexes for uniqueness and for searching dates, indexes are optional, more about that later on. Last we created 3 publications that we will use in our next step. Notice that we only replicate inserts and updates not deletes. Just keep that in mind for later.

Next step is setting up the data nodes. On each container (node2016, node2017 and node2018) run the following SQL :


-- node 2016
CREATE TABLE data_p2016 (
  id int,
  data text,
  insert_time timestamp without time zone );

create unique index data_p2016_uniq on data_p2016 (id);
create index data_p2016_time on data_p2016 (insert_time);

CREATE SUBSCRIPTION sub_data_p2016 
  CONNECTION 'dbname=monkey host=10.0.0.2 user=postgres port=5432'
  PUBLICATION pub_data_p2016;

-- node 2017

CREATE TABLE data_p2017 (
  id int,
  data text,
insert_time timestamp without time zone ) ;

create unique index data_p2017_uniq on data_p2017 (id);
create index data_p2017_time on data_p2017 (insert_time);

CREATE SUBSCRIPTION sub_data_p2017 
  CONNECTION 'dbname=monkey host=10.0.0.2 user=postgres port=5432'
  PUBLICATION pub_data_p2017;

-- node 2018

CREATE TABLE data_p2018 (
  id int,
  data text,
  insert_time timestamp without time zone ) ;

create unique index data_p2018_uniq on data_p2017 (id);
create index data_p2018_time on data_p2017 (insert_time);

CREATE SUBSCRIPTION sub_data_p2018 
  CONNECTION 'dbname=monkey host=10.0.0.2 user=postgres port=5432'
  PUBLICATION pub_data_p2018;

Here, for each node we create the data table, indexes and a subscription pointing to the bucket server.

Right now every row that gets into the bucket is being transferred to the appropriate node. One last thing is missing, putting everything together. For aggregating all nodes we have the reporting proxy container. In this server we need to run the following SQL statements :


create extension if not exists postgres_fdw;

CREATE SERVER data_node_2016 
  FOREIGN DATA WRAPPER postgres_fdw 
  OPTIONS(host '10.0.0.3',port '5432',dbname 'monkey');
CREATE SERVER data_node_2017 
  FOREIGN DATA WRAPPER postgres_fdw 
  OPTIONS(host '10.0.0.4',port '5432',dbname 'monkey');
CREATE SERVER data_node_2018 
  FOREIGN DATA WRAPPER postgres_fdw 
  OPTIONS(host '10.0.0.5',port '5432',dbname 'monkey');

CREATE USER MAPPING FOR postgres SERVER data_node_2016 OPTIONS(user 'postgres');
CREATE USER MAPPING FOR postgres SERVER data_node_2017 OPTIONS(user 'postgres');
CREATE USER MAPPING FOR postgres SERVER data_node_2018 OPTIONS(user 'postgres');

CREATE TABLE reporting_table (
  id int,
  data text, 
insert_time timestamp without time zone);

CREATE FOREIGN TABLE data_node_2016 (
  CHECK ( insert_time >= DATE '2016-01-01' AND insert_time < DATE '2017-01-01' ))
  INHERITS (reporting_table) SERVER data_node_2016 
  options (table_name 'data_p2016');
CREATE FOREIGN TABLE data_node_2017 (
  CHECK ( insert_time >= DATE '2017-01-01' AND insert_time < DATE '2018-01-01' ))
  INHERITS (reporting_table) SERVER data_node_2017 options (table_name 'data_p2017');
CREATE FOREIGN TABLE data_node_2018 (
  CHECK ( insert_time >= DATE '2018-01-01' AND insert_time < DATE '2019-01-01' ))
  INHERITS (reporting_table) SERVER data_node_2018 options (table_name 'data_p2018');

We first create the Postgres foreign data wrapper extension , create remote servers and user mappings for each data node, then create the main reporting table and finally we create three foreign tables, one for each node using table inheritance.
The structure is ready, everything is now connected and we should be good for testing. But before we test this let's describe what to expect. By inserting into data_bucket data should be replicated into yearly partitions, these partitions will be replicated to their data nodes and the reporting proxy should aggregate all nodes by using foreign scans. 
Let's insert some randomly generated data by inserting into the data_bucket:


insert into data_bucket
  select generate_series(1,1000000), 
  md5(random()::text),
  timestamp '2016-01-01 00:00:00' + random() * 
  (timestamp '2019-01-01 00:00:00' - timestamp '2016-01-01 00:00:00');

Data should be distributed into all three nodes. Now from reporting_table we created in the reporting proxy we should be able to see everything, notice the explain plans :


monkey=# select count (*) from reporting_table ;
  count
---------
 1000000
(1 row)

monkey=# select min (insert_time),max(insert_time) from reporting_table;
            min             |            max
----------------------------+----------------------------
 2016-01-01 00:03:17.062862 | 2018-12-31 23:59:39.671967
(1 row)

monkey=# explain analyze select min (insert_time),max(insert_time) from reporting_table;
                                                              QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=598.80..598.81 rows=1 width=16) (actual time=1708.333..1708.334 rows=1 loops=1)
   ->  Append  (cost=0.00..560.40 rows=7681 width=8) (actual time=0.466..1653.186 rows=1000000 loops=1)
         ->  Seq Scan on reporting_table  (cost=0.00..0.00 rows=1 width=8) (actual time=0.002..0.002 rows=0 loops=1)
         ->  Foreign Scan on data_node_2016  (cost=100.00..186.80 rows=2560 width=8) (actual time=0.464..544.597 rows=334088 loops=1)
         ->  Foreign Scan on data_node_2017  (cost=100.00..186.80 rows=2560 width=8) (actual time=0.334..533.149 rows=332875 loops=1)
         ->  Foreign Scan on data_node_2018  (cost=100.00..186.80 rows=2560 width=8) (actual time=0.323..534.776 rows=333037 loops=1)
 Planning time: 0.220 ms
 Execution time: 1709.252 ms
(8 rows)

monkey=# select * from reporting_table where insert_time = '2016-06-21 17:59:44';
 id | data | insert_time
----+------+-------------
(0 rows)

monkey=# select * from reporting_table where insert_time = '2016-06-21 17:59:44.154904';
 id  |               data               |        insert_time
-----+----------------------------------+----------------------------
 150 | 27da6c5606ea26d4ca51c6b642547d44 | 2016-06-21 17:59:44.154904
(1 row)

monkey=# explain analyze select * from reporting_table where insert_time = '2016-06-21 17:59:44.154904';
                                                      QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------
 Append  (cost=0.00..125.17 rows=7 width=44) (actual time=0.383..0.384 rows=1 loops=1)
   ->  Seq Scan on reporting_table  (cost=0.00..0.00 rows=1 width=44) (actual time=0.002..0.002 rows=0 loops=1)
         Filter: (insert_time = '2016-06-21 17:59:44.154904'::timestamp without time zone)
   ->  Foreign Scan on data_node_2016  (cost=100.00..125.17 rows=6 width=44) (actual time=0.381..0.381 rows=1 loops=1)
 Planning time: 0.172 ms
 Execution time: 0.801 ms
(6 rows)

Some might say that ok, but we have all the data in 2 places, which is true.. but do we actually need data in the bucket? Answer is no, we don't , we only need them in case we need to update. Remember that we set logical replication to only replicate insert and updates? This means that we can delete whatever we want from either the bucket or its partitions, so we can have any custom data retention, we can even truncate them if we want to remove data fast.
Now, is this solution perfect ? No, it's not, foreign data wrappers are obviously slower and they can't perform all operations but with each Postgres version they are getting better.


Thanks for reading.

Vasilis Ventirozos
OmniTI Computer Consulting

Comments

Popular posts from this blog

Accessing PostgreSQL data from AWS Lambda

Tuning checkpoints

AWS Aurora Postgres, not a great first impression