00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifdef HAVE_CONFIG_H
00018 # include <dtn-config.h>
00019 #endif
00020
00021 #include <oasys/io/NetUtils.h>
00022 #include <oasys/util/Time.h>
00023 #include "DTNTunnel.h"
00024 #include "TCPTunnel.h"
00025
00026 namespace dtntunnel {
00027
00028
00029 TCPTunnel::TCPTunnel()
00030 : IPTunnel("TCPTunnel", "/dtntunnel/tcp"),
00031 next_connection_id_(0)
00032 {
00033 }
00034
00035
00036 void
00037 TCPTunnel::add_listener(in_addr_t listen_addr, u_int16_t listen_port,
00038 in_addr_t remote_addr, u_int16_t remote_port)
00039 {
00040 new Listener(this, listen_addr, listen_port,
00041 remote_addr, remote_port);
00042 }
00043
00044
00045 u_int32_t
00046 TCPTunnel::next_connection_id()
00047 {
00048 oasys::ScopeLock l(&lock_, "TCPTunnel::next_connection_id");
00049 return ++next_connection_id_;
00050 }
00051
00052
00053 void
00054 TCPTunnel::new_connection(Connection* c)
00055 {
00056 oasys::ScopeLock l(&lock_, "TCPTunnel::new_connection");
00057
00058 ConnTable::iterator i;
00059 ConnKey key(c->dest_eid_,
00060 c->client_addr_,
00061 c->client_port_,
00062 c->remote_addr_,
00063 c->remote_port_,
00064 c->connection_id_);
00065
00066 i = connections_.find(key);
00067
00068 if (i != connections_.end()) {
00069 log_err("got duplicate connection *%p", c);
00070 return;
00071 }
00072
00073 log_debug("added new connection to table *%p", c);
00074
00075 connections_[key] = c;
00076
00077 ASSERT(connections_.find(key) != connections_.end());
00078 }
00079
00080
00081 void
00082 TCPTunnel::kill_connection(Connection* c)
00083 {
00084 oasys::ScopeLock l(&lock_, "TCPTunnel::kill_connection");
00085
00086 ConnTable::iterator i;
00087 ConnKey key(c->dest_eid_,
00088 c->client_addr_,
00089 c->client_port_,
00090 c->remote_addr_,
00091 c->remote_port_,
00092 c->connection_id_);
00093
00094 i = connections_.find(key);
00095
00096 if (i == connections_.end()) {
00097 log_err("can't find connection *%p in table", c);
00098 return;
00099 }
00100
00101
00102
00103
00104 if (i->second == c) {
00105 connections_.erase(i);
00106 } else {
00107 log_notice("not erasing connection in table since already replaced");
00108 }
00109
00110 }
00111
00112
00113 void
00114 TCPTunnel::handle_bundle(dtn::APIBundle* bundle)
00115 {
00116 oasys::ScopeLock l(&lock_, "TCPTunnel::handle_bundle");
00117
00118 DTNTunnel::BundleHeader hdr;
00119 memcpy(&hdr, bundle->payload_.buf(), sizeof(hdr));
00120 hdr.connection_id_ = ntohl(hdr.connection_id_);
00121 hdr.seqno_ = ntohl(hdr.seqno_);
00122 hdr.client_port_ = ntohs(hdr.client_port_);
00123 hdr.remote_port_ = ntohs(hdr.remote_port_);
00124
00125 log_debug("handle_bundle got %zu byte bundle from %s for %s:%d -> %s:%d (id %u seqno %u)",
00126 bundle->payload_.len(),
00127 bundle->spec_.source.uri,
00128 intoa(hdr.client_addr_),
00129 hdr.client_port_,
00130 intoa(hdr.remote_addr_),
00131 hdr.remote_port_,
00132 hdr.connection_id_,
00133 hdr.seqno_);
00134
00135 Connection* conn = NULL;
00136 ConnTable::iterator i;
00137 ConnKey key(bundle->spec_.source,
00138 hdr.client_addr_,
00139 hdr.client_port_,
00140 hdr.remote_addr_,
00141 hdr.remote_port_,
00142 hdr.connection_id_);
00143
00144 i = connections_.find(key);
00145
00146 if (i == connections_.end()) {
00147 if (hdr.seqno_ == 0) {
00148 conn = new Connection(this, &bundle->spec_.source,
00149 hdr.client_addr_, hdr.client_port_,
00150 hdr.remote_addr_, hdr.remote_port_,
00151 hdr.connection_id_);
00152
00153 log_info("new connection *%p", conn);
00154 conn->start();
00155 connections_[key] = conn;
00156
00157 } else {
00158
00159 log_warn("got bundle with seqno %u but no connection... "
00160 "postponing delivery",
00161 hdr.seqno_);
00162
00163 dtn::APIBundleVector* vec;
00164 NoConnBundleTable::iterator j = no_conn_bundles_.find(key);
00165 if (j == no_conn_bundles_.end()) {
00166 vec = new dtn::APIBundleVector();
00167 no_conn_bundles_[key] = vec;
00168 } else {
00169 vec = j->second;
00170 }
00171 vec->push_back(bundle);
00172 return;
00173 }
00174
00175 } else {
00176 conn = i->second;
00177 }
00178
00179 ASSERT(conn != NULL);
00180 conn->handle_bundle(bundle);
00181
00182 NoConnBundleTable::iterator j = no_conn_bundles_.find(key);
00183 if (j != no_conn_bundles_.end()) {
00184 dtn::APIBundleVector* vec = j->second;
00185 no_conn_bundles_.erase(j);
00186 for (dtn::APIBundleVector::iterator k = vec->begin(); k != vec->end(); ++k) {
00187 log_debug("conn *%p handling postponed bundle", conn);
00188 conn->handle_bundle(*k);
00189 }
00190 delete vec;
00191 }
00192 }
00193
00194
00195 TCPTunnel::Listener::Listener(TCPTunnel* t,
00196 in_addr_t listen_addr, u_int16_t listen_port,
00197 in_addr_t remote_addr, u_int16_t remote_port)
00198 : TCPServerThread("TCPTunnel::Listener",
00199 "/dtntunnel/tcp/listener",
00200 Thread::DELETE_ON_EXIT),
00201 tcptun_(t),
00202 listen_addr_(listen_addr),
00203 listen_port_(listen_port),
00204 remote_addr_(remote_addr),
00205 remote_port_(remote_port)
00206 {
00207 if (bind_listen_start(listen_addr, listen_port) != 0) {
00208 log_err("can't initialize listener socket, bailing");
00209 exit(1);
00210 }
00211 }
00212
00213
00214 void
00215 TCPTunnel::Listener::accepted(int fd, in_addr_t addr, u_int16_t port)
00216 {
00217 Connection* c = new Connection(tcptun_, DTNTunnel::instance()->dest_eid(),
00218 fd, addr, port, remote_addr_, remote_port_,
00219 tcptun_->next_connection_id());
00220 tcptun_->new_connection(c);
00221 c->start();
00222 }
00223
00224
00225 TCPTunnel::Connection::Connection(TCPTunnel* t, dtn_endpoint_id_t* dest_eid,
00226 in_addr_t client_addr, u_int16_t client_port,
00227 in_addr_t remote_addr, u_int16_t remote_port,
00228 u_int32_t connection_id)
00229 : Thread("TCPTunnel::Connection", Thread::DELETE_ON_EXIT),
00230 Logger("TCPTunnel::Connection", "/dtntunnel/tcp/conn"),
00231 tcptun_(t),
00232 sock_("/dtntunnel/tcp/conn/sock"),
00233 queue_("/dtntunnel/tcp/conn"),
00234 next_seqno_(0),
00235 client_addr_(client_addr),
00236 client_port_(client_port),
00237 remote_addr_(remote_addr),
00238 remote_port_(remote_port),
00239 connection_id_(connection_id)
00240 {
00241 dtn_copy_eid(&dest_eid_, dest_eid);
00242 }
00243
00244
00245 TCPTunnel::Connection::Connection(TCPTunnel* t, dtn_endpoint_id_t* dest_eid,
00246 int fd,
00247 in_addr_t client_addr, u_int16_t client_port,
00248 in_addr_t remote_addr, u_int16_t remote_port,
00249 u_int32_t connection_id)
00250 : Thread("TCPTunnel::Connection", Thread::DELETE_ON_EXIT),
00251 Logger("TCPTunnel::Connection", "/dtntunnel/tcp/conn"),
00252 tcptun_(t),
00253 sock_(fd, client_addr, client_port, "/dtntunnel/tcp/conn/sock"),
00254 queue_("/dtntunnel/tcp/conn"),
00255 next_seqno_(0),
00256 client_addr_(client_addr),
00257 client_port_(client_port),
00258 remote_addr_(remote_addr),
00259 remote_port_(remote_port),
00260 connection_id_(connection_id)
00261 {
00262 dtn_copy_eid(&dest_eid_, dest_eid);
00263 }
00264
00265
00266 TCPTunnel::Connection::~Connection()
00267 {
00268 dtn::APIBundle* b;
00269 while(queue_.try_pop(&b)) {
00270 delete b;
00271 }
00272 }
00273
00274
00275 int
00276 TCPTunnel::Connection::format(char* buf, size_t sz) const
00277 {
00278 return snprintf(buf, sz, "[%s %s:%d -> %s:%d (id %u)]",
00279 dest_eid_.uri,
00280 intoa(client_addr_),
00281 client_port_,
00282 intoa(remote_addr_),
00283 remote_port_,
00284 connection_id_);
00285 }
00286
00287
00288 void
00289 TCPTunnel::Connection::run()
00290 {
00291 DTNTunnel* tunnel = DTNTunnel::instance();
00292 u_int32_t send_seqno = 0;
00293 u_int32_t next_recv_seqno = 0;
00294 u_int32_t total_sent = 0;
00295 bool sock_eof = false;
00296 bool dtn_blocked = false;
00297 bool first = true;
00298
00299
00300 dtn::APIBundle* b_xmit = NULL;
00301 dtn::APIBundle* b_recv = NULL;
00302
00303
00304 oasys::Time tbegin, tnow;
00305 ASSERT(tbegin.sec_ == 0);
00306
00307
00308 DTNTunnel::BundleHeader hdr;
00309 hdr.eof_ = 0;
00310 hdr.protocol_ = IPPROTO_TCP;
00311 hdr.connection_id_ = htonl(connection_id_);
00312 hdr.seqno_ = 0;
00313 hdr.client_addr_ = client_addr_;
00314 hdr.client_port_ = htons(client_port_);
00315 hdr.remote_addr_ = remote_addr_;
00316 hdr.remote_port_ = htons(remote_port_);
00317
00318 if (sock_.state() != oasys::IPSocket::ESTABLISHED) {
00319 int err = sock_.connect(remote_addr_, remote_port_);
00320 if (err != 0) {
00321 log_err("error connecting to %s:%d",
00322 intoa(remote_addr_), remote_port_);
00323
00324
00325 dtn::APIBundle* b = new dtn::APIBundle();
00326 hdr.eof_ = 1;
00327 memcpy(b->payload_.buf(sizeof(hdr)), &hdr, sizeof(hdr));
00328 b->payload_.set_len(sizeof(hdr));
00329 int err;
00330 if ((err = tunnel->send_bundle(b, &dest_eid_)) != DTN_SUCCESS) {
00331 log_err("error sending connect reply bundle: %s",
00332 dtn_strerror(err));
00333 tcptun_->kill_connection(this);
00334 exit(1);
00335 }
00336 goto done;
00337 }
00338 }
00339
00340 while (1) {
00341 struct pollfd pollfds[2];
00342
00343 struct pollfd* msg_poll = &pollfds[0];
00344 msg_poll->fd = queue_.read_fd();
00345 msg_poll->events = POLLIN;
00346 msg_poll->revents = 0;
00347
00348 struct pollfd* sock_poll = &pollfds[1];
00349 sock_poll->fd = sock_.fd();
00350 sock_poll->events = POLLIN | POLLERR;
00351 sock_poll->revents = 0;
00352
00353
00354
00355
00356 int nfds = (sock_eof || dtn_blocked) ? 1 : 2;
00357
00358 int timeout = -1;
00359 if (first || dtn_blocked) {
00360 timeout = 1000;
00361 } else if (tbegin.sec_ != 0) {
00362 timeout = tunnel->delay();
00363 }
00364
00365 log_debug("blocking in poll... (timeout %d)", timeout);
00366 int nready = oasys::IO::poll_multiple(pollfds, nfds, timeout,
00367 NULL, logpath_);
00368 if (nready == oasys::IOERROR) {
00369 log_err("unexpected error in poll: %s", strerror(errno));
00370 goto done;
00371 }
00372
00373
00374
00375
00376
00377 if ((first || sock_poll->revents != 0) && (b_xmit == NULL)) {
00378 first = false;
00379 b_xmit = new dtn::APIBundle();
00380 b_xmit->payload_.reserve(tunnel->max_size());
00381 hdr.seqno_ = ntohl(send_seqno++);
00382 memcpy(b_xmit->payload_.buf(), &hdr, sizeof(hdr));
00383 b_xmit->payload_.set_len(sizeof(hdr));
00384 }
00385
00386
00387 if (sock_poll->revents != 0) {
00388 u_int payload_todo = tunnel->max_size() - b_xmit->payload_.len();
00389
00390 if (payload_todo != 0) {
00391 tbegin.get_time();
00392
00393 char* bp = b_xmit->payload_.end();
00394 int ret = sock_.read(bp, payload_todo);
00395 if (ret < 0) {
00396 log_err("error reading from socket: %s", strerror(errno));
00397 delete b_xmit;
00398 goto done;
00399 }
00400
00401 b_xmit->payload_.set_len(b_xmit->payload_.len() + ret);
00402
00403 if (ret == 0) {
00404 DTNTunnel::BundleHeader* hdrp =
00405 (DTNTunnel::BundleHeader*)b_xmit->payload_.buf();
00406 hdrp->eof_ = 1;
00407 sock_eof = true;
00408 }
00409 }
00410 }
00411
00412
00413 tnow.get_time();
00414 if ((b_xmit != NULL) &&
00415 ((sock_eof == true) ||
00416 (b_xmit->payload_.len() == tunnel->max_size()) ||
00417 ((tnow - tbegin).in_milliseconds() >= tunnel->delay())))
00418 {
00419 size_t len = b_xmit->payload_.len();
00420 int err = tunnel->send_bundle(b_xmit, &dest_eid_);
00421 if (err == DTN_SUCCESS) {
00422 total_sent += len;
00423 log_debug("sent %zu byte payload #%u to dtn (%u total)",
00424 len, send_seqno, total_sent);
00425 b_xmit = NULL;
00426 tbegin.sec_ = 0;
00427 tbegin.usec_ = 0;
00428 dtn_blocked = false;
00429
00430 } else if (err == DTN_ENOSPACE) {
00431 log_debug("no space for %zu byte payload... "
00432 "setting dtn_blocked", len);
00433 dtn_blocked = true;
00434 continue;
00435 } else {
00436 log_err("error sending bundle: %s", dtn_strerror(err));
00437 exit(1);
00438 }
00439 }
00440
00441
00442 if (msg_poll->revents != 0) {
00443 b_recv = queue_.pop_blocking();
00444
00445
00446
00447
00448 if (b_recv == NULL)
00449 {
00450 log_debug("got signal to abort connection");
00451 goto done;
00452 }
00453
00454 DTNTunnel::BundleHeader* recv_hdr =
00455 (DTNTunnel::BundleHeader*)b_recv->payload_.buf();
00456
00457 u_int32_t recv_seqno = ntohl(recv_hdr->seqno_);
00458
00459
00460
00461
00462 if (recv_seqno != next_recv_seqno) {
00463 log_err("got out of order bundle: seqno %d, expected %d",
00464 recv_seqno, next_recv_seqno);
00465 delete b_recv;
00466 goto done;
00467 }
00468 ++next_recv_seqno;
00469
00470 u_int len = b_recv->payload_.len() - sizeof(hdr);
00471
00472 if (len != 0) {
00473 int cc = sock_.writeall(b_recv->payload_.buf() + sizeof(hdr), len);
00474 if (cc != (int)len) {
00475 log_err("error writing payload to socket: %s", strerror(errno));
00476 delete b_recv;
00477 goto done;
00478 }
00479
00480 log_debug("sent %d byte payload to client", len);
00481 }
00482
00483
00484 if (recv_hdr->eof_) {
00485 log_info("bundle had eof bit set... closing connection");
00486 sock_.close();
00487 }
00488
00489 delete b_recv;
00490 }
00491 }
00492
00493 done:
00494 tcptun_->kill_connection(this);
00495 }
00496
00497
00498 void
00499 TCPTunnel::Connection::handle_bundle(dtn::APIBundle* bundle)
00500 {
00501 DTNTunnel::BundleHeader* hdr =
00502 (DTNTunnel::BundleHeader*)bundle->payload_.buf();
00503
00504 u_int32_t recv_seqno = ntohl(hdr->seqno_);
00505
00506
00507
00508 if (recv_seqno < next_seqno_)
00509 {
00510 log_warn("got seqno %u, but already delivered up to %u: "
00511 "ignoring bundle",
00512 recv_seqno, next_seqno_);
00513 delete bundle;
00514 return;
00515 }
00516
00517
00518
00519 else if (recv_seqno != next_seqno_)
00520 {
00521 log_debug("got out of order bundle: expected seqno %d, got %d",
00522 next_seqno_, recv_seqno);
00523
00524 reorder_table_[recv_seqno] = bundle;
00525 return;
00526 }
00527
00528
00529 log_debug("delivering %zu byte bundle with seqno %d",
00530 bundle->payload_.len(), recv_seqno);
00531 queue_.push_back(bundle);
00532 next_seqno_++;
00533
00534
00535
00536 ReorderTable::iterator iter;
00537 while (1) {
00538 iter = reorder_table_.find(next_seqno_);
00539 if (iter == reorder_table_.end()) {
00540 break;
00541 }
00542
00543 bundle = iter->second;
00544 log_debug("delivering %zu byte bundle with seqno %d (from reorder table)",
00545 bundle->payload_.len(), next_seqno_);
00546
00547 reorder_table_.erase(iter);
00548 next_seqno_++;
00549
00550 queue_.push_back(bundle);
00551 }
00552 }
00553
00554 }
00555