Scala HBaseUtils

论坛 期权论坛 编程之家     
选择匿名的用户   2021-6-2 20:20   1688   0
package com.sm.utils

import java.util.Properties
import java.util.concurrent.{ExecutorService, Executors}

import com.sm.common.conf.ConfigManager
import com.sm.constants.Constants
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, Put, Result, Table}
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.conf.Configuration


/**
  * HBase 工具类
  *
  * create by LiuJinHe 2020/12/28
  */
object HBaseUtils {
  val conf: Configuration = HBaseConfiguration.create();
  val prop: Properties = ConfigManager.load(Constants.PROP_PROPERTIES)
  conf.set(Constants.HBASE_ZOOKEEPER_QUORUM, prop.getProperty(Constants.HBASE_ZOOKEEPER_QUORUM))
  conf.set(Constants.HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT, prop.getProperty(Constants.HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT))
  conf.setInt(Constants.HBASE_CLIENT_OPERATION_TIMEOUT, prop.getProperty(Constants.HBASE_CLIENT_OPERATION_TIMEOUT).toInt)
  conf.setInt(Constants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, prop.getProperty(Constants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD).toInt)
  conf.setBoolean(Constants.HBASE_DEFAULT_FOR_VERSION_SKIP, true)

  var conn: Connection = getConnection

  // 从线程池获取连接
  def getConnection: Connection = {
    if (conn == null) {
      // 建立一个数量为10的线程池
      val pool: ExecutorService = Executors.newFixedThreadPool(10)

      // connection 维护的线程池, getTable方法从线程池里获得 table 实例, 线程安全
      // 用线程池创建 connection
      conn = ConnectionFactory.createConnection(conf, pool)
    }
    conn
  }

  // 获取 Table
  def getTable(conn: Connection, tableName: String): Table = {
    conn.getTable(TableName.valueOf(tableName))
  }

  def getTable(tableName: String): Table = {
    conn.getTable(TableName.valueOf(tableName))
  }

  /**
    * 根据 rowKey, 列名查询数据
    * 获取数据,getData "dim_ad_click", "gdt_plan_14630499_171516810", "info", "aid"
    */
  def getData(tableName: String, rowKey: String, columnFamily: String, column: String): String = {
    val table = getTable(conn, tableName)
    var data: String = ""

    val get = new Get(rowKey.getBytes)
    val result: Result = table.get(get)
    val arrBytes = result.getValue(columnFamily.getBytes, column.getBytes)

    if (arrBytes.nonEmpty) {
      data = Bytes.toString(arrBytes)
    }

    data
  }

  /**
    * 获取数据,table, rowKey
    * getData "dim_ad_click", "gdt_plan_14630499_171516810"
    */
  def getData(tableName: String, rowKey: String): Result = {
    val table = getTable(conn, tableName)
    val get = new Get(rowKey.getBytes)
    val result: Result = table.get(get)

    closeTable(table)

    result
  }
  
  /**
    * 从result中获取column值
    */
  def parseData(result: Result, columnFamily: String, column: String): String = {
    var arrBytes = Array[Byte]()

    if (!result.isEmpty) {
      arrBytes = result.getValue(columnFamily.getBytes, column.getBytes)
    }

    if (arrBytes.isEmpty) "" else Bytes.toString(arrBytes)
  }

  /**
    * 插入数据
    */
  def putData(tableName: String, rowKey: String, columnFamily: String, column: String, data: String): Unit = {
    val table = conn.getTable(TableName.valueOf(tableName))
    val put = new Put(rowKey.getBytes)
    put.addColumn(columnFamily.getBytes, column.getBytes, data.getBytes)

    table.put(put)
    closeTable(table)
  }

  /**
    * 插入对象
    */
  def putObject(tableName: String, rowKey: String, columnFamily: String, obj: Object): Unit = {
    val table = conn.getTable(TableName.valueOf(tableName))
    val put = new Put(rowKey.getBytes)

    val fields = obj.getClass.getDeclaredFields

    fields.foreach(
      field => {
        field.setAccessible(true)
        put.addColumn(columnFamily.getBytes, field.getName.getBytes, getBytesValue(field.get(obj)))
      })

    table.put(put)
    closeTable(table)
  }

  /**
    * 批量插入数据
    * var map = Map("key1" -> "value1", "key2" -> "value2")
    * putMapData("table", "rowKey", "info", map)
    */
  def put(tableName: String, rowKey: String, columnFamily: String, dataMap: Map[String, String]): Unit = {
    val table = getTable(conn, tableName)

    val put: Put = new Put(getBytesValue(rowKey))
    if (dataMap.nonEmpty) {
      for ((key, value) <- dataMap) {
        put.addColumn(columnFamily.getBytes, getBytesValue(key), getBytesValue(value))
      }
    }
    table.put(put)

    closeTable(table)
  }

  // 每次 table 读写之后应该把 table close
  def closeTable(table: Table): Unit = {
    if (table != null) table.close()
  }

  // 关闭资源
  // 整个进程结束的时候才把 connection close
  def close(): Unit = {
    if (conn != null) conn.close()
  }

  def getBytesValue(value: Any): Array[Byte] = {
    if (value != null) {
      Bytes.toBytes(value.toString)
    } else {
      Bytes.toBytes("")
    }
  }
}

配置文件

hbase.zookeeper.quorum = cdh-master:2181,cdh-slave01:2181,cdh-slave02:2181,cdh-slave03:2181,cdh-slave04:2181
hbase.zookeeper.znode.parent = /hbase
hbase.sink.buffer-flush.max-size = 100mb
hbase.sink.buffer-flush.max-rows = 1000000
hbase.sink.buffer-flush.interval = 1s
hbase.zookeeper.property.clientPort = 2181
hbase_client_operation_timeout = 30000
hbase_client_scanner_timeout_period = 30000
hbase.acl = -1

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:3875789
帖子:775174
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP