RabbitMQ 스트림 대기열에 대한 클라이언트로 이동하십시오
BatchSend
Send
RabbitMQ 스트림 대기열에 대한 클라이언트로 이동하십시오
go get -u github.com/rabbitmq/rabbitmq-stream-go-client
수입 :
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" // Main package
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" // amqp 1.0 package to encode messages
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" // messages interface package, you may not need to import it directly
로컬로 테스트하려면 서버가 필요할 수 있습니다. 브로커를 시작합시다 :
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS= ' -rabbitmq_stream advertised_host localhost -rabbit loopback_users "none" '
rabbitmq:3-management
중개인은 몇 초 안에 시작해야합니다. 준비되면 stream
플러그인 및 stream_management
활성화하십시오.
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream_management
관리 UI : http : // localhost : 15672/
스트림 URI : rabbitmq-stream://guest:guest@localhost:5552
시작 예를 참조하십시오.
더 많은 사용 사례는 예제 디렉토리를 참조하십시오.
단일 노드를 연결하는 표준 방법 :
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5552 ).
SetUser ( "guest" ).
SetPassword ( "guest" ))
CheckErr ( err )
연결 당 생산자 수를 정의 할 수 있고 기본값은 1입니다.
stream . NewEnvironmentOptions ().
SetMaxProducersPerClient ( 2 ))
연결 당 소비자 수를 정의 할 수 있고 기본값은 1입니다.
stream . NewEnvironmentOptions ().
SetMaxConsumersPerClient ( 2 ))
최상의 성능을 얻으려면 기본값을 사용해야합니다. 연결 당 여러 소비자에 대한 참고 사항 : IO 스레드는 소비자 전체에서 공유되므로 한 소비자가 느리면 다른 소비자 성능에 영향을 줄 수 있습니다.
클라이언트를 연결하지 않으면 멀티 호스트를 정의 할 수 있습니다.
addresses := [] string {
"rabbitmq-stream://guest:guest@host1:5552/%2f" ,
"rabbitmq-stream://guest:guest@host2:5552/%2f" ,
"rabbitmq-stream://guest:guest@host3:5552/%2f" }
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions (). SetUris ( addresses ))
스트림 클라이언트는 모든 호스트 이름에 도달해야합니다.로드 밸런서의 경우 stream.AddressResolver
매개 변수를 사용할 수 있습니다.
addressResolver := stream. AddressResolver {
Host : "load-balancer-ip" ,
Port : 5552 ,
}
env , err := stream. NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( addressResolver . Host ).
SetPort ( addressResolver . Port ).
SetAddressResolver ( addressResolver ).
이 구성에서 클라이언트는 오른쪽 노드에 도달 할 때까지 연결을 시도합니다.
이 RabbitMQ 블로그 게시물은 세부 사항을 설명합니다.
예제 디렉토리의 "로드 밸런서 사용"예제도 참조
TLS를 구성하려면 IsTLS
매개 변수를 설정해야합니다.
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetHost ( "localhost" ).
SetPort ( 5551 ). // standard TLS port
SetUser ( "guest" ).
SetPassword ( "guest" ).
IsTLS ( true ).
SetTLSConfig ( & tls. Config {}),
)
tls.Config
는 표준 Golang TLS 라이브러리 https://pkg.go.dev/crypto/tls입니다
예제 디렉토리의 "시작 TLS"예제도 참조하십시오.
스키마 URI와 같은 스키마를 사용하여 TLS를 구성 할 수도 있습니다.
env , err := stream . NewEnvironment (
stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://guest:guest@localhost:5551/" ).
SetTLSConfig ( & tls. Config {}),
)
SASL을 구성하려면 SaslMechanism
매개 변수 Environment.SetSaslConfiguration
설정해야합니다 .SETSASLConfiguration :
cfg := new (tls. Config )
cfg . ServerName = "my_server_name"
cfg . RootCAs = x509 . NewCertPool ()
if ca , err := os . ReadFile ( "certs/ca_certificate.pem" ); err == nil {
cfg . RootCAs . AppendCertsFromPEM ( ca )
}
if cert , err := tls . LoadX509KeyPair ( "certs/client/cert.pem" , "certs/client/key.pem" ); err == nil {
cfg . Certificates = append ( cfg . Certificates , cert )
}
env , err := stream . NewEnvironment ( stream . NewEnvironmentOptions ().
SetUri ( "rabbitmq-stream+tls://my_server_name:5551/" ).
IsTLS ( true ).
SetSaslConfiguration ( stream . SaslConfigurationExternal ). // SASL EXTERNAL
SetTLSConfig ( cfg ))
스트림을 정의하려면 environment
인터페이스 DeclareStream
및 DeleteStream
사용해야합니다.
MaxLengthBytes
또는 MaxAge
와 같은 스트림 생성 중에 스트림 유지 정책을 정의하는 것이 좋습니다.
err = env . DeclareStream ( streamName ,
stream . NewStreamOptions ().
SetMaxLengthBytes (stream. ByteCapacity {}. GB ( 2 )))
스트림이 이미 동일한 매개 변수로 정의 된 경우 함수 DeclareStream
오류를 반환하지 않습니다. 스트림이 존재하는지 확인하기 위해 StreamExists
사용하는 동일한 매개 변수가 없을 때 전제 조건이 실패한 경우를 반환합니다.
스트림 통계를 얻으려면 environment.StreamStats
메소드를 사용해야합니다.
stats , err := environment . StreamStats ( testStreamName )
// FirstOffset - The first offset in the stream.
// return first offset in the stream /
// Error if there is no first offset yet
firstOffset , err := stats . FirstOffset () // first offset of the stream
// CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.
//
// It is the offset of the first message in the last chunk confirmed by a quorum of the stream
// cluster members (leader and replicas).
//
// The committed chunk ID is a good indication of what the last offset of a stream can be at a
// given time. The value can be stale as soon as the application reads it though, as the committed
// chunk ID for a stream that is published to changes all the time.
committedChunkId , err := statsAfter . CommittedChunkId ()
메시지를 게시하려면 *stream.Producer
인스턴스가 필요합니다.
producer , err := env . NewProducer ( "my-stream" , nil )
프로듀서 동작을 사용자 정의하기 위해 ProducerOptions
사용하면 다음과 같습니다.
type ProducerOptions struct {
Name string // Producer name, it is useful to handle deduplication messages
QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
BatchSize int // It is the batch-size aggregation, low value reduce the latency, high value increase the throughput
BatchPublishingDelay int // Period to send a batch of messages.
}
클라이언트는 메시지를 보내는 두 개의 인터페이스를 제공합니다. send
:
var message message. StreamMessage
message = amqp . NewMessage ([] byte ( "hello" ))
err = producer . Send ( message )
그리고 BatchSend
:
var messages []message. StreamMessage
for z := 0 ; z < 10 ; z ++ {
messages = append ( messages , amqp . NewMessage ([] byte ( "hello" )))
}
err = producer . BatchSend ( messages )
producer.Send
:
requestedMaxFrameSize
보다 큰 경우 메시지를 자동으로 분할합니다.producer-send-timeout
에서 아무 일도 일어나지 않는 경우 메시지를 보냅니다 producer.BatchSend
:
생산자를 닫으십시오 : producer.Close()
생산자는 서버에서 제거됩니다. 다른 생산자가없는 경우 TCP 연결이 닫힙니다
BatchSend
Send
BatchSend
는 메시지를 보내기위한 원시적이며, Send
메시지를 게시하고 내부적으로 BatchSend
사용하는 스마트 레이어를 소개합니다.
Send
Interface는 대부분의 경우에 작동하며 일부 조건에서는 BatchSend
보다 약 15/20 느릅니다. 이 스레드도 참조하십시오.
예제 디렉토리의 "클라이언트 성능"예제도 참조하십시오
각 게시에 대해 서버는 클라이언트에게 확인 또는 오류를 보냅니다. 클라이언트는 확인을 받기위한 인터페이스를 제공합니다.
//optional publish confirmation channel
chPublishConfirm := producer . NotifyPublishConfirmation ()
handlePublishConfirm ( chPublishConfirm )
func handlePublishConfirm ( confirms stream. ChannelPublishConfirm ) {
go func () {
for confirmed := range confirms {
for _ , msg := range confirmed {
if msg . IsConfirmed () {
fmt . Printf ( "message %s stored n " , msg . GetMessage (). GetData ())
} else {
fmt . Printf ( "message %s failed n " , msg . GetMessage (). GetData ())
}
}
}
}()
}
Messagestatus struct에서는 두 가지 publishingId
찾을 수 있습니다.
//first one
messageStatus . GetMessage (). GetPublishingId ()
// second one
messageStatus . GetPublishingId ()
첫 번째는 사용자가 중복 제거와 같은 특별한 경우에 제공합니다. 두 번째는 클라이언트가 자동으로 할당합니다. 사용자가 다음과 함께 publishingId
지정하는 경우.
msg = amqp . NewMessage ([] byte ( "mymessage" ))
msg . SetPublishingId ( 18 ) // <---
제출 된 : messageStatus.GetMessage().HasPublishingId()
사실이며
messageStatus.GetMessage().GetPublishingId()
및 messageStatus.GetPublishingId()
값은 동일합니다.
예제 디렉토리의 "시작"예제도 참조하십시오
스트림 플러그인은 중복 제거 데이터를 처리 할 수 있습니다. 자세한 내용은이 블로그 게시물을 참조하십시오.
예제 디렉토리에서 "중복 제거"예제를 찾을 수 있습니다.
시간 이상을 실행하면 메시지 수는 항상 10입니다.
생산자의 마지막 시퀀스 ID를 검색하려면 다음을 사용할 수 있습니다.
publishingId, err := producer.GetLastPublishingId()
하위 엔트리에 넣을 메시지 수. 하위 엔트리는 게시 프레임에서 하나의 "슬롯"이며, 아웃 바운드 메시지는 출판 프레임뿐만 아니라 하위 엔트리에서도 배치됩니다. 이 기능을 사용하여 대기 시간이 증가하는 비용으로 처리량을 늘리십시오.
예제 디렉토리에서 "하위 항목 배치"예제를 찾을 수 있습니다.
기본 압축은 None
(압축 없음)이지만 다른 종류의 압축을 정의 할 수 있습니다 : GZIP
, SNAPPY
, LZ4
, ZSTD
압축은 유효 SubEntrySize > 1
producer , err := env . NewProducer ( streamName , stream . NewProducerOptions ().
SetSubEntrySize ( 100 ).
SetCompression (stream. Compression {}. Gzip ()))
스트림 필터링은 RabbitMQ 3.13의 새로운 기능입니다. 해당 응용 프로그램이 스트림의 메시지의 하위 집합 만 필요할 때 브로커와 소비 애플리케이션간에 대역폭을 저장할 수 있습니다. 자세한 내용은이 블로그 게시물을 참조하십시오. 블로그 게시물에는 Java 예제도 포함되어 있지만 Go 클라이언트는 비슷합니다. 예제 디렉토리의 필터링 예제를 참조하십시오.
스트림에서 메시지를 소비하려면 NewConsumer
인터페이스를 사용해야합니다.
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
fmt . Printf ( "consumer name: %s, text: %s n " , consumerContext . Consumer . GetName (), message . Data )
}
consumer , err := env . NewConsumer (
"my-stream" ,
handleMessages ,
... .
ConsumerOptions
사용하면 소비자 행동을 사용자 정의 할 수 있습니다.
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). // set a consumer name
SetCRCCheck ( false ). // Enable/Disable the CRC control.
SetOffset (stream. OffsetSpecification {}. First ())) // start consuming from the beginning
CRC 제어를 비활성화하면 성능이 향상 될 수 있습니다.
예제 디렉토리의 "오프셋 시작"예제도 참조하십시오
소비자를 닫으십시오 : consumer.Close()
소비자는 서버에서 제거됩니다. 다른 소비자가없는 경우 TCP 연결이 닫힙니다
서버는 이러한 방식으로 소비자가 주어진 현재 전달 된 오프셋을 저장할 수 있습니다.
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
if atomic . AddInt32 ( & count , 1 ) % 1000 == 0 {
err := consumerContext . Consumer . StoreOffset () // commit all messages up to the current message's offset
... .
consumer , err := env. NewConsumer (
..
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ). <- - - - - -
소비자는 오프셋을 저장할 수있는 이름이 있어야합니다.
참고 : 각 단일 메시지에 대한 오프셋을 저장하지 않으면 성능이 줄어 듭니다.
예제 디렉토리의 "오프셋 추적"예제도 참조하십시오
서버는 또한 이러한 방식으로 전류 전달 된 오프셋 대신 이전 전달 된 오프셋을 저장할 수 있습니다.
processMessageAsync := func ( consumer stream. Consumer , message * amqp. Message , offset int64 ) {
... .
err := consumer . StoreCustomOffset ( offset ) // commit all messages up to this offset
... .
이것은 메시지를 비동기 적으로 처리해야하며 원래 메시지 핸들러를 차단할 수없는 상황에 유용합니다. 즉, 위의 handleMessages
함수에서 보았 듯이 현재 또는 최신 전달 된 오프셋을 저장할 수 없습니다.
다음 스 니펫은 기본값으로 자동 추적을 활성화하는 방법을 보여줍니다.
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy () ...
nil
또한 유효한 가치입니다. 기본값이 사용됩니다
stream . NewConsumerOptions ().
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( nil ) ...
소비자 이름 설정 (오프셋 추적의 필수)
자동 추적 전략에는 다음과 같은 설정이 있습니다.
스토리지 전 메시지 수 : 클라이언트는 지정된 수의 메시지 후 오프셋을 저장합니다.
메시지 처리기 실행 직후. 기본값은 10,000 개의 메시지마다입니다.
플러시 간격 : 클라이언트는 지정된 간격으로 마지막 수신 된 오프셋을 저장해야합니다.
이것은 비 활동의 경우 오프셋을 저장하지 않고 보류 중이되는 것을 피합니다. 기본값은 5 초입니다.
이러한 설정은 다음 스 니펫과 같이 구성 가능합니다.
stream . NewConsumerOptions ().
// set a consumerOffsetNumber name
SetConsumerName ( "my_consumer" ).
SetAutoCommit ( stream . NewAutoCommitStrategy ().
SetCountBeforeStorage ( 50 ). // store each 50 messages stores
SetFlushInterval ( 10 * time . Second )). // store each 10 seconds
SetOffset (stream. OffsetSpecification {}. First ()))
예제 디렉토리의 "자동 오프셋 추적"예를 참조하십시오.
다음을 사용하여 소비자 오프셋을 쿼리 할 수 있습니다.
offset , err := env . QueryOffset ( "consumer_name" , "streamName" )
오프셋이 존재하지 않으면 오류가 반환됩니다.
스트림 필터링은 RabbitMQ 3.13의 새로운 기능입니다. 해당 응용 프로그램이 스트림의 메시지의 하위 집합 만 필요할 때 브로커와 소비 애플리케이션간에 대역폭을 저장할 수 있습니다. 자세한 내용은이 블로그 게시물을 참조하십시오. 블로그 게시물에는 Java 예제도 포함되어 있지만 Go 클라이언트는 비슷합니다. 예제 디렉토리의 필터링 예제를 참조하십시오.
단일 활성 소비자 패턴은 하나의 소비자가 한 번에 스트림에서 메시지를 처리하도록합니다. 단일 활성 소비자 예제를 참조하십시오.
단일 활성 소비자 패턴으로 소비자를 만들려면 SingleActiveConsumer
옵션을 설정해야합니다.
consumerName := "MyFirstGroupConsumer"
consumerUpdate := func ( isActive bool ) stream. OffsetSpecification {..}
stream . NewConsumerOptions ().
SetConsumerName ( consumerName ).
SetSingleActiveConsumer (
stream . NewSingleActiveConsumer ( consumerUpdate ))
소비자가 홍보 될 때 ConsumerUpdate
기능이 호출됩니다.
새로운 소비자는 consumerUpdate
기능에 의해 반환 된 오프셋에서 소비를 다시 시작합니다.
반환 할 오프셋을 결정하는 것은 사용자에게 달려 있습니다.
그 중 하나는 오프셋 서버 쪽을 저장하고 마지막 오프셋에서 다시 시작하는 것입니다.
단일 활성 소비자 예제는 서버 측 오프셋을 사용하여 소비자를 다시 시작합니다.
ConsumerName
은 SAC를 활성화해야합니다. 다른 소비자 그룹을 만드는 방법입니다.
다른 소비자 그룹은 동시에 동일한 스트림을 소비 할 수 있습니다.
SAC ConsumerUpdate
활성화 될 때 NewConsumerOptions().SetOffset()
필요하지 않습니다.
자세한 내용은이 게시물도 참조하십시오
클라이언트는 생산자/소비자를 가까이에서 처리하기위한 인터페이스를 제공합니다.
channelClose := consumer . NotifyClose ()
defer consumerClose ( channelClose )
func consumerClose ( channelClose stream. ChannelClose ) {
event := <- channelClose
fmt . Printf ( "Consumer: %s closed on the stream: %s, reason: %s n " , event . Name , event . StreamName , event . Reason )
}
이런 식으로 실패를 처리 할 수 있습니다
ReliableProducer
및 ReliableConsumer
는 표준 생산자/소비자로 구축됩니다.
둘 다 표준 이벤트를 사용하여 클로즈를 처리합니다. 따라서 장애를 처리하기 위해 자신의 코드를 작성할 수 있습니다.
특징:
Both
] 단절의 경우 자동 레코 니트.Both
ReliableConsumer
스트림 ReliableProducer
존재하는지 확인하십시오.Both
] 스트림에 유효한 리더와 복제본이 있는지 확인하십시오. 스트림이 준비 될 때까지 재 시도하지 않으면.ReliableProducer
] 실패의 경우 확인되지 않은 메시지를 자동으로 처리합니다.ReliableConsumer
] 재시작의 경우 마지막 오프셋에서 다시 시작합니다. 예제 디렉토리에서 "신뢰할 수있는"예제를 찾을 수 있습니다.
Super Stream 기능은 RabbitMQ 3.11의 새로운 기능입니다. 여러 파티션이있는 스트림을 만들 수 있습니다.
각 파티션은 별도의 스트림이지만 클라이언트는 슈퍼 스트림을 단일 스트림으로 본다.
예제 디렉토리에서 "슈퍼 스트림"예제를 찾을 수 있습니다.
이 블로그 게시물에서 https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-super-streams를 찾을 수 있습니다.
Java Stream-Client 블로그 게시물 : https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#super-streams를 읽을 수 있습니다.
Super Stream은 게시 필터링 및 소비 필터링 기능을 지원합니다.
슈퍼 스트림 소비자를 위해 오프셋 추적이 지원됩니다.
표준 스트림과 같은 방식으로 SetAutoCommit
또는 SetManualCommit
옵션을 사용하여 자동 오프셋 추적을 활성화/비활성화 할 수 있습니다.
슈퍼 스트림 소비자 메시지 핸들러에서 파티션, 소비자 및 오프셋을 식별 할 수 있습니다.
handleMessages := func ( consumerContext stream. ConsumerContext , message * amqp. Message ) {
... .
consumerContext . Consumer . GetName () // consumer name
consumerContext . Consumer . GetOffset () // current offset
consumerContext . Consumer . GetStreamName () // stream name (partition name )
... .
}
수동 추적 API :
consumerContext.Consumer.StoreOffset()
: 현재 오프셋을 저장합니다.consumerContext.Consumer.StoreCustomOffset(xxx)
사용자 정의 오프셋을 저장합니다.표준 스트림과 마찬가지로 각 단일 메시지에 대한 오프셋을 저장하지 않아야합니다. 성능이 줄어 듭니다.
성능 테스트 도구 테스트를 실행하는 데 유용합니다. Java 성능 도구도 참조하십시오
설치하려면 GitHub에서 버전을 다운로드 할 수 있습니다.
스코틀랜드 사람:
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_darwin_amd64.tar.gz
Linux :
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_linux_amd64.tar.gz
창
https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_windows_amd64.zip
stream-perf-test --help
실행하여 매개 변수를보십시오. 기본적으로 하나의 생산자, 한 소비자와 테스트를 실행합니다.
여기서 예 :
stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB --uris rabbitmq-stream://guest:guest@localhost:5552/ --fixed-body 400 --time 10
Docker 이미지를 사용할 수 있습니다 : pivotalrabbitmq/go-stream-perf-test
, 테스트 : 테스트 :
서버 실행 서버는 호스트 모드입니다.
docker run -it --rm --name rabbitmq --network host
rabbitmq:3-management
플러그인 활성화 :
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
그런 다음 Docker 이미지를 실행하십시오.
docker run -it --network host pivotalrabbitmq/go-stream-perf-test
모든 매개 변수를 보려면 :
docker run -it --network host pivotalrabbitmq/go-stream-perf-test --help
make build
Docker 이미지가 필요한 테스트를 실행하려면 다음을 사용할 수 있습니다.
make rabbitmq-server
테스트를 위해 스트림이 활성화 된 준비 RabbitMQ-Server를 실행합니다.
그런 다음 make test