溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

nutch中如何實現索引去重

發布時間:2021-12-04 09:20:50 來源:億速云 閱讀:174 作者:小新 欄目:云計算

這篇文章主要介紹nutch中如何實現索引去重,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!


一、主程序調用


SolrDeleteDuplicates dedup = new SolrDeleteDuplicates();

dedup.setConf(getConf());

dedup.dedup(solrUrl);



二、job任務配置


JobConf job = new NutchJob(getConf());


job.setInputFormat(SolrInputFormat.class);

job.setMapperClass(IdentityMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(SolrRecord.class);


job.setReducerClass(SolrDeleteDuplicates.class);

job.setOutputFormat(NullOutputFormat.class);


JobClient.runJob(job);



三、Map、reduce任務的輸入和輸出

Map任務輸入、輸出

public void map(

K key, V val,

      OutputCollector<K, V> output


reduce任務輸入、輸出

輸入:Text/Iterator<SolrRecord>

輸出:Text/SolrRecord


public void reduce(

Text key, Iterator<SolrRecord> values,

      OutputCollector<Text, SolrRecord> output




四、job任務輸入類SolrInputFormat


getSplits方法將所有的文檔按照數量平均分片

getRecordReader方法中利用solrserver查詢了當前分片包含的所有doc記錄,solrrecord返回了的當前的RecordReader<Text, SolrRecord>記錄(RecordReader是一個全局的變量),并且有獲取下一個方法。


(1)、SolrInputFormat的getSplits方法


1、根據job對象的參數,獲取solrserver對象。

2、構建并執行查詢(查詢參數:[*:*、id、setRow(1)] ),獲取響應對象

3、根據響應對象獲取索引總數,除以分片數,得到每一片分配多少個索引

4、根據分片數創建 SolrInputSplit數組對象,

5、根據solr輸入分片的開始和結束位置,實例化SolrInputSplit對象




    public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {

      SolrServer solr = SolrUtils.getCommonsHttpSolrServer(job);


      final SolrQuery solrQuery = new SolrQuery(SOLR_GET_ALL_QUERY);

      solrQuery.setFields(SolrConstants.ID_FIELD);

      solrQuery.setRows(1);


      QueryResponse response;

      try {

        response = solr.query(solrQuery);

      } catch (final SolrServerException e) {

        throw new IOException(e);

      }


      int numResults = (int)response.getResults().getNumFound();

      int numDocsPerSplit = (numResults / numSplits); 

      int currentDoc = 0;

      SolrInputSplit[] splits = new SolrInputSplit[numSplits];

      for (int i = 0; i < numSplits - 1; i++) {

        splits[i] = new SolrInputSplit(currentDoc, numDocsPerSplit);

        currentDoc += numDocsPerSplit;

      }

      splits[splits.length - 1] = new SolrInputSplit(currentDoc, numResults - currentDoc);


      return splits;

    }



(2)、SolrInputFormat的getRecordReader()方法


1、獲取solrserver對象

2、將傳入的split參數,強轉成SolrInputSplit對象,并獲取這個分片的文檔總數

3、構建查詢對象,執行查詢(參數[*:*,id,boost,tstamp,digest, SolrInputSplit中的開始位置,文檔總數 ])。

4、根據響應對象,獲取結果集

5、對匿名內部內RecordReader做了實現,并且返回


    public RecordReader<Text, SolrRecord> getRecordReader(final InputSplit split,

        final JobConf job, 

        Reporter reporter)

        throws IOException {


//1、獲取solrserver對象

      SolrServer solr = SolrUtils.getCommonsHttpSolrServer(job);


//2、將傳入的split參數,強轉成SolrInputSplit對象,并獲取這個分片的文檔總數

SolrInputSplit solrSplit = (SolrInputSplit) split;

      final int numDocs = solrSplit.getNumDocs();

      

//3、構建查詢對象,執行查詢(參數[*:*,id,boost,tstamp,digest,

SolrInputSplit中的開始位置,文檔總數

])

      SolrQuery solrQuery = new SolrQuery(SOLR_GET_ALL_QUERY);

      solrQuery.setFields(SolrConstants.ID_FIELD, SolrConstants.BOOST_FIELD,

                          SolrConstants.TIMESTAMP_FIELD,

                          SolrConstants.DIGEST_FIELD);

      solrQuery.setStart(solrSplit.getDocBegin());

      solrQuery.setRows(numDocs);


      QueryResponse response;

      


try {

        response = solr.query(solrQuery);

      } catch (final SolrServerException e) {

        throw new IOException(e);

      }


//4、根據響應對象,獲取結果集

      final SolrDocumentList solrDocs = response.getResults();



      return new RecordReader<Text, SolrRecord>() {

//當前的文檔

        private int currentDoc = 0;


        public void close() throws IOException { }


        public Text createKey() {

          return new Text();

        }


        public SolrRecord createValue() {

          return new SolrRecord();

        }

//獲取當前的指針

        public long getPos() throws IOException {

          return currentDoc;

        }

//獲取進度

        public float getProgress() throws IOException {

          return currentDoc / (float) numDocs;

        }

//獲取下一個

        public boolean next(Text key, SolrRecord value) throws IOException {

          if (currentDoc >= numDocs) {

            return false;

          }

//

          SolrDocument doc = solrDocs.get(currentDoc);


//獲取摘要

          String digest = (String) doc.getFieldValue(SolrConstants.DIGEST_FIELD);

//把摘要作為key

          key.set(digest);

//value(SolrRecord)

//賦值:通過doc給solrrecord的id,tstamp,boost 3個字段賦值

          value.readSolrDocument(doc);

//指針加自增1

          currentDoc++;

          return true;

        }    

      };

    }


  


五、map()方法和reduce()方法中的實現


(1)、map任務

(2)、reduce任務


去重邏輯:


reduce任務會遍歷每一個record,并執行reduce()方法中的代碼

reduce()方法中,會遍歷處于當前文檔之后的所有文檔,如果分值和時間都比當前的小,會調用solrj刪除這個文檔,如果比當前的大,會刪除當前的,并把當前的替換成這個大的。



  public void reduce(Text key, Iterator<SolrRecord> values,

      OutputCollector<Text, SolrRecord> output, Reporter reporter)

  throws IOException {

     //1、下一個SolrRecord對象

    SolrRecord recordToKeep = new SolrRecord(values.next());


//2、遍歷了SolrRecord

    while (values.hasNext()) {

//

      SolrRecord solrRecord = values.next();


//boost、tstamp參與比較

//如果當前的分值, 比保持的分支高,并且時間比保持的新,就根據id刪除這條索引,

      if (solrRecord.getBoost() > recordToKeep.getBoost() ||

          (solrRecord.getBoost() == recordToKeep.getBoost() && 

              solrRecord.getTstamp() > recordToKeep.getTstamp())) {

        updateRequest.deleteById(recordToKeep.id);

        recordToKeep = new SolrRecord(solrRecord);

      } else {

        updateRequest.deleteById(solrRecord.id);

      }

      numDeletes++;

      reporter.incrCounter("SolrDedupStatus", "Deleted documents", 1);

      if (numDeletes >= NUM_MAX_DELETE_REQUEST) {

        try {

          LOG.info("SolrDeleteDuplicates: deleting " + numDeletes + " duplicates");

          updateRequest.process(solr);

        } catch (SolrServerException e) {

          throw new IOException(e);

        }

        updateRequest = new UpdateRequest();

        numDeletes = 0;

      }

    }

  }



六、關于digest


doc中的digest字段,是在IndexerMapReduce類中的reduce方法中加入的

// add digest, used by dedup

doc.add("digest", metadata.get(Nutch.SIGNATURE_KEY));


Metadata中包含了一個HashMap

final Metadata metadata = parseData.getContentMeta();

以上是“nutch中如何實現索引去重”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女