00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #ifdef HAVE_CONFIG_H
00017 # include <dtn-config.h>
00018 #endif
00019
00020 #include <typeinfo>
00021
00022 #if defined(XERCES_C_ENABLED) && defined(EXTERNAL_CL_ENABLED)
00023
00024 #include <oasys/io/NetUtils.h>
00025 #include <oasys/io/FileUtils.h>
00026 #include <oasys/io/TCPClient.h>
00027 #include <oasys/io/IO.h>
00028 #include <oasys/thread/Lock.h>
00029 #include <oasys/util/OptParser.h>
00030 #include <oasys/util/StringBuffer.h>
00031 #include <oasys/serialize/XMLSerialize.h>
00032 #include <oasys/thread/SpinLock.h>
00033
00034 #include <xercesc/framework/MemBufFormatTarget.hpp>
00035
00036 #include "ECLModule.h"
00037 #include "bundling/BundleDaemon.h"
00038 #include "storage/BundleStore.h"
00039 #include "storage/GlobalStore.h"
00040 #include "contacts/ContactManager.h"
00041
00042
00043 namespace dtn {
00044
00045 const size_t ECLModule::READ_BUFFER_SIZE;
00046 const size_t ECLModule::MAX_BUNDLE_IN_MEMORY;
00047
00048 ECLModule::ECLModule(int fd,
00049 in_addr_t remote_addr,
00050 u_int16_t remote_port,
00051 ExternalConvergenceLayer& cl) :
00052 CLEventHandler("ECLModule", "/dtn/cl/module"),
00053 Thread("/dtn/cl/module", Thread::CREATE_JOINABLE),
00054 cl_(cl),
00055 iface_list_lock_("/dtn/cl/parts/iface_list_lock"),
00056 socket_(fd, remote_addr, remote_port, logpath_),
00057 message_queue_("/dtn/cl/parts/module"),
00058 parser_( true, cl.schema_.c_str() )
00059 {
00060 name_ = "(unknown)";
00061 was_shutdown_ = false;
00062 sem_init(&link_list_sem_, 0, 2);
00063 }
00064
00065 ECLModule::~ECLModule()
00066 {
00067 while (message_queue_.size() > 0)
00068 delete message_queue_.pop_blocking();
00069 }
00070
00071 void
00072 ECLModule::run()
00073 {
00074 struct pollfd pollfds[2];
00075
00076 struct pollfd* message_poll = &pollfds[0];
00077 message_poll->fd = message_queue_.read_fd();
00078 message_poll->events = POLLIN;
00079
00080 struct pollfd* sock_poll = &pollfds[1];
00081 sock_poll->fd = socket_.fd();
00082 sock_poll->events = POLLIN;
00083
00084 while ( !should_stop() ) {
00085
00086 int ret = oasys::IO::poll_multiple(pollfds, 2, -1);
00087
00088 if (ret == oasys::IOINTR) {
00089 log_err("Module server interrupted");
00090 set_should_stop();
00091 continue;
00092 }
00093
00094 if (ret == oasys::IOERROR) {
00095 log_err("Module server error");
00096 set_should_stop();
00097 continue;
00098 }
00099
00100 if (message_poll->revents & POLLIN) {
00101 cl_message* message;
00102 if ( message_queue_.try_pop(&message) ) {
00103 ASSERT(message != NULL);
00104 int result;
00105
00106
00107
00108 if ( message->bundle_send_request().present() )
00109 result = prepare_bundle_to_send(message);
00110
00111 else
00112 result = send_message(message);
00113
00114 delete message;
00115
00116 if (result < 0) {
00117 set_should_stop();
00118 continue;
00119 }
00120 }
00121 }
00122
00123
00124 if (sock_poll->revents & POLLIN)
00125 read_cycle();
00126 }
00127
00128 log_info( "CL %s is shutting down", name_.c_str() );
00129
00130 oasys::ScopeLock lock(&cl_.global_resource_lock_, "ECLModule::run");
00131
00132 if (!was_shutdown_) {
00133 set_flag(Thread::DELETE_ON_EXIT);
00134 cl_.remove_module(this);
00135 }
00136
00137 socket_.close();
00138 cleanup();
00139 }
00140
00141 void
00142 ECLModule::post_message(cl_message* message)
00143 {
00144 message_queue_.push_back(message);
00145 }
00146
00147 ECLInterfaceResource*
00148 ECLModule::remove_interface(const std::string& name)
00149 {
00150 oasys::ScopeLock lock(&iface_list_lock_, "remove_interface");
00151 std::list<ECLInterfaceResource*>::iterator iface_i;
00152
00153 for (iface_i = iface_list_.begin(); iface_i != iface_list_.end(); ++iface_i) {
00154 if ( (*iface_i)->interface_->name() == name) {
00155 iface_list_.erase(iface_i);
00156 return *iface_i;
00157 }
00158 }
00159
00160 return NULL;
00161 }
00162
00163 void
00164 ECLModule::shutdown()
00165 {
00166 oasys::ScopeLock lock(&cl_.global_resource_lock_, "ECLModule::run");
00167 log_debug("ECLModule::shutdown() for CLA '%s'", name_.c_str());
00168 was_shutdown_ = true;
00169 set_should_stop();
00170
00171
00172 message_queue_.notify();
00173 }
00174
00175 void
00176 ECLModule::handle(const cla_add_request& message)
00177 {
00178 if (cl_.get_module( message.name() ) != NULL) {
00179 log_err("A CLA with name '%s' already exists", message.name().c_str());
00180 set_should_stop();
00181 return;
00182 }
00183
00184 name_ = message.name();
00185 logpathf( "/dtn/cl/%s", name_.c_str() );
00186 message_queue_.logpathf( "/dtn/cl/parts/%s/message_queue", name_.c_str() );
00187 socket_.logpathf( "/dtn/cl/parts/%s/socket", name_.c_str() );
00188
00189 log_info( "New external CL: %s", name_.c_str() );
00190
00191
00192 BundleStore* bs = BundleStore::instance();
00193 std::string payload_dir = bs->payload_dir();
00194 oasys::FileUtils::abspath(&payload_dir);
00195
00196 oasys::StringBuffer in_dir( "%s/%s-in", payload_dir.c_str(),
00197 name_.c_str() );
00198 oasys::StringBuffer out_dir( "%s/%s-out", payload_dir.c_str(),
00199 name_.c_str() );
00200
00201
00202 bundle_in_path_ = std::string( in_dir.c_str() );
00203 bundle_out_path_ = std::string( out_dir.c_str() );
00204
00205
00206
00207 if (oasys::FileUtils::rm_all_from_dir(bundle_in_path_.c_str(), true) != 0) {
00208 log_warn( "Unable to clean incoming bundle directory %s: %s",
00209 bundle_in_path_.c_str(), strerror(errno) );
00210 }
00211 ::rmdir( bundle_in_path_.c_str() );
00212
00213 if (oasys::FileUtils::rm_all_from_dir(bundle_out_path_.c_str(), true) != 0) {
00214 log_warn( "Unable to clean outgoing bundle directory %s: %s",
00215 bundle_out_path_.c_str(), strerror(errno) );
00216 }
00217 ::rmdir( bundle_out_path_.c_str() );
00218
00219
00220 if (oasys::IO::mkdir(in_dir.c_str(), 0777) < 0) {
00221 log_err( "Unable to create incoming bundle directory %s: %s",
00222 in_dir.c_str(), strerror(errno) );
00223
00224 set_should_stop();
00225 return;
00226 }
00227
00228
00229 if (oasys::IO::mkdir(out_dir.c_str(), 0777) < 0) {
00230 log_err( "Unable to create outgoing bundle directory %s: %s",
00231 out_dir.c_str(), strerror(errno) );
00232
00233 set_should_stop();
00234 return;
00235 }
00236
00237 cla_set_params_request request;
00238 request.create_discovered_links(
00239 ExternalConvergenceLayer::create_discovered_links_);
00240
00241 request.local_eid( BundleDaemon::instance()->local_eid().str() );
00242 request.bundle_pass_method(bundlePassMethodType::filesystem);
00243 request.reactive_fragment_enabled(
00244 BundleDaemon::params_.reactive_frag_enabled_);
00245
00246 KeyValueSequence params;
00247 params.push_back( key_value_pair("incoming_bundle_dir", in_dir.c_str() ) );
00248 params.push_back( key_value_pair("outgoing_bundle_dir", out_dir.c_str() ) );
00249 request.key_value_pair(params);
00250
00251 POST_MESSAGE(this, cla_set_params_request, request);
00252
00253
00254 take_resources();
00255 }
00256
00257 void
00258 ECLModule::take_resource(ECLResource* resource)
00259 {
00260 oasys::ScopeLock lock(&resource->lock_, "ECLModule::take_resource()");
00261 resource->module_ = this;
00262 resource->should_delete_ = false;
00263
00264
00265 if ( typeid(*resource) == typeid(ECLInterfaceResource) ) {
00266 ECLInterfaceResource* iface = (ECLInterfaceResource*)resource;
00267
00268 iface_list_lock_.lock("take_resource");
00269 iface_list_.push_back(iface);
00270 iface_list_lock_.unlock();
00271
00272 log_info( "Module %s acquiring interface %s", name_.c_str(),
00273 iface->interface_->name().c_str() );
00274
00275 cl_message* message = new cl_message(*iface->create_message_);
00276 post_message(message);
00277 }
00278
00279
00280 else if ( typeid(*resource) == typeid(ECLLinkResource) ) {
00281 ECLLinkResource* link = (ECLLinkResource*)resource;
00282
00283 sem_wait(&link_list_sem_);
00284 sem_wait(&link_list_sem_);
00285
00286 link_list_.insert( LinkHashMap::value_type(link->link_->name_str(),
00287 link) );
00288
00289 sem_post(&link_list_sem_);
00290 sem_post(&link_list_sem_);
00291
00292 log_info( "Module %s acquiring link %s", name_.c_str(),
00293 link->link_->name() );
00294 cl_message* message = new cl_message(*link->create_message_);
00295 post_message(message);
00296 }
00297
00298 else {
00299 log_err( "Cannot take unknown resource type %s",
00300 typeid(*resource).name() );
00301 }
00302 }
00303
00304 void
00305 ECLModule::handle(const cla_delete_request& message)
00306 {
00307 (void)message;
00308 set_should_stop();
00309 }
00310
00311 void
00312 ECLModule::take_resources()
00313 {
00314 log_info("Module %s is acquiring appropriate CL resources", name_.c_str());
00315
00316 std::list<ECLResource*> resource_list_ = cl_.take_resources(name_);
00317 std::list<ECLResource*>::iterator resource_i;
00318
00319 for ( resource_i = resource_list_.begin();
00320 resource_i != resource_list_.end();
00321 ++resource_i ) {
00322 ECLResource* resource = (*resource_i);
00323 take_resource(resource);
00324 }
00325 }
00326
00327 void
00328 ECLModule::handle(const cla_params_set_event& message)
00329 {
00330 (void)message;
00331 BundleDaemon::post( new CLAParamsSetEvent(&cl_, name_) );
00332 }
00333
00334 void
00335 ECLModule::handle(const interface_created_event& message)
00336 {
00337 ECLInterfaceResource* resource = get_interface( message.interface_name() );
00338
00339 if (!resource) {
00340 log_warn( "Got interface_created_event for unknown interface %s",
00341 message.interface_name().c_str() );
00342 return;
00343 }
00344 }
00345
00346 void
00347 ECLModule::handle(const interface_reconfigured_event& message)
00348 {
00349 (void)message;
00350 }
00351
00352 void
00353 ECLModule::handle(const eid_reachable_event& message)
00354 {
00355 ECLInterfaceResource* resource = get_interface( message.interface_name() );
00356
00357 if (!resource) {
00358 log_warn( "Got eid_reachable_event for unknown interface %s",
00359 message.interface_name().c_str() );
00360 return;
00361 }
00362
00363 BundleDaemon::post(
00364 new NewEIDReachableEvent( resource->interface_, message.peer_eid() ) );
00365 }
00366
00367 void
00368 ECLModule::handle(const link_created_event& message)
00369 {
00370 ECLLinkResource* resource;
00371 LinkRef link("handle(link_created_event) temporary");
00372
00373 resource = get_link( message.link_name() );
00374 if (!resource) {
00375 log_err( "Got link_created_event for unknown link %s",
00376 message.link_name().c_str() );
00377 return;
00378 }
00379
00380 link = resource->link_;
00381 oasys::ScopeLock l(link->lock(), "handle(link_created_event)");
00382 if ( link->isdeleted() ) {
00383
00384
00385 log_info( "Link %s has already been deleted",
00386 message.link_name().c_str());
00387 return;
00388 }
00389
00390
00391 std::string outgoing_dir = bundle_out_path_ + "/" + message.link_name();
00392 if (oasys::IO::mkdir(outgoing_dir.c_str(), 0777) < 0) {
00393 log_err( "Unable to create outgoing bundle directory %s: %s",
00394 outgoing_dir.c_str(), strerror(errno) );
00395
00396 set_should_stop();
00397 return;
00398 }
00399
00400 link->set_create_pending(false);
00401
00402 if (link->state() == Link::UNAVAILABLE)
00403 link->set_state(Link::AVAILABLE);
00404
00405
00406 if ( message.link_attributes().high_water_mark().present() ) {
00407 resource->set_high_water_mark(
00408 message.link_attributes().high_water_mark().get() );
00409 }
00410
00411
00412 if ( message.link_attributes().low_water_mark().present() ) {
00413 resource->set_low_water_mark(
00414 message.link_attributes().low_water_mark().get() );
00415 }
00416
00417 BundleDaemon::post(new LinkCreatedEvent(link));
00418
00419 if (link->type() == Link::OPPORTUNISTIC) {
00420 BundleDaemon::post(
00421 new LinkAvailableEvent(link, ContactEvent::NO_INFO) );
00422 }
00423 }
00424
00425 void
00426 ECLModule::handle(const link_opened_event& message)
00427 {
00428 ECLLinkResource* resource = get_link( message.link_name() );
00429
00430
00431
00432
00433 if (!resource) {
00434 oasys::Notifier* notifier = new oasys::Notifier("/dtn/cl/external");
00435 BundleDaemon::post_and_wait(new StatusRequest(), notifier);
00436 delete notifier;
00437 resource = get_link( message.link_name() );
00438 if (!resource) {
00439 log_err( "Got link_opened_event for unknown link %s",
00440 message.link_name().c_str() );
00441 return;
00442 }
00443 }
00444
00445 oasys::ScopeLock l(resource->link_->lock(), "ECLModule::link_opened_evet");
00446
00447 ContactRef contact = resource->link_->contact();
00448 if (contact == NULL) {
00449 contact = new Contact(resource->link_);
00450 resource->link_->set_contact( contact.object() );
00451 }
00452
00453 l.unlock();
00454
00455 update_contact_attributes(message.contact_attributes(), contact);
00456 BundleDaemon::post( new ContactUpEvent(contact) );
00457 }
00458
00459 void
00460 ECLModule::handle(const link_closed_event& message)
00461 {
00462 ECLLinkResource* resource = get_link( message.link_name() );
00463
00464 if (!resource) {
00465 log_err( "Got link_closed_event for unknown link %s",
00466 message.link_name().c_str() );
00467 return;
00468 }
00469
00470 resource->known_state_ = Link::CLOSED;
00471
00472 if (resource->link_->contact() != NULL) {
00473 update_contact_attributes( message.contact_attributes(),
00474 resource->link_->contact() );
00475
00476
00477
00478 BundleDaemon::post( new LinkStateChangeRequest(resource->link_,
00479 Link::CLOSED,
00480 ContactEvent::NO_INFO) );
00481 }
00482 }
00483
00484 void
00485 ECLModule::handle(const link_state_changed_event& message)
00486 {
00487 Link::state_t new_state;
00488 ECLLinkResource* resource = get_link( message.link_name() );
00489
00490 if (!resource) {
00491 log_err( "Got link_state_changed_event for unknown link %s",
00492 message.link_name().c_str() );
00493 return;
00494 }
00495
00496 new_state = XMLConvert::convert_link_state( message.new_state() );
00497
00498 resource->known_state_ = new_state;
00499 BundleDaemon::post( new LinkStateChangeRequest( resource->link_,
00500 new_state, XMLConvert::convert_link_reason( message.reason() ) ) );
00501 }
00502
00503 void
00504 ECLModule::handle(const link_deleted_event& message)
00505 {
00506 ECLLinkResource* resource = get_link( message.link_name() );
00507 if (!resource) {
00508 log_err( "Got link_deleted_event for unknown link %s",
00509 message.link_name().c_str() );
00510 return;
00511 }
00512
00513
00514
00515 resource->lock_.lock("handle(link_deleted_event)");
00516 resource->module_ = NULL;
00517 resource->lock_.unlock();
00518
00519
00520
00521 sem_wait(&link_list_sem_);
00522 sem_wait(&link_list_sem_);
00523
00524 link_list_.erase( message.link_name() );
00525
00526
00527 sem_post(&link_list_sem_);
00528 sem_post(&link_list_sem_);
00529
00530
00531
00532
00533
00534
00535
00536 if (resource->link_->cl_info() != NULL) {
00537
00538 BundleDaemon::instance()->contactmgr()->del_link(resource->link_, true);
00539 }
00540
00541 cl_.delete_resource(resource);
00542
00543
00544
00545 }
00546
00547 void
00548 ECLModule::handle(const link_attribute_changed_event& message)
00549 {
00550 ECLLinkResource* resource = get_link( message.link_name() );
00551 if (!resource) {
00552 log_err( "Got link_attribute_changed_event for unknown link %s",
00553 message.link_name().c_str() );
00554 return;
00555 }
00556
00557
00558 if ( message.link_attributes().high_water_mark().present() ) {
00559 resource->set_high_water_mark(
00560 message.link_attributes().high_water_mark().get() );
00561 }
00562
00563
00564 if ( message.link_attributes().low_water_mark().present() ) {
00565 resource->set_low_water_mark(
00566 message.link_attributes().low_water_mark().get() );
00567 }
00568
00569 ContactEvent::reason_t reason =
00570 XMLConvert::convert_link_reason( message.reason() );
00571
00572 AttributeVector params;
00573 const clmessage::link_attributes& attributes = message.link_attributes();
00574
00575
00576
00577 if ( attributes.peer_eid().present() ) {
00578 resource->link_->set_remote_eid(
00579 EndpointID( attributes.peer_eid().get() ) );
00580 }
00581
00582 if (attributes.nexthop().present()) {
00583 params.push_back(
00584 NamedAttribute("nexthop", attributes.nexthop().get()) );
00585 }
00586 if (attributes.is_reachable().present()) {
00587 params.push_back(
00588 NamedAttribute("is_reachable", attributes.is_reachable().get()) );
00589 }
00590 if (attributes.how_reliable().present()) {
00591 params.push_back(
00592 NamedAttribute("how_reliable", static_cast<int>(attributes.how_reliable().get())) );
00593 }
00594 if (attributes.how_available().present()) {
00595 params.push_back(
00596 NamedAttribute("how_available", static_cast<int>(attributes.how_available().get())) );
00597 }
00598
00599 KeyValueSequence::const_iterator iter;
00600 for (iter = attributes.key_value_pair().begin();
00601 iter != attributes.key_value_pair().end();
00602 iter++) {
00603 params.push_back( NamedAttribute(iter->name(), iter->value()) );
00604 }
00605
00606 BundleDaemon::post(
00607 new LinkAttributeChangedEvent(resource->link_, params, reason) );
00608 }
00609
00610 void
00611 ECLModule::handle(const contact_attribute_changed_event& message)
00612 {
00613 ECLLinkResource* resource = get_link( message.link_name() );
00614 if (!resource) {
00615 log_err( "Got link_attribute_changed_event for unknown link %s",
00616 message.link_name().c_str() );
00617 return;
00618 }
00619
00620 update_contact_attributes( message.contact_attributes(),
00621 resource->link_->contact() );
00622
00623 ContactEvent::reason_t reason =
00624 XMLConvert::convert_link_reason( message.reason() );
00625
00626 BundleDaemon::post(
00627 new ContactAttributeChangedEvent(resource->link_->contact(), reason) );
00628 }
00629
00630 void
00631 ECLModule::handle(const link_add_reachable_event& message)
00632 {
00633
00634
00635
00636
00637 LinkRef link = BundleDaemon::instance()->contactmgr()->find_link(message.link_name().c_str());
00638 if (link != NULL){
00639 oasys::Notifier* notifier = new oasys::Notifier("/dtn/cl/external");
00640 BundleDaemon::post_and_wait(new StatusRequest(), notifier);
00641 delete notifier;
00642 link = BundleDaemon::instance()->contactmgr()->find_link(message.link_name().c_str());
00643 if (link != NULL){
00644 log_err( "Got link_add_reachable_event for link '%s' that already exists",
00645 message.link_name().c_str() );
00646 return;
00647 }
00648 }
00649
00650 ECLLinkResource* resource;
00651 const clmessage::link_config_parameters& params =
00652 message.link_config_parameters();
00653 resource = get_link( message.link_name() );
00654 if (resource) {
00655 log_err( "Got link_add_reachable_event for link '%s' that already exists",
00656 message.link_name().c_str() );
00657 return;
00658 }
00659
00660 if ( !params.nexthop().present() ) {
00661 log_err("Got link_add_reachable_event with no nexthop field");
00662 return;
00663 }
00664
00665 resource = create_discovered_link( message.peer_eid(),
00666 params.nexthop().get(),
00667 message.link_name() );
00668 }
00669
00670 void
00671 ECLModule::handle(const bundle_transmitted_event& message)
00672 {
00673
00674 ECLLinkResource* resource = get_link( message.link_name() );
00675 if (!resource) {
00676 log_err( "Got bundle_transmitted_event for unknown link %s",
00677 message.link_name().c_str() );
00678 return;
00679 }
00680
00681 BundleRef bundle =
00682 resource->get_outgoing_bundle( message.bundle_attributes() );
00683 if ( !bundle.object() ) {
00684 log_err("Got bundle_transmitted_event for unknown bundle");
00685 return;
00686 }
00687
00688
00689 if ( !resource->erase_outgoing_bundle( bundle.object() ) ) {
00690 log_err("Unable to remove bundle %d from the link's outgoing bundle list",
00691 bundle->bundleid());
00692 }
00693
00694
00695 oasys::StringBuffer filename("bundle%d", bundle->bundleid());
00696 std::string abs_path = bundle_out_path_ + "/" + resource->link_->name_str() +
00697 "/" + filename.c_str();
00698
00699
00700 ::remove( abs_path.c_str() );
00701
00702
00703
00704
00705
00706
00707
00708
00709
00710
00711
00712
00713
00714
00715
00716
00717
00718
00719
00720
00721
00722
00723
00724
00725
00726
00727 BundleTransmittedEvent* b_event =
00728 new BundleTransmittedEvent(bundle.object(),
00729 resource->link_->contact(),
00730 resource->link_,
00731 message.bytes_sent(),
00732 message.reliably_sent());
00733 BundleDaemon::post(b_event);
00734 }
00735
00736 void
00737 ECLModule::handle(const bundle_canceled_event& message)
00738 {
00739
00740 ECLLinkResource* resource = get_link( message.link_name() );
00741 if (!resource) {
00742 log_err( "Got bundle_canceled_event for unknown link %s",
00743 message.link_name().c_str() );
00744 return;
00745 }
00746
00747
00748 BundleRef bundle =
00749 resource->get_outgoing_bundle( message.bundle_attributes() );
00750 if ( !bundle.object() ) {
00751 log_err("Got bundle_canceled_event for unknown bundle");
00752 return;
00753 }
00754
00755
00756 bundle_send_failed(resource, bundle.object(), true);
00757 BundleDaemon::post( new BundleSendCancelledEvent(bundle.object(),
00758 resource->link_) );
00759
00760
00761
00762
00763
00764
00765
00766
00767
00768
00769
00770
00771
00772
00773
00774
00775
00776
00777
00778
00779
00780
00781
00782
00783 }
00784
00785 void
00786 ECLModule::handle(const bundle_receive_started_event& message)
00787 {
00788 IncomingBundleRecord record;
00789 record.location = message.location();
00790 if ( message.peer_eid().present() )
00791 record.peer_eid = message.peer_eid().get();
00792
00793 incoming_bundle_list_.push_back(record);
00794 }
00795
00796 void
00797 ECLModule::handle(const bundle_received_event& message)
00798 {
00799
00800
00801 if (message.bytes_received() == 0) {
00802 std::string file_path = bundle_in_path_ + "/" + message.location();
00803
00804
00805 if (oasys::FileUtils::size( file_path.c_str() ) >= 0) {
00806 if (::remove( file_path.c_str() ) < 0) {
00807 log_err( "Unable to remove bundle file %s: %s",
00808 file_path.c_str(), strerror(errno) );
00809 }
00810 }
00811 }
00812
00813 else {
00814 std::string peer_eid = EndpointID::NULL_EID().c_str();
00815 if ( message.peer_eid().present() )
00816 peer_eid = message.peer_eid().get();
00817
00818 read_bundle_file(message.location(), peer_eid);
00819 }
00820
00821
00822
00823 std::list<IncomingBundleRecord>::iterator incoming_i;
00824 for (incoming_i = incoming_bundle_list_.begin();
00825 incoming_i != incoming_bundle_list_.end(); ++incoming_i) {
00826 if ( incoming_i->location == message.location() ) {
00827 incoming_bundle_list_.erase(incoming_i);
00828 break;
00829 }
00830 }
00831 }
00832
00833 void
00834 ECLModule::handle(const report_eid_reachable& message)
00835 {
00836 BundleDaemon::post(
00837 new EIDReachableReportEvent( message.query_id(),
00838 message.is_reachable() ) );
00839 }
00840
00841 void
00842 ECLModule::handle(const report_link_attributes& message)
00843 {
00844 AttributeVector attrib_vector;
00845
00846 KeyValueSequence::const_iterator iter;
00847 for (iter = message.key_value_pair().begin();
00848 iter != message.key_value_pair().end();
00849 iter++) {
00850 attrib_vector.push_back( NamedAttribute(iter->name(), iter->value()) );
00851 }
00852
00853 BundleDaemon::post( new LinkAttributesReportEvent( message.query_id(),
00854 attrib_vector) );
00855 }
00856
00857 void
00858 ECLModule::handle(const report_interface_attributes& message)
00859 {
00860 AttributeVector attrib_vector;
00861
00862 KeyValueSequence::const_iterator iter;
00863 for (iter = message.key_value_pair().begin();
00864 iter != message.key_value_pair().end();
00865 iter++) {
00866 attrib_vector.push_back( NamedAttribute(iter->name(), iter->value()) );
00867 }
00868
00869 BundleDaemon::post( new IfaceAttributesReportEvent( message.query_id(),
00870 attrib_vector) );
00871 }
00872
00873 void
00874 ECLModule::handle(const report_cla_parameters& message)
00875 {
00876 AttributeVector attrib_vector;
00877
00878 KeyValueSequence::const_iterator iter;
00879 for (iter = message.key_value_pair().begin();
00880 iter != message.key_value_pair().end();
00881 iter++) {
00882 attrib_vector.push_back( NamedAttribute(iter->name(), iter->value()) );
00883 }
00884
00885 BundleDaemon::post( new CLAParametersReportEvent( message.query_id(),
00886 attrib_vector) );
00887 }
00888
00889
00890 void
00891 ECLModule::read_bundle_file(const std::string& location,
00892 const std::string& peer_eid)
00893 {
00894 int bundle_fd;
00895 bool finished = false;
00896 off_t file_offset = 0;
00897 struct stat file_stat;
00898
00899 std::string file_path = bundle_in_path_ + "/" + location;
00900
00901
00902 bundle_fd = oasys::IO::open(file_path.c_str(), O_RDONLY);
00903 if (bundle_fd < 0) {
00904 log_err( "Unable to read bundle file %s: %s", file_path.c_str(),
00905 strerror(errno) );
00906 return;
00907 }
00908
00909
00910 if (oasys::IO::stat(file_path.c_str(), &file_stat) < 0) {
00911 log_err( "Unable to stat bundle file %s: %s", file_path.c_str(),
00912 strerror(errno) );
00913 oasys::IO::close(bundle_fd);
00914 return;
00915 }
00916
00917 Bundle* bundle = new Bundle();
00918
00919
00920
00921
00922
00923 while (!finished && file_offset < file_stat.st_size) {
00924 size_t map_size = std::min(file_stat.st_size - file_offset,
00925 (off_t)MAX_BUNDLE_IN_MEMORY);
00926
00927
00928 void* bundle_ptr = oasys::IO::mmap(bundle_fd, file_offset, map_size,
00929 oasys::IO::MMAP_RO);
00930 if (bundle_ptr == NULL) {
00931 log_err( "Unable to map bundle file %s: %s", file_path.c_str(),
00932 strerror(errno) );
00933 oasys::IO::close(bundle_fd);
00934 delete bundle;
00935 return;
00936 }
00937
00938
00939 int result = BundleProtocol::consume(bundle, (u_char*)bundle_ptr,
00940 map_size, &finished);
00941
00942
00943 if (oasys::IO::munmap(bundle_ptr, map_size) < 0) {
00944 log_err("Unable to unmap bundle file");
00945 oasys::IO::close(bundle_fd);
00946 delete bundle;
00947 return;
00948 }
00949
00950
00951 if (result < 0) {
00952 log_err("Unable to process bundle");
00953 oasys::IO::close(bundle_fd);
00954 delete bundle;
00955 return;
00956 }
00957
00958
00959 file_offset += map_size;
00960 }
00961
00962
00963 oasys::IO::close(bundle_fd);
00964 if (::remove( file_path.c_str() ) < 0) {
00965 log_err( "Unable to remove bundle file %s: %s", file_path.c_str(),
00966 strerror(errno) );
00967 }
00968
00969 if (bundle->recv_blocks().size() < 1) {
00970 log_err("Received bundle does not contain enough information");
00971 delete bundle;
00972 return;
00973 }
00974
00975
00976
00977 if (file_offset < file_stat.st_size) {
00978 log_warn("Used only %llu of %llu bytes for the bundle",
00979 U64FMT(file_offset), U64FMT(file_stat.st_size));
00980 }
00981
00982
00983 BundleReceivedEvent* b_event =
00984 new BundleReceivedEvent(bundle, EVENTSRC_PEER, file_stat.st_size, peer_eid);
00985 BundleDaemon::post(b_event);
00986 }
00987
00988 void
00989 ECLModule::read_cycle() {
00990 size_t buffer_i = 0;
00991
00992
00993 int result = socket_.recv(read_buffer_, READ_BUFFER_SIZE, MSG_PEEK);
00994
00995 if (result <= 0) {
00996 log_err("Connection to CL %s lost: %s", name_.c_str(),
00997 (result == 0 ? "Closed by other side" : strerror(errno)));
00998
00999 set_should_stop();
01000 return;
01001 }
01002
01003
01004
01005 if (msg_buffer_.capacity() < msg_buffer_.size() + (size_t)result + 1)
01006 msg_buffer_.reserve(msg_buffer_.size() + (size_t)result + 1);
01007
01008
01009
01010 while (buffer_i < (size_t)result) {
01011 msg_buffer_.push_back(read_buffer_[buffer_i++]);
01012
01013
01014 if (msg_buffer_.size() > 12 &&
01015 strncmp(&msg_buffer_[msg_buffer_.size() - 13], "</cl_message>", 13) == 0) {
01016
01017
01018 msg_buffer_.push_back('\0');
01019 process_cl_event(&msg_buffer_[0], parser_);
01020
01021 msg_buffer_.clear();
01022 break;
01023 }
01024 }
01025
01026
01027 socket_.recv(read_buffer_, buffer_i, 0);
01028 }
01029
01030 int
01031 ECLModule::send_message(const cl_message* message)
01032 {
01033 xercesc::MemBufFormatTarget buf;
01034
01035 try {
01036
01037 cl_message_(buf, *message, ExternalConvergenceLayer::namespace_map_,
01038 "UTF-8", xml_schema::flags::dont_initialize);
01039 }
01040
01041 catch (xml_schema::serialization& e) {
01042 xml_schema::errors::const_iterator err_i;
01043 for (err_i = e.errors().begin(); err_i != e.errors().end(); ++err_i)
01044 log_err( "XML serialize error: %s", err_i->message().c_str() );
01045
01046 return 0;
01047 }
01048
01049 catch (std::exception& e) {
01050 log_err( "XML serialize error: %s", e.what() );
01051 return 0;
01052 }
01053
01054 std::string msg_string( (char*)buf.getRawBuffer(), buf.getLen() );
01055 log_debug_p("/dtn/cl/XML", "Sending message to module %s:\n%s",
01056 name_.c_str(), msg_string.c_str() );
01057
01058
01059 int err = socket_.send( (char*)buf.getRawBuffer(), buf.getLen(), 0 );
01060
01061 if (err < 0) {
01062 log_err("Socket error: %s", strerror(err));
01063 log_err("Connection with CL %s lost", name_.c_str());
01064
01065 return -1;
01066 }
01067
01068 return 0;
01069 }
01070
01071 int
01072 ECLModule::prepare_bundle_to_send(cl_message* message)
01073 {
01074 bundle_send_request request = message->bundle_send_request().get();
01075
01076
01077 ECLLinkResource* link_resource = get_link( request.link_name() );
01078 if (!link_resource) {
01079 log_err( "Got bundle_send_request for unknown link %s",
01080 request.link_name().c_str() );
01081 return 0;
01082 }
01083
01084
01085 BundleRef bundle =
01086 link_resource->get_outgoing_bundle( request.bundle_attributes() );
01087 if ( !bundle.object() ) {
01088 log_err( "Got bundle_send_request for unknown bundle");
01089 return 0;
01090 }
01091
01092
01093 BlockInfoVec* blocks =
01094 bundle->xmit_blocks()->find_blocks(link_resource->link_);
01095 if (!blocks) {
01096 log_err( "Bundle id %d on link %s has no block vectors",
01097 bundle->bundleid(), request.link_name().c_str() );
01098 return 0;
01099 }
01100
01101
01102 off_t total_length = BundleProtocol::total_length(blocks);
01103
01104
01105 std::string abs_path = bundle_out_path_ + "/" + request.location();
01106
01107
01108 int bundle_fd = oasys::IO::open(abs_path.c_str(), O_RDWR | O_CREAT | O_EXCL,
01109 0644);
01110 if (bundle_fd < 0) {
01111 log_err( "Unable to create bundle file %s: %s",
01112 request.location().c_str(), strerror(errno) );
01113 return 0;
01114 }
01115
01116
01117 if (oasys::IO::truncate(bundle_fd, total_length) < 0) {
01118 log_err( "Unable to resize bundle file %s: %s",
01119 request.location().c_str(), strerror(errno) );
01120 oasys::IO::close(bundle_fd);
01121 bundle_send_failed(link_resource, bundle.object(), true);
01122 return 0;
01123 }
01124
01125 off_t offset = 0;
01126 bool done = false;
01127
01128 while (offset < total_length) {
01129
01130 off_t map_size = std::min(total_length - offset,
01131 (off_t)MAX_BUNDLE_IN_MEMORY);
01132 void* bundle_ptr = oasys::IO::mmap(bundle_fd, offset, map_size,
01133 oasys::IO::MMAP_RW);
01134 if (bundle_ptr == NULL) {
01135 log_err( "Unable to map output file %s: %s",
01136 request.location().c_str(), strerror(errno) );
01137 oasys::IO::close(bundle_fd);
01138 bundle_send_failed(link_resource, bundle.object(), true);
01139 return -1;
01140 }
01141
01142
01143 BundleProtocol::produce(bundle.object(), blocks, (u_char*)bundle_ptr,
01144 offset, map_size, &done);
01145
01146
01147 oasys::IO::munmap(bundle_ptr, map_size);
01148 offset += map_size;
01149 }
01150
01151 oasys::IO::close(bundle_fd);
01152
01153
01154 return send_message(message);
01155 }
01156
01157 void
01158 ECLModule::bundle_send_failed(ECLLinkResource* link_resource,
01159 Bundle* bundle,
01160 bool erase_from_list)
01161 {
01162 ContactRef contact = link_resource->link_->contact();
01163
01164
01165 if (erase_from_list)
01166 link_resource->erase_outgoing_bundle(bundle);
01167
01168
01169 oasys::StringBuffer filename_buf("bundle%d", bundle->bundleid());
01170
01171
01172 ::remove( (bundle_out_path_ + "/" + link_resource->link_->name_str() + "/" +
01173 filename_buf.c_str()).c_str() );
01174 }
01175
01176 ECLInterfaceResource*
01177 ECLModule::get_interface(const std::string& name) const
01178 {
01179 oasys::ScopeLock l(&iface_list_lock_, "get_interface");
01180 std::list<ECLInterfaceResource*>::const_iterator iface_i;
01181
01182 for (iface_i = iface_list_.begin(); iface_i != iface_list_.end();
01183 ++iface_i) {
01184 if ( (*iface_i)->interface_->name() == name)
01185 return *iface_i;
01186 }
01187
01188 return NULL;
01189 }
01190
01191 ECLLinkResource*
01192 ECLModule::get_link(const std::string& name) const
01193 {
01194 sem_wait(&link_list_sem_);
01195
01196
01197 LinkHashMap::const_iterator link_i = link_list_.find(name);
01198 if ( link_i == link_list_.end() ) {
01199 sem_post(&link_list_sem_);
01200 return NULL;
01201 }
01202
01203 sem_post(&link_list_sem_);
01204 return link_i->second;
01205 }
01206
01207 bool
01208 ECLModule::link_exists(const std::string& name) const
01209 {
01210 sem_wait(&link_list_sem_);
01211
01212
01213 LinkHashMap::const_iterator link_i = link_list_.find(name);
01214 if ( link_i == link_list_.end() ) {
01215 sem_post(&link_list_sem_);
01216 return false;
01217 }
01218
01219 sem_post(&link_list_sem_);
01220 return true;
01221 }
01222
01223 ECLLinkResource*
01224 ECLModule::create_discovered_link(const std::string& peer_eid,
01225 const std::string& nexthop,
01226 const std::string& link_name)
01227 {
01228 ContactManager* cm = BundleDaemon::instance()->contactmgr();
01229
01230
01231 oasys::ScopeLock l(cm->lock(), "ECLModule::create_discovered_link");
01232
01233 if (cm->has_link(link_name.c_str())) {
01234 log_err("A link with name %s already exists; can't create duplicate",
01235 link_name.c_str());
01236 return NULL;
01237 }
01238
01239 LinkRef link = Link::create_link(link_name, Link::OPPORTUNISTIC, &cl_,
01240 nexthop.c_str(), 0, NULL);
01241 if (link == NULL) {
01242 log_err("Unexpected error creating opportunistic link");
01243 return NULL;
01244 }
01245
01246 LinkRef new_link(link.object(),
01247 "ECLModule::create_discovered_link: the new link");
01248
01249 new_link->set_remote_eid(peer_eid);
01250
01251
01252 new_link->set_create_pending(true);
01253
01254 if (ExternalConvergenceLayer::discovered_prev_hop_header_)
01255 new_link->params().prevhop_hdr_ = true;
01256
01257 if (!cm->add_new_link(new_link)) {
01258 new_link->delete_link();
01259 log_err( "Failed to add new opportunistic link %s", new_link->name() );
01260 new_link = NULL;
01261 return NULL;
01262 }
01263
01264
01265 ECLLinkResource* resource =
01266 new ECLLinkResource(name_, NULL, new_link, true);
01267 oasys::ScopeLock res_lock(&resource->lock_, "create_discovered_link");
01268 new_link->set_cl_info(resource);
01269 new_link->set_state(Link::AVAILABLE);
01270 resource->module_ = this;
01271 resource->should_delete_ = false;
01272
01273
01274 l.unlock();
01275
01276
01277 sem_wait(&link_list_sem_);
01278 sem_wait(&link_list_sem_);
01279
01280
01281 link_list_.insert( LinkHashMap::value_type(link_name.c_str(), resource) );
01282
01283
01284 sem_post(&link_list_sem_);
01285 sem_post(&link_list_sem_);
01286
01287
01288 new_link->set_create_pending(false);
01289
01290
01291 return resource;
01292 }
01293
01294 void
01295 ECLModule::cleanup() {
01296 LinkHashMap::const_iterator link_i;
01297
01298
01299 for (link_i = link_list_.begin(); link_i != link_list_.end();
01300 ++link_i) {
01301 ECLLinkResource* resource = link_i->second;
01302
01303 oasys::ScopeLock res_lock(&resource->lock_, "ECLModule::cleanup");
01304 resource->module_ = NULL;
01305 resource->known_state_ = Link::CLOSED;
01306
01307
01308
01309 Link::state_t current_state = resource->link_->state();
01310 if (current_state == Link::OPEN || current_state == Link::OPENING) {
01311 BundleDaemon::post( new LinkStateChangeRequest(resource->link_,
01312 Link::CLOSED, ContactEvent::NO_INFO) );
01313 }
01314
01315
01316 BundleList& bundle_set = resource->get_bundle_set();
01317 oasys::ScopeLock bundle_lock(bundle_set.lock(), "ECLModule::cleanup");
01318
01319
01320
01321 BundleList::iterator bundle_i;
01322 for (bundle_i = bundle_set.begin(); bundle_i != bundle_set.end();
01323 ++bundle_i) {
01324 bundle_send_failed(resource, *bundle_i, false);
01325 }
01326
01327
01328 bundle_set.clear();
01329
01330
01331 std::string outgoing_dir = bundle_out_path_ + "/" +
01332 resource->link_->name_str();
01333 ::remove( outgoing_dir.c_str() );
01334 }
01335
01336
01337
01338
01339 std::list<IncomingBundleRecord>::iterator incoming_i;
01340 for (incoming_i = incoming_bundle_list_.begin();
01341 incoming_i != incoming_bundle_list_.end(); ++incoming_i)
01342 read_bundle_file( incoming_i->location, EndpointID::NULL_EID().c_str() );
01343
01344
01345
01346 cl_message* message;
01347 while ( message_queue_.try_pop(&message) )
01348 delete message;
01349
01350
01351 cl_.give_resources(iface_list_);
01352 cl_.give_resources(link_list_);
01353
01354
01355 ::remove( bundle_in_path_.c_str() );
01356 ::remove( bundle_out_path_.c_str() );
01357 }
01358
01359 void
01360 ECLModule::update_contact_attributes(const contact_attributes& attributes,
01361 const ContactRef& contact)
01362 {
01363
01364
01365 contact->set_start_time(oasys::Time(attributes.start_time() / 1000,
01366 attributes.start_time() * 1000));
01367 contact->set_duration(attributes.duration());
01368 contact->set_bps(attributes.bps());
01369 contact->set_latency(attributes.latency());
01370 }
01371
01372 }
01373
01374 #endif // XERCES_C_ENABLED && EXTERNAL_CL_ENABLED