溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Storm MongoDB接口怎么使用

發布時間:2021-12-23 14:01:46 來源:億速云 閱讀:428 作者:iii 欄目:云計算

本篇內容介紹了“Storm MongoDB接口怎么使用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

整體的Storn接口分為以下的幾個class

1:MongoBolt.java

2 : MongoSpout.java

3 : MongoTailableCursorTopology.java

4 : SimpleMongoBolt.java

看代碼說話:

package storm.mongo;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

import com.mongodb.DB;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoException;
import com.mongodb.WriteConcern;

/**
 *
 * 注意在這里,沒有實現批處理的調用,并且只是一個抽象類,對于Mongo的Storm交互做了一次封裝
 *
 * @author Adrian Petrescu <apetresc@gmail.com>
 *
 */
public abstract class MongoBolt extends BaseRichBolt {
	private OutputCollector collector;
	
	// MOngDB的DB對象
	private DB mongoDB;
	
	
        //記錄我們的主機,端口,和MongoDB的數據DB民粹
	private final String mongoHost;
	private final int mongoPort;
	private final String mongoDbName;

	/**
	 * @param mongoHost The host on which Mongo is running.
	 * @param mongoPort The port on which Mongo is running.
	 * @param mongoDbName The Mongo database containing all collections being
	 * written to.
	 */
	protected MongoBolt(String mongoHost, int mongoPort, String mongoDbName) {
		this.mongoHost = mongoHost;
		this.mongoPort = mongoPort;
		this.mongoDbName = mongoDbName;
	}
	
	@Override
	public void prepare(
			@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, OutputCollector collector) {
		
		this.collector = collector;
		try {
		
		        //prepare方法目前在初始化的過程之中得到了一個Mongo的連接
			this.mongoDB = new MongoClient(mongoHost, mongoPort).getDB(mongoDbName);
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	@Override
	public void execute(Tuple input) {
	
	    
	        //注意我們在這里還有一個判斷,判斷當前是否該發射
	
		if (shouldActOnInput(input)) {
			String collectionName = getMongoCollectionForInput(input);
			DBObject dbObject = getDBObjectForInput(input);
			if (dbObject != null) {
				try {
					mongoDB.getCollection(collectionName).save(dbObject, new WriteConcern(1));
					collector.ack(input);
				} catch (MongoException me) {
					collector.fail(input);
				}
			}
		} else {
			collector.ack(input);
		}
	}

	/**
	 * Decide whether or not this input tuple should trigger a Mongo write.
	 *
	 * @param input the input tuple under consideration
	 * @return {@code true} iff this input tuple should trigger a Mongo write
	 */
	public abstract boolean shouldActOnInput(Tuple input);
	
	/**
	 * Returns the Mongo collection which the input tuple should be written to.
	 *
	 * @param input the input tuple under consideration
	 * @return the Mongo collection which the input tuple should be written to
	 */
	public abstract String getMongoCollectionForInput(Tuple input);
	
	/**
	 * Returns the DBObject to store in Mongo for the specified input tuple.
	 * 
	 
	 拿到DBObject的一個抽象類
	 
	 
	 * @param input the input tuple under consideration
	 * @return the DBObject to be written to Mongo
	 */
	public abstract DBObject getDBObjectForInput(Tuple input);
	
	
	//注意這里隨著計算的終結被關閉了。
	@Override
	public void cleanup() {
		this.mongoDB.getMongo().close();
	}

}

2 :

package storm.mongo;

import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.utils.Utils;

import com.mongodb.BasicDBObject;
import com.mongodb.Bytes;
import com.mongodb.DB;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoException;

/**
* A Spout which consumes documents from a Mongodb tailable cursor.
*
* Subclasses should simply override two methods:
* <ul>
* <li>{@link #declareOutputFields(OutputFieldsDeclarer) declareOutputFields}
* <li>{@link #dbObjectToStormTuple(DBObject) dbObjectToStormTuple}, which turns
* a Mongo document into a Storm tuple matching the declared output fields.
* </ul>
*
** <p>
* <b>WARNING:</b> You can only use tailable cursors on capped collections.
* 
* @author Dan Beaulieu <danjacob.beaulieu@gmail.com>
*
*/


// 在這里,抽象的過程中,依舊保持了第一層的Spout為一個抽象類,MongoSpout為abstract的一個抽象類,子類在繼承這// 個類的過程之中實現特定的方法即可
// 這里還有一個類似Cursor的操作。


public abstract class MongoSpout extends BaseRichSpout {

	private SpoutOutputCollector collector;
	
	private LinkedBlockingQueue<DBObject> queue;
	private final AtomicBoolean opened = new AtomicBoolean(false);
	
	private DB mongoDB;
	private final DBObject query;
	
	private final String mongoHost;
	private final int mongoPort;
	private final String mongoDbName;
	private final String mongoCollectionName;
	
	
	public MongoSpout(String mongoHost, int mongoPort, String mongoDbName, String mongoCollectionName, DBObject query) {
		
		this.mongoHost = mongoHost;
		this.mongoPort = mongoPort;
		this.mongoDbName = mongoDbName;
		this.mongoCollectionName = mongoCollectionName;
		this.query = query;
	}
	
	class TailableCursorThread extends Thread {
		
		
		// 內部類 TailableCursorThread線程
		
		
		//注意在其中我們使用了LinkedBlockingQueue的對象,有關java高并發的集合類,請參考本ID的【Java集合類型的博文】博文。
		LinkedBlockingQueue<DBObject> queue;
		String mongoCollectionName;
		DB mongoDB;
		DBObject query;

		public TailableCursorThread(LinkedBlockingQueue<DBObject> queue, DB mongoDB, String mongoCollectionName, DBObject query) {
			
			this.queue = queue;
			this.mongoDB = mongoDB;
			this.mongoCollectionName = mongoCollectionName;
			this.query = query;
		}

		public void run() {
			
			while(opened.get()) {
				try {
					// create the cursor
					mongoDB.requestStart();
					final DBCursor cursor = mongoDB.getCollection(mongoCollectionName)
												.find(query)
												.sort(new BasicDBObject("$natural", 1))
												.addOption(Bytes.QUERYOPTION_TAILABLE)
												.addOption(Bytes.QUERYOPTION_AWAITDATA);
					try {
						while (opened.get() && cursor.hasNext()) {
		                    final DBObject doc = cursor.next();
		
		                    if (doc == null) break;
		
		                    queue.put(doc);
		                }
					} finally {
						try { 
							if (cursor != null) cursor.close(); 
						} catch (final Throwable t) { }
	                    try { 
	                    	mongoDB.requestDone(); 
	                    	} catch (final Throwable t) { }
	                }
					
					Utils.sleep(500);
				} catch (final MongoException.CursorNotFound cnf) {
					// rethrow only if something went wrong while we expect the cursor to be open.
                    if (opened.get()) {
                    	throw cnf;
                    }
                } catch (InterruptedException e) { break; }
			}
		};
	}
	
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		
		this.collector = collector;
		this.queue = new LinkedBlockingQueue<DBObject>(1000);
		try {
			this.mongoDB = new MongoClient(this.mongoHost, this.mongoPort).getDB(this.mongoDbName);
		} catch (Exception e) {
			throw new RuntimeException(e);
		}

		TailableCursorThread listener = new TailableCursorThread(this.queue, this.mongoDB, this.mongoCollectionName, this.query);
		this.opened.set(true);
		listener.start();
	}

	@Override
	public void close() {
		this.opened.set(false);
	}

	@Override
	public void nextTuple() {
		
		DBObject dbo = this.queue.poll();
		if(dbo == null) {
            Utils.sleep(50);
        } else {
            this.collector.emit(dbObjectToStormTuple(dbo));
        }
	}

	@Override
	public void ack(Object msgId) {
		// TODO Auto-generated method stub	
	}

	@Override
	public void fail(Object msgId) {
		// TODO Auto-generated method stub	
	}

	public abstract List<Object> dbObjectToStormTuple(DBObject message);

}

“Storm MongoDB接口怎么使用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女