The Postgres Source Connector connects to a database with the provided
then will call
Ping and test the connection. If the
Ping fails, the
method will fail and the Connector will not be ready to Read.
Change Data Capture
This connector implements CDC features for PostgreSQL by reading WAL log events into a buffer that is checked on each Read request after the initial table Rows have been read.
When Open is called, the connector will attempt to start all necessary connections and will run some initial setup commands to create the logical replication slots it needs to run and start its own subscription.
The connector user specified in the connection URL must have sufficient privileges to run all of these commands or it will fail.
cdc field in the plugin.Configuration is set to any truthy value, it
will be enabled.
Publication and slot name are user configurable, and must be correctly set. The plugin will do what it can to be smart about publication and slot management, but it can't handle everything.
If a replication_url is provided, it will be used for CDC features instead of the url. If no replication_url is provided, but cdc is enabled, then it will attempt to use that url value for CDC features and logical replication setup.
Example configuration for CDC features:
Internal Event Buffer
There is a private variable bufferSize that dictates the size of the channel buffer that holds WAL events. If it's full, pushing to that channel will be a blocking operation, and thus execution will stop if the handler for WAL events cannot push into that buffer. That blocking execution could have unknown negative performance consequences, so we should have this be sufficiently high and possibly configured by environment variable. WAL Events
In all cases we should aim for consistency between WAL event records and
standard Row records, so any discrepancies between the formats of those should
be pointed out. I've addressed some where I've found them, for example the
integer handling in
withValues function treats all numbers as
that's how the row queries are handled as well.
Go and PostgreSQL Data Types
We handle the basic Postgres types for now, but we will need an exhaustive test suite of all the different data types that Postgres can handle.
The WAL uses Postgres' internal LSN (log sequence number) to track positions of WAL events, but they're not an exact science, and they are subject to wrap-around at their high end limit. This means that once the plugin is reading only WAL events, the LSN will likely be orders of magnitude higher than the row Position of the last read row. This could cause some issues, but can still be handled by the Connector setting Position back to 0 and reading the table from the start again.
However, that isn't the greatest long-term solution and we should handle this by adding a highwater mark for the last read Row in our database query.
The position argument let's the connector know what the current position of the Read is at. When you call Read, it takes that current position and attempts to get the next one. Because of this, we attempt to parse the Position as an integer and then increment it.
The read query will perform a lookup for a row where
key >= $1 where key is
the name of the key column and $1 is the value of the incremented Position.
plugins.ErrEndData is returned if no row can be found that matches this
Position currently references the
key column when querying.
key must be a column of integer, serial or bigserial type, or be a
string that can be parsed as an integer.
Positions are parsed to integers with
ParseInt and then parsed back to strings
withPosition respectively handle
key field is provided, then the connector will attempt to look up the
primary key column of the table. If that can't be determined it will error.
The config passed to
Open can be contain the following fields.
|table||the name of the table in Postgres that the connector should read||yes|
|url||formatted connection string to the database.||yes|
|columns||comma separated string list of column names that should be built in to each Record's payload.||no|
|key||column name that records should use for their ||no|
|cdc||enables CDC features||req. for CDC mode|
|publication_name||name of the publication to listen for WAL events||req. for CDC mode|
|slot_name||name of the slot opened for replication events||req. for CDC mode|
|replication_url||URL for the CDC connection to use. If no replication_url is provided, then the CDC connection attempts the use the ||optional in CDC mode|
If no column names are provided in the config, then the plugin will assume that all columns in the table should be queried. It will attempt to get the column names for the configured table and set them in memory.
Currently the Postgres source takes a
key property and uses that column name
to key records. The key column must be unique and must be able to be parsed
as an integer.
We plan to support alphanumeric and composite record keys soon.