ClickHouse – 06

ClickHouse – 06

9、框架整合 9.1、Java 读写 ClickHouse API Maven 依赖 ru.yandex.clickhouseclickhouse-jdbc0.2.6 Java 读取 ClickHouse 数据 ClickHouseProperties props = new ClickHouseProperties(); props.setUser(""); props.setPassword(""); BalancedClickhouseDat...

9、框架整合


9.1、Java 读写 ClickHouse API


Maven 依赖


ru.yandex.clickhouseclickhouse-jdbc0.2.6

Java 读取 ClickHouse 数据

ClickHouseProperties props = new ClickHouseProperties();
props.setUser("");
props.setPassword("");
BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node01:8123,node02:8123,node03:8123/default", props);
ClickHouseConnection conn = dataSource.getConnection();
ClickHouseStatement statement = conn.createStatement();
ResultSet rs = statement.executeQuery("select id, name from users");
while (rs.next()) {
  int id = rs.getInt("id");
  String name = rs.getString("name");
  System.out.println("id = " + id + ",name = " + name);
}
rs.close();
statement.close();
conn.close();

Java 向 ClickHouse 写入数据

ClickHouseProperties props = new ClickHouseProperties();
props.setUser("");
props.setPassword("");
BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://192.168.110.150:8123/default", props);
ClickHouseConnection conn = dataSource.getConnection();
PreparedStatement statement = conn.prepareStatement("insert into test values(?, ?, ?)");
statement.setInt(1, 100);
statement.setString(2, "张三");
statement.setInt(3, 30);
boolean success = statement.execute();
System.out.println(success);
statement.close();
conn.close();

9.2、Spark 写入 ClickHouse API


SparkCore 写入 ClickHouse,可以直接采用写入方式。下面案例是使用 SparkSQL 将结果存入 ClickHouse 对应的表中。在 ClickHouse 中需要预先创建好对应的结果表。

Maven 依赖

ru.yandex.clickhouseclickhouse-jdbc0.2.6com.fasterxml.jackson.corejackson-databindnet.jpountz.lz4lz4

Spark 代码

import org.apache.spark.sql._
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import java.util.Properties

object ClickHouse {

  def main(args: Array[String]): Unit = {
    val session: SparkSession = SparkSession.builder().master("local").appName("test").getOrCreate()
    val jsonList: Seq[String] = List[String](
      "{"id":1,"name":"张三","age":18}",
      "{"id":2,"name":"李四","age":19}",
      "{"id":3,"name":"王五","age":20}"
    )

    //将jsonList数据转换成DataSet
    import session.implicits._
    val ds: Dataset[String] = jsonList.toDS()

    val df: DataFrame = session.read.json(ds)
    df.show()

    //将结果写往ClickHouse
    val url = "jdbc:clickhouse://node01:8123/default"
    val table = "test"

    val properties = new Properties()
    properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
    properties.put("user", "default")
    properties.put("password", "")
    properties.put("socket_timeout", "300000")

    df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties)
  }

}

9.3、Flink 写入 ClickHouse API


可以通过 Flink 原生 JDBC Connector 包将 Flink 结果写入 ClickHouse 中,Flink在 1.11.0 版本对其 JDBC Connnector 进行了重构:

  • 重构之前(1.10.x 及之前版本),包名为 flink-jdbc
  • 重构之后(1.11.x 及之后版本),包名为 flink-connector-jdbc

二者对 Flink 中以不同方式写入 ClickHouse Sink 的支持情况如下:

API名称 flink-jdbc flink-connector-jdbc
DataStream API 不支持 支持
Table API & SQL 支持 不支持

flink-jdbc

Maven 依赖

org.apache.flinkflink-table-planner-blink_2.111.9.1org.apache.flinkflink-table-api-scala-bridge_2.111.9.1org.apache.flinkflink-table-common1.9.1org.apache.flinkflink-jdbc_2.111.9.1ru.yandex.clickhouseclickhouse-jdbc0.2.4

Flink 代码

/**
  *  通过 flink-jdbc API 将 Flink 数据结果写入到ClickHouse中,只支持Table API
  *
  *  注意:
  *   1.由于 ClickHouse 单次插入的延迟比较高,我们需要设置 BatchSize 来批量插入数据,提高性能。
  *   2.在 JDBCAppendTableSink 的实现中,若最后一批数据的数目不足 BatchSize,则不会插入剩余数据。
  */

case class PersonInfo(id:Int,name:String,age:Int)

object FlinkWriteToClickHouse1 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行度为1,后期每个并行度满批次需要的条数时,会插入click中
    env.setParallelism(1)
    val settings: EnvironmentSettings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,settings)

    //导入隐式转换
    import org.apache.flink.streaming.api.scala._

    //读取Socket中的数据
    val sourceDS: DataStream[String] = env.socketTextStream("node5",9999)
    val ds: DataStream[PersonInfo] = sourceDS.map(line => {
      val arr: Array[String] = line.split(",")
      PersonInfo(arr(0).toInt, arr(1), arr(2).toInt)
    })

    //将 ds 转换成 table 对象
    import org.apache.flink.table.api.scala._
    val table: Table = tableEnv.fromDataStream(ds,'id,'name,'age)

    //将table 对象写入ClickHouse中
    //需要在ClickHouse中创建表:create table flink_result(id Int,name String,age Int) engine = MergeTree() order by id;
    val insertIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"

    //准备ClickHouse table sink
    val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
      .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
      .setDBUrl("jdbc:clickhouse://node1:8123/default")
      .setUsername("default")
      .setPassword("")
      .setQuery(insertIntoCkSql)
      .setBatchSize(2) //设置批次量,默认5000条
      .setParameterTypes(Types.INT, Types.STRING, Types.INT)
      .build()

    //注册ClickHouse table Sink,设置sink 数据的字段及Schema信息
    tableEnv.registerTableSink("ck-sink",
      sink.configure(Array("id", "name", "age"),Array(Types.INT, Types.STRING, Types.INT)))

    //将数据插入到 ClickHouse Sink 中
    tableEnv.insertInto(table,"ck-sink")

    //触发以上执行
    env.execute("Flink Table API to ClickHouse Example")

  }
}

flink-connector-jdbc

Maven 依赖


org.apache.flinkflink-clients_2.111.11.3org.apache.flinkflink-table-planner-blink_2.111.11.3org.apache.flinkflink-table-api-scala-bridge_2.111.11.3org.apache.flinkflink-table-common1.11.3org.apache.flinkflink-connector-jdbc_2.111.11.3ru.yandex.clickhouseclickhouse-jdbc0.2.4

Flink 代码

/**
  *  Flink 通过 flink-connector-jdbc 将数据写入ClickHouse ,目前只支持DataStream API
  */
object FlinkWriteToClickHouse2 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行度为1
    env.setParallelism(1)
    import org.apache.flink.streaming.api.scala._

    val ds: DataStream[String] = env.socketTextStream("node5",9999)

    val result: DataStream[(Int, String, Int)] = ds.map(line => {
      val arr: Array[String] = line.split(",")
      (arr(0).toInt, arr(1), arr(2).toInt)
    })

    //准备向ClickHouse中插入数据的sql
    val insetIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"

    //设置ClickHouse Sink
    val ckSink: SinkFunction[(Int, String, Int)] = JdbcSink.sink(
      //插入数据SQL
      insetIntoCkSql,

      //设置插入ClickHouse数据的参数
      new JdbcStatementBuilder[(Int, String, Int)] {
        override def accept(ps: PreparedStatement, tp: (Int, String, Int)): Unit = {
          ps.setInt(1, tp._1)
          ps.setString(2, tp._2)
          ps.setInt(3, tp._3)
        }
      },
      //设置批次插入数据
      new JdbcExecutionOptions.Builder().withBatchSize(5).build(),

      //设置连接ClickHouse的配置
      new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
        .withUrl("jdbc:clickhouse://node1:8123/default")
        .withUsername("default")
        .withUsername("")
        .build()
    )

    //针对数据加入sink
    result.addSink(ckSink)

    env.execute("Flink DataStream to ClickHouse Example")

  }
}

10、可视化工具


TABiX 支持通过浏览器直接连接 ClickHouse,不需要安装其他软件,就可以访问 ClickHouse。有两种使用方式,一种是直接浏览器访问配置。另一种是使用ClickHouse 内嵌方式。

Tabix具有以下特点:

  • ⾼亮语法的编辑器。
  • ⾃动命令补全。
  • 查询命令执⾏的图形分析⼯具。
  • 配⾊⽅案选项。

直接浏览器访问

打开http://ui.tabix.io/,配置 ClickHouse。

连接 ClickHouse 的用户名默认为 default,密码默认为空。

访问 ClickHouse 端口默认为 8123

ClickHouse内嵌方式

ClickHouse 自带了配置连接 TABiX,这里通过 ClickHouse Server 节点访问http://ui.tabix.io/网址进行 ClickHouse 界面化操作,可以进入 ClickHouse Server 节点路径 /etc/clickhouse-server,配置 config.xml 解开“”标签。

重启 ClickHouse Server,可以通过http://node01:8123来访问 TABiX。

文章来源于互联网:ClickHouse – 06

0

评论0

鱼翔浅底,鹰击长空,驼走大漠
没有账号? 注册  忘记密码?