Implementing a "distributed" reporting server using some of postgres10 features.
Today i will try to show how strong Postgres 10 is by combining different features in order to create a "distributed" reporting server. The features that i will be using are :
- Logical Replication
- Partitioning
- Foreign Data Wrappers
- Table Inheritance
We have one central point for inserts, that we will call Bucket, bucket is partitioned by range yearly.In my example we have 3 partitions for 2016, 2017, 2018 and each partition is logically replicated to 3 data nodes, each responsible for 1 year of data. Finally we have a reporting proxy that is responsible for all selects and connects to each node through foreign data wrappers.
The setup consists in 5 docker containers that have the following roles.
Here, for each node we create the data table, indexes and a subscription pointing to the bucket server.
Right now every row that gets into the bucket is being transferred to the appropriate node. One last thing is missing, putting everything together. For aggregating all nodes we have the reporting proxy container. In this server we need to run the following SQL statements :
We first create the Postgres foreign data wrapper extension , create remote servers and user mappings for each data node, then create the main reporting table and finally we create three foreign tables, one for each node using table inheritance.
The structure is ready, everything is now connected and we should be good for testing. But before we test this let's describe what to expect. By inserting into data_bucket data should be replicated into yearly partitions, these partitions will be replicated to their data nodes and the reporting proxy should aggregate all nodes by using foreign scans.
Let's insert some randomly generated data by inserting into the data_bucket:
Data should be distributed into all three nodes. Now from reporting_table we created in the reporting proxy we should be able to see everything, notice the explain plans :
Some might say that ok, but we have all the data in 2 places, which is true.. but do we actually need data in the bucket? Answer is no, we don't , we only need them in case we need to update. Remember that we set logical replication to only replicate insert and updates? This means that we can delete whatever we want from either the bucket or its partitions, so we can have any custom data retention, we can even truncate them if we want to remove data fast.
Now, is this solution perfect ? No, it's not, foreign data wrappers are obviously slower and they can't perform all operations but with each Postgres version they are getting better.
Thanks for reading.
- 10.0.0.2, bucket, insert / update / delete
- 10.0.0.3, node2016, data holder for 2016
- 10.0.0.4, node2017, data holder for 2017
- 10.0.0.5, node2018, data holder for 2018
- 10.0.0.6, reporting proxy, main point for selects
Now lets start with the bucket :
CREATE TABLE data_bucket (
id int ,
data text,
insert_time timestamp without time zone DEFAULT now())
PARTITION BY RANGE (insert_time);
CREATE TABLE data_p2016 PARTITION OF data_bucket
FOR VALUES FROM ('2016-01-01 00:00:00') TO ('2017-01-01 00:00:00');
CREATE TABLE data_p2017 PARTITION OF data_bucket
FOR VALUES FROM ('2017-01-01 00:00:00') TO ('2018-01-01 00:00:00');
CREATE TABLE data_p2018 PARTITION OF data_bucket
FOR VALUES FROM ('2018-01-01 00:00:00') TO ('2019-01-01 00:00:00');
create unique index data_p2016_uniq on data_p2016 (id);
create unique index data_p2017_uniq on data_p2017 (id);
create unique index data_p2018_uniq on data_p2018 (id);
create index data_p2016_time on data_p2016 (insert_time);
create index data_p2017_time on data_p2017 (insert_time);
create index data_p2018_time on data_p2018 (insert_time);
CREATE PUBLICATION pub_data_p2016 FOR TABLE data_p2016
WITH (publish='insert,update');
CREATE PUBLICATION pub_data_p2017 FOR TABLE data_p2017
WITH (publish='insert,update');
CREATE PUBLICATION pub_data_p2018 FOR TABLE data_p2018
WITH (publish='insert,update');
Here we created a data bucket table that we will insert into, its yearly partitions, some indexes for uniqueness and for searching dates, indexes are optional, more about that later on. Last we created 3 publications that we will use in our next step. Notice that we only replicate inserts and updates not deletes. Just keep that in mind for later.
Next step is setting up the data nodes. On each container (node2016, node2017 and node2018) run the following SQL :
-- node 2016
CREATE TABLE data_p2016 (
id int,
data text,
insert_time timestamp without time zone );
create unique index data_p2016_uniq on data_p2016 (id);
create index data_p2016_time on data_p2016 (insert_time);
CREATE SUBSCRIPTION sub_data_p2016
CONNECTION 'dbname=monkey host=10.0.0.2 user=postgres port=5432'
PUBLICATION pub_data_p2016;
-- node 2017
CREATE TABLE data_p2017 (
id int,
data text,
insert_time timestamp without time zone ) ;
create unique index data_p2017_uniq on data_p2017 (id);
create index data_p2017_time on data_p2017 (insert_time);
CREATE SUBSCRIPTION sub_data_p2017
CONNECTION 'dbname=monkey host=10.0.0.2 user=postgres port=5432'
PUBLICATION pub_data_p2017;
-- node 2018
CREATE TABLE data_p2018 (
id int,
data text,
insert_time timestamp without time zone ) ;
create unique index data_p2018_uniq on data_p2017 (id);
create index data_p2018_time on data_p2017 (insert_time);
CREATE SUBSCRIPTION sub_data_p2018
CONNECTION 'dbname=monkey host=10.0.0.2 user=postgres port=5432'
PUBLICATION pub_data_p2018;
Here, for each node we create the data table, indexes and a subscription pointing to the bucket server.
Right now every row that gets into the bucket is being transferred to the appropriate node. One last thing is missing, putting everything together. For aggregating all nodes we have the reporting proxy container. In this server we need to run the following SQL statements :
create extension if not exists postgres_fdw;
CREATE SERVER data_node_2016
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS(host '10.0.0.3',port '5432',dbname 'monkey');
CREATE SERVER data_node_2017
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS(host '10.0.0.4',port '5432',dbname 'monkey');
CREATE SERVER data_node_2018
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS(host '10.0.0.5',port '5432',dbname 'monkey');
CREATE USER MAPPING FOR postgres SERVER data_node_2016 OPTIONS(user 'postgres');
CREATE USER MAPPING FOR postgres SERVER data_node_2017 OPTIONS(user 'postgres');
CREATE USER MAPPING FOR postgres SERVER data_node_2018 OPTIONS(user 'postgres');
CREATE TABLE reporting_table (
id int,
data text,
insert_time timestamp without time zone);
CREATE FOREIGN TABLE data_node_2016 (
CHECK ( insert_time >= DATE '2016-01-01' AND insert_time < DATE '2017-01-01' ))
INHERITS (reporting_table) SERVER data_node_2016
options (table_name 'data_p2016');
CREATE FOREIGN TABLE data_node_2017 (
CHECK ( insert_time >= DATE '2017-01-01' AND insert_time < DATE '2018-01-01' ))
INHERITS (reporting_table) SERVER data_node_2017 options (table_name 'data_p2017');
CREATE FOREIGN TABLE data_node_2018 (
CHECK ( insert_time >= DATE '2018-01-01' AND insert_time < DATE '2019-01-01' ))
INHERITS (reporting_table) SERVER data_node_2018 options (table_name 'data_p2018');
We first create the Postgres foreign data wrapper extension , create remote servers and user mappings for each data node, then create the main reporting table and finally we create three foreign tables, one for each node using table inheritance.
The structure is ready, everything is now connected and we should be good for testing. But before we test this let's describe what to expect. By inserting into data_bucket data should be replicated into yearly partitions, these partitions will be replicated to their data nodes and the reporting proxy should aggregate all nodes by using foreign scans.
Let's insert some randomly generated data by inserting into the data_bucket:
insert into data_bucket
select generate_series(1,1000000),
md5(random()::text),
timestamp '2016-01-01 00:00:00' + random() *
(timestamp '2019-01-01 00:00:00' - timestamp '2016-01-01 00:00:00');
Data should be distributed into all three nodes. Now from reporting_table we created in the reporting proxy we should be able to see everything, notice the explain plans :
monkey=# select count (*) from reporting_table ;
count
---------
1000000
(1 row)
monkey=# select min (insert_time),max(insert_time) from reporting_table;
min | max
----------------------------+----------------------------
2016-01-01 00:03:17.062862 | 2018-12-31 23:59:39.671967
(1 row)
monkey=# explain analyze select min (insert_time),max(insert_time) from reporting_table;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------
Aggregate (cost=598.80..598.81 rows=1 width=16) (actual time=1708.333..1708.334 rows=1 loops=1)
-> Append (cost=0.00..560.40 rows=7681 width=8) (actual time=0.466..1653.186 rows=1000000 loops=1)
-> Seq Scan on reporting_table (cost=0.00..0.00 rows=1 width=8) (actual time=0.002..0.002 rows=0 loops=1)
-> Foreign Scan on data_node_2016 (cost=100.00..186.80 rows=2560 width=8) (actual time=0.464..544.597 rows=334088 loops=1)
-> Foreign Scan on data_node_2017 (cost=100.00..186.80 rows=2560 width=8) (actual time=0.334..533.149 rows=332875 loops=1)
-> Foreign Scan on data_node_2018 (cost=100.00..186.80 rows=2560 width=8) (actual time=0.323..534.776 rows=333037 loops=1)
Planning time: 0.220 ms
Execution time: 1709.252 ms
(8 rows)
monkey=# select * from reporting_table where insert_time = '2016-06-21 17:59:44';
id | data | insert_time
----+------+-------------
(0 rows)
monkey=# select * from reporting_table where insert_time = '2016-06-21 17:59:44.154904';
id | data | insert_time
-----+----------------------------------+----------------------------
150 | 27da6c5606ea26d4ca51c6b642547d44 | 2016-06-21 17:59:44.154904
(1 row)
monkey=# explain analyze select * from reporting_table where insert_time = '2016-06-21 17:59:44.154904';
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------
Append (cost=0.00..125.17 rows=7 width=44) (actual time=0.383..0.384 rows=1 loops=1)
-> Seq Scan on reporting_table (cost=0.00..0.00 rows=1 width=44) (actual time=0.002..0.002 rows=0 loops=1)
Filter: (insert_time = '2016-06-21 17:59:44.154904'::timestamp without time zone)
-> Foreign Scan on data_node_2016 (cost=100.00..125.17 rows=6 width=44) (actual time=0.381..0.381 rows=1 loops=1)
Planning time: 0.172 ms
Execution time: 0.801 ms
(6 rows)
Some might say that ok, but we have all the data in 2 places, which is true.. but do we actually need data in the bucket? Answer is no, we don't , we only need them in case we need to update. Remember that we set logical replication to only replicate insert and updates? This means that we can delete whatever we want from either the bucket or its partitions, so we can have any custom data retention, we can even truncate them if we want to remove data fast.
Now, is this solution perfect ? No, it's not, foreign data wrappers are obviously slower and they can't perform all operations but with each Postgres version they are getting better.
Thanks for reading.
Vasilis Ventirozos
OmniTI Computer Consulting
Comments
Post a Comment