本篇內容介紹了“Spring Cloud Stream如何使用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
Spring Cloud Stream是構建消息驅動的微服務應用程序的框架。Spring Cloud Stream基于Spring Boot建立獨立的生產級Spring應用程序,并使用Spring Integration提供與消息代理的連接。它提供了來自幾家供應商的中間件的意見配置,介紹了持久發布訂閱語義,消費者組和分區的概念。
您可以將@EnableBinding注釋添加到應用程序,以便立即連接到消息代理,并且可以將@StreamListener添加到方法中,以使其接收流處理的事件。以下是接收外部消息的簡單接收器應用程序。
@SpringBootApplication@EnableBinding(Sink.class)public class VoteRecordingSinkApplication { public static void main(String[] args) { SpringApplication.run(VoteRecordingSinkApplication.class, args); } @StreamListener(Sink.INPUT) public void processVote(Vote vote) { votingService.recordVote(vote); }}
@EnableBinding注釋需要一個或多個接口作為參數(在這種情況下,該參數是單個Sink接口)。接口聲明輸入和/或輸出通道。Spring Cloud Stream提供了接口Source,Sink和Processor; 您還可以定義自己的界面。
以下是Sink接口的定義:
public interface Sink { String INPUT = "input"; @Input(Sink.INPUT) SubscribableChannel input();}
@Input注釋標識輸入通道,通過該輸入通道接收到的消息進入應用程序; @Output注釋標識輸出通道,發布的消息將通過該通道離開應用程序。@Input和@Output注釋可以使用頻道名稱作為參數; 如果未提供名稱,將使用注釋方法的名稱。
Spring Cloud Stream將為您創建一個界面的實現。您可以在應用程序中通過自動連接來使用它,如下面的測試用例示例。
@RunWith(SpringJUnit4ClassRunner.class)@SpringApplicationConfiguration(classes = VoteRecordingSinkApplication.class)@WebAppConfiguration@DirtiesContextpublic class StreamApplicationTests { @Autowired private Sink sink; @Test public void contextLoads() { assertNotNull(this.sink.input()); }}
編程模型
Binder
Binder 是 Spring Cloud Stream 的一個抽象概念,是應用與消息中間件之間的粘合劑。
目前 Spring Cloud Stream 實現了 Kafka 和 Rabbit MQ 的binder。通過 binder ,可以很方便的連接中間件,可以動態的改變消息的destinations(對應于 Kafka 的topic,Rabbit MQ 的 exchanges),這些都可以通過外部配置項來做到。甚至可以任意的改變中間件的類型而不需要修改一行代碼。
Publish-Subscribe
消息的發布(Publish)和訂閱(Subscribe)是事件驅動的經典模式。Spring Cloud Stream 的數據交互也是基于這個思想。生產者把消息通過某個 topic 廣播出去(Spring Cloud Stream 中的 destinations)。其他的微服務,通過訂閱特定 topic 來獲取廣播出來的消息來觸發業務的進行。
這種模式,極大的降低了生產者與消費者之間的耦合。即使有新的應用的引入,也不需要破壞當前系統的整體結構。
Consumer Groups
“Group”, Kafka 中的概念。Spring Cloud Stream 的這個分組概念的意思基本和 Kafka 一致。
微服務中動態的縮放同一個應用的數量以此來達到更高的處理能力是非常必須的。對于這種情況,同一個事件防止被重復消費,只要把這些應用放置于同一個 “group” 中,就能夠保證消息只會被其中一個應用消費一次。
Message
Message,就是所說的消息體,用來承載傳輸的信息用的。Message分為兩部分,header和payload。header是頭部信息,用來存儲傳輸的一些特性屬性參數。payload是用來裝載數據的,他可以攜帶的任何Object對象 不同的對象在binder中傳輸 可以指定不同的mini類型
可以通過application.yml中設置 輸入input和輸出output的mini類型
spring.cloud.stream.bindings..content-type
MessageChannel
消息管道,生產者生產一個消息到channel,消費者從channel消費一個消息,所以channel可以對消息組件解耦,并且提供一個方便的攔截功能和監控功能。
默認的通道
輸入(SubscribableChannel)和輸出通道(MessageChannel)參考 Processor接口
springcloudstream提供通道的定義 比如自定義通過可以使用接口
public interface OrderChannel { String INPUT = "input_order"; String OUTPUT="ouput_order"; /** * input注解制定通道的名稱 將來在yml中配置該通道的實際綁定的topic或者訂閱組 * @return */ @Input(INPUT) SubscribableChannel orderInput(); /** * output注解指定輸出通道的名稱 * @return */ @Output(OUTPUT) MessageChannel orderOutput();}
以下 代碼參考 Source Sink Processor接口 將來在yml關于該通道的配置既可以
spring: cloud: stream: bindings: 通道名稱: destination: mydest
“Spring Cloud Stream如何使用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。