溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

.net中rabbitmq如何使用

發布時間:2021-09-14 13:39:12 來源:億速云 閱讀:138 作者:小新 欄目:開發技術

小編給大家分享一下.net中rabbitmq如何使用,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

    什么是rabbitMQ

            RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue 高級消息隊列協議 )的開源實現,
            能夠實現異步消息處理      

            RabbitMQ是一個消息代理:它接受和轉發消息。
            你可以把它想象成一個郵局:當你把你想要發布的郵件放在郵箱中時,你可以確定郵差先生最終將郵件發送給你的收件人。在這個比喻中,RabbitMQ是郵政信箱,郵局和郵遞員。
            RabbitMQ和郵局的主要區別在于它不處理紙張,而是接受,存儲和轉發二進制數據塊      

            優點:異步消息處理
                  業務解耦(下訂單操作:扣減庫存、生成訂單、發紅包、發短信),
                            將下單操作主流程:扣減庫存、生成訂單
                            然后通過MQ消息隊列完成通知,發紅包、發短信
                  錯峰流控 (通知量 消息量 訂單量大的情況實現MQ消息隊列機制,淡季情況下訪問量會少)     

                  靈活的路由(Flexible Routing)
                    在消息進入隊列之前,通過 Exchange 來路由消息的。對于典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,可以將多個 Exchange 綁定在一起,也通過插件機制實現自己的 Exchange 。       

            RabbitMQ網站端口號:15672
            程序里面實現的端口為:5672

    Rabbitmq的關鍵術語

      1、綁定器(Binding):根據路由規則綁定Queue和Exchange。

      2、路由鍵(Routing Key):Exchange根據關鍵字進行消息投遞。

      3、交換機(Exchange):指定消息按照路由規則進入指定隊列

      4、消息隊列(Queue):消息的存儲載體

      5、生產者(Producer):消息發布者。

      6、消費者(Consumer):消息接收者。

    Rabbitmq的運作

      從下圖可以看出,發布者(Publisher)是把消息先發送到交換器(Exchange),再從交換器發送到指定隊列(Queue),而先前已經聲明交換器與隊列綁定關系,最后消費者(Customer)通過訂閱或者主動取指定隊列消息進行消費。

    .net中rabbitmq如何使用

      那么剛剛提到的訂閱和主動取可以理解成,推(被動),拉(主動)。

      推,只要隊列增加一條消息,就會通知空閑的消費者進行消費。(我不找你,就等你找我,觀察者模式)

      拉,不會通知消費者,而是由消費者主動輪循或者定時去取隊列消息。(我需要才去找你)

      使用場景我舉個例子,假如有兩套系統 訂單系統和發貨系統,從訂單系統發起發貨消息指令,為了及時發貨,發貨系統需要訂閱隊列,只要有指令就處理。

      可是程序偶爾會出異常,例如網絡或者DB超時了,把消息丟到失敗隊列,這個時候需要重發機制。但是我又不想while(IsPostSuccess == True),因為只要出異常了,會在某個時間段內都會有異常,這樣的重試是沒意義的。

      這個時候不需要及時的去處理消息,有個JOB定時或者每隔幾分鐘(失敗次數*間隔分鐘)去取失敗隊列消息,進行重發。

    Publish(發布)的封裝

      步驟:初始化鏈接->聲明交換器->聲明隊列->換機器與隊列綁定->發布消息。注意的是,我將Model存到了ConcurrentDictionary里面,因為聲明與綁定是非常耗時的,其次,往重復的隊列發送消息是不需要重新初始化的。

    /// <summary>
              /// 交換器聲明
              /// </summary>
              /// <param name="iModel"></param>
              /// <param name="exchange">交換器</param>
              /// <param name="type">交換器類型:
              /// 1、Direct Exchange – 處理路由鍵。需要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全
              /// 匹配。這是一個完整的匹配。如果一個隊列綁定到該交換機上要求路由鍵 “dog”,則只有被標記為“dog”的
              /// 消息才被轉發,不會轉發dog.puppy,也不會轉發dog.guard,只會轉發dog
             /// 2、Fanout Exchange – 不處理路由鍵。你只需要簡單的將隊列綁定到交換機上。一個發送到交換機的消息都
             /// 會被轉發到與該交換機綁定的所有隊列上。很像子網廣播,每臺子網內的主機都獲得了一份復制的消息。Fanout
             /// 交換機轉發消息是最快的。
             /// 3、Topic Exchange – 將路由鍵和某模式進行匹配。此時隊列需要綁定要一個模式上。符號“#”匹配一個或多
             /// 個詞,符號“*”匹配不多不少一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*”
             /// 只會匹配到“audit.irs”。</param>
             /// <param name="durable">持久化</param>
             /// <param name="autoDelete">自動刪除</param>
             /// <param name="arguments">參數</param>
             private static void ExchangeDeclare(IModel iModel, string exchange, string type = ExchangeType.Direct,
                 bool durable = true,
                 bool autoDelete = false, IDictionary<string, object> arguments = null)
             {
                 exchange = exchange.IsNullOrWhiteSpace() ? "" : exchange.Trim();
                 iModel.ExchangeDeclare(exchange, type, durable, autoDelete, arguments);
             }
     
             /// <summary>
             /// 隊列聲明
             /// </summary>
             /// <param name="channel"></param>
             /// <param name="queue">隊列</param>
             /// <param name="durable">持久化</param>
             /// <param name="exclusive">排他隊列,如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見,
             /// 并在連接斷開時自動刪除。這里需要注意三點:其一,排他隊列是基于連接可見的,同一連接的不同信道是可
             /// 以同時訪問同一個連接創建的排他隊列的。其二,“首次”,如果一個連接已經聲明了一個排他隊列,其他連
             /// 接是不允許建立同名的排他隊列的,這個與普通隊列不同。其三,即使該隊列是持久化的,一旦連接關閉或者
             /// 客戶端退出,該排他隊列都會被自動刪除的。這種隊列適用于只限于一個客戶端發送讀取消息的應用場景。</param>
             /// <param name="autoDelete">自動刪除</param>
             /// <param name="arguments">參數</param>
             private static void QueueDeclare(IModel channel, string queue, bool durable = true, bool exclusive = false,
                bool autoDelete = false, IDictionary<string, object> arguments = null)
             {
                 queue = queue.IsNullOrWhiteSpace() ? "UndefinedQueueName" : queue.Trim();
                 channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments);
            }
     
            /// <summary>
            /// 獲取Model
            /// </summary>
           /// <param name="exchange">交換機名稱</param>
            /// <param name="queue">隊列名稱</param>
             /// <param name="routingKey"></param>
             /// <param name="isProperties">是否持久化</param>
            /// <returns></returns>
             private static IModel GetModel(string exchange, string queue, string routingKey, bool isProperties = false)
            {
              return ModelDic.GetOrAdd(queue, key =>
                {
                     var model = _conn.CreateModel();
                    ExchangeDeclare(model, exchange, ExchangeType.Fanout, isProperties);
                     QueueDeclare(model, queue, isProperties);
                    model.QueueBind(queue, exchange, routingKey);
                    ModelDic[queue] = model;
                     return model;
                });
            }
     
           /// <summary>
           /// 發布消息
             /// </summary>
           /// <param name="routingKey">路由鍵</param>
             /// <param name="body">隊列信息</param>
            /// <param name="exchange">交換機名稱</param>
            /// <param name="queue">隊列名</param>
           /// <param name="isProperties">是否持久化</param>
            /// <returns></returns>
             public void Publish(string exchange, string queue, string routingKey, string body, bool isProperties = false)
             {
               var channel = GetModel(exchange, queue, routingKey, isProperties);
    
                 try
                 {
                   channel.BasicPublish(exchange, routingKey, null, body.SerializeUtf8());
                 }
                catch (Exception ex)
                {
                   throw ex.GetInnestException();
               }
           }

      下次是本機測試的發布速度截圖:

    .net中rabbitmq如何使用

      4.2W/S屬于穩定速度,把反序列化(ToJson)會稍微快一些。

    Subscribe(訂閱)的封裝

      發布的時候是申明了交換器和隊列并綁定,然而訂閱的時候只需要聲明隊列就可。從下面代碼能看到,捕獲到異常的時候,會把消息送到自定義的“死信隊列”里,由另外的JOB進行定時重發,因此,finally是應答成功的。

    /// <summary>
            /// 獲取Model
            /// </summary>
            /// <param name="queue">隊列名稱</param>
            /// <param name="isProperties"></param>
            /// <returns></returns>
            private static IModel GetModel(string queue, bool isProperties = false)
            {
                return ModelDic.GetOrAdd(queue, value =>
                 {
                     var model = _conn.CreateModel();
                     QueueDeclare(model, queue, isProperties);
    
                     //每次消費的消息數
                     model.BasicQos(0, 1, false);
    
                     ModelDic[queue] = model;
    
                     return model;
                 });
            }    
    
            /// <summary>
            /// 接收消息
            /// </summary>
            /// <typeparam name="T"></typeparam>
            /// <param name="queue">隊列名稱</param>
            /// <param name="isProperties"></param>
            /// <param name="handler">消費處理</param>
            /// <param name="isDeadLetter"></param>
            public void Subscribe<T>(string queue, bool isProperties, Action<T> handler, bool isDeadLetter) where T : class
            {
                //隊列聲明
                var channel = GetModel(queue, isProperties);
    
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var msgStr = body.DeserializeUtf8();
                    var msg = msgStr.FromJson<T>();
                    try
                    {
                        handler(msg);
                    }
                    catch (Exception ex)
                    {
                        ex.GetInnestException().WriteToFile("隊列接收消息", "RabbitMq");
                        if (!isDeadLetter)
                            PublishToDead<DeadLetterQueue>(queue, msgStr, ex);
                    }
                    finally
                    {
                        channel.BasicAck(ea.DeliveryTag, false);
                    }
                };
                channel.BasicConsume(queue, false, consumer);
            }

      下次是本機測試的發布速度截圖:

    .net中rabbitmq如何使用

      快的時候有1.9K/S,慢的時候也有1.7K/S

    Pull(拉)的封裝

      直接上代碼:

     /// <summary>
            /// 獲取消息
            /// </summary>
            /// <typeparam name="T"></typeparam>
            /// <param name="exchange"></param>
            /// <param name="queue"></param>
            /// <param name="routingKey"></param>
            /// <param name="handler">消費處理</param>
            private void Poll<T>(string exchange, string queue, string routingKey, Action<T> handler) where T : class
            {
                var channel = GetModel(exchange, queue, routingKey);
    
                var result = channel.BasicGet(queue, false);
                if (result.IsNull())
                    return;
    
                var msg = result.Body.DeserializeUtf8().FromJson<T>();
                try
                {
                    handler(msg);
                }
                catch (Exception ex)
                {
                    ex.GetInnestException().WriteToFile("隊列接收消息", "RabbitMq");
                }
                finally
                {
                    channel.BasicAck(result.DeliveryTag, false);
                }
            }

    .net中rabbitmq如何使用

      快的時候有1.8K/s,穩定是1.5K/S

    Rpc(遠程調用)的封裝

      首先說明下,RabbitMq只是提供了這個RPC的功能,但是并不是真正的RPC,為什么這么說:

      1、傳統Rpc隱藏了調用細節,像調用本地方法一樣傳參、拋出異常

      2、RabbitMq的Rpc是基于消息的,消費者消費后,通過新隊列返回響應結果。

     /// <summary>
            /// RPC客戶端
            /// </summary>
            /// <param name="exchange"></param>
            /// <param name="queue"></param>
            /// <param name="routingKey"></param>
            /// <param name="body"></param>
            /// <param name="isProperties"></param>
            /// <returns></returns>
            public string RpcClient(string exchange, string queue, string routingKey, string body, bool isProperties = false)
            {
                var channel = GetModel(exchange, queue, routingKey, isProperties);
    
                var consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume(queue, true, consumer);
    
                try
                {
                    var correlationId = Guid.NewGuid().ToString();
                    var basicProperties = channel.CreateBasicProperties();
                    basicProperties.ReplyTo = queue;
                    basicProperties.CorrelationId = correlationId;
    
                    channel.BasicPublish(exchange, routingKey, basicProperties, body.SerializeUtf8());
    
                    var sw = Stopwatch.StartNew();
                    while (true)
                    {
                        var ea = consumer.Queue.Dequeue();
                        if (ea.BasicProperties.CorrelationId == correlationId)
                        {
                            return ea.Body.DeserializeUtf8();
                        }
    
                        if (sw.ElapsedMilliseconds > 30000)
                            throw new Exception("等待響應超時");
                    }
                }
                catch (Exception ex)
                {
                    throw ex.GetInnestException();
                }
            }    
    
            /// <summary>
            /// RPC服務端
            /// </summary>
            /// <typeparam name="T"></typeparam>
            /// <param name="exchange"></param>
            /// <param name="queue"></param>
            /// <param name="isProperties"></param>
            /// <param name="handler"></param>
            /// <param name="isDeadLetter"></param>
            public void RpcService<T>(string exchange, string queue, bool isProperties, Func<T, T> handler, bool isDeadLetter)
            {
                //隊列聲明
                var channel = GetModel(queue, isProperties);
    
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var msgStr = body.DeserializeUtf8();
                    var msg = msgStr.FromJson<T>();
    
                    var props = ea.BasicProperties;
                    var replyProps = channel.CreateBasicProperties();
                    replyProps.CorrelationId = props.CorrelationId;
    
                    try
                    {
                        msg = handler(msg);
                    }
                    catch (Exception ex)
                    {
                        ex.GetInnestException().WriteToFile("隊列接收消息", "RabbitMq");
                    }
                    finally
                    {
                        channel.BasicPublish(exchange, props.ReplyTo, replyProps, msg.ToJson().SerializeUtf8());
                        channel.BasicAck(ea.DeliveryTag, false);
                    }
                };
                channel.BasicConsume(queue, false, consumer);
            }

      可以用,但不建議去用??梢钥紤]其他的RPC框架。grpc、thrift等。

    以上是“.net中rabbitmq如何使用”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女