00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #ifdef HAVE_CONFIG_H
00016 # include <dtn-config.h>
00017 #endif
00018
00019 #if defined(XERCES_C_ENABLED) && defined(EXTERNAL_CL_ENABLED)
00020
00021 #include <stdio.h>
00022 #include <sys/types.h>
00023 #include <sys/socket.h>
00024 #include <netinet/in.h>
00025 #include <arpa/inet.h>
00026
00027 #include <oasys/io/NetUtils.h>
00028 #include <oasys/io/FileUtils.h>
00029 #include <oasys/thread/Lock.h>
00030 #include <oasys/util/OptParser.h>
00031
00032 #include "ExternalConvergenceLayer.h"
00033 #include "ECLModule.h"
00034 #include "bundling/BundleDaemon.h"
00035 #include "contacts/ContactManager.h"
00036
00037 namespace dtn {
00038
00039 bool ExternalConvergenceLayer::client_validation_ = true;
00040 std::string ExternalConvergenceLayer::schema_ = "";
00041 in_addr_t ExternalConvergenceLayer::server_addr_ = inet_addr("127.0.0.1");
00042 u_int16_t ExternalConvergenceLayer::server_port_ = 5070;
00043 bool ExternalConvergenceLayer::create_discovered_links_ = false;
00044 bool ExternalConvergenceLayer::discovered_prev_hop_header_ = false;
00045 xml_schema::namespace_infomap ExternalConvergenceLayer::namespace_map_;
00046
00047 ExternalConvergenceLayer::ExternalConvergenceLayer() :
00048 ConvergenceLayer("ExternalConvergenceLayer", "extcl"),
00049 global_resource_lock_("/dtn/cl/parts/global_resource_lock"),
00050 module_mutex_("/dtn/cl/parts/module_mutex"),
00051 resource_mutex_("/dtn/cl/parts/resource_mutex"),
00052 listener_(*this)
00053 {
00054
00055 }
00056
00057 ExternalConvergenceLayer::~ExternalConvergenceLayer()
00058 {
00059
00060 }
00061
00062 void
00063 ExternalConvergenceLayer::start()
00064 {
00065 log_info("ExternalConvergenceLayer started");
00066
00067
00068 if ( schema_ == std::string("") ) {
00069 log_err("No XML schema file specified."
00070 " ExternalConvergenceLayer is disabled.");
00071 return;
00072 }
00073
00074 log_info( "Using XML schema file %s", schema_.c_str() );
00075 if (client_validation_)
00076 namespace_map_[""].schema = schema_.c_str();
00077
00078
00079 listener_.start();
00080 }
00081
00082 bool
00083 ExternalConvergenceLayer::set_cla_parameters(AttributeVector ¶ms)
00084 {
00085 ECLModule* module = NULL;
00086 KeyValueSequence param_sequence;
00087 cla_set_params_request request;
00088
00089 AttributeVector::iterator iter;
00090 for (iter = params.begin(); iter != params.end(); iter++) {
00091 if (iter->name() == "create_discovered_links") {
00092 request.create_discovered_links(iter->bool_val());
00093 }
00094 else if (iter->name() == "protocol") {
00095
00096 module = get_module(iter->string_val());
00097 if (!module) {
00098 log_err(
00099 "No CLA for protocol '%s' exists; not setting CLA parameters",
00100 iter->string_val().c_str() );
00101 return false;
00102 }
00103 }
00104 else {
00105 param_sequence.push_back( clmessage::key_value_pair(iter->name(),
00106 iter->string_val()) );
00107 }
00108 }
00109
00110 if (!module) {
00111 log_err( "No protocol provided; not setting CLA parameters" );
00112 return false;
00113 }
00114
00115 request.key_value_pair(param_sequence);
00116
00117
00118 cl_message* message = new cl_message();
00119 message->cla_set_params_request(request);
00120 POST_MESSAGE(module, cla_set_params_request, request);
00121
00122 return true;
00123 }
00124
00125 bool
00126 ExternalConvergenceLayer::set_interface_defaults(int argc, const char* argv[],
00127 const char** invalidp)
00128 {
00129 std::string proto_option;
00130 oasys::OptParser parser;
00131
00132
00133 parser.addopt(new oasys::StringOpt("protocol", &proto_option));
00134 int parse_result = parser.parse_and_shift(argc, argv, invalidp);
00135
00136 if (parse_result < 0) {
00137 log_err("Error parsing interface option '%s'", *invalidp);
00138 return false;
00139 }
00140
00141 if (proto_option.size() == 0) {
00142 log_err("Error parsing interface options: no 'protocol' option given");
00143 return false;
00144 }
00145
00146
00147 ECLModule* module = get_module(proto_option);
00148 if (!module) {
00149 log_err( "No CLA for protocol '%s' exists; not setting interface defaults",
00150 proto_option.c_str() );
00151 return false;
00152 }
00153
00154 argc -= parse_result;
00155
00156
00157 KeyValueSequence param_sequence;
00158 build_param_sequence(argc, argv, param_sequence);
00159
00160
00161 interface_set_defaults_request request;
00162 request.key_value_pair(param_sequence);
00163
00164
00165 cl_message* message = new cl_message();
00166 message->interface_set_defaults_request(request);
00167 POST_MESSAGE(module, interface_set_defaults_request, request);
00168
00169 return true;
00170 }
00171
00172 bool
00173 ExternalConvergenceLayer::interface_up(Interface* iface, int argc,
00174 const char* argv[])
00175 {
00176 std::string proto_option;
00177 oasys::OptParser parser;
00178 const char* invalidp;
00179
00180
00181 parser.addopt(new oasys::StringOpt("protocol", &proto_option));
00182 int parse_result = parser.parse_and_shift(argc, argv, &invalidp);
00183
00184 if (parse_result < 0) {
00185 log_err("Error parsing interface option '%s'", invalidp);
00186 return false;
00187 }
00188
00189 if (proto_option.size() == 0) {
00190 log_err("Error parsing interface options: no 'protocol' option given");
00191 return false;
00192 }
00193
00194 argc -= parse_result;
00195
00196 log_debug( "Adding interface %s for protocol %s", iface->name().c_str(),
00197 proto_option.c_str() );
00198
00199
00200 KeyValueSequence param_sequence;
00201 build_param_sequence(argc, argv, param_sequence);
00202
00203
00204 interface_create_request request;
00205 request.interface_name( iface->name() );
00206 request.key_value_pair(param_sequence);
00207
00208 cl_message* message = new cl_message();
00209 message->interface_create_request(request);
00210
00211
00212 ECLModule* module = get_module(proto_option);
00213 ECLInterfaceResource* resource =
00214 new ECLInterfaceResource(proto_option, message, iface);
00215
00216 iface->set_cl_info(resource);
00217
00218 oasys::ScopeLock l(&resource->lock_, "interface_up");
00219
00220
00221 if (module) {
00222 module->take_resource(resource);
00223 }
00224
00225
00226 else {
00227 log_warn("No CLA for protocol '%s' exists. "
00228 "Deferring initialization of interface %s",
00229 proto_option.c_str(), iface->name().c_str());
00230
00231 add_resource(resource);
00232 }
00233
00234 return true;
00235 }
00236
00237 bool
00238 ExternalConvergenceLayer::interface_down(Interface* iface)
00239 {
00240 ECLInterfaceResource* resource =
00241 dynamic_cast<ECLInterfaceResource*>( iface->cl_info() );
00242 ASSERT(resource);
00243
00244
00245 interface_destroy_request request;
00246 request.interface_name( iface->name() );
00247 cl_message* message = new cl_message();
00248 message->interface_destroy_request(request);
00249 POST_MESSAGE(resource->module_, interface_destroy_request, request);
00250
00251
00252 resource->module_->remove_interface( iface->name() );
00253 delete resource;
00254
00255 return true;
00256 }
00257
00258 void
00259 ExternalConvergenceLayer::dump_interface(Interface* iface,
00260 oasys::StringBuffer* buf)
00261 {
00262 ECLInterfaceResource* resource =
00263 dynamic_cast<ECLInterfaceResource*>( iface->cl_info() );
00264 ASSERT(resource);
00265
00266 buf->appendf( "\tprotocol: %s\n", resource->protocol_.c_str() );
00267
00268 const KeyValueSequence& param_sequence = resource->create_message_->
00269 interface_create_request().get().key_value_pair();
00270
00271 KeyValueSequence::const_iterator param_i;
00272 for (param_i = param_sequence.begin(); param_i != param_sequence.end();
00273 ++param_i) {
00274 buf->appendf( "\t%s=%s\n", param_i->name().c_str(),
00275 param_i->value().c_str() );
00276 }
00277 }
00278
00279 bool
00280 ExternalConvergenceLayer::set_link_defaults(int argc, const char* argv[],
00281 const char** invalidp)
00282 {
00283 bool reactive_fragment;
00284 bool reactive_fragment_set = false;
00285 bool is_usable;
00286 bool is_usable_set = false;
00287 std::string nexthop;
00288 std::string proto_option;
00289 oasys::OptParser parser;
00290
00291
00292 parser.addopt(new oasys::StringOpt("protocol", &proto_option));
00293 parser.addopt(new oasys::BoolOpt("is_usable", &is_usable, "", &is_usable_set));
00294 parser.addopt(new oasys::BoolOpt("reactive_fragment", &reactive_fragment, "",
00295 &reactive_fragment_set));
00296 parser.addopt(new oasys::StringOpt("nexthop", &nexthop));
00297 int parse_result = parser.parse_and_shift(argc, argv, invalidp);
00298
00299 if (parse_result < 0) {
00300 log_err("Error parsing link option '%s'", *invalidp);
00301 return false;
00302 }
00303
00304
00305 if (proto_option.size() == 0) {
00306 log_err("Error parsing link options: no 'protocol' option given");
00307 return false;
00308 }
00309
00310
00311 ECLModule* module = get_module(proto_option);
00312 if (!module) {
00313 log_err( "No CLA for protocol '%s' exists; not setting link defaults",
00314 proto_option.c_str() );
00315 return false;
00316 }
00317
00318 argc -= parse_result;
00319
00320
00321 link_config_parameters config_params;
00322 if (is_usable_set)
00323 config_params.is_usable(is_usable);
00324 if (reactive_fragment_set)
00325 config_params.reactive_fragment(reactive_fragment);
00326 if (nexthop.size() > 0)
00327 config_params.nexthop(nexthop);
00328
00329
00330 KeyValueSequence param_sequence;
00331 build_param_sequence(argc, argv, param_sequence);
00332 config_params.key_value_pair(param_sequence);
00333
00334
00335 link_set_defaults_request request;
00336 request.link_config_parameters(config_params);
00337
00338
00339 cl_message* message = new cl_message();
00340 message->link_set_defaults_request(request);
00341 POST_MESSAGE(module, link_set_defaults_request, request);
00342
00343 return true;
00344 }
00345
00346 bool
00347 ExternalConvergenceLayer::init_link(const LinkRef& link,
00348 int argc, const char* argv[])
00349 {
00350 bool reactive_fragment;
00351 bool reactive_fragment_set = false;
00352 bool is_usable;
00353 bool is_usable_set = false;
00354 std::string nexthop;
00355 std::string proto_option;
00356 oasys::OptParser parser;
00357 const char* invalidp;
00358
00359
00360
00361
00362 if (argc == 0 && argv == NULL)
00363 return true;
00364
00365 ASSERT(link != NULL);
00366 ASSERT(!link->isdeleted());
00367 ASSERT(link->cl_info() == NULL);
00368
00369
00370 parser.addopt(new oasys::StringOpt("protocol", &proto_option));
00371 parser.addopt(new oasys::BoolOpt("is_usable", &is_usable, "", &is_usable_set));
00372 parser.addopt(new oasys::BoolOpt("reactive_fragment", &reactive_fragment, "",
00373 &reactive_fragment_set));
00374 int parse_result = parser.parse_and_shift(argc, argv, &invalidp);
00375
00376 if (parse_result < 0) {
00377 log_err("Error parsing link option '%s'", invalidp);
00378 return false;
00379 }
00380
00381 if (proto_option.size() == 0) {
00382 log_err("Error parsing link options: no 'protocol' option given");
00383 return false;
00384 }
00385
00386 log_debug( "Adding link %s for protocol %s", link->name(),
00387 proto_option.c_str() );
00388
00389 ECLModule* module = get_module(proto_option);
00390
00391 if ( module && module->link_exists( link->name() ) ) {
00392 log_err( "Link %s already exists on CL %s", link->name(),
00393 proto_option.c_str() );
00394 return false;
00395 }
00396
00397 argc -= parse_result;
00398
00399
00400 KeyValueSequence param_sequence;
00401 build_param_sequence(argc, argv, param_sequence);
00402
00403
00404 link_create_request request;
00405 request.link_name( link->name_str() );
00406 request.type( XMLConvert::convert_link_type( link->type() ) );
00407 request.peer_eid( link->remote_eid().str() );
00408
00409
00410 link_config_parameters config_params;
00411 config_params.nexthop( link->nexthop() );
00412
00413
00414 if (is_usable_set)
00415 config_params.is_usable(is_usable);
00416 if (reactive_fragment_set)
00417 config_params.reactive_fragment(reactive_fragment);
00418
00419
00420 config_params.key_value_pair(param_sequence);
00421 request.link_config_parameters(config_params);
00422
00423 cl_message* message = new cl_message();
00424 message->link_create_request(request);
00425
00426 ECLLinkResource* resource =
00427 new ECLLinkResource(proto_option, message, link, false);
00428
00429 oasys::ScopeLock l(&resource->lock_, "init_link");
00430
00431 link->set_cl_info(resource);
00432 link->set_create_pending(true);
00433
00434 if (module) {
00435 module->take_resource(resource);
00436 }
00437
00438 else {
00439 log_warn( "No CL with protocol '%s' exists. "
00440 "Deferring initialization of link %s",
00441 proto_option.c_str(), link->name() );
00442 add_resource(resource);
00443 }
00444
00445 return true;
00446 }
00447
00448 void
00449 ExternalConvergenceLayer::delete_link(const LinkRef& link)
00450 {
00451 ASSERT(link != NULL);
00452
00453 log_debug("ExternalConvergenceLayer::delete_link: "
00454 "deleting link %s", link->name());
00455
00456 oasys::ScopeLock link_lock(link->lock(),
00457 "ExternalConvergenceLayer::delete_link");
00458
00459 ASSERT(!link->isdeleted());
00460
00461 ECLLinkResource* resource =
00462 dynamic_cast<ECLLinkResource*>( link->cl_info() );
00463
00464
00465
00466
00467
00468
00469
00470
00471
00472
00473 oasys::ScopeLock lock(&resource->lock_, "delete_link");
00474
00475
00476 link->set_cl_info(NULL);
00477
00478
00479 if (resource->module_ == NULL) {
00480 lock.unlock();
00481 if (resource->should_delete_)
00482 delete_resource(resource);
00483
00484
00485
00486 return;
00487 }
00488
00489
00490 link_delete_request request;
00491 request.link_name( link->name_str() );
00492 POST_MESSAGE(resource->module_, link_delete_request, request);
00493 }
00494
00495 void
00496 ExternalConvergenceLayer::dump_link(const LinkRef& link,
00497 oasys::StringBuffer* buf)
00498 {
00499 ASSERT(link != NULL);
00500
00501 oasys::ScopeLock link_lock(link->lock(),
00502 "ExternalConvergenceLayer::reconfigure_link");
00503
00504
00505
00506
00507 if ( link->isdeleted() ) {
00508 log_err( "Cannot dump deleted link %s", link->name() );
00509 return;
00510 }
00511
00512 if (link->cl_info() == NULL) {
00513 log_err( "Cannot dump deleted link %s", link->name() );
00514 return;
00515 }
00516
00517 ECLLinkResource* resource =
00518 dynamic_cast<ECLLinkResource*>( link->cl_info() );
00519 ASSERT(resource);
00520
00521 buf->appendf( "\nprotocol: %s\n", resource->protocol_.c_str() );
00522
00523 const KeyValueSequence& param_sequence = resource->create_message_->
00524 link_create_request().get().link_config_parameters().key_value_pair();
00525
00526 KeyValueSequence::const_iterator param_i;
00527 for (param_i = param_sequence.begin(); param_i != param_sequence.end();
00528 ++param_i) {
00529 buf->appendf( "%s=%s\n", param_i->name().c_str(),
00530 param_i->value().c_str() );
00531 }
00532 }
00533
00534 bool ExternalConvergenceLayer::reconfigure_link(const LinkRef& link, int argc,
00535 const char* argv[])
00536 {
00537 bool reactive_fragment;
00538 bool reactive_fragment_set = false;
00539 bool is_usable;
00540 bool is_usable_set = false;
00541 std::string nexthop;
00542 oasys::OptParser parser;
00543 const char* invalidp;
00544
00545
00546
00547
00548 if (argc == 0 && argv == NULL)
00549 return true;
00550
00551 ASSERT(link != NULL);
00552
00553 oasys::ScopeLock link_lock(link->lock(),
00554 "ExternalConvergenceLayer::reconfigure_link");
00555
00556
00557
00558
00559 if ( link->isdeleted() ) {
00560 log_err( "Cannot reconfigure deleted link %s", link->name() );
00561 return false;
00562 }
00563
00564 if (link->cl_info() == NULL) {
00565 log_err( "Cannot reconfigure deleted link %s", link->name() );
00566 return false;
00567 }
00568
00569
00570 parser.addopt(new oasys::BoolOpt("is_usable", &is_usable, "", &is_usable_set));
00571 parser.addopt(new oasys::BoolOpt("reactive_fragment", &reactive_fragment, "",
00572 &reactive_fragment_set));
00573 parser.addopt(new oasys::StringOpt("nexthop", &nexthop));
00574 int parse_result = parser.parse_and_shift(argc, argv, &invalidp);
00575
00576 if (parse_result < 0) {
00577 log_err("Error parsing link option '%s'", invalidp);
00578 return false;
00579 }
00580
00581 cl_message* old_create_message_;
00582 cl_message* new_create_message_;
00583 KeyValueSequence::iterator new_param_i;
00584
00585 ECLLinkResource* resource =
00586 dynamic_cast<ECLLinkResource*>( link->cl_info() );
00587 ASSERT(resource);
00588
00589 KeyValueSequence param_sequence;
00590 build_param_sequence(argc, argv, param_sequence);
00591
00592 oasys::ScopeLock l(&resource->lock_, "reconfigure_link");
00593
00594
00595 old_create_message_ = resource->create_message_;
00596
00597
00598 link_create_request new_create_request =
00599 old_create_message_->link_create_request().get();
00600
00601
00602 if (is_usable_set)
00603 new_create_request.link_config_parameters().is_usable(is_usable);
00604 if (reactive_fragment_set)
00605 new_create_request.link_config_parameters().reactive_fragment(
00606 reactive_fragment);
00607 if (nexthop.size() > 0)
00608 new_create_request.link_config_parameters().nexthop(nexthop);
00609
00610
00611 KeyValueSequence& existing_params =
00612 new_create_request.link_config_parameters().key_value_pair();
00613
00614
00615 for (new_param_i = param_sequence.begin();
00616 new_param_i != param_sequence.end(); ++new_param_i) {
00617 KeyValueSequence::iterator existing_param_i;
00618
00619 for (existing_param_i = existing_params.begin();
00620 existing_param_i != existing_params.end(); ++existing_param_i) {
00621 if ( new_param_i->name() == existing_param_i->name() ) {
00622 *existing_param_i = *new_param_i;
00623 break;
00624 }
00625 }
00626 }
00627
00628
00629 new_create_message_ = new cl_message();
00630 new_create_message_->link_create_request(new_create_request);
00631 resource->create_message_ = new_create_message_;
00632 delete old_create_message_;
00633
00634 if (!resource->module_) {
00635 log_warn( "Link %s is unclaimed; new settings will apply when it is claimed",
00636 link->name() );
00637 return true;
00638 }
00639
00640
00641 link_reconfigure_request request;
00642 request.link_name( link->name_str() );
00643
00644
00645 link_config_parameters config_params;
00646 config_params.nexthop( link->nexthop() );
00647
00648
00649 if (is_usable_set)
00650 config_params.is_usable(is_usable);
00651 if (reactive_fragment_set)
00652 config_params.reactive_fragment(reactive_fragment);
00653
00654
00655 config_params.key_value_pair(param_sequence);
00656
00657
00658 request.link_config_parameters(config_params);
00659 cl_message* message = new cl_message();
00660 message->link_reconfigure_request(request);
00661 POST_MESSAGE(resource->module_, link_reconfigure_request, request);
00662
00663 return true;
00664 }
00665
00666 void
00667 ExternalConvergenceLayer::reconfigure_link(const LinkRef& link,
00668 AttributeVector& params)
00669 {
00670 ASSERT(link != NULL);
00671
00672 oasys::ScopeLock link_lock(link->lock(),
00673 "ExternalConvergenceLayer::reconfigure_link");
00674
00675
00676
00677
00678 if ( link->isdeleted() ) {
00679 log_err( "Cannot reconfigure deleted link %s", link->name() );
00680 return;
00681 }
00682
00683 if (link->cl_info() == NULL) {
00684 log_err( "Cannot reconfigure deleted link %s", link->name() );
00685 return;
00686 }
00687
00688 ECLLinkResource* resource =
00689 dynamic_cast<ECLLinkResource*>( link->cl_info() );
00690 if (!resource) {
00691 log_err( "Cannot reconfigure link that does not exist" );
00692 return;
00693 }
00694
00695 link_reconfigure_request request;
00696 request.link_name( link->name_str() );
00697 link_config_parameters config_params;
00698 KeyValueSequence param_sequence;
00699
00700 AttributeVector::iterator iter;
00701 for (iter = params.begin(); iter != params.end(); iter++) {
00702 if (iter->name() == "is_usable") {
00703 config_params.is_usable( iter->bool_val() );
00704 }
00705 else if (iter->name() == "reactive_fragment") {
00706 config_params.reactive_fragment( iter->bool_val() );
00707 }
00708 else if (iter->name() == "nexthop") {
00709 config_params.nexthop( iter->string_val() );
00710 }
00711 else {
00712 param_sequence.push_back( clmessage::key_value_pair(iter->name(),
00713 iter->string_val()) );
00714 }
00715 }
00716 config_params.key_value_pair(param_sequence);
00717 request.link_config_parameters(config_params);
00718
00719 POST_MESSAGE(resource->module_, link_reconfigure_request, request);
00720 }
00721
00722 bool
00723 ExternalConvergenceLayer::open_contact(const ContactRef& contact)
00724 {
00725 oasys::ScopeLock grl(&global_resource_lock_, "open_contact");
00726 LinkRef link = contact->link();
00727
00728 ASSERT(link != NULL);
00729
00730 oasys::ScopeLock link_lock(link->lock(),
00731 "ExternalConvergenceLayer::open_contact");
00732
00733
00734
00735
00736 if ( link->isdeleted() ) {
00737 log_err( "Cannot open contact on deleted link %s", link->name() );
00738 return false;
00739 }
00740
00741 ASSERT(link->cl_info() != NULL);
00742
00743 ECLLinkResource* resource =
00744 dynamic_cast<ECLLinkResource*>( link->cl_info() );
00745 if (!resource) {
00746 log_err( "Cannot open contact on link that does not exist" );
00747 BundleDaemon::post(
00748 new LinkStateChangeRequest(link, Link::CLOSED,
00749 ContactEvent::NO_INFO));
00750 return false;
00751 }
00752
00753 oasys::ScopeLock l(&resource->lock_, "open_contact");
00754
00755 if (!resource->module_) {
00756 log_err( "Cannot open contact on unclaimed link %s", link->name() );
00757 BundleDaemon::post(
00758 new LinkStateChangeRequest(link, Link::CLOSED,
00759 ContactEvent::NO_INFO));
00760 return false;
00761 }
00762
00763 if (resource->known_state_ != Link::OPEN) {
00764 resource->known_state_ = Link::OPEN;
00765 link_open_request request;
00766 request.link_name( link->name_str() );
00767 POST_MESSAGE(resource->module_, link_open_request, request);
00768 return true;
00769 }
00770
00771 return true;
00772 }
00773
00774 bool
00775 ExternalConvergenceLayer::close_contact(const ContactRef& contact)
00776 {
00777 oasys::ScopeLock grl(&global_resource_lock_, "open_contact");
00778 LinkRef link = contact->link();
00779
00780 ASSERT(link != NULL);
00781
00782 oasys::ScopeLock link_lock(link->lock(),
00783 "ExternalConvergenceLayer::close_contact");
00784
00785
00786 if (link->cl_info() == NULL) {
00787 log_warn( "Ignoring close contact request for destroyed link %s",
00788 link->name() );
00789 return true;
00790 }
00791
00792 ECLLinkResource* resource =
00793 dynamic_cast<ECLLinkResource*>( link->cl_info() );
00794 ASSERT(resource);
00795
00796 oasys::ScopeLock l(&resource->lock_, "close_contact");
00797
00798
00799 if (resource->known_state_ == Link::CLOSED)
00800 return true;
00801
00802 if (resource->module_ == NULL) {
00803 log_err( "close_contact called for unclaimed link %s", link->name() );
00804 return true;
00805 }
00806
00807 resource->known_state_ = Link::CLOSED;
00808
00809
00810 link_close_request request;
00811 request.link_name( link->name_str() );
00812 POST_MESSAGE(resource->module_, link_close_request, request);
00813
00814 return true;
00815 }
00816
00817 void
00818 ExternalConvergenceLayer::bundle_queued(const LinkRef& link, const BundleRef& bundle)
00819 {
00820 oasys::ScopeLock grl(&global_resource_lock_, "send_bundle");
00821
00822 ASSERT(link != NULL);
00823
00824 oasys::ScopeLock link_lock(link->lock(),
00825 "ExternalConvergenceLayer::send_bundle");
00826
00827
00828
00829
00830 if ( link->isdeleted() )
00831 return;
00832
00833 ASSERT(link->cl_info() != NULL);
00834
00835 ECLLinkResource* resource =
00836 dynamic_cast<ECLLinkResource*>( link->cl_info() );
00837 ASSERT(resource);
00838
00839 oasys::ScopeLock l(&resource->lock_, "send_bundle");
00840
00841
00842
00843 if (!resource->module_) {
00844 log_err("Cannot send bundle on nonexistent CL %s through link %s",
00845 resource->protocol_.c_str(), link->name());
00846 return;
00847 }
00848
00849 log_debug( "Sending bundle %d on link %s", bundle->bundleid(),
00850 link->name() );
00851
00852
00853 oasys::StringBuffer filename_buf("bundle%d", bundle->bundleid());
00854 std::string filename = link->name_str() + "/" +
00855 std::string( filename_buf.c_str() );
00856
00857
00858 bundle_send_request request;
00859 request.location(filename);
00860 request.link_name( link->name_str() );
00861
00862
00863 bundle_attributes bundle_attribs;
00864 fill_bundle_attributes(bundle, bundle_attribs);
00865 request.bundle_attributes(bundle_attribs);
00866
00867
00868 resource->add_outgoing_bundle(bundle.object());
00869 POST_MESSAGE(resource->module_, bundle_send_request, request);
00870
00871
00872
00873
00874
00875
00876
00877
00878
00879 }
00880
00881 void
00882 ExternalConvergenceLayer::cancel_bundle(const LinkRef& link, const BundleRef& bundle)
00883 {
00884 oasys::ScopeLock grl(&global_resource_lock_, "cancel_bundle");
00885
00886 ASSERT(link != NULL);
00887 oasys::ScopeLock link_lock(link->lock(),
00888 "ExternalConvergenceLayer::cancel_bundle");
00889
00890
00891
00892
00893 if ( link->isdeleted() )
00894 return;
00895
00896 if ( link->cl_info() == NULL)
00897 return;
00898
00899 ECLLinkResource* resource =
00900 dynamic_cast<ECLLinkResource*>( link->cl_info() );
00901 ASSERT(resource);
00902
00903 oasys::ScopeLock l(&resource->lock_, "cancel_bundle");
00904
00905
00906 if (!resource->module_) {
00907 log_err("Cannot cancel bundle on nonexistent CL %s through link %s",
00908 resource->protocol_.c_str(), link->name());
00909 return;
00910 }
00911
00912 log_info( "Cancelling bundle %d on link %s", bundle->bundleid(),
00913 link->name() );
00914
00915
00916 bundle_cancel_request request;
00917 request.link_name( link->name_str() );
00918
00919
00920 bundle_attributes bundle_attribs;
00921 fill_bundle_attributes(bundle, bundle_attribs);
00922 request.bundle_attributes(bundle_attribs);
00923
00924 POST_MESSAGE(resource->module_, bundle_cancel_request, request);
00925 return;
00926 }
00927
00928 bool
00929 ExternalConvergenceLayer::is_queued(const LinkRef& link, Bundle* bundle)
00930 {
00931 oasys::ScopeLock grl(&global_resource_lock_, "is_queued");
00932
00933 ASSERT(link != NULL);
00934 oasys::ScopeLock link_lock(link->lock(),
00935 "ExternalConvergenceLayer::is_queued");
00936 if (link->isdeleted()) {
00937 log_debug("ExternalConvergenceLayer::is_queued: "
00938 "cannot check bundle queue on deleted link %s", link->name());
00939 return false;
00940 }
00941
00942 if (link->cl_info() == NULL)
00943 return false;
00944
00945 ECLLinkResource* resource =
00946 dynamic_cast<ECLLinkResource*>( link->cl_info() );
00947 ASSERT(resource);
00948
00949 oasys::ScopeLock l(&resource->lock_, "is_queued");
00950
00951
00952 return resource->has_outgoing_bundle(bundle);
00953 }
00954
00955 void
00956 ExternalConvergenceLayer::is_eid_reachable(const std::string& query_id,
00957 Interface* iface,
00958 const std::string& endpoint)
00959 {
00960 ECLInterfaceResource* resource =
00961 dynamic_cast<ECLInterfaceResource*>( iface->cl_info() );
00962 ASSERT(resource);
00963
00964 oasys::ScopeLock l(&resource->lock_, "is_eid_reachable");
00965
00966
00967 if (!resource->module_) {
00968 log_err( "Cannot query unclaimed interface %s", iface->name().c_str() );
00969 BundleDaemon::post(new EIDReachableReportEvent(query_id, false));
00970 return;
00971 }
00972
00973 query_eid_reachable query;
00974 query.query_id(query_id);
00975 query.interface_name( iface->name() );
00976 query.peer_eid(endpoint);
00977
00978 POST_MESSAGE(resource->module_, query_eid_reachable, query);
00979 }
00980
00981 void
00982 ExternalConvergenceLayer::query_link_attributes(const std::string& query_id,
00983 const LinkRef& link,
00984 const AttributeNameVector& attributes)
00985 {
00986 ASSERT(link != NULL);
00987
00988 oasys::ScopeLock link_lock(link->lock(),
00989 "ExternalConvergenceLayer::query_link_attributes");
00990
00991 if (link->isdeleted()) {
00992 log_debug("ConvergenceLayer::query_link_attributes: "
00993 "link %s already deleted", link->name());
00994 return;
00995 }
00996
00997 ECLLinkResource* resource =
00998 dynamic_cast<ECLLinkResource*>( link->cl_info() );
00999 ASSERT(resource);
01000
01001 oasys::ScopeLock l(&resource->lock_, "query_link_attributes");
01002 if (resource->module_ == NULL) {
01003 log_err( "query_link_attributes called for unclaimed link %s",
01004 link->name() );
01005 BundleDaemon::post( new LinkAttributesReportEvent( query_id,
01006 AttributeVector() ) );
01007
01008 return;
01009 }
01010
01011 clmessage::query_link_attributes query;
01012 query.query_id(query_id);
01013 query.link_name( link->name_str() );
01014
01015 AttributeNameVector::const_iterator attrib_i;
01016 for (attrib_i = attributes.begin(); attrib_i != attributes.end(); ++attrib_i)
01017 query.attribute_name().push_back( attribute_name( attrib_i->name() ) );
01018
01019 POST_MESSAGE(resource->module_, query_link_attributes, query);
01020 }
01021
01022 void
01023 ExternalConvergenceLayer::query_iface_attributes(const std::string& query_id,
01024 Interface* iface,
01025 const AttributeNameVector& attributes)
01026 {
01027 ECLInterfaceResource* resource =
01028 dynamic_cast<ECLInterfaceResource*>( iface->cl_info() );
01029 ASSERT(resource);
01030
01031 oasys::ScopeLock l(&resource->lock_, "query_iface_attributes");
01032 if (resource->module_ == NULL) {
01033 log_err( "query_iface_attributes called for unclaimed interface %s",
01034 iface->name().c_str() );
01035 BundleDaemon::post( new IfaceAttributesReportEvent( query_id,
01036 AttributeVector() ) );
01037 return;
01038 }
01039
01040 query_interface_attributes query;
01041 query.query_id(query_id);
01042 query.interface_name( iface->name() );
01043
01044 AttributeNameVector::const_iterator attrib_i;
01045 for (attrib_i = attributes.begin(); attrib_i != attributes.end(); ++attrib_i)
01046 query.attribute_name().push_back( attribute_name( attrib_i->name() ) );
01047
01048 POST_MESSAGE(resource->module_, query_interface_attributes, query);
01049 }
01050
01051 void
01052 ExternalConvergenceLayer::query_cla_parameters(const std::string& query_id,
01053 const AttributeNameVector& parameters)
01054 {
01055 clmessage::query_cla_parameters query;
01056 std::string protocol;
01057
01058 AttributeNameVector::const_iterator param_i;
01059 for (param_i = parameters.begin();
01060 param_i != parameters.end();
01061 ++param_i) {
01062
01063
01064 if (param_i->name().find("protocol=") != std::string::npos) {
01065 protocol = param_i->name().substr(9);
01066 }
01067
01068
01069 else {
01070 query.attribute_name().push_back(
01071 attribute_name( param_i->name() ) );
01072 }
01073 }
01074
01075
01076 if (protocol.size() == 0) {
01077 log_err("No protocol given to query_cla_parameters; don't know where "
01078 "to send the query");
01079 BundleDaemon::post( new CLAParametersReportEvent( query_id,
01080 AttributeVector() ) );
01081 return;
01082 }
01083
01084
01085 ECLModule* module = get_module(protocol);
01086 if (!module) {
01087 log_err("No external CLA with protocol '%s' exists", protocol.c_str());
01088 BundleDaemon::post( new CLAParametersReportEvent( query_id,
01089 AttributeVector() ) );
01090 return;
01091 }
01092
01093
01094 query.query_id(query_id);
01095 POST_MESSAGE(module, query_cla_parameters, query);
01096 }
01097
01098 void
01099 ExternalConvergenceLayer::shutdown()
01100 {
01101 std::list<ECLModule*>::iterator list_i;
01102 for (list_i = module_list_.begin(); list_i != module_list_.end(); ++list_i) {
01103 log_info("Shutting down external CLA '%s'", (*list_i)->name().c_str());
01104 (*list_i)->shutdown();
01105 (*list_i)->join();
01106 delete *list_i;
01107 }
01108 }
01109
01110 void
01111 ExternalConvergenceLayer::add_module(ECLModule* module)
01112 {
01113 oasys::ScopeLock lock(&module_mutex_, "add_module");
01114 module_list_.push_back(module);
01115 }
01116
01117 void
01118 ExternalConvergenceLayer::remove_module(ECLModule* module)
01119 {
01120 oasys::ScopeLock lock(&module_mutex_, "remove_module");
01121 std::list<ECLModule*>::iterator list_i = module_list_.begin();
01122
01123 while ( list_i != module_list_.end() ) {
01124 if (*list_i == module) {
01125 module_list_.erase(list_i);
01126 return;
01127 }
01128
01129 ++list_i;
01130 }
01131 }
01132
01133 ECLModule*
01134 ExternalConvergenceLayer::get_module(const std::string& name)
01135 {
01136 std::string proto_option;
01137 std::list<ECLModule*>::iterator list_i;
01138
01139 oasys::ScopeLock lock(&module_mutex_, "get_module");
01140
01141 for (list_i = module_list_.begin(); list_i != module_list_.end();
01142 ++list_i) {
01143 if ((*list_i)->name() == name)
01144 return *list_i;
01145 }
01146
01147 return NULL;
01148 }
01149
01150 void
01151 ExternalConvergenceLayer::add_resource(ECLResource* resource)
01152 {
01153 oasys::ScopeLock lock(&resource_mutex_, "add_interface");
01154 resource_list_.push_back(resource);
01155 }
01156
01157 void
01158 ExternalConvergenceLayer::build_param_sequence(int argc, const char* argv[],
01159 KeyValueSequence& param_sequence)
01160 {
01161 for (int arg_i = 0; arg_i < argc; ++arg_i) {
01162 std::string arg_string(argv[arg_i]);
01163
01164
01165 size_t index = arg_string.find('=');
01166 if (index == std::string::npos) {
01167 log_warn("Invalid parameter: %s", argv[arg_i]);
01168 continue;
01169 }
01170
01171
01172 std::string lhs = arg_string.substr(0, index);
01173 std::string rhs = arg_string.substr(index + 1);
01174 param_sequence.push_back( clmessage::key_value_pair(lhs, rhs) );
01175 }
01176 }
01177
01178 void
01179 ExternalConvergenceLayer::fill_bundle_attributes(const BundleRef& bundle,
01180 bundle_attributes& attribs)
01181 {
01182 attribs.source_eid( bundle->source().str() );
01183 attribs.is_fragment(bundle->is_fragment());
01184
01185
01186
01187 attribs.timestamp_seconds(bundle->creation_ts().seconds_);
01188 attribs.timestamp_sequence(bundle->creation_ts().seqno_);
01189
01190
01191
01192 if (bundle->is_fragment()) {
01193 attribs.fragment_offset(bundle->frag_offset());
01194 attribs.fragment_length(bundle->payload().length());
01195 }
01196 }
01197
01198 std::list<ECLResource*>
01199 ExternalConvergenceLayer::take_resources(std::string name)
01200 {
01201 oasys::ScopeLock lock(&resource_mutex_, "take_resources");
01202 std::list<ECLResource*> new_list;
01203 std::list<ECLResource*>::iterator list_i = resource_list_.begin();
01204
01205 while ( list_i != resource_list_.end() ) {
01206 if ( (*list_i)->protocol_ == name ) {
01207 new_list.push_back(*list_i);
01208
01209
01210 std::list<ECLResource*>::iterator erase_i = list_i;
01211 ++list_i;
01212 resource_list_.erase(erase_i);
01213 }
01214
01215 else {
01216 ++list_i;
01217 }
01218 }
01219
01220 return new_list;
01221 }
01222
01223 void
01224 ExternalConvergenceLayer::delete_resource(ECLResource* resource)
01225 {
01226 oasys::ScopeLock lock(&resource_mutex_, "delete_resource");
01227 resource_list_.remove(resource);
01228 delete resource;
01229 }
01230
01231 void
01232 ExternalConvergenceLayer::give_resources(std::list<ECLInterfaceResource*>& list)
01233 {
01234 oasys::ScopeLock l(&resource_mutex_, "give_resources(ECLInterfaceResource)");
01235
01236 for ( std::list<ECLInterfaceResource*>::iterator list_i = list.begin();
01237 list_i != list.end();
01238 ++list_i) {
01239 ECLInterfaceResource* resource = *list_i;
01240
01241 oasys::ScopeLock res_lock(&resource->lock_, "give_resources");
01242 resource->module_ = NULL;
01243 resource->should_delete_ = true;
01244
01245 resource->module_ = NULL;
01246 resource_list_.push_back(resource);
01247 }
01248
01249 log_debug( "Got %zu ECLInterfaceResources", list.size() );
01250 list.clear();
01251 }
01252
01253 void
01254 ExternalConvergenceLayer::give_resources(LinkHashMap& list)
01255 {
01256 int actual_taken = 0;
01257 oasys::ScopeLock l(&resource_mutex_, "give_resources(ECLLinkResource)");
01258
01259 for ( LinkHashMap::iterator list_i = list.begin();
01260 list_i != list.end();
01261 ++list_i) {
01262 ECLLinkResource* resource = list_i->second;
01263
01264 oasys::ScopeLock res_lock(&resource->lock_, "give_resources");
01265 resource->module_ = NULL;
01266 resource->should_delete_ = true;
01267
01268
01269
01270 if (resource->is_discovered_)
01271 BundleDaemon::post( new LinkDeleteRequest(resource->link_) );
01272
01273 else {
01274 resource_list_.push_back(resource);
01275 ++actual_taken;
01276 }
01277 }
01278
01279 log_debug("Got %d ECLLinkResources", actual_taken);
01280 list.clear();
01281 }
01282
01283 ExternalConvergenceLayer::Listener::Listener(ExternalConvergenceLayer& cl)
01284 : TCPServerThread("ExternalConvergenceLayer::Listener",
01285 "/dtn/cl/Listener"),
01286 cl_(cl)
01287 {
01288 Thread::set_flag(Thread::DELETE_ON_EXIT);
01289 set_logfd(false);
01290 }
01291
01292 ExternalConvergenceLayer::Listener::~Listener()
01293 {
01294
01295 }
01296
01297 void
01298 ExternalConvergenceLayer::Listener::start()
01299 {
01300 log_info("Listening for external CLAs on %s:%d", intoa(server_addr_),
01301 server_port_);
01302
01303 if (bind_listen_start(server_addr_, server_port_) < 0)
01304 log_err("Listener thread failed to start");
01305 }
01306
01307 void
01308 ExternalConvergenceLayer::Listener::accepted(int fd, in_addr_t addr,
01309 u_int16_t port)
01310 {
01311 if ( schema_ == std::string() ) {
01312 log_err("ECLA module is connecting before the XSD file is specified"
01313 "Closing the socket");
01314 ::close(fd);
01315 return;
01316 }
01317
01318 log_debug("Accepted connection from a new CLA module");
01319 ECLModule* module = new ECLModule(fd, addr, port, cl_);
01320
01321
01322 cl_.add_module(module);
01323 module->start();
01324 }
01325
01326
01327 ECLLinkResource::ECLLinkResource(std::string p, clmessage::cl_message* create,
01328 const LinkRef& l, bool disc) :
01329 ECLResource(p, create),
01330 link_(l.object(), "ECLLinkResource"),
01331 outgoing_bundles_("outgoing_bundles")
01332 {
01333 known_state_ = Link::UNAVAILABLE;
01334 is_discovered_ = disc;
01335 high_water_mark_ = -1;
01336 low_water_mark_ = 0;
01337 }
01338
01339 void
01340 ECLLinkResource::add_outgoing_bundle(Bundle* bundle) {
01341 outgoing_bundles_.push_back(bundle);
01342 }
01343
01344 BundleRef
01345 ECLLinkResource::get_outgoing_bundle(clmessage::bundle_attributes bundle_attribs)
01346 {
01347 GbofId gbof_id;
01348 gbof_id.source_ = EndpointID( bundle_attribs.source_eid() );
01349 gbof_id.creation_ts_.seconds_ =
01350 (u_int32_t)bundle_attribs.timestamp_seconds();
01351 gbof_id.creation_ts_.seqno_ =
01352 (u_int32_t)bundle_attribs.timestamp_sequence();
01353 gbof_id.is_fragment_ = bundle_attribs.is_fragment();
01354
01355 if ( bundle_attribs.fragment_length().present() )
01356 gbof_id.frag_length_ = (u_int32_t)bundle_attribs.fragment_length().get();
01357 else
01358 gbof_id.frag_length_ = 0;
01359
01360 if ( bundle_attribs.fragment_length().present() )
01361 gbof_id.frag_offset_ = (u_int32_t)bundle_attribs.fragment_offset().get();
01362 else
01363 gbof_id.frag_offset_ = 0;
01364
01365 return outgoing_bundles_.find(gbof_id);
01366 }
01367
01368 bool
01369 ECLLinkResource::has_outgoing_bundle(Bundle* bundle)
01370 {
01371 return outgoing_bundles_.contains(bundle);
01372 }
01373
01374 bool
01375 ECLLinkResource::erase_outgoing_bundle(Bundle* bundle)
01376 {
01377 return outgoing_bundles_.erase(bundle);
01378 }
01379
01380 BundleList&
01381 ECLLinkResource::get_bundle_set()
01382 {
01383 return outgoing_bundles_;
01384 }
01385
01386
01387 clmessage::linkTypeType
01388 XMLConvert::convert_link_type(Link::link_type_t type)
01389 {
01390 switch (type) {
01391 case Link::ALWAYSON: return linkTypeType(linkTypeType::alwayson);
01392 case Link::ONDEMAND: return linkTypeType(linkTypeType::ondemand);
01393 case Link::SCHEDULED: return linkTypeType(linkTypeType::scheduled);
01394 case Link::OPPORTUNISTIC:
01395 return linkTypeType(linkTypeType::opportunistic);
01396 default: break;
01397 }
01398
01399 return linkTypeType();
01400 }
01401
01402 Link::link_type_t
01403 XMLConvert::convert_link_type(clmessage::linkTypeType type)
01404 {
01405 switch (type) {
01406 case linkTypeType::alwayson: return Link::ALWAYSON;
01407 case linkTypeType::ondemand: return Link::ONDEMAND;
01408 case linkTypeType::scheduled: return Link::SCHEDULED;
01409 case linkTypeType::opportunistic: return Link::OPPORTUNISTIC;
01410 default: break;
01411 }
01412
01413 return Link::LINK_INVALID;
01414 }
01415
01416 Link::state_t
01417 XMLConvert::convert_link_state(clmessage::linkStateType state)
01418 {
01419 switch (state) {
01420 case linkStateType::unavailable: return Link::UNAVAILABLE;
01421 case linkStateType::available: return Link::AVAILABLE;
01422 case linkStateType::opening: return Link::OPENING;
01423 case linkStateType::open: return Link::OPEN;
01424
01425 case linkStateType::closed: return Link::CLOSED;
01426 default: break;
01427 }
01428
01429 ASSERT(false);
01430 return Link::CLOSED;
01431 }
01432
01433 ContactEvent::reason_t
01434 XMLConvert::convert_link_reason(clmessage::linkReasonType reason)
01435 {
01436 switch (reason) {
01437 case linkReasonType::no_info: return ContactEvent::NO_INFO;
01438 case linkReasonType::user: return ContactEvent::USER;
01439 case linkReasonType::broken: return ContactEvent::BROKEN;
01440 case linkReasonType::shutdown: return ContactEvent::SHUTDOWN;
01441 case linkReasonType::reconnect: return ContactEvent::RECONNECT;
01442 case linkReasonType::idle: return ContactEvent::IDLE;
01443 case linkReasonType::timeout: return ContactEvent::TIMEOUT;
01444
01445 default: break;
01446 }
01447
01448 return ContactEvent::INVALID;
01449 }
01450
01451 }
01452
01453 #endif // XERCES_C_ENABLED && EXTERNAL_CL_ENABLED