-
[ZeroMQ] Socket 생성 및 동작 확인 및 구현Computer Programs 2021. 6. 29. 16:32
https://zguide.zeromq.org/docs/chapter2/
chapter2 sockets and patterns
zeroMQ 는 개발자들이 제멋대로인 컨셉들 사용하는거 대신에 소켓이랑 메시지를 편하게 사용할 수 있도록 해준다.
소켓은 4개의 파트로 life가 나눠진다.
- Creating & Destroying
zmq_socket(), zmq_close()
- Configuring
zmq_setsockopt(), zmq_getsockopt()
- Plugging
zmq_bind(), zmq_connect()
- Writting & Reciving Messages
zmq_msg_send(), zmq_msg_recv()
*NOTE ) Socket = Pointer, Message = Structure
1. Plugging Sockets into the Topology
To create a connection between two nodes, use zmq_binds() in one node and zmq_connect() in the other.
As a general, zmq_bind() is a "server", zmq_connect() is a "client" .
대부분의 server/client모델은 무조건 static한 server가 먼저 시작하고, 그 다음 dynamic한 client가 시작하는 것이 원ㅊ기. 이게 꼬이게 되면 => get big red Fail flag
근데, ZeroMQ는 유동적이고 평화롭게 처리하기 때문에, 서버가 살아있지 않은 상태에서 클라이언트가 연결을 하면 정사엊ㄱ인 연결을 아니더라도 connection은 존재하게 되고, 메시지를 보낼 수 있게 된다. 그 후 server가 bind 되면, ZeroMQ는 정상적으로 message를 deliver하게 된다.
server는 단일 소켓으로 한번에 다양한 endpoint에 bind할 수 있다. 이는 다른 운송프로토콜의 연결을 허용한다는 뜻이다. 같은 프로토콜을 bind했다면 crash후 recovery용으로 사용하겠다는 뜻이다.2. Sending & Receiving Messages
To send and receive messages you use the zmq_msg_send(), zmq_msg_recv() methods.
TCP vs ZeroMQ
TCP ZeroMQ carry stream of bytes carry messages, like UDP ( messsage is length-specified binary data ) - do I/O in background thread, ( messages arrive in local input queues and are sent from local output queues ==> nomatter what application is busy ) - one-to-N routing behavior built-in zmq_send()는 연결되어있다고 바로 보내는 것이 아니고, queue에 저장되어있다가 I/O thread가 비동기식으로 내보낸다. 그래서 예외 케이스에 걸려도, 메시지 전송이 block되지 않는다.
3. Unicast Transports
ZeroMQ는 unicast transport ( inproc, ipc, tcp ) 뿐만 아니라 multicast ( epgm, pgm )까지 지원한다.
Unicast Common case -> "Disconnected TCP transport" : elastic, portable, fast enough. (disconnected라 칭한 이유는 ZeroMQ는 endpoint끼리의 정확한 연결을 요구하지 않기 때문에 )
Unicast case -> inter process "ipc" : disconnected tcp와 비슷하지만, Window OS환경에서는 동작안하는 한계점.
Unicast case -> inter thread transport "inproc" : musch faster than tcp or ipc, 근데 연결지향성 , server must issue a bind before any client issuses a connect.
4. I/O Threads
when create a new context, starts with one I/O thread. the general rule , one I/O thread per gigabyte of data in or out per second.
To raise number of I/O threads, use zmq_ctx_set() call before creating any socket.
one socket can handle dozens, even thousands of connections at once.
5. Messaging Patterns
built-in core ZeroMQ patterns
Reqyest-Reply set of clients - set of services remote procedure call , task distribution Pub-Sub set of publisher - set of subscriber data distirbution pattern Pipeline nodes in fan-out/fan-in pattern that can have multiple steps and loops paralle task distribution and collection patter Exclusive Pair two scoket exclusively, two threads in a process 6. Working with Messages
- Initialise a message: zmq_msg_init(), zmq_msg_init_size(), zmq_msg_init_data().
- Sending and receiving a message: zmq_msg_send(), zmq_msg_recv().
- Release a message: zmq_msg_close().
- Access message content: zmq_msg_data(), zmq_msg_size(), zmq_msg_more().
- Work with message properties: zmq_msg_get(), zmq_msg_set().
- Message manipulation: zmq_msg_copy(), zmq_msg_move().
Basic ground rules for usin ZeroMQ messages
▶ zmq_msg_t objects, not blocks of data
▶ to read message, use zmq_msg_init() to create an empty message and pass message to zmq_msg_recv()
▶ to write message, use zmq_msg_init_size() to create message and same time, allocate a block of data of some size. and send message to zmq_msg_sned()
▶ to release a message, call zmq_msg_close(), ZeroMQ will drops reference, and destroy message.
▶ to access the messsage content, use zmq_msg_data() to know how much data message contains, zmq_msg_size()
▶ do not use zmq_msg_move(), zmq_msg_copy(), zmq_msg_init_data() -> if want to send same messsage more than once, create second message, initialize using zmq_msg_init(), then use zmq_msg_copy(). -> 데이터를 복사하는 것이 아닌,레퍼런스를 복사하는 것.
▶ after pass message to zmq_msg_send(), ZeroMQ will clear the message ( set size zero ).
7. Handling Multiple Sockets
read from multiple endpoints at the same time -> use zmq_poll().
Example : two sockets using nonblocking reads. acts both as a subscriber to weather updates.
zhelpers.hpp ->
https://github.com/booksbyus/zguide/blob/master/examples/C++/zhelpers.hpp
// msreader // Reading from multiple sockets in C++ // This version uses a simple recv loop // #include "zhelpers.hpp" int main (int argc, char *argv[]) { // Prepare our context and sockets zmq::context_t context(1); // Connect to task ventilator zmq::socket_t receiver(context, ZMQ_PULL); receiver.connect("tcp://localhost:5557"); // Connect to weather server zmq::socket_t subscriber(context, ZMQ_SUB); subscriber.connect("tcp://localhost:5556"); subscriber.setsockopt(ZMQ_SUBSCRIBE, "10001 ", 6); // Process messages from both sockets // We prioritize traffic from the task ventilator while (1) { // Process any waiting tasks bool rc; do { zmq::message_t task; if ((rc = receiver.recv(&task, ZMQ_DONTWAIT)) == true) { // process task } } while(rc == true); // Process any waiting weather updates do { zmq::message_t update; if ((rc = subscriber.recv(&update, ZMQ_DONTWAIT)) == true) { // process weather update } } while(rc == true); // No activity, so sleep for 1 msec s_sleep(1); } return 0; }
// mspoller // Reading from multiple sockets in C++ // This version uses zmq_poll() // #include "zhelpers.hpp" int main (int argc, char *argv[]) { zmq::context_t context(1); // Connect to task ventilator zmq::socket_t receiver(context, ZMQ_PULL); receiver.connect("tcp://localhost:5557"); // Connect to weather server zmq::socket_t subscriber(context, ZMQ_SUB); subscriber.connect("tcp://localhost:5556"); subscriber.setsockopt(ZMQ_SUBSCRIBE, "10001 ", 6); // Initialize poll set zmq::pollitem_t items [] = { { static_cast<void*>(receiver), 0, ZMQ_POLLIN, 0 }, { static_cast<void*>(subscriber), 0, ZMQ_POLLIN, 0 } }; // Process messages from both sockets while (1) { zmq::message_t message; zmq::poll (&items [0], 2, -1); if (items [0].revents & ZMQ_POLLIN) { receiver.recv(&message); // Process task } if (items [1].revents & ZMQ_POLLIN) { subscriber.recv(&message); // Process weather update } } return 0; }
'Computer Programs' 카테고리의 다른 글
[Capnp] Cap'n Proto 란 (0) 2022.11.07 [Protocol Buffer] google protobuf 란 (0) 2021.06.29 [ZeroMQ] Mac Os BigSur & Linux Ubuntun 18.04 에서 설치 및 helloworld 테스트 (0) 2021.06.28 [CMake] CMakeLists.txt 작성하기 (0) 2021.06.28