Flink sql 实现 -connection-clickhouse的 source和 sink

  • A+
所属分类:flink

1. 场景

Flink sql 实现 -connection-clickhouse的 source和 sink

2. 版本

mysql flink clickhouse
5.7.20-log flink-1.13.1 20.11.4.13
5.7.20-log flink-1.13.2 20.11.4.13
5.7.20-log flink-1.13.5 20.11.4.13

flink 连接clickhouse 的包

3. 代码的自定义结构图

Flink sql 实现 -connection-clickhouse的 source和 sink

4. 代码的pom 文件

4.1 pom 文件

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.glab</groupId>
  <artifactId>flink-connector-clickhouse</artifactId>
  <version>13.1</version>

  <name>flink-connector-clickhouse</name>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.13.1</flink.version>
    <scala.binary.version>2.11</scala.binary.version>
    <clickhouse-jdbc-version>0.3.0</clickhouse-jdbc-version>
  </properties>

  <packaging>jar</packaging>

  <dependencies>
    <dependency>
      <groupId>ru.yandex.clickhouse</groupId>
      <artifactId>clickhouse-jdbc</artifactId>
      <version>${clickhouse-jdbc-version}</version>
      <scope>provided</scope>
      <exclusions>
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>30.1.1-jre</version>
    </dependency>


    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpclient</artifactId>
      <version>4.5.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpmime</artifactId>
      <version>4.5.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpcore</artifactId>
      <version>4.4.4</version>
    </dependency>
    <dependency>
      <groupId>commons-logging</groupId>
      <artifactId>commons-logging</artifactId>
      <version>1.2</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-core</artifactId>
      <version>1.2.3</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-classic</artifactId>
      <version>1.2.3</version>
      <scope>provided</scope>
    </dependency>

    <!--kafak connector 测试用-->

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-csv</artifactId>
      <version>${flink.version}</version>
      <scope>test</scope>
    </dependency>

    <!-- Table ecosystem -->
    <!-- Projects depending on this project won't depend on flink-table-*. --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> <!--<optional>true</optional>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- test dependencies --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <!--<type>test-jar</type>--> <scope>provided</scope> </dependency> <!-- A planner dependency won't be necessary once FLIP-32 has been completed. -->
<!--    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>-->

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
          <encoding>UTF-8</encoding>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>2.2</version>
        <configuration>
          <shadedArtifactAttached>true</shadedArtifactAttached>
          <outputFile>out/flink-connector-clickhouse-${pom.version}.jar</outputFile>
          <artifactSet>
            <includes>
              <include>*:*</include>
            </includes>
          </artifactSet>
          <filters>
            <filter>
              <artifact>*:*</artifact>
              <excludes>
                <exclude>META-INF/*.SF</exclude>
                <exclude>META-INF/*.DSA</exclude>
                <exclude>META-INF/*.RSA</exclude>
              </excludes>
            </filter>
          </filters>
        </configuration>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.0.0</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>


4.2. ClickHouseDynamicTableFactory.java

package com.glab.flink.connector.clickhouse.table;

import com.glab.flink.connector.clickhouse.table.internal.dialect.ClickHouseDialect;
import com.glab.flink.connector.clickhouse.table.internal.options.ClickHouseOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.*;
import org.apache.flink.table.utils.TableSchemaUtils;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

public class ClickHouseDynamicTableFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory {
   
    public static final String IDENTIFIER = "clickhouse";

    private static final String DRIVER_NAME = "ru.yandex.clickhouse.ClickHouseDriver";

    public static final ConfigOption<String> URL = ConfigOptions.key("url")
            .stringType()
            .noDefaultValue()
            .withDeprecatedKeys("the ClickHouse url in format `clickhouse://<host>:<port>`.");
    public static final ConfigOption<String> USERNAME = ConfigOptions.key("username")
            .stringType()
            .noDefaultValue()
            .withDescription("the ClickHouse username.");

    public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password")
            .stringType()
            .noDefaultValue()
            .withDescription("the ClickHouse password.");

    public static final ConfigOption<String> DATABASE_NAME = ConfigOptions.key("database-name")
            .stringType()
            .defaultValue("default")
            .withDescription("the ClickHouse database name. Default to `default`.");

    public static final ConfigOption<String> TABLE_NAME = ConfigOptions.key("table-name")
            .stringType()
            .noDefaultValue()
            .withDescription("the ClickHouse table name.");

    public static final ConfigOption<Integer> SINK_BATCH_SIZE = ConfigOptions.key("sink.batch-size")
            .intType()
            .defaultValue(Integer.valueOf(1000))
            .withDescription("the flush max size, over this number of records, will flush data. The default value is 1000.");

    public static final ConfigOption<Duration> SINK_FLUSH_INTERVAL = ConfigOptions.key("sink.flush-interval")
            .durationType()
            .defaultValue(Duration.ofSeconds(1L))
            .withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The default value is 1s.");

    public static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions.key("sink.max-retries")
            .intType()
            .defaultValue(Integer.valueOf(3))
            .withDescription("the max retry times if writing records to database failed.");

    public static final ConfigOption<Boolean> SINK_WRITE_LOCAL = ConfigOptions.key("sink.write-local")
            .booleanType()
            .defaultValue(Boolean.valueOf(false))
            .withDescription("directly write to local tables in case of Distributed table.");

    public static final ConfigOption<String> SINK_PARTITION_STRATEGY = ConfigOptions.key("sink.partition-strategy")
            .stringType()
            .defaultValue("balanced")
            .withDescription("partition strategy. available: balanced, hash, shuffle.");

    public static final ConfigOption<String> SINK_PARTITION_KEY = ConfigOptions.key("sink.partition-key")
            .stringType()
            .noDefaultValue()
            .withDescription("partition key used for hash strategy.");

    public static final ConfigOption<Boolean> SINK_IGNORE_DELETE = ConfigOptions.key("sink.ignore-delete")
            .booleanType()
            .defaultValue(Boolean.valueOf(true))
            .withDescription("whether to treat update statements as insert statements and ignore deletes. defaults to true.");

    public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS = ConfigOptions.key("lookup.cache.max-rows")
            .longType()
            .defaultValue(-1L)
            .withDescription("the max number of rows of lookup cache, over this value, the oldest rows will be eliminated." +
                    "cache.max-rows and cache ttl options must all be specified id any of them is specified. cache is not enabled as default.");

    public static final ConfigOption<Duration> LOOKUP_CACHE_TTL = ConfigOptions.key("lookup.cache.ttl")
            .durationType()
            .defaultValue(Duration.ofSeconds(10))
            .withDescription("the cache time to live");

    public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES = ConfigOptions.key("lookup.max-retries")
            .intType()
            .defaultValue(3)
            .withDescription("the max retry times if lookup database failed.");

    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
   
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        ReadableConfig config = helper.getOptions();
        helper.validate();
        try {
   
            validateConfigOptions(config);
        } catch (Exception e) {
   
            e.printStackTrace();
        }

        //带New的使用1.13API,不带的用12的
        ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema();
        return new ClickHouseDynamicTableSource(resolvedSchema, getOptions(config), getJdbcLookupOptions(config));

    }

    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
   
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        ReadableConfig config = helper.getOptions();
        helper.validate();
        try {
   
            validateConfigOptions(config);
        } catch (Exception e) {
   
            e.printStackTrace();
        }

        //带New的使用1.13API,不带的用12的
        ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema();
        return new ClickHouseDynamicTableSink(resolvedSchema, getOptions(config));
    }

    @Override
    public String factoryIdentifier() {
   
        return IDENTIFIER;
    }

    @Override
    public Set<ConfigOption<?>> requiredOptions() {
   
        Set<ConfigOption<?>> requiredOptions = new HashSet<>();
        requiredOptions.add(URL);
        requiredOptions.add(TABLE_NAME);
        return requiredOptions;
    }

    @Override
    public Set<ConfigOption<?>> optionalOptions() {
   
        Set<ConfigOption<?>> optionalOptions = new HashSet<>();
        optionalOptions.add(USERNAME);
        optionalOptions.add(PASSWORD);
        optionalOptions.add(DATABASE_NAME);
        optionalOptions.add(SINK_BATCH_SIZE);
        optionalOptions.add(SINK_FLUSH_INTERVAL);
        optionalOptions.add(SINK_MAX_RETRIES);
        optionalOptions.add(SINK_WRITE_LOCAL);
        optionalOptions.add(SINK_PARTITION_STRATEGY);
        optionalOptions.add(SINK_PARTITION_KEY);
        optionalOptions.add(SINK_IGNORE_DELETE);
        optionalOptions.add(LOOKUP_CACHE_MAX_ROWS);
        optionalOptions.add(LOOKUP_CACHE_TTL);
        optionalOptions.add(LOOKUP_MAX_RETRIES);
        return optionalOptions;
    }

    private void validateConfigOptions(ReadableConfig config) throws Exception{
   
        String partitionStrategy = config.get(SINK_PARTITION_STRATEGY);
        if (!Arrays.asList(new String[] {
    "hash", "balanced", "shuffle" }).contains(partitionStrategy))
            throw new IllegalArgumentException("Unknown sink.partition-strategy `" + partitionStrategy + "`");
        if (partitionStrategy.equals("hash") && !config.getOptional(SINK_PARTITION_KEY).isPresent())
            throw new IllegalArgumentException("A partition key must be provided for hash partition strategy");
        if ((config.getOptional(USERNAME).isPresent() ^ config.getOptional(PASSWORD).isPresent()))
            throw new IllegalArgumentException("Either all or none of username and password should be provided");
    }

    private ClickHouseOptions getOptions(ReadableConfig config) {
   
        return (new ClickHouseOptions.Builder()).withUrl((String)config.get(URL))
                .withUsername((String)config.get(USERNAME))
                .withPassword((String)config.get(PASSWORD))
                .withDatabaseName((String)config.get(DATABASE_NAME))
                .withTableName((String)config.get(TABLE_NAME))
                .withBatchSize(((Integer)config.get(SINK_BATCH_SIZE)).intValue())
                .withFlushInterval((Duration)config.get(SINK_FLUSH_INTERVAL))
                .withMaxRetries(((Integer)config.get(SINK_MAX_RETRIES)).intValue())
                .withWriteLocal((Boolean)config.get(SINK_WRITE_LOCAL))
                .withPartitionStrategy((String)config.get(SINK_PARTITION_STRATEGY))
                .withPartitionKey((String)config.get(SINK_PARTITION_KEY))
                .withIgnoreDelete(((Boolean)config.get(SINK_IGNORE_DELETE)).booleanValue())
                .setDialect(new ClickHouseDialect())
                .build();
    }

/*    private JdbcOptions getJdbcOptions(ReadableConfig config) {
   
        return JdbcOptions.builder()
                .setDriverName(DRIVER_NAME)
                .setDBUrl(config.get(URL))
                .setTableName(config.get(TABLE_NAME))
                .setDialect(new ClickHouseDialect())
                .build();
    }*/


    private JdbcLookupOptions getJdbcLookupOptions(ReadableConfig config) {
   
        return JdbcLookupOptions.builder()
                .setCacheExpireMs(config.get(LOOKUP_CACHE_TTL).toMillis())
                .setMaxRetryTimes(config.get(LOOKUP_MAX_RETRIES))
                .setCacheMaxSize(config.get(LOOKUP_CACHE_MAX_ROWS))
                .build();
    }

}

4.3 ClickHouseDynamicTableSink.java

package com.glab.flink.connector.clickhouse.table;

import com.glab.flink.connector.clickhouse.table.internal.AbstractClickHouseSinkFunction;
import com.glab.flink.connector.clickhouse.table.internal.options.ClickHouseOptions;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;


public class ClickHouseDynamicTableSink implements DynamicTableSink {
   
    private final ResolvedSchema resolvedSchema;

    private final ClickHouseOptions options;

    public ClickHouseDynamicTableSink(ResolvedSchema resolvedSchema, ClickHouseOptions options) {
   
        this.resolvedSchema = resolvedSchema;
        this.options = options;
    }

    @Override
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
   
        validatePrimaryKey(requestedMode);
        return ChangelogMode.newBuilder()
                .addContainedKind(RowKind.INSERT)
                .addContainedKind(RowKind.UPDATE_AFTER)
                .addContainedKind(RowKind.DELETE)
                .build();
    }

    private void validatePrimaryKey(ChangelogMode requestedMode) {
   
        Preconditions.checkState((ChangelogMode.insertOnly().equals(requestedMode) || this.resolvedSchema.getPrimaryKey().isPresent()), "please declare primary key for sink table when query contains update/delete record.");
    }

    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
   
        AbstractClickHouseSinkFunction sinkFunction =
                (new AbstractClickHouseSinkFunction.Builder())
                        .withOptions(this.options)
                        .withFieldNames(this.resolvedSchema.getColumnNames())
                        .withFieldDataTypes(this.resolvedSchema.getColumnDataTypes())
                        .withPrimaryKey(this.resolvedSchema.getPrimaryKey())
                        .withRowDataTypeInfo(context.createTypeInformation(this.resolvedSchema.toSinkRowDataType()))
                        .build();
        return SinkFunctionProvider.of(sinkFunction);
    }

    @Override
    public ClickHouseDynamicTableSink copy() {
   
        return new ClickHouseDynamicTableSink(this.resolvedSchema, this.options);
    }

    @Override
    public String asSummaryString() {
   
        return "ClickHouse sink";
    }
}

4.4 ClickHouseDynamicTableSource.java

package com.glab.flink.connector.clickhouse.table;

import com.glab.flink.connector.clickhouse.table.internal.ClickHouseRowDataLookupFunction;
import com.glab.flink.connector.clickhouse.table.internal.dialect.ClickHouseDialect;
import com.glab.flink.connector.clickhouse.table.internal.options.ClickHouseOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
import org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.*;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.http.client.utils.URIBuilder;


public class ClickHouseDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsLimitPushDown {
   

    private final ResolvedSchema resolvedSchema;

    private final ClickHouseOptions options;

    private final JdbcLookupOptions lookupOptions;
    private long limit = -1;

    public ClickHouseDynamicTableSource(ResolvedSchema resolvedSchema, ClickHouseOptions options, JdbcLookupOptions lookupOptions) {
   
        this.resolvedSchema = resolvedSchema;
        this.options = options;
        this.lookupOptions = lookupOptions;
    }

    @Override
    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
   
        String[] keyNames = new String[lookupContext.getKeys().length];
        for(int i = 0; i <keyNames.length; i++) {
   
            int[] innerKeyArr = lookupContext.getKeys()[i];
            Preconditions.checkArgument(innerKeyArr.length == 1, "JDBC only support non-nested look up keys");
            keyNames[i] = resolvedSchema.getColumnNames().get(innerKeyArr[0]);
        }

        final RowType rowType = (RowType)resolvedSchema.toSourceRowDataType().getLogicalType();
        ClickHouseRowDataLookupFunction lookupFunction =
                new ClickHouseRowDataLookupFunction(options, lookupOptions,
                        resolvedSchema.getColumnNames().stream().toArray(String[]::new),
                        resolvedSchema.getColumnDataTypes().stream().toArray(DataType[]::new), keyNames, rowType);
        return TableFunctionProvider.of(lookupFunction);
    }


    @Override
    public ChangelogMode getChangelogMode() {
   
        return ChangelogMode.newBuilder()
                .addContainedKind(RowKind.INSERT)
                .build();
    }


    //仅供数据探查
    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext){
   
        ClickHouseDialect dialect = (ClickHouseDialect)options.getDialect();
        String query = dialect.getSelectFromStatement(options.getTableName(), resolvedSchema.getColumnNames().stream().toArray(String[]::new), new String[0]);

        //1.13支持SupportsLimitPushDown,不然数据太大直接卡死了
        if(limit >= 0) {
   
            query = String.format("%s %s", query, dialect.getLimitClause(limit));
        }

        RowType rowType = (RowType)resolvedSchema.toSourceRowDataType().getLogicalType();
        getJdbcUrl(options.getUrl(), options.getDatabaseName());
        JdbcRowDataInputFormat build = JdbcRowDataInputFormat.builder()
                .setDrivername(options.getDialect().defaultDriverName().get())
                .setDBUrl(getJdbcUrl(options.getUrl(), options.getDatabaseName()))
                .setUsername(options.getUsername().orElse(null))
                .setPassword(options.getPassword().orElse(null))
                .setQuery(query)
                .setRowConverter(dialect.getRowConverter(rowType))
                .setRowDataTypeInfo(scanContext.createTypeInformation(resolvedSchema.toSourceRowDataType()))
                .build();
        return InputFormatProvider.of(build);
    }

    @Override
    public DynamicTableSource copy() {
   
        ClickHouseDynamicTableSource tableSource = new ClickHouseDynamicTableSource(resolvedSchema, options, lookupOptions);
        return tableSource;
    }

    @Override
    public String asSummaryString() {
   
        return "clickhouse source";
    }

    private String getJdbcUrl(String url, String dbName) {
   
        try {
   
            return "jdbc:" + (new URIBuilder(url)).setPath("/" + dbName).build().toString();
        }catch (Exception e) {
   
            throw new RuntimeException("get JDBC url failed.", e);
        }
    }

    @Override
    public void applyLimit(long limit) {
   
        this.limit = limit;
    }
}

4.5 其他的类代码上传
代码地址
flink 13 自定义的clickhouse 的source 和 sink 的 自定义 https://download.csdn.net/download/wudonglianga/86501949

4.5.1 flink 所含的包

[root@node01 flink-1.13.1]# cd lib/
[root@node01 lib]# ll
总用量 384180
-rw-r--r-- 1 root      root      358385 827 18:39 clickhouse-jdbc-0.3.0.jar
-rw-r--r-- 1 root      root     4585064 828 22:51 flink-connector-clickhouse-13.1-jar-with-dependencies.jar
-rw-r--r-- 1 root      root      248980 828 22:12 flink-connector-jdbc_2.11-1.13.1.jar
-rw-r--r-- 1 root      root    30087268 828 18:10 flink-connector-mysql-cdc-2.0.2.jar
-rw-r--r-- 1 zookeeper hadoop     92311 525 2021 flink-csv-1.13.1.jar
-rw-r--r-- 1 zookeeper hadoop 115530972 525 2021 flink-dist_2.11-1.13.1.jar
-rw-r--r-- 1 root      root       81363 105 2021 flink-hadoop-compatibility_2.12-1.12.0.jar
-rw-r--r-- 1 zookeeper hadoop    148131 525 2021 flink-json-1.13.1.jar
-rw-r--r-- 1 root      root    43317025 105 2021 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
-rw-rw-r-- 1 zookeeper hadoop   7709740 48 2021 flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 root      root    38101480 105 2021 flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar
-rw-r--r-- 1 zookeeper hadoop  36417228 525 2021 flink-table_2.11-1.13.1.jar
-rw-r--r-- 1 zookeeper hadoop  40965908 525 2021 flink-table-blink_2.11-1.13.1.jar
-rw-r--r-- 1 root      root     1654821 105 2021 hadoop-mapreduce-client-core-3.1.1.3.1.4.0-315.jar
-rw-r--r-- 1 root      root    52191593 105 2021 hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar
-rw-r--r-- 1 root      root    17427063 105 2021 hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar
-rw-rw-r-- 1 zookeeper hadoop     67114 1010 2019 log4j-1.2-api-2.12.1.jar
-rw-rw-r-- 1 zookeeper hadoop    276771 1010 2019 log4j-api-2.12.1.jar
-rw-rw-r-- 1 zookeeper hadoop   1674433 1010 2019 log4j-core-2.12.1.jar
-rw-rw-r-- 1 zookeeper hadoop     23518 1010 2019 log4j-slf4j-impl-2.12.1.jar
-rw-r--r-- 1 root      root     2397321 828 22:13 mysql-connector-java-8.0.21.jar
[root@node01 lib]# pwd
/opt/module/flink/flink-1.13.1/flink-1.13.1/lib
[root@node01 lib]# 

4.6 表结构

4.6.1 mysql 表结构


SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for Flink_cdc
-- ----------------------------
DROP TABLE IF EXISTS `Flink_cdc`;
CREATE TABLE `Flink_cdc`  (
  `id` bigint(64) NOT NULL AUTO_INCREMENT,
  `name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  `age` int(20) NULL DEFAULT NULL,
  `birthday` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 10225 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of Flink_cdc
-- ----------------------------

INSERT INTO `Flink_cdc` VALUES (190, '乜荷爽', 5, '2022-02-19 19:29:39', '2022-02-19 19:29:39');
INSERT INTO `Flink_cdc` VALUES (191, '嵇露影', 4, '2022-02-19 19:29:39', '2022-02-19 19:29:39');
INSERT INTO `Flink_cdc` VALUES (192, '富胜', 18, '2022-02-19 19:29:39', '2022-02-19 19:29:39');
INSERT INTO `Flink_cdc` VALUES (193, '孟言', 7, '2022-02-19 19:29:39', '2022-02-19 19:29:39');
INSERT INTO `Flink_cdc` VALUES (194, '漆维光', 16, '2022-02-19 19:29:39', '2022-02-19 19:29:39');
INSERT INTO `Flink_cdc` VALUES (195, '澹巧', 7, '2022-02-19 19:29:39', '2022-02-19 19:29:39');
INSERT INTO `Flink_cdc` VALUES (196, '司玉', 23, '2022-02-19 19:29:39', '2022-02-19 19:29:39');
INSERT INTO `Flink_cdc` VALUES (197, '唐栋豪', 5, '2022-02-19 19:29:39', '2022-02-19 19:29:39');
INSERT INTO `Flink_cdc` VALUES (198, '姚以', 22, '2022-02-19 19:29:39', '2022-02-19 19:29:39');
INSERT INTO `Flink_cdc` VALUES (199, '仲亨', 15, '2022-02-19 19:29:39', '2022-02-19 19:29:39');
INSERT INTO `Flink_cdc` VALUES (200, '凌燕翠', 11, '2022-02-19 19:29:39', '2022-02-19 19:29:39');
INSERT INTO `Flink_cdc` VALUES (201, '琴荷亚', 13, '2022-02-19 19:29:39', '2022-02-19 19:29:39');


SET FOREIGN_KEY_CHECKS = 1;

4.6.2 clickhouse 表结构


create table  clickhosuetable ( id UInt64 , name String, age UInt64, birthday  Datetime   ) engine =MergeTree partition by toYYYYMMDD(birthday) primary key (id);

insert into	clickhosuetable values (10001,'flink',25,'2022-08-28 12:00:00');

#*******************source*********************************
CREATE TABLE source_mysql2 (
   id BIGINT PRIMARY KEY NOT ENFORCED,
   name STRING,
   age INT,
   birthday TIMESTAMP(3),
   ts TIMESTAMP(3)
 ) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://192.168.1.162:3306/wudldb',
'table-name' = 'Flink_cdc',
 'username' = 'root',
 'password' = '123456'
 );

#*************************slink 表***************************

CREATE TABLE if not exists wutable2 (
   id BIGINT,
   name STRING,
   age BIGINT,
   birthday  TIMESTAMP,
   PRIMARY KEY( id) NOT ENFORCED
) WITH (
    'connector' = 'clickhouse',
    'url' = 'clickhouse://192.168.1.161:8123',
    'username' = 'default',  
    'password' = '',  
    'database-name' = 'wudldb', 
    'table-name' = 'clickhosuetable', 
    'lookup.cache.max-rows' = '100',
    'lookup.cache.ttl' = '10',
    'lookup.max-retries' = '3'
);

#***************************insert *************************
insert into wutable2  select id ,name , age, birthday   from source_mysql2;

Flink sql 实现 -connection-clickhouse的 source和 sink

Flink sql 实现 -connection-clickhouse的 source和 sink

4.7 flink cdc 到clickhouse

CREATE TABLE source_mysql (
   id BIGINT PRIMARY KEY NOT ENFORCED,
   name STRING,
   age INT,
   birthday TIMESTAMP(3),
   ts TIMESTAMP(3)
 ) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '192.168.1.162',
 'port' = '3306',
 'username' = 'root',
'password' = '123456',
 'server-time-zone' = 'Asia/Shanghai',
 'debezium.snapshot.mode' = 'initial',
 'database-name' = 'wudldb',
 'table-name' = 'Flink_cdc'
 );


#****************************

CREATE TABLE if not exists wutable2 (
   id BIGINT,
   name STRING,
   age BIGINT,
   birthday  TIMESTAMP,
   PRIMARY KEY( id) NOT ENFORCED
) WITH (
    'connector' = 'clickhouse',
    'url' = 'clickhouse://192.168.1.161:8123',
    'username' = 'default',  
    #'password-wudongliang' = '', 
    'database-name' = 'wudldb', 
    'table-name' = 'clickhosuetable', 
    'lookup.cache.max-rows' = '100',
    'lookup.cache.ttl' = '10',
    'lookup.max-retries' = '3'
);


# *******************************************************************
Flink SQL> insert into wutable2  select id ,name , age, birthday   from source_mysql;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 1712c4e583d900b5523c08150ad9dd70


Flink SQL> 

clickhouse结果

SELECT count(*)
FROM clickhosuetable

Query id: 93ee83d4-7092-46e4-9954-736af4e09548

┌─count()─┐
│   20449 │
└─────────┘

1 rows in set. Elapsed: 0.005 sec. 

node01.com :) 

flink 程序
Flink sql 实现 -connection-clickhouse的 source和 sink


5. flink 对应 clickhouse 的 数据类型映射 Data Type Mapping

Flink Type ClickHouse Type
CHAR String
VARCHAR String / IP / UUID
STRING String / Enum
BOOLEAN UInt8
BYTES FixedString
DECIMAL Decimal / Int128 / Int256 / UInt64 / UInt128 / UInt256
TINYINT Int8
SMALLINT Int16 / UInt8
INTEGER Int32 / UInt16 / Interval
BIGINT Int64 / UInt32
FLOAT Float32
DOUBLE Float64
DATE Date
TIME DateTime
TIMESTAMP DateTime
TIMESTAMP_LTZ DateTime
INTERVAL_YEAR_MONTH Int32
INTERVAL_DAY_TIME Int64
ARRAY Array
MAP Map
ROW Not supported
MULTISET Not supported
RAW Not supported

事例:


CREATE TABLE if not exists clickhouseTable (
   ts BIGINT,
   id STRING,
   geohash12 STRING,
   loc_type STRING,
   wifimac STRING,
   id_type STRING,
   .....
   address STRING,
   PRIMARY KEY(ts, id) NOT ENFORCED
) WITH (
    'connector' = 'clickhouse',  -- 使用 ck connector
    'url' = 'clickhouse://xxxxx:8123',  --集群中任意一台
    'username' = '',  
    'password' = '',  
    'database-name' = 'test', 
    'table-name' = 'lbs',  
    -----以下为sink参数------
    'sink.batch-size' = '1000000',  -- 批量插入数量
    'sink.flush-interval' = '5000',  --刷新时间,默认1s
    'sink.max-retries' = '3',  --最大重试次数
    'sink.partition-strategy' = 'hash', --插入策略hash\balanced\shuffle
    'sink.partition-key' = 'id'
    'sink.write-local' = 'true',--是否写入本地表
    'sink.ignore-delete' = 'true',
    -----以下为source参数-----
    'lookup.cache.max-rows' = '100',
    'lookup.cache.ttl' = '10',
    'lookup.max-retries' = '3'
);
--1、sink.partition-strategy选择hash时,需配置sink.partition-key,并且sink.write-local=true写入本地表;
hash函数使用murmur3_32,与官方murmurHash3_32()集群表分发策略保持一致
--2、当sink.write-local=false时写入集群表,sink.partition-strategy无效,分发策略以来ck集群表配置;

6. flink 13.5 需要 用mysql驱动包8 以及flink checkpoint 启动hdfs

w3cjava