Ticket #168: asio.diff

File asio.diff, 11.4 KB (added by jinmei, 10 years ago)
  • main.cc

     
    2828#include <iostream>
    2929
    3030#include <boost/foreach.hpp>
    31 #include <boost/bind.hpp>
    32 #include <asio.hpp>
    3331
    3432#include <exceptions/exceptions.h>
    3533
     
    4947#include "spec_config.h"
    5048#include "common.h"
    5149#include "auth_srv.h"
     50#include "asio_link.h"
    5251
    5352using namespace std;
    5453#ifdef USE_XFROUT
    5554using namespace isc::xfr;
    5655#endif
    5756
    58 using namespace asio;
    59 using ip::udp;
    60 using ip::tcp;
    61 
    6257using namespace isc::data;
    6358using namespace isc::cc;
    6459using namespace isc::config;
     
    7671 * class itself? */
    7772AuthSrv *auth_server;
    7873
    79 // TODO: this should be a property of AuthSrv, and AuthSrv needs
    80 // a stop() method (so the shutdown command can be handled)
    81 asio::io_service io_service_;
     74asio_link::IOService* io_service;
    8275
    8376ElementPtr
    8477my_config_handler(ElementPtr new_config) {
     
    9487        /* let's add that message to our answer as well */
    9588        answer->get("result")->add(args);
    9689    } else if (command == "shutdown") {
    97         io_service_.stop();
     90        io_service->stop();
    9891    }
    9992   
    10093    return answer;
     
    133126}
    134127#endif
    135128
    136 //
    137 // Helper classes for asynchronous I/O using asio
    138 //
    139 class TCPClient {
    140 public:
    141     TCPClient(io_service& io_service) :
    142         socket_(io_service),
    143         response_buffer_(0),
    144         responselen_buffer_(TCP_MESSAGE_LENGTHSIZE),
    145         response_renderer_(response_buffer_),
    146         dns_message_(Message::PARSE)
    147     {}
    148 
    149     void start() {
    150         async_read(socket_, asio::buffer(data_, TCP_MESSAGE_LENGTHSIZE),
    151                    boost::bind(&TCPClient::headerRead, this,
    152                                placeholders::error,
    153                                placeholders::bytes_transferred));
    154     }
    155 
    156     tcp::socket& getSocket() { return (socket_); }
    157 
    158     void headerRead(const asio::error_code& error,
    159                     size_t bytes_transferred)
    160     {
    161         if (!error) {
    162             InputBuffer dnsbuffer(data_, bytes_transferred);
    163 
    164             uint16_t msglen = dnsbuffer.readUint16();
    165             async_read(socket_, asio::buffer(data_, msglen),
    166 
    167                        boost::bind(&TCPClient::requestRead, this,
    168                                    placeholders::error,
    169                                    placeholders::bytes_transferred));
    170         } else {
    171             delete this;
    172         }
    173     }
    174 
    175     void requestRead(const asio::error_code& error,
    176                      size_t bytes_transferred)
    177     {
    178         if (!error) {
    179             InputBuffer dnsbuffer(data_, bytes_transferred);
    180 #ifdef USE_XFROUT
    181             if (check_axfr_query(data_, bytes_transferred)) {
    182                 dispatch_axfr_query(socket_.native(), data_, bytes_transferred);
    183                 // start to get new query ?
    184                 start();
    185             } else {
    186 #endif         
    187                 if (auth_server->processMessage(dnsbuffer, dns_message_,
    188                                                 response_renderer_, false)) {
    189                     responselen_buffer_.writeUint16(response_buffer_.getLength());
    190                     async_write(socket_,
    191                                 asio::buffer(
    192                                     responselen_buffer_.getData(),
    193                                     responselen_buffer_.getLength()),
    194                                 boost::bind(&TCPClient::responseWrite, this,
    195                                             placeholders::error));
    196                 } else {
    197                     delete this;
    198                 }
    199 #ifdef USE_XFROUT
    200             }
    201 #endif
    202         } else {
    203             delete this;
    204         }
    205     }
    206 
    207     void responseWrite(const asio::error_code& error) {
    208         if (!error) {
    209                 async_write(socket_,
    210                             asio::buffer(response_buffer_.getData(),
    211                                                 response_buffer_.getLength()),
    212                         boost::bind(&TCPClient::handleWrite, this,
    213                                     placeholders::error));
    214         } else {
    215             delete this;
    216         }
    217     }
    218 
    219     void handleWrite(const asio::error_code& error) {
    220         if (!error) {
    221             start();            // handle next request, if any.
    222       } else {
    223             delete this;
    224       }
    225     }
    226 
    227 private:
    228     tcp::socket socket_;
    229     OutputBuffer response_buffer_;
    230     OutputBuffer responselen_buffer_;
    231     MessageRenderer response_renderer_;
    232     Message dns_message_;
    233     enum { MAX_LENGTH = 65535 };
    234     static const size_t TCP_MESSAGE_LENGTHSIZE = 2;
    235     char data_[MAX_LENGTH];
    236 };
    237 
    238 class TCPServer {
    239 public:
    240     TCPServer(io_service& io_service, int af, short port) :
    241         io_service_(io_service), acceptor_(io_service_),
    242         listening_(new TCPClient(io_service_))
    243     {
    244         tcp::endpoint endpoint(af == AF_INET6 ? tcp::v6() : tcp::v4(), port);
    245         acceptor_.open(endpoint.protocol());
    246         // Set v6-only (we use a different instantiation for v4,
    247         // otherwise asio will bind to both v4 and v6
    248         if (af == AF_INET6) {
    249             acceptor_.set_option(ip::v6_only(true));
    250         }
    251         acceptor_.set_option(tcp::acceptor::reuse_address(true));
    252         acceptor_.bind(endpoint);
    253         acceptor_.listen();
    254         acceptor_.async_accept(listening_->getSocket(),
    255                                boost::bind(&TCPServer::handleAccept, this,
    256                                            listening_, placeholders::error));
    257     }
    258 
    259     ~TCPServer() { delete listening_; }
    260 
    261     void handleAccept(TCPClient* new_client,
    262                       const asio::error_code& error)
    263     {
    264         if (!error) {
    265             assert(new_client == listening_);
    266             new_client->start();
    267             listening_ = new TCPClient(io_service_);
    268             acceptor_.async_accept(listening_->getSocket(),
    269                                    boost::bind(&TCPServer::handleAccept,
    270                                                this, listening_,
    271                                                placeholders::error));
    272         } else {
    273             delete new_client;
    274         }
    275     }
    276 
    277 private:
    278     io_service& io_service_;
    279     tcp::acceptor acceptor_;
    280     TCPClient* listening_;
    281 };
    282 
    283 class UDPServer {
    284 public:
    285     UDPServer(io_service& io_service, int af, short port) :
    286         io_service_(io_service),
    287         socket_(io_service, af == AF_INET6 ? udp::v6() : udp::v4()),
    288         response_buffer_(0),
    289         response_renderer_(response_buffer_),
    290         dns_message_(Message::PARSE)
    291     {
    292         // Set v6-only (we use a different instantiation for v4,
    293         // otherwise asio will bind to both v4 and v6
    294         if (af == AF_INET6) {
    295             socket_.set_option(asio::ip::v6_only(true));
    296             socket_.bind(udp::endpoint(udp::v6(), port));
    297         } else {
    298             socket_.bind(udp::endpoint(udp::v4(), port));
    299         }
    300         startReceive();
    301     }
    302 
    303     void handleRequest(const asio::error_code& error,
    304                        size_t bytes_recvd)
    305     {
    306         if (!error && bytes_recvd > 0) {
    307             InputBuffer request_buffer(data_, bytes_recvd);
    308 
    309             dns_message_.clear(Message::PARSE);
    310             response_renderer_.clear();
    311             if (auth_server->processMessage(request_buffer, dns_message_,
    312                                             response_renderer_, true)) {
    313                 socket_.async_send_to(
    314                     asio::buffer(response_buffer_.getData(),
    315                                         response_buffer_.getLength()),
    316                     sender_endpoint_,
    317                     boost::bind(&UDPServer::sendCompleted,
    318                                 this,
    319                                 placeholders::error,
    320                                 placeholders::bytes_transferred));
    321             } else {
    322                 startReceive();
    323             }
    324         } else {
    325             startReceive();
    326         }
    327     }
    328 
    329     void sendCompleted(const asio::error_code& error UNUSED_PARAM,
    330                        size_t bytes_sent UNUSED_PARAM)
    331     {
    332         // Even if error occurred there's nothing to do.  Simply handle
    333         // the next request.
    334         startReceive();
    335     }
    336 private:
    337     void startReceive() {
    338         socket_.async_receive_from(
    339             asio::buffer(data_, MAX_LENGTH), sender_endpoint_,
    340             boost::bind(&UDPServer::handleRequest, this,
    341                         placeholders::error,
    342                         placeholders::bytes_transferred));
    343     }
    344 
    345 private:
    346     io_service& io_service_;
    347     udp::socket socket_;
    348     OutputBuffer response_buffer_;
    349     MessageRenderer response_renderer_;
    350     Message dns_message_;
    351     udp::endpoint sender_endpoint_;
    352     enum { MAX_LENGTH = 4096 };
    353     char data_[MAX_LENGTH];
    354 };
    355 
    356 struct ServerSet {
    357     ServerSet() : udp4_server(NULL), udp6_server(NULL),
    358                   tcp4_server(NULL), tcp6_server(NULL)
    359     {}
    360     ~ServerSet() {
    361         delete udp4_server;
    362         delete udp6_server;
    363         delete tcp4_server;
    364         delete tcp6_server;
    365     }
    366     UDPServer* udp4_server;
    367     UDPServer* udp6_server;
    368     TCPServer* tcp4_server;
    369     TCPServer* tcp6_server;
    370 };
    371 
    372129void
    373 run_server(const char* port, const bool use_ipv4, const bool use_ipv6,
    374            AuthSrv* srv UNUSED_PARAM)
    375 {
    376     ServerSet servers;
    377     short portnum = atoi(port);
    378 
    379     if (use_ipv4) {
    380         servers.udp4_server = new UDPServer(io_service_, AF_INET, portnum);
    381         servers.tcp4_server = new TCPServer(io_service_, AF_INET, portnum);
    382     }
    383     if (use_ipv6) {
    384         servers.udp6_server = new UDPServer(io_service_, AF_INET6, portnum);
    385         servers.tcp6_server = new TCPServer(io_service_, AF_INET6, portnum);
    386     }
    387 
    388     cout << "Server started." << endl;
    389     io_service_.run();
    390 }
    391 
    392 void
    393130usage() {
    394131    cerr << "Usage: b10-auth [-p port] [-4|-6]" << endl;
    395132    exit(1);
     
    455192        auth_server->setConfigSession(&cs);
    456193        auth_server->updateConfig(ElementPtr());
    457194
    458         run_server(port, use_ipv4, use_ipv6, auth_server);
     195        io_service = new asio_link::IOService(auth_server, port, use_ipv4,
     196                                              use_ipv6);
     197        io_service->run();
    459198    } catch (const std::exception& ex) {
    460199        cerr << ex.what() << endl;
    461200        ret = 1;
    462201    }
    463202
     203    delete io_service;
    464204    delete auth_server;
    465205    return (ret);
    466206}
  • Makefile.am

     
    44AM_CPPFLAGS += -I$(top_srcdir)/src/lib/dns -I$(top_builddir)/src/lib/dns
    55if GCC_WERROR_OK
    66AM_CPPFLAGS += -Werror
     7AM_CPPFLAGS += -Werror
    78endif
    89
    910pkglibexecdir = $(libexecdir)/@PACKAGE@
     
    2627spec_config.h: spec_config.h.pre
    2728        $(SED) -e "s|@@LOCALSTATEDIR@@|$(localstatedir)|" spec_config.h.pre >$@
    2829
     30lib_LIBRARIES = libasio_link.a
     31libasio_link_a_SOURCES = asio_link.cc asio_link.h
     32libasio_link_a_CPPFLAGS = $(AM_CPPFLAGS) -Wno-error=unused-parameter
     33
    2934BUILT_SOURCES = spec_config.h
    3035pkglibexec_PROGRAMS = b10-auth
    3136b10_auth_SOURCES = auth_srv.cc auth_srv.h
     
    3641b10_auth_LDADD += $(top_builddir)/src/lib/config/.libs/libcfgclient.a
    3742b10_auth_LDADD += $(top_builddir)/src/lib/cc/libcc.a
    3843b10_auth_LDADD += $(top_builddir)/src/lib/exceptions/.libs/libexceptions.a
     44b10_auth_LDADD += $(top_builddir)/src/bin/auth/libasio_link.a
    3945b10_auth_LDADD += $(SQLITE_LIBS)
    4046if HAVE_BOOST_PYTHON
    4147b10_auth_LDADD += $(top_builddir)/src/lib/xfr/.libs/libxfr.a