hive传统的数据导入采用批量导入的方式,这中数据导入难以满足实时性的要求。hive streaming提供了数据流式写入的API,这样外部数据可以连续不断的写入hive中。
必备条件
hive streaming 需要配合hive 事务表使用,表的数据存储格式式必须为 orc 在 hive-site.xml 中设置如下参数以支持hive事务表hive.txn.manager =org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
hive.compactor.initiator.on = true (See more important details here)
hive.compactor.worker.threads > 0
建表时指定表为事务表 tblproperties(“transactional”=“true”)
hive表必须为分区分桶表
案例
hadoop版本:2.6.5
hive版本:1.2.2
1.在hive中新建一张表test.t3
CREATE TABLE t3 (id INT, name STRING, address STRING) partitioned by (country string) CLUSTERED BY (id) INTO 8 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true');
2.代码
```
public class HiveStreamingDemo {
/**
* DelimitedInputWriter使用
* @throws InterruptedException
* @throws StreamingException
* @throws ClassNotFoundException
*/
public static void delimitedInputWriterDemo() throws InterruptedException, StreamingException, ClassNotFoundException {
String dbName = "test";
String tblName = "t3";
List partitionVals = new ArrayList(1);
partitionVals.add("china");
HiveEndPoint hiveEP = new HiveEndPoint("thrift://192.168.61.146:9083", dbName, tblName, partitionVals);
String[] fieldNames = new String[3];
fieldNames[0] = "id";
fieldNames[1] = "name";
fieldNames[2] = "address";
StreamingConnection connection = hiveEP.newConnection(true);
DelimitedInputWriter writer =
new DelimitedInputWriter(fieldNames,",", hiveEP);
TransactionBatch txnBatch= connection.fetchTransactionBatch(10, writer);
txnBatch.beginNextTransaction();
for (int i = 0 ; i < 100; ++i) {
txnBatch.write((i + ",zhangsan,beijing").getBytes());
}
txnBatch.commit();
txnBatch.close();
connection.close();
}
/**
* StrictJsonWriter 使用
* @throws StreamingException
* @throws InterruptedException
*/
public static void strictJsonWriterDemo() throws StreamingException, InterruptedException {
String dbName = "test";
String tblName = "t3";
List partitionVals = new ArrayList(1);
partitionVals.add("china");
HiveEndPoint hiveEP = new HiveEndPoint("thrift://192.168.61.146:9083", dbName, tblName, partitionVals);
StreamingConnection connection = hiveEP.newConnection(true);
StrictJsonWriter writer = new StrictJsonWriter(hiveEP);
TransactionBatch txnBatch= connection.fetchTransactionBatch(10, writer);
txnBatch.beginNextTransaction();
for (int i = 0 ; i < 10; ++i) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("id", i);
jsonObject.put("name", "chenli" + i);
jsonObject.put("address", "beijing");
txnBatch.write(jsonObject.toJSONString().getBytes());
}
txnBatch.commit();
txnBatch.close();
connection.close();
}
public static void main(String[] args) throws InterruptedException, StreamingException, ClassNotFoundException {
strictJsonWriterDemo();
}
```
pom.xml
```
org.apache.hive.hcatalog
hive-hcatalog-streaming
1.2.2
org.apache.hive.hcatalog
hive-hcatalog-core
1.2.2
```
3.添加hive-site.xml,hdfs-site.xml,core-site.xml到resources目录下
4.运行结果
5.查看hive表中的数据
```
hive> select * from t3;
OK
0 chenli0 beijing china
1 chenli1 beijing china
2 chenli2 beijing china
3 chenli3 beijing china
4 chenli4 beijing china
5 chenli5 beijing china
6 chenli6 beijing china
7 chenli7 beijing china
8 chenli8 beijing china
9 chenli9 beijing china
Time taken: 0.666 seconds, Fetched: 10 row(s)
```
参考
https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest#StreamingDataIngest-Limitations