- high availability
- clustering
- zarządzianie (web UI)
- security (autentykacja, uprawnienia)
Aby rozpocząć pracę z RMQ należy pobrać i zainstalować go ze strony. Aby uruchomić instalator należy wcześniej pobrać i zainstalować dystrybucję Erlanga. Obie instalacje warto przeprowadzać jako administrator systemu (unikniemy problemów z cookie Erlanga). RabbitMQ dostarcza przyjazny interfejs webowy na porcie 15672. Aby móc z niego skorzystać należy wywołać polecenie:
rabbitmq-plugins enable rabbitmq_management
AMQP (Advanced Message Queue Protocol) jest otwartym standarded dla messagingu, niezależnym od producenta. Opiera się o następujące koncepcje:
- message broker - scentralizowany serwer pośredniczący
- exchanges :
- direct (do konkretnej kolejki)
- fan-out (do wielu kolejek)
- topic (routing na podstawie wyrażen tekstowych)
- headers (routing na podstawie nagłówków)
Exchange transferuje wiadomość do kolejek. Każda kolejka ma przypisany jakiś exchange poprzez mechanizm bindingu.
Kolejki i exchange można zakładać zarówno w proceduralnym kodzie jak i w UI administracyjnym, wyklikując odpowiednie opcje:
W projekcie .NET-owym należy zainstalować nugetem pakiet klienta RabbitMQ:
Najprostszy kod wstawiający do nazwanej kolejki poprzez domyślny exchange wygląda następująco:
var connectionFac = new ConnectionFactory() { HostName = "192.168.1.5", UserName = "admin", Password = "admin1", VirtualHost = "/" }; var connection = connectionFac.CreateConnection(); var model = connection.CreateModel(); var properties = model.CreateBasicProperties(); properties.SetPersistent(true); var bytes = Encoding.UTF8.GetBytes("my message"); var queue = "myqueue"; var exchange = ""; //default model.BasicPublish(exchange, queue, properties, bytes); Console.ReadKey();
Przykład kodu czytającego wiadomości:
model.BasicQos(0, 1, false); //przetwarzany po jednej wiadomości var consumer = new QueueingBasicConsumer(); model.BasicConsume("myqueue", false, consumer); while (true) { var delArgs = consumer.Queue.Dequeue(); byte[] bytes = delArgs.Body; Console.WriteLine(Encoding.UTF8.GetString(bytes)); model.BasicAck(delArgs.DeliveryTag, false); }
Procesów słuchających można uruchomić wiele przypisanych do tej samej kolejki. Będą wtedy rywalizowały o pobranie wiadomości. Jeśli chcemy, by kolejka działało jako publish / subscribe, to powinniśmy zbindować nasze kolejki do odpowiedniego exchange (typu fanout). Po założeniu takiego exchange'a możemy to skonfigurować w UI administracyjnym:
W kodzie po stronie wstawiającej należy ustawić odpowiedni, nazwany exchange, routing key tym razem nie będzie nazwą kolejki, ale pustym stringiem.
Po stronie odbierającej wiadomości mamy kilka zmian:
model.QueueBind("sub1", "my_fanout_exchange", ""); var subscriber = new EventingBasicConsumer(model); subscriber.Received += (mod, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] {0}", message); }; model.BasicConsume(queue: "sub1", noAck: true, consumer: subscriber); break;
Pozostałe wzorce komunikacyjne w RabbitMQ:
- RPC - wiadomość jest wysyłana do kolejki przez default Exchange, ale zawiera resposne queue, odbiorca dostaje wiadomość i umieszcza odpowiedź w reponse queue
- Routing - consumers zbindowani po kluczu, producent wysyła wiadomość z kluczem, a następnie na podstawie tego klucza wiadomość jest przekazywana do jednej lub więcej kolejek.
- Topic może zawierać wyrażenie tekstowe a'la regex wyliczane na podstawie znaków * (dowolny znak) i # (zero lub więcej słów). Wyrażeniem tym przetwarzany jest RoutingKey. Przykładowe topiki to: *.high.* lub corporate.#.
- Headers: nazwany exchange, wiadomość zawiera nagłówki (tekstowe), warunkiem może być all lub any. Umożliwia matchowanie po jednym lub każdym headerze (własności wiadomości). Analogia do tagowania postów
- Scatter Gather - producent wstawia wiadomość do kolejki, tworzy tymczasową kolejkę odpowiedzi. Wielu przetwarzających pobiera dane, przetwarza i wstawia swoje odpowiedzi do response queue.
Brak komentarzy:
Prześlij komentarz