Flink Connector
This is the official Apache Flink Sink Connector supported by ClickHouse. It is built using Flink's AsyncSinkBase and the official ClickHouse java client.
The connector supports Apache Flink's DataStream API. Table API support is planned for a future release.
Requirements
- Java 11+ (for Flink 1.17+) or 17+ (for Flink 2.0+)
- Apache Flink 1.17+
Flink version compatibility matrix
The connector is split into two artifacts to support both Flink 1.17+ and Flink 2.0+. Choose the artifact that matches your desired Flink version:
| Flink Version | Artifact | ClickHouse Java Client Version | Required Java |
|---|---|---|---|
| latest | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 2.0.1 | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 2.0.0 | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 1.20.2 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.19.3 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.18.1 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.17.2 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
The connector has not been tested against Flink versions earlier than 1.17.2
Installation & setup
Import as a dependency
For Flink 2.0+
- Maven
- Gradle
- SBT
For Flink 1.17+
- Maven
- Gradle
- SBT
Download the binary
The name pattern of the binary JAR is:
where:
flink_versionis one of2.0.0or1.17stable_versionis a stable artifact release version
You can find all available released JAR files in the Maven Central Repository.
Using the DataStream API
Snippet
Let's say you want to insert raw CSV data into ClickHouse:
- Java
More examples and snippets can be found in our tests:
Quick start example
We have created maven-based example for an easy start with the ClickHouse Sink:
For more detailed instructions, see the Example Guide
DataStream API connection options
ClickHouse client options
| Parameters | Description | Default Value | Required |
|---|---|---|---|
url | Fully qualified Clickhouse URL | N/A | Yes |
username | ClickHouse database username | N/A | Yes |
password | ClickHouse database password | N/A | Yes |
database | ClickHouse database name | N/A | Yes |
table | ClickHouse table name | N/A | Yes |
options | Map of Java client configuration options | Empty map | No |
serverSettings | Map of ClickHouse server session settings | Empty map | No |
enableJsonSupportAsString | ClickHouse server setting to expect a JSON formatted String for the JSON data type | true | No |
options and serverSettings should be passed to the client as Map<String, String>. An empty map for either will use client or server defaults, respectively.
All available Java client options are listed in ClientConfigProperties.java and this documentation page.
All available server session settings are listed in this documentation page.
For example:
- Java
Sink options
The following options come directly from Flink's AsyncSinkBase:
| Parameters | Description | Default Value | Required |
|---|---|---|---|
maxBatchSize | Maximum number of records inserted in a single batch | N/A | Yes |
maxInFlightRequests | The maximum number of in flight requests allowed before the sink applies backpressure | N/A | Yes |
maxBufferedRequests | The maximum number of records that may be buffered in the sink before backpressure is applied | N/A | Yes |
maxBatchSizeInBytes | The maximum size (in bytes) a batch may become. All batches sent will be smaller than or equal to this size | N/A | Yes |
maxTimeInBufferMS | The maximum time a record may stay in the sink before being flushed | N/A | Yes |
maxRecordSizeInBytes | The maximum record size that the sink will accept, records larger than this will be automatically rejected | N/A | Yes |
Supported data types
The table below provides a quick reference for converting data types when inserting from Flink into ClickHouse.
Inserting data from Flink into ClickHouse
| Java Type | ClickHouse Type | Supported | Serialization Method |
|---|---|---|---|
byte/Byte | Int8 | ✅ | DataWriter.writeInt8 |
short/Short | Int16 | ✅ | DataWriter.writeInt16 |
int/Integer | Int32 | ✅ | DataWriter.writeInt32 |
long/Long | Int64 | ✅ | DataWriter.writeInt64 |
BigInteger | Int128 | ✅ | DataWriter.writeInt128 |
BigInteger | Int256 | ✅ | DataWriter.writeInt256 |
short/Short | UInt8 | ✅ | DataWriter.writeUInt8 |
int/Integer | UInt8 | ✅ | DataWriter.writeUInt8 |
int/Integer | UInt16 | ✅ | DataWriter.writeUInt16 |
long/Long | UInt32 | ✅ | DataWriter.writeUInt32 |
long/Long | UInt64 | ✅ | DataWriter.writeUInt64 |
BigInteger | UInt64 | ✅ | DataWriter.writeUInt64 |
BigInteger | UInt128 | ✅ | DataWriter.writeUInt128 |
BigInteger | UInt256 | ✅ | DataWriter.writeUInt256 |
BigDecimal | Decimal | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal32 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal64 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal128 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal256 | ✅ | DataWriter.writeDecimal |
float/Float | Float | ✅ | DataWriter.writeFloat32 |
double/Double | Double | ✅ | DataWriter.writeFloat64 |
boolean/Boolean | Boolean | ✅ | DataWriter.writeBoolean |
String | String | ✅ | DataWriter.writeString |
String | FixedString | ✅ | DataWriter.writeFixedString |
LocalDate | Date | ✅ | DataWriter.writeDate |
LocalDate | Date32 | ✅ | DataWriter.writeDate32 |
LocalDateTime | DateTime | ✅ | DataWriter.writeDateTime |
ZonedDateTime | DateTime | ✅ | DataWriter.writeDateTime |
LocalDateTime | DateTime64 | ✅ | DataWriter.writeDateTime64 |
ZonedDateTime | DateTime64 | ✅ | DataWriter.writeDateTime64 |
int/Integer | Time | ❌ | N/A |
long/Long | Time64 | ❌ | N/A |
byte/Byte | Enum8 | ✅ | DataWriter.writeInt8 |
int/Integer | Enum16 | ✅ | DataWriter.writeInt16 |
java.util.UUID | UUID | ✅ | DataWriter.writeIntUUID |
String | JSON | ✅ | DataWriter.writeJSON |
Array<Type> | Array<Type> | ✅ | DataWriter.writeArray |
Map<K,V> | Map<K,V> | ✅ | DataWriter.writeMap |
Tuple<Type,..> | Tuple<T1,T2,..> | ✅ | DataWriter.writeTuple |
Object | Variant | ❌ | N/A |
Notes:
- A
ZoneIdmust be provided when performing date operations. - Precision and scale must be provided when performing decimal operations.
- In order for ClickHouse to parse a Java String as JSON, you need to enable
enableJsonSupportAsStringinClickHouseClientConfig. - The connector requires an
ElementConvertorto map elements in the input DataStream to ClickHouse payloads. To this end, the connector providesClickHouseConvertorandPOJOConvertor, which you can use to implement this mapping using theDataWriterserialization methods above.
Supported input formats
You can find the list of available ClickHouse input formats in this documentation page and ClickHouseFormat.java.
To specify the format the connector should use to serialize your DataStream to ClickHouse payloads, use the setClickHouseFormat function. For example:
By default, the connector will use either RowBinaryWithDefaults or RowBinary if setSupportDefault in ClickHouseClientConfig is explicitly set to true or false, respectively.
Metrics
The connector exposes the following additional metrics on top of Flink's existing metrics:
| Metric | Description | Type | Status |
|---|---|---|---|
numBytesSend | Total number of bytes sent to ClickHouse | Counter | ✅ |
numRecordSend | Total number of records sent to ClickHouse | Counter | ✅ |
numRequestSubmitted | Total number of requests sent (actual number of flushes performed) | Counter | ✅ |
numOfDroppedBatches | Total number of batches dropped due to non-retryable failures | Counter | ✅ |
numOfDroppedRecords | Total number of records dropped due to non-retryable failures | Counter | ✅ |
totalBatchRetries | Total number of batch retries due to retryable failures | Counter | ✅ |
writeLatencyHistogram | Histogram of successful write latency distribution (ms) | Histogram | ✅ |
writeFailureLatencyHistogram | Histogram of failed write latency distribution (ms) | Histogram | ✅ |
triggeredByMaxBatchSizeCounter | Total number of flushes triggered by reaching maxBatchSize | Counter | ✅ |
triggeredByMaxBatchSizeInBytesCounter | Total number of flushes triggered by reaching maxBatchSizeInBytes | Counter | ✅ |
triggeredByMaxTimeInBufferMSCounter | Total number of flushes triggered by reaching maxTimeInBufferMS | Counter | ✅ |
actualRecordsPerBatch | Histogram of actual batch size distribution | Histogram | ✅ |
actualBytesPerBatch | Histogram of actual bytes per batch distribution | Histogram | ✅ |
Limitations
- The sink currently provides an at-least-once delivery guarantee. Work toward exactly-once semantics is being tracked here.
- The sink does not yet support a dead-letter queue (DLQ) for buffering unprocessable messages. In the meantime, the connector will stop processing at the first row it is unable to handle. This feature is being tracked here.
- The sink does not yet support creation via Flink's Table API or Flink SQL. This feature is being tracked here.
ClickHouse version compatibility and security
- The connector is tested against a range of recent ClickHouse versions, including latest and head, via a daily CI workflow. The tested versions are updated periodically as new ClickHouse releases become active. See here for the versions the connector is tested against daily.
- See the ClickHouse security policy for known security vulnerabilities and how to report a vulnerability.
- We recommend upgrading the connector continuously to not miss security fixes and new improvements.
- If you have an issue with migration, please create a GitHub issue and we will respond!
Advanced and recommended usage
- For optimal performance, ensure your DataStream element type is not a Generic type - see here for Flink's type distinction. Non-generic elements will avoid the serialization overhead incurred by Kryo and improve throughput to ClickHouse.
- We recommend setting
maxBatchSizeto at least 1000 and ideally between 10,000 to 100,000. See this guide on bulk inserts for more information.
Contributing and support
If you'd like to contribute to the project or report any issues, we welcome your input! Visit our GitHub repository to open an issue, suggest improvements, or submit a pull request.
Contributions are welcome! Please check the contribution guide in the repository before starting. Thank you for helping improve the ClickHouse Flink connector!