當前位置:成語大全網 - 書法字典 - 如何使用Spark提高Solr批量插入的效率

如何使用Spark提高Solr批量插入的效率

假設妳將數據批量插入SolrCloud。

1: MapPartitions用於遍歷和插入每個分區的數據,而不是使用Map來插入每個數據。

原因:插入時需要獲得與SolrCloud的連接。如果用Map插入每壹條數據,需要得到n個連接(n是數據總數)。如果使用MapPartitions進行插入,只需要獲得m個連接(m是分區總數)。

2:在執行程序端初始化1個鏈路池,執行程序端的每個鏈路都是從這個鏈路池中獲取的。這樣做的好處是:1)鏈接池持有與SolrCloud的長鏈接。壹旦打開,除非Excutor退出,否則不會關閉;2)鏈路池可以控制每個執行程序連接到SolrCloud的鏈路數量,防止SolrCloud因為Rdd分區過多而崩潰。

Java實例代碼如下:

1)使用映射分區插入代碼塊:

[java]查看純文本

//finalRdd是Java RDD

JavaRDD & ltSomeObjects & gtmapPartitionsRdd = final rdd . map partitions(新的FlatMapFunction & lt叠代器& ltSomeObjects & gt,SomeObjects & gt() {

public Iterable & ltSomeObjects & gt調用(叠代器& ltSomeObjects & gt叠代器)

引發異常{

最終字符串集合= source

//初始化連接池

BlockingQueue & ltCloudSolrServer & gtserver list = solrcollectionpool . instance . getcollectionpool(ZK host,collection,poolSize);

CloudSolrServer cloud server = server list . take();

列表& ltSomeObjects & gtbatchSomeObjects = new linked list & lt;SomeObjects & gt();

列表& ltSomeObjects & gtresult objects = new linked list & lt;SomeObjects & gt();

嘗試{

while (flag) {

for(int I = 0;我& ltdmlRatePerBatch & amp& ampiterator . has next();i++) {

batchsomeobjects . add(iterator . next());

}

if(batchsomeobjects . size()& gt;0) {

//插入solr

列表& ltSolrInputDocument & gtsolrInputDocumentList = RecordConverterAdapter。*****(fieldInfoList,batchSomeObjects);//將數據轉換成Solr格式。

cloudserver . add(solrInputDocumentList);

result objects . add all(batchSomeObjects);

//空的

batchsomeobjects . clear();

}否則{

flag = false

}

}

} catch(異常e) {

e . printstacktrace();

}最後{

server list . put(cloud server);

}

返回resultObjects

}

});

2)鏈接池代碼塊:

[java]查看純文本

公共類SolrCollectionPool實現Serializable {

private static Logger log = Logger . get Logger(solrcollectionpool . class);

public static SolrCollectionPool實例= new SolrCollectionPool();

私有靜態地圖& ltString,BlockingQueue & ltCloudSolrServer & gt& gtpool map = new concurrent hashmap & lt;String,BlockingQueue & ltCloudSolrServer & gt& gt();

公共SolrCollectionPool() {

}

公共同步阻塞隊列& ltCloudSolrServer & gtgetCollectionPool(String zkHost,String collection,final int size) {

if(pool map . get(collection)= = null){

log . info(" Solr:"+collection+" pool size:"+size);

system . set property(" javax . XML . parsers . documentbuilderfactory ",

" com . sun . org . Apache . Xerces . internal . JAXP . documentbuilderfactoryimpl ");

system . set property(" javax . XML . parsers . saxparserfactory ",

" com . sun . org . Apache . Xerces . internal . JAXP . saxparserfactoryimpl ");

BlockingQueue & ltCloudSolrServer & gtserver list = new LinkedBlockingQueue & lt;CloudSolrServer & gt(大小);

for(int I = 0;我& lt尺寸;i++) {

CloudSolrServer cloudServer =新的CloudSolrServer(ZK host);

cloud server . setdefaultcollection(集合);

cloud server . setzkclientime out(Utils。zkclienttime out);

cloud server . setzkconnectdimeout(Utils。zkconnectimeout);

cloud server . connect();

server list . add(cloud server);

}

poolMap.put(collection,server list);

}

返回poolMap.get(集合);

}

公共靜態SolrCollectionPool實例(){

返回SolrCollectionPool.instance

}

}