hive Hcatalog streaming API使用

Hanna ·
更新时间:2024-09-21
· 867 次阅读

hive streaming

hive传统的数据导入采用批量导入的方式,这中数据导入难以满足实时性的要求。hive streaming提供了数据流式写入的API,这样外部数据可以连续不断的写入hive中。

必备条件

hive streaming 需要配合hive 事务表使用,表的数据存储格式式必须为 orc 在 hive-site.xml 中设置如下参数以支持hive事务表hive.txn.manager =org.apache.hadoop.hive.ql.lockmgr.DbTxnManager hive.compactor.initiator.on = true (See more important details here) hive.compactor.worker.threads > 0 建表时指定表为事务表 tblproperties(“transactional”=“true”) hive表必须为分区分桶表 案例 hadoop版本:2.6.5 hive版本:1.2.2

1.在hive中新建一张表test.t3

CREATE TABLE t3 (id INT, name STRING, address STRING) partitioned by (country string) CLUSTERED BY (id) INTO 8 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true');

2.代码

``` public class HiveStreamingDemo { /** * DelimitedInputWriter使用 * @throws InterruptedException * @throws StreamingException * @throws ClassNotFoundException */ public static void delimitedInputWriterDemo() throws InterruptedException, StreamingException, ClassNotFoundException { String dbName = "test"; String tblName = "t3"; List partitionVals = new ArrayList(1); partitionVals.add("china"); HiveEndPoint hiveEP = new HiveEndPoint("thrift://192.168.61.146:9083", dbName, tblName, partitionVals); String[] fieldNames = new String[3]; fieldNames[0] = "id"; fieldNames[1] = "name"; fieldNames[2] = "address"; StreamingConnection connection = hiveEP.newConnection(true); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", hiveEP); TransactionBatch txnBatch= connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); for (int i = 0 ; i < 100; ++i) { txnBatch.write((i + ",zhangsan,beijing").getBytes()); } txnBatch.commit(); txnBatch.close(); connection.close(); } /** * StrictJsonWriter 使用 * @throws StreamingException * @throws InterruptedException */ public static void strictJsonWriterDemo() throws StreamingException, InterruptedException { String dbName = "test"; String tblName = "t3"; List partitionVals = new ArrayList(1); partitionVals.add("china"); HiveEndPoint hiveEP = new HiveEndPoint("thrift://192.168.61.146:9083", dbName, tblName, partitionVals); StreamingConnection connection = hiveEP.newConnection(true); StrictJsonWriter writer = new StrictJsonWriter(hiveEP); TransactionBatch txnBatch= connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); for (int i = 0 ; i < 10; ++i) { JSONObject jsonObject = new JSONObject(); jsonObject.put("id", i); jsonObject.put("name", "chenli" + i); jsonObject.put("address", "beijing"); txnBatch.write(jsonObject.toJSONString().getBytes()); } txnBatch.commit(); txnBatch.close(); connection.close(); } public static void main(String[] args) throws InterruptedException, StreamingException, ClassNotFoundException { strictJsonWriterDemo(); } ``` pom.xml ``` org.apache.hive.hcatalog hive-hcatalog-streaming 1.2.2 org.apache.hive.hcatalog hive-hcatalog-core 1.2.2 ```

3.添加hive-site.xml,hdfs-site.xml,core-site.xml到resources目录下
在这里插入图片描述

4.运行结果
在这里插入图片描述
5.查看hive表中的数据

``` hive> select * from t3; OK 0 chenli0 beijing china 1 chenli1 beijing china 2 chenli2 beijing china 3 chenli3 beijing china 4 chenli4 beijing china 5 chenli5 beijing china 6 chenli6 beijing china 7 chenli7 beijing china 8 chenli8 beijing china 9 chenli9 beijing china Time taken: 0.666 seconds, Fetched: 10 row(s) ``` 参考

https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest#StreamingDataIngest-Limitations


作者:苍老流年



hive api

需要 登录 后方可回复, 如果你还没有账号请 注册新账号