1: MapPartitions用於遍歷和插入每個分區的數據,而不是使用Map來插入每個數據。
原因:插入時需要獲得與SolrCloud的連接。如果用Map插入每壹條數據,需要得到n個連接(n是數據總數)。如果使用MapPartitions進行插入,只需要獲得m個連接(m是分區總數)。
2:在執行程序端初始化1個鏈路池,執行程序端的每個鏈路都是從這個鏈路池中獲取的。這樣做的好處是:1)鏈接池持有與SolrCloud的長鏈接。壹旦打開,除非Excutor退出,否則不會關閉;2)鏈路池可以控制每個執行程序連接到SolrCloud的鏈路數量,防止SolrCloud因為Rdd分區過多而崩潰。
Java實例代碼如下:
1)使用映射分區插入代碼塊:
[java]查看純文本
//finalRdd是Java RDD
JavaRDD & ltSomeObjects & gtmapPartitionsRdd = final rdd . map partitions(新的FlatMapFunction & lt叠代器& ltSomeObjects & gt,SomeObjects & gt() {
public Iterable & ltSomeObjects & gt調用(叠代器& ltSomeObjects & gt叠代器)
引發異常{
最終字符串集合= source
//初始化連接池
BlockingQueue & ltCloudSolrServer & gtserver list = solrcollectionpool . instance . getcollectionpool(ZK host,collection,poolSize);
CloudSolrServer cloud server = server list . take();
列表& ltSomeObjects & gtbatchSomeObjects = new linked list & lt;SomeObjects & gt();
列表& ltSomeObjects & gtresult objects = new linked list & lt;SomeObjects & gt();
嘗試{
while (flag) {
for(int I = 0;我& ltdmlRatePerBatch & amp& ampiterator . has next();i++) {
batchsomeobjects . add(iterator . next());
}
if(batchsomeobjects . size()& gt;0) {
//插入solr
列表& ltSolrInputDocument & gtsolrInputDocumentList = RecordConverterAdapter。*****(fieldInfoList,batchSomeObjects);//將數據轉換成Solr格式。
cloudserver . add(solrInputDocumentList);
result objects . add all(batchSomeObjects);
//空的
batchsomeobjects . clear();
}否則{
flag = false
}
}
} catch(異常e) {
e . printstacktrace();
}最後{
server list . put(cloud server);
}
返回resultObjects
}
});
2)鏈接池代碼塊:
[java]查看純文本
公共類SolrCollectionPool實現Serializable {
private static Logger log = Logger . get Logger(solrcollectionpool . class);
public static SolrCollectionPool實例= new SolrCollectionPool();
私有靜態地圖& ltString,BlockingQueue & ltCloudSolrServer & gt& gtpool map = new concurrent hashmap & lt;String,BlockingQueue & ltCloudSolrServer & gt& gt();
公共SolrCollectionPool() {
}
公共同步阻塞隊列& ltCloudSolrServer & gtgetCollectionPool(String zkHost,String collection,final int size) {
if(pool map . get(collection)= = null){
log . info(" Solr:"+collection+" pool size:"+size);
system . set property(" javax . XML . parsers . documentbuilderfactory ",
" com . sun . org . Apache . Xerces . internal . JAXP . documentbuilderfactoryimpl ");
system . set property(" javax . XML . parsers . saxparserfactory ",
" com . sun . org . Apache . Xerces . internal . JAXP . saxparserfactoryimpl ");
BlockingQueue & ltCloudSolrServer & gtserver list = new LinkedBlockingQueue & lt;CloudSolrServer & gt(大小);
for(int I = 0;我& lt尺寸;i++) {
CloudSolrServer cloudServer =新的CloudSolrServer(ZK host);
cloud server . setdefaultcollection(集合);
cloud server . setzkclientime out(Utils。zkclienttime out);
cloud server . setzkconnectdimeout(Utils。zkconnectimeout);
cloud server . connect();
server list . add(cloud server);
}
poolMap.put(collection,server list);
}
返回poolMap.get(集合);
}
公共靜態SolrCollectionPool實例(){
返回SolrCollectionPool.instance
}
}