czwartek, 29 października 2015

[MessageQueues] RabbitMQ

RabbitMQ jest open-source'owym message brokerem zbudowanym w oparciu o standard AMQP w języku Erlang. Jego główne cechy wyróżniające go spośród innych rozwiązan tej klasy to:
- high availability
- clustering
- zarządzianie (web UI)
- security (autentykacja, uprawnienia)

Aby rozpocząć pracę z RMQ należy pobrać i zainstalować go ze strony. Aby uruchomić instalator należy wcześniej pobrać i zainstalować dystrybucję Erlanga. Obie instalacje warto przeprowadzać jako administrator systemu (unikniemy problemów z cookie Erlanga). RabbitMQ dostarcza przyjazny interfejs webowy na porcie 15672. Aby móc z niego skorzystać należy wywołać polecenie:
rabbitmq-plugins enable rabbitmq_management

AMQP (Advanced Message Queue Protocol) jest otwartym standarded dla messagingu, niezależnym od producenta. Opiera się o następujące koncepcje:
- message broker - scentralizowany serwer pośredniczący
- exchanges :
 - direct (do konkretnej kolejki)
 - fan-out (do wielu kolejek)
 - topic (routing na podstawie wyrażen tekstowych)
 - headers (routing na podstawie nagłówków)

Exchange transferuje wiadomość do kolejek. Każda kolejka ma przypisany jakiś exchange poprzez mechanizm bindingu.

Kolejki i exchange można zakładać zarówno w proceduralnym kodzie jak i w UI administracyjnym, wyklikując odpowiednie opcje:



W projekcie .NET-owym należy zainstalować nugetem pakiet klienta RabbitMQ:



Najprostszy kod wstawiający do nazwanej kolejki poprzez domyślny exchange wygląda następująco:


var connectionFac = new ConnectionFactory()
{
    HostName = "192.168.1.5",
    UserName = "admin",
    Password = "admin1",
    VirtualHost = "/"
};

var connection = connectionFac.CreateConnection();
var model = connection.CreateModel();

var properties = model.CreateBasicProperties();
properties.SetPersistent(true);

var bytes = Encoding.UTF8.GetBytes("my message");

var queue = "myqueue";
var exchange = ""; //default

model.BasicPublish(exchange, queue, properties, bytes);

Console.ReadKey();
W przypadku połączen spoza localhost musimy założyć swojego usera (takiego jak admin) i nadać mu uprawnienia do vhosta.

Przykład kodu czytającego wiadomości:

model.BasicQos(0, 1, false); //przetwarzany po jednej wiadomości

var consumer = new QueueingBasicConsumer();
model.BasicConsume("myqueue", false, consumer);

while (true)
{
    var delArgs = consumer.Queue.Dequeue();
    byte[] bytes = delArgs.Body;
    Console.WriteLine(Encoding.UTF8.GetString(bytes));
    model.BasicAck(delArgs.DeliveryTag, false);
}

Procesów słuchających można uruchomić wiele przypisanych do tej samej kolejki. Będą wtedy rywalizowały o pobranie wiadomości. Jeśli chcemy, by kolejka działało jako publish / subscribe, to powinniśmy zbindować nasze kolejki do odpowiedniego exchange (typu fanout). Po założeniu takiego exchange'a możemy to skonfigurować w UI administracyjnym:

W kodzie po stronie wstawiającej należy ustawić odpowiedni, nazwany exchange, routing key tym razem nie będzie nazwą kolejki, ale pustym stringiem.

Po stronie odbierającej wiadomości mamy kilka zmian:

model.QueueBind("sub1", "my_fanout_exchange", "");
var subscriber = new EventingBasicConsumer(model);

subscriber.Received += (mod, ea) =>
{
    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] {0}", message);
};

model.BasicConsume(queue: "sub1", noAck: true, consumer: subscriber);
break;

Pozostałe wzorce komunikacyjne w RabbitMQ:

  1. RPC -  wiadomość jest wysyłana do kolejki przez default Exchange, ale zawiera resposne queue, odbiorca dostaje wiadomość i umieszcza odpowiedź w reponse queue
  2. Routing - consumers zbindowani po kluczu, producent wysyła wiadomość z kluczem, a następnie na podstawie tego klucza wiadomość jest przekazywana do jednej lub więcej kolejek.
  3. Topic może zawierać wyrażenie tekstowe a'la regex wyliczane na podstawie znaków * (dowolny znak) i # (zero lub więcej słów). Wyrażeniem tym przetwarzany jest RoutingKey. Przykładowe topiki to: *.high.* lub corporate.#.
  4. Headers: nazwany exchange, wiadomość zawiera nagłówki (tekstowe), warunkiem może być all lub any. Umożliwia matchowanie po jednym lub każdym headerze (własności wiadomości). Analogia do tagowania postów
  5. Scatter Gather - producent wstawia wiadomość do kolejki, tworzy tymczasową kolejkę odpowiedzi. Wielu przetwarzających pobiera dane, przetwarza i wstawia swoje odpowiedzi do response queue. 

Brak komentarzy:

Prześlij komentarz