這篇文章主要介紹nutch中如何實現索引去重,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
一、主程序調用 SolrDeleteDuplicates dedup = new SolrDeleteDuplicates(); dedup.setConf(getConf()); dedup.dedup(solrUrl); 二、job任務配置 JobConf job = new NutchJob(getConf()); job.setInputFormat(SolrInputFormat.class); job.setMapperClass(IdentityMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(SolrRecord.class); job.setReducerClass(SolrDeleteDuplicates.class); job.setOutputFormat(NullOutputFormat.class); JobClient.runJob(job); 三、Map、reduce任務的輸入和輸出 Map任務輸入、輸出 public void map( K key, V val, OutputCollector<K, V> output reduce任務輸入、輸出 輸入:Text/Iterator<SolrRecord> 輸出:Text/SolrRecord public void reduce( Text key, Iterator<SolrRecord> values, OutputCollector<Text, SolrRecord> output 四、job任務輸入類SolrInputFormat getSplits方法將所有的文檔按照數量平均分片 getRecordReader方法中利用solrserver查詢了當前分片包含的所有doc記錄,solrrecord返回了的當前的RecordReader<Text, SolrRecord>記錄(RecordReader是一個全局的變量),并且有獲取下一個方法。 (1)、SolrInputFormat的getSplits方法 1、根據job對象的參數,獲取solrserver對象。 2、構建并執行查詢(查詢參數:[*:*、id、setRow(1)] ),獲取響應對象 3、根據響應對象獲取索引總數,除以分片數,得到每一片分配多少個索引 4、根據分片數創建 SolrInputSplit數組對象, 5、根據solr輸入分片的開始和結束位置,實例化SolrInputSplit對象 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { SolrServer solr = SolrUtils.getCommonsHttpSolrServer(job); final SolrQuery solrQuery = new SolrQuery(SOLR_GET_ALL_QUERY); solrQuery.setFields(SolrConstants.ID_FIELD); solrQuery.setRows(1); QueryResponse response; try { response = solr.query(solrQuery); } catch (final SolrServerException e) { throw new IOException(e); } int numResults = (int)response.getResults().getNumFound(); int numDocsPerSplit = (numResults / numSplits); int currentDoc = 0; SolrInputSplit[] splits = new SolrInputSplit[numSplits]; for (int i = 0; i < numSplits - 1; i++) { splits[i] = new SolrInputSplit(currentDoc, numDocsPerSplit); currentDoc += numDocsPerSplit; } splits[splits.length - 1] = new SolrInputSplit(currentDoc, numResults - currentDoc); return splits; } (2)、SolrInputFormat的getRecordReader()方法 1、獲取solrserver對象 2、將傳入的split參數,強轉成SolrInputSplit對象,并獲取這個分片的文檔總數 3、構建查詢對象,執行查詢(參數[*:*,id,boost,tstamp,digest, SolrInputSplit中的開始位置,文檔總數 ])。 4、根據響應對象,獲取結果集 5、對匿名內部內RecordReader做了實現,并且返回 public RecordReader<Text, SolrRecord> getRecordReader(final InputSplit split, final JobConf job, Reporter reporter) throws IOException { //1、獲取solrserver對象 SolrServer solr = SolrUtils.getCommonsHttpSolrServer(job); //2、將傳入的split參數,強轉成SolrInputSplit對象,并獲取這個分片的文檔總數 SolrInputSplit solrSplit = (SolrInputSplit) split; final int numDocs = solrSplit.getNumDocs();
//3、構建查詢對象,執行查詢(參數[*:*,id,boost,tstamp,digest, SolrInputSplit中的開始位置,文檔總數 ]) SolrQuery solrQuery = new SolrQuery(SOLR_GET_ALL_QUERY); solrQuery.setFields(SolrConstants.ID_FIELD, SolrConstants.BOOST_FIELD, SolrConstants.TIMESTAMP_FIELD, SolrConstants.DIGEST_FIELD); solrQuery.setStart(solrSplit.getDocBegin()); solrQuery.setRows(numDocs); QueryResponse response;
try { response = solr.query(solrQuery); } catch (final SolrServerException e) { throw new IOException(e); } //4、根據響應對象,獲取結果集 final SolrDocumentList solrDocs = response.getResults(); return new RecordReader<Text, SolrRecord>() { //當前的文檔 private int currentDoc = 0; public void close() throws IOException { } public Text createKey() { return new Text(); } public SolrRecord createValue() { return new SolrRecord(); } //獲取當前的指針 public long getPos() throws IOException { return currentDoc; } //獲取進度 public float getProgress() throws IOException { return currentDoc / (float) numDocs; } //獲取下一個 public boolean next(Text key, SolrRecord value) throws IOException { if (currentDoc >= numDocs) { return false; } // SolrDocument doc = solrDocs.get(currentDoc); //獲取摘要 String digest = (String) doc.getFieldValue(SolrConstants.DIGEST_FIELD); //把摘要作為key key.set(digest); //value(SolrRecord) //賦值:通過doc給solrrecord的id,tstamp,boost 3個字段賦值 value.readSolrDocument(doc); //指針加自增1 currentDoc++; return true; } }; }
五、map()方法和reduce()方法中的實現 (1)、map任務 (2)、reduce任務 去重邏輯: reduce任務會遍歷每一個record,并執行reduce()方法中的代碼 reduce()方法中,會遍歷處于當前文檔之后的所有文檔,如果分值和時間都比當前的小,會調用solrj刪除這個文檔,如果比當前的大,會刪除當前的,并把當前的替換成這個大的。 public void reduce(Text key, Iterator<SolrRecord> values, OutputCollector<Text, SolrRecord> output, Reporter reporter) throws IOException { //1、下一個SolrRecord對象 SolrRecord recordToKeep = new SolrRecord(values.next()); //2、遍歷了SolrRecord while (values.hasNext()) { // SolrRecord solrRecord = values.next(); //boost、tstamp參與比較 //如果當前的分值, 比保持的分支高,并且時間比保持的新,就根據id刪除這條索引, if (solrRecord.getBoost() > recordToKeep.getBoost() || (solrRecord.getBoost() == recordToKeep.getBoost() && solrRecord.getTstamp() > recordToKeep.getTstamp())) { updateRequest.deleteById(recordToKeep.id); recordToKeep = new SolrRecord(solrRecord); } else { updateRequest.deleteById(solrRecord.id); } numDeletes++; reporter.incrCounter("SolrDedupStatus", "Deleted documents", 1); if (numDeletes >= NUM_MAX_DELETE_REQUEST) { try { LOG.info("SolrDeleteDuplicates: deleting " + numDeletes + " duplicates"); updateRequest.process(solr); } catch (SolrServerException e) { throw new IOException(e); } updateRequest = new UpdateRequest(); numDeletes = 0; } } } 六、關于digest doc中的digest字段,是在IndexerMapReduce類中的reduce方法中加入的 // add digest, used by dedup doc.add("digest", metadata.get(Nutch.SIGNATURE_KEY)); Metadata中包含了一個HashMap final Metadata metadata = parseData.getContentMeta(); |
以上是“nutch中如何實現索引去重”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。