/* Copyright 2009 Virginia Polytechnic Institute and State University Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ /* Inter-component communication handled by sockets and FD's. * Server support has been completely implemented and tested. * * Services are stored in a SQLite DB by the ID of the CE that registered them. Service * support has been completely implemented and tested. * * Missions are loaded from an XML file, connected with services provided by components, * and run. See the documentation for the "PerformActiveMission" below for important * info. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "tinyxml/tinyxml.h" #include "tinyxml/tinystr.h" #include "vtcross/debug.h" #include "vtcross/error.h" #include "vtcross/common.h" #include "vtcross/components.h" #include "vtcross/containers.h" #include "vtcross/socketcomm.h" typedef struct services_s *services_DB; typedef struct data_s *data_DB; using namespace std; struct services_s { string filename; string tablename; string command; sqlite3 *db; unsigned int num_columns; }; struct data_s { string filename; string tablename; string command; sqlite3 *db; unsigned int num_columns; }; services_DB _services_DB; data_DB _data_DB; string _SML_Config; bool shellFound; /* Callback function used internally by some of the SQLite3 commands */ int32_t callback(void *notUsed, int32_t argc, char **argv, char **azColName) { for(size_t i = 0; i < argc; i++) { LOG("%s = %s\n", azColName[i], argv[i] ? argv[i] : "NULL"); } LOG("\n"); return 0; } /* Useful for spotchecking what's in the database */ void printDatabase() { LOG("\n\n\n"); _data_DB->command = "select "; _data_DB->command.append(_data_DB->tablename); _data_DB->command.append(".* from "); _data_DB->command.append(_data_DB->tablename); _data_DB->command.append(";"); char *errorMsg; int32_t rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg); if((rc != SQLITE_OK) && (rc != 101)) WARNING("SQL error: %s\n", errorMsg); LOG("database %s, table %s:\n", _data_DB->filename, _data_DB->tablename); LOG("\n\n\n"); } ServiceManagementLayer::ServiceManagementLayer() { LOG("Creating Service Management Layer.\n"); shellSocketFD = -1; numberOfCognitiveEngines = 0; CE_Present = false; cogEngSrv = 1; } /* Free and clear the DB's associated with this SML in the destructor. * * Note that exiting with an error condition will cause SML to not be destructed, * resulting in the DB's staying in memory until the destructor is encountered in * future executions. */ ServiceManagementLayer::~ServiceManagementLayer() { char *errorMsg; int32_t rc; /* sqlite command return code */ _services_DB->command = "drop table "; _services_DB->command.append(_services_DB->tablename); rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg); if((rc != SQLITE_OK) && (rc != 101)) WARNING("ServiceManagementLayer::Destructor services 'drop table' error: %s\n", errorMsg); _services_DB->command = "vacuum"; rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg); if((rc != SQLITE_OK) && (rc != 101)) WARNING("ServiceManagementLayer::Destructor services 'vacuum' error: %s\n", errorMsg); free(_services_DB); _data_DB->command = "drop table "; _data_DB->command.append(_data_DB->tablename); rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg); if((rc != SQLITE_OK) && (rc != 101)) WARNING("ServiceManagementLayer::Destructor data 'drop table' error: %s\n", errorMsg); _data_DB->command = "vacuum"; rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg); if((rc != SQLITE_OK) && (rc != 101)) WARNING("ServiceManagementLayer::Destructor data 'vacuum' error: %s\n", errorMsg); free(_data_DB); } /* Note that sizes of CE_List, miss, and service are hardcoded for now. * Also, their sizes are hardcoded into the code in various places; a fix * for a future version. */ ServiceManagementLayer::ServiceManagementLayer(const char* SML_Config, \ const char* serverName, const char* serverPort, int16_t clientPort) { LOG("Creating Service Management Layer.\n"); _SML_Config = string(SML_Config); SMLport = clientPort; ConnectToShell(serverName, serverPort); CE_List = new CE_Reg[10]; miss = new Mission[10]; for(size_t i = 0; i < 10; i++) { miss[i].services = new Service[30]; } Current_ID = 0; LoadConfiguration(SML_Config, miss); CreateServicesDB(); CreateDataDB(); } /* CALLED BY: constructor * INPUTS: * OUTPUTS: * * DESCRIPTION: Create and initialize a DB to hold the services registered by components */ void ServiceManagementLayer::CreateServicesDB() { sqlite3_stmt *ppStmt; /* OUT: Statement handle */ const char *pzTail; /* OUT: Pointer to unused portion of zSql */ int32_t rc; /* sqlite command return code */ _services_DB = new services_s; _services_DB->filename="Services_Table"; sqlite3_open(_services_DB->filename.c_str(), &(_services_DB->db)); char *cols[] = {(char *)"ID_Num", (char *)"Service_Name"}; _services_DB->tablename="Services"; /* ifprogram execution ends in anything other than a ordered shutdown, DB's will still * be there for next run. Need to get rid of it so that old data isn't inadvertantly * used in the next execution cycle. */ _services_DB->command = "DROP TABLE ifEXISTS Services;"; rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), 128, &ppStmt, &pzTail); if((rc != SQLITE_OK) && (rc != 101)) WARNING("ServiceManagementLayer::CreateServicesDB 'prepare_stmt' error %d\n", rc); rc = sqlite3_step(ppStmt); if((rc != SQLITE_OK) && (rc != 101)) WARNING("ServiceManagementLayer::CreateServicesDB 'step' error\n"); _services_DB->num_columns = 2; /* Generate command */ _services_DB->command="CREATE TABLE "; _services_DB->command.append(_services_DB->tablename); _services_DB->command.append("("); _services_DB->command.append(cols[0]); _services_DB->command.append(" INT, "); _services_DB->command.append(cols[1]); _services_DB->command.append(" TEXT);"); /* Execute create table command */ rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), 128, &ppStmt, &pzTail); if((rc != SQLITE_OK) && (rc != 101)) WARNING("ServiceManagementLayer::CreateServicesDB 'prepare_stmt' error %d\n", rc); rc = sqlite3_step(ppStmt); if((rc != SQLITE_OK) && (rc != 101)) WARNING("ServiceManagementLayer::CreateServicesDB 'step' error\n"); } /* CALLED BY: constructor * INPUTS: * OUTPUTS: * * DESCRIPTION: Create and initialize a DB to hold the data sent by components */ void ServiceManagementLayer::CreateDataDB() { sqlite3_stmt *ppStmt; /* OUT: Statement handle */ const char *pzTail; /* OUT: Pointer to unused portion of zSql */ int32_t rc; /* sqlite command return code */ _data_DB = new data_s; _data_DB->filename="Data_Table"; sqlite3_open(_data_DB->filename.c_str(), &(_data_DB->db)); char *cols[] = {(char *)"Tag", (char *)"Data"}; _data_DB->tablename = "Data"; _data_DB->command = "DROP TABLE ifEXISTS Data;"; rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), 128, &ppStmt, &pzTail); if((rc != SQLITE_OK) && (rc != 101)) WARNING("ServiceManagementLayer::CreateDataDB 'prepare_stmt' error %d\n", rc); rc = sqlite3_step(ppStmt); if((rc != SQLITE_OK) && (rc != 101)) WARNING("ServiceManagementLayer::CreateDataDB 'step' error\n"); _data_DB->num_columns = 2; /* Generate command */ _data_DB->command = "CREATE TABLE "; _data_DB->command.append(_data_DB->tablename); _data_DB->command.append("("); _data_DB->command.append(cols[0]); /* First column is the name of the data (corresponding to the name of the output/input pair) * It is the primary key so any subsequent data with the same name will replace the row. */ _data_DB->command.append(" TEXT PRIMARY KEY ON CONFLICT REPLACE, "); _data_DB->command.append(cols[1]); _data_DB->command.append(" TEXT);"); rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), 128, &ppStmt, &pzTail); if((rc != SQLITE_OK) && (rc != 101)) WARNING("ServiceManagementLayer::CreateDataDB 'prepare_stmt' error %d\n", rc); rc = sqlite3_step(ppStmt); if((rc != SQLITE_OK) && (rc != 101)) WARNING("ServiceManagementLayer::CreateDataDB 'step' error\n"); } /* CALLED BY: MessageHandler * INPUTS: * OUTPUTS: * * DESCRIPTION: Sends a message identifying this component as an SML to the Shell */ void ServiceManagementLayer::SendComponentType() { SendMessage(shellSocketFD, "response_sml"); LOG("SML responded to GetRemoteComponentType query.\n"); } /* CALLED BY: constructor * INPUTS: |serverName| the IPv4 name of the server (127.0.0.1 for localhost) * |serverPort| the port on the server to connect to * OUTPUTS: * * DESCRIPTION: Connecting to the shell takes 2 steps * 1) Establish a client socket for communication * 2) Run the initial Registration/handshake routine */ void ServiceManagementLayer::ConnectToShell(const char* serverName, \ const char* serverPort) { shellSocketFD = ClientSocket(serverName, serverPort); RegisterComponent(); } /* CALLED BY: StartSMLServer * INPUTS: |ID| The ID number of the CE that has a message wating * OUTPUTS: * * DESCRIPTION: Called whenever a socket is identified as being ready for communication * This funciton reads the message and calls the appropriate helper */ void ServiceManagementLayer::MessageHandler(int32_t ID) { char buffer[256]; memset(buffer, 0, 256); int32_t _FD; if(ID != -1) _FD = CE_List[ID].FD; else _FD = shellSocketFD; ReadMessage(_FD, buffer); //--------Policy Engine Stuff - no policy engine support in this version-------// //printf("********* %s **********\n", buffer); // TODO // ifwe send integer op codes rather than strings, this process will be // MUCH faster since instead of donig string compares we can simply // switch on the integer value... /*if(strcmp(buffer, "register_service") == 0) { if(strcmp(buffer, "policy_geo") == 0) { } else if(strcmp(buffer, "policy_time") == 0) { } else if(strcmp(buffer, "policy_spectrum") == 0) { } else if(strcmp(buffer, "policy_spacial") == 0) { } } else if(strcmp(buffer, "deregister_service") == 0) { if(strcmp(buffer, "policy_geo") == 0) { } else if(strcmp(buffer, "policy_time") == 0) { } else if(strcmp(buffer, "policy_spectrum") == 0) { } else if(strcmp(buffer, "policy_spacial") == 0) { } }*/ //Go down the list to call the appropriate function if(strcmp(buffer, "query_component_type") == 0) { SendComponentType(); } else if(strcmp(buffer, "reset_sml") == 0) { Reset(); } else if(strcmp(buffer, "shutdown_sml") == 0) { Shutdown(); } else if(strcmp(buffer, "register_engine_cognitive") == 0) { RegisterCognitiveEngine(ID); } else if(strcmp(buffer, "register_service") == 0) { ReceiveServices(ID); } else if(strcmp(buffer, "send_component_type") == 0) { SendComponentType(); } else if(strcmp(buffer, "list_services") == 0) { ListServices(); } else if(strcmp(buffer, "set_active_mission") == 0) { SetActiveMission(); } else if(strcmp(buffer, "request_optimization") == 0) { PerformActiveMission(); } else if(strcmp(buffer, "deregister_engine_cognitive") == 0) { DeregisterCognitiveEngine(ID); } else if(strcmp(buffer, "deregister_service") == 0) { DeregisterServices(ID); } } //TODO Finish /* CALLED BY: MessageHandler * INPUTS: * OUTPUTS: * * DESCRIPTION: Deregisters the component from the Shell. */ void ServiceManagementLayer::Shutdown() { DeregisterComponent(); } //TODO Finish /* CALLED BY: MessageHandler * INPUTS: * OUTPUTS: * * DESCRIPTION: Deregisters the component from the Shell */ void ServiceManagementLayer::Reset() { DeregisterComponent(); ReloadConfiguration(); } /* CALLED BY: ConnectToShell * INPUTS: * OUTPUTS: * * DESCRIPTION: Sends the registration message to the Shell */ void ServiceManagementLayer::RegisterComponent() { SendMessage(shellSocketFD, "register_sml"); LOG("ServiceManagementLayer:: Registration message sent.\n"); } /* CALLED BY: Shutdown * INPUTS: * OUTPUTS: * * DESCRIPTION: Closes the client socket with the shell, sends a deregstration message */ void ServiceManagementLayer::DeregisterComponent() { SendMessage(shellSocketFD, "deregister_sml"); LOG("ServiceManagementLayer:: Deregistration message sent.\n"); shutdown(shellSocketFD, 2); close(shellSocketFD); shellSocketFD = -1; LOG("ServiceManagementLayer:: Shell socket closed.\n"); } /* CALLED BY: RegisterCognitiveEngine * INPUTS: |ID| The ID number of the component where the data is to be transfered to * OUTPUTS: * * DESCRIPTION: Streams config data directly from the shell to the CE, and checks * for an "ack" message from the CE after every sent message * to know when to stop communication. * * NOTE: Modified to check the incoming message buffer rather than the outgoing * message buffer to avoid a portion of the delay. May change this again to handle * data more inteligently, taking advantage of it's properties. */ void ServiceManagementLayer::TransferRadioConfiguration(int32_t ID) { struct timeval selTimeout; fd_set sockSet; int32_t rc = 1; char buffer[256]; /* Send data until the CE sends an ACK message back */ while(rc != 0) { memset(buffer, 0, 256); /* Receive data from Shell */ ReadMessage(shellSocketFD, buffer); /* Send data to CE */ SendMessage(CE_List[ID].FD, buffer); FD_ZERO(&sockSet); FD_SET(shellSocketFD, &sockSet); selTimeout.tv_sec = 0; selTimeout.tv_usec = 5000; /* Check ifthere is a message on the shell ready to be processed */ rc = select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout); } memset(buffer, 0, 256); ReadMessage(CE_List[ID].FD, buffer); SendMessage(shellSocketFD, buffer); } /* CALLED BY: RegisterCognitiveEngine * INPUTS: |ID| The ID number of the component where the data is to be transfered to * OUTPUTS: * * DESCRIPTION: Simmilar to TransferRadioConfig, just with Experience data * * NOTE: Modified to check the incoming message buffer rather than the outgoing * message buffer to avoid a portion of the delay. May change this again to handle * data more inteligently, taking advantage of it's properties. */ void ServiceManagementLayer::TransferExperience(int32_t ID) { struct timeval selTimeout; fd_set sockSet; int32_t rc = 1; char buffer[256]; /* Send data until the CE sends an ACK message back */ while(rc != 0) { memset(buffer, 0, 256); /* Receive data from Shell */ ReadMessage(shellSocketFD, buffer); /* Send data to CE */ SendMessage(CE_List[ID].FD, buffer); FD_ZERO(&sockSet); FD_SET(shellSocketFD, &sockSet); selTimeout.tv_sec = 0; selTimeout.tv_usec = 5000; /* Check ifthere is a message on the shell ready to be processed */ rc = select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout); } memset(buffer, 0, 256); ReadMessage(CE_List[ID].FD, buffer); SendMessage(shellSocketFD, buffer); } /* CALLED BY: MessageHandler * INPUTS: |ID| The ID number of the component where service is located * OUTPUTS: * * DESCRIPTION: Inserts a service into the DB with the ID of the component where it exists */ void ServiceManagementLayer::ReceiveServices(int32_t ID) { char buffer[256]; memset(buffer, 0, 256); ReadMessage(CE_List[ID].FD, buffer); char *cols[] = {(char *) "ID_Num", (char *) "Service_Name"}; /* Generate command */ _services_DB->command = "insert into "; _services_DB->command.append(_services_DB->tablename); _services_DB->command.append(" ("); _services_DB->command.append(cols[0]); _services_DB->command.append(", "); _services_DB->command.append(cols[1]); _services_DB->command.append(") "); _services_DB->command.append(" values("); char temp[3]; memset(temp, 0, 3); sprintf(temp, "%d", ID); _services_DB->command.append(temp); _services_DB->command.append(", '"); _services_DB->command.append(buffer); _services_DB->command.append("');"); /* Execute add command */ char *errorMsg; int rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg); if((rc != SQLITE_OK) && (rc != 101)) WARNING("ServiceManagementLayer::RecieveServices DB Error %s\n", errorMsg); } /* CALLED BY: MessageHandler * INPUTS: * OUTPUTS: * * DESCRIPTION: This method associates the services that components provide with the * services that are requested in the mission. Each service in the mission is given * the ID and FD of a component that has registered to provide that service. Deregistration * is okay until this method is called without a reload, but ifderegistration occurs after this * method is called it needs to be called again even ifother engines also provide the services */ void ServiceManagementLayer::SetActiveMission() { char buffer[256]; memset(buffer, 0, 256); ReadMessage(shellSocketFD, buffer); uint32_t missID = atoi(buffer); for(activeMission = 0; activeMission < 10; activeMission++) { /* Find the active mission by comparing mission ID's */ if(miss[activeMission].missionID == missID) break; } LOG("ServiceManagementLayer:: Received Set Active Mission command: %i.\n", missID); /* For each service in the mission */ for(size_t i = 0; i < miss[activeMission].numServices; i++) { /* Check whether the current service is an actual service or a conditional */ if(miss[activeMission].services[i].name.compare("if") && \ miss[activeMission].services[i].name.compare("dowhile") && \ miss[activeMission].services[i].name.compare("shell")) { /* ifit is a service, search the database of registered services to find * the ID of the component that registered it */ _services_DB->command="select "; _services_DB->command.append(_services_DB->tablename); _services_DB->command.append(".* from "); _services_DB->command.append( _services_DB->tablename); _services_DB->command.append(" where Service_Name=='"); _services_DB->command.append(miss[activeMission].services[i].name); _services_DB->command.append("';"); sqlite3_stmt * pStatement; int32_t rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), \ -1, &pStatement, NULL); if(rc == SQLITE_OK) { if(sqlite3_step(pStatement) == SQLITE_ROW) miss[activeMission].services[i].componentID = sqlite3_column_int(pStatement, 0); else { WARNING("services_DB:: Mission requires service %s ", \ miss[activeMission].services[i].name.c_str()); WARNING("not provided by any connected component.\n"); rc = 31337; } } else { WARNING("services_DB:: Error executing SQL statement. rc = %i\n%s\n", \ rc, _services_DB->command.c_str()); } sqlite3_finalize(pStatement); miss[activeMission].services[i].socketFD = \ CE_List[miss[activeMission].services[i].componentID].FD; } /* TODO Nothing to be done for conditionals at this stage */ } SendMessage(shellSocketFD, "ack"); LOG("ServiceManagementLayer:: Done setting active mission.\n"); } /* CALLED BY: PerformActiveMission * INPUTS: |sourceID| ID of the service that is being processed * OUTPUTS: * * DESCRIPTION: This is a helper method for the "PerformActiveMission" function * NOTE: This function has changed drastically from the previous implementation * * Takes an ID of a service. For that service, finds inputs in DB and forwords * those on to the engine after sending comm-starting messages. Afterwords, listenes * for the outputs so that it can store those in the database for future services or * the overall output */ void ServiceManagementLayer::TransactData(int32_t sourceID) { char buffer[256]; std::string data; char *cols[] = {(char *) "Tag", (char *) "Data"}; char *token; /* Send a message directly to the shell */ if(miss[activeMission].services[sourceID].name.find("shell") != string::npos) { shellFound=true; int32_t k = 0; while((k < 10) && (!miss[activeMission].input[k].empty())) { k++; } sprintf(buffer, "%d", k); SendMessage(shellSocketFD, buffer); for(int32_t t = 0; t < k; t++) { memset(buffer, 0 , 256); _data_DB->command="select "; _data_DB->command.append(_data_DB->tablename); _data_DB->command.append(".* from "); _data_DB->command.append(_data_DB->tablename); _data_DB->command.append(" where Tag=='"); _data_DB->command.append(miss[activeMission].input[t]); _data_DB->command.append("';"); sqlite3_stmt * pStatement; int32_t rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \ -1, &pStatement, NULL); if(rc == SQLITE_OK) { if(sqlite3_step(pStatement) == SQLITE_ROW) data=((const char*) sqlite3_column_text(pStatement, 1)); else { LOG("3data_DB:: Data not yet in DB., %s\n", _data_DB->command.c_str()); rc = 31337; } } else { WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \ rc,_data_DB->command.c_str()); } sqlite3_finalize(pStatement); token = strtok((char *) data.c_str(), "@"); token = strtok(NULL, "@"); SendMessage(shellSocketFD, token); token = strtok(NULL, "@"); SendMessage(shellSocketFD, token); } return; } /* ifthis is a service command and not a shell command... */ /* Transmission starting messages */ SendMessage(miss[activeMission].services[sourceID].socketFD, "request_optimization_service"); SendMessage(miss[activeMission].services[sourceID].socketFD, \ miss[activeMission].services[sourceID].name.c_str()); } /* CALLED BY: MessageHandler * INPUTS: * OUTPUTS: * * DESCRIPTION: This function works by first sending the inputs from the shell to * the appropriate components. The first service should begin immeadiately, as * should any others who have all of their input parameters. When they complete, * the output path is found and the data is transfered as it becomes available * Presumably at this point the second function has all of it's parameters, so it * begins to compute, and the cycle repeats. * * Rules for active missions (currently) * -Five inputs/outputs per service and per mission * -All ordering constraints have been relaxed in this version; all data is stored * locally and only sent when requested * -ifand while support fully implemented - up to three levels (if's can be nested, but dowhiles cannot) * -For dowhiles, assumes loop condition determined on last line * * -IMPORTANT: DB uses '@' to seperate individual statements; using '@' in the data * stream will result in incorrect behavior */ void ServiceManagementLayer::PerformActiveMission() { shellFound = false; std::string data_param; std::string data_obsv; std::string data; std::string input; std::string check; char buffer[256]; char buffer1[256]; std::string token, token2; std::string data2; int32_t rc; char *errorMsg; char *cols[] = {(char *) "Tag", (char *) "Data"}; LOG("ServiceManagementLayer:: Received PerformActiveMission command.\n"); /* Get the inputs */ memset(buffer, 0, 256); ReadMessage(shellSocketFD, buffer); /* Receive Set of Parameters */ memset(buffer, 0, 256); ReadMessage(shellSocketFD, buffer); int32_t t = atoi(buffer); for(size_t m = 0; m < t; m++) { memset(buffer1, 0, 256); ReadMessage(shellSocketFD, buffer1); _data_DB->command="insert into "; _data_DB->command.append(_data_DB->tablename); _data_DB->command.append(" ("); _data_DB->command.append(cols[0]); _data_DB->command.append(", "); _data_DB->command.append(cols[1]); _data_DB->command.append(") "); memset(buffer, 0, 256); ReadMessage(shellSocketFD, buffer); _data_DB->command.append(" values('"); _data_DB->command.append(buffer1); _data_DB->command.append("', '1@"); _data_DB->command.append(buffer1); _data_DB->command.append("@"); _data_DB->command.append(buffer); _data_DB->command.append("');"); rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg); if((rc != SQLITE_OK) && (rc != 101)) WARNING("SQL error: %s\n", errorMsg); } int32_t numstatements[3] = {0 ,0 ,0}; for(size_t i; i < miss[activeMission].numServices; i++) { if(miss[activeMission].services[i].name.compare("if") == 0) { input.clear(); check.clear(); for(size_t t = 0; t < 10; t++) { if(!miss[activeMission].services[i].output[t].empty()) { input = miss[activeMission].services[i - numstatements[0] - 1].output[t]; _data_DB->command="SELECT "; _data_DB->command.append(_data_DB->tablename); _data_DB->command.append(".* from "); _data_DB->command.append(_data_DB->tablename); _data_DB->command.append(" where Tag=='"); _data_DB->command.append(input); _data_DB->command.append("';"); sqlite3_stmt *pStatement; rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \ -1, &pStatement, NULL); if(rc == SQLITE_OK) { if(sqlite3_step(pStatement) == SQLITE_ROW) data = (const char *) sqlite3_column_text(pStatement, 1); else { WARNING("1 data_DB:: Data not yet in DB.\n"); rc=31337; } } else { WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \ rc,_data_DB->command.c_str()); } sqlite3_finalize(pStatement); int32_t pos = data.find_last_of("@", data.length() - 2); token = data.substr(pos + 1); token.erase(token.length() - 1); data.clear(); break; } } bool doit = false; if(miss[activeMission].services[i].output[t].find(">") != string::npos) { std::string data2; _data_DB->command="SELECT "; _data_DB->command.append(_data_DB->tablename); _data_DB->command.append(".* from "); _data_DB->command.append(_data_DB->tablename); _data_DB->command.append(" where Tag=='"); _data_DB->command.append(miss[activeMission].services[i].output[t].erase(0, 1)); _data_DB->command.append("';"); sqlite3_stmt *pStatement; rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \ -1, &pStatement, NULL); if(rc == SQLITE_OK) { if(sqlite3_step(pStatement) == SQLITE_ROW) data2 = (const char *) sqlite3_column_text(pStatement, 1); else { WARNING("2 data_DB:: Data not yet in DB.\n"); rc = 31337; } } else { WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \ rc, _data_DB->command.c_str()); } sqlite3_finalize(pStatement); int32_t pos = data2.find_last_of("@", data2.length() - 2); token2 = data2.substr(pos + 1); token2.erase(token2.length() - 1); if(atof(token.c_str()) > atof(token2.c_str())) doit = true; } else if(miss[activeMission].services[i].output[t].find(token) != string::npos) doit = true; if(doit) { for(size_t k = i + 1; k <= i+miss[activeMission].services[i].num_conds; k++) { if(miss[activeMission].services[k].name.compare("if") == 0) { input.clear(); check.clear(); for(size_t t = 0; t < 10; t++) { if(!miss[activeMission].services[k].output[t].empty()) { input = miss[activeMission].services[k - numstatements[1] - 1].output[t]; _data_DB->command="SELECT "; _data_DB->command.append(_data_DB->tablename); _data_DB->command.append(".* from "); _data_DB->command.append(_data_DB->tablename); _data_DB->command.append(" where Tag=='"); _data_DB->command.append(input); _data_DB->command.append("';"); sqlite3_stmt *pStatement; rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \ -1, &pStatement, NULL); if(rc == SQLITE_OK) { if(sqlite3_step(pStatement) == SQLITE_ROW) data = (const char *) sqlite3_column_text(pStatement, 1); else { WARNING("3 data_DB:: Data not yet in DB.\n"); rc = 31337; } } else { WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \ rc,_data_DB->command.c_str()); } sqlite3_finalize(pStatement); int32_t pos = data.find_last_of("@", data.length() - 2); token = data.substr(pos + 1); token.erase(token.length() - 1); break; } } bool doit = false; if(miss[activeMission].services[k].output[t].find(">") != string::npos) { std::string data2; _data_DB->command="SELECT "; _data_DB->command.append(_data_DB->tablename); _data_DB->command.append(".* from "); _data_DB->command.append(_data_DB->tablename); _data_DB->command.append(" where Tag=='"); _data_DB->command.append(miss[activeMission].services[k].output[t].erase(0, 1)); _data_DB->command.append("';"); sqlite3_stmt *pStatement; rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \ -1, &pStatement, NULL); if(rc == SQLITE_OK) { if(sqlite3_step(pStatement) == SQLITE_ROW) data2 = (const char *) sqlite3_column_text(pStatement, 1); else { WARNING("4 data_DB:: Data not yet in DB.\n"); rc = 31337; } } else { WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \ rc, _data_DB->command.c_str()); } sqlite3_finalize(pStatement); int32_t pos = data2.find_last_of("@", data2.length() - 2); token2 = data2.substr(pos + 1); token2.erase(token2.length() - 1); if(atof(token.c_str()) > atof(token2.c_str())) doit = true; } else if(miss[activeMission].services[k].output[t].find(token) != string::npos) doit=true; if(doit) { for(size_t j = k + 1; j <= k+miss[activeMission].services[k].num_conds; j++) { if(miss[activeMission].services[j].name.compare("if") == 0) { input.clear(); check.clear(); for(t = 0; t < 10; t++) { if(!miss[activeMission].services[j].output[t].empty()) { input = miss[activeMission].services[j - numstatements[2] - 1].output[t]; _data_DB->command="SELECT "; _data_DB->command.append(_data_DB->tablename); _data_DB->command.append(".* from "); _data_DB->command.append(_data_DB->tablename); _data_DB->command.append(" where Tag=='"); _data_DB->command.append(input); _data_DB->command.append("';"); sqlite3_stmt *pStatement; rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL); if(rc == SQLITE_OK) { if(sqlite3_step(pStatement) == SQLITE_ROW) data = (const char *) sqlite3_column_text(pStatement, 1); else { WARNING("5 data_DB:: Data not yet in DB.\n"); rc = 31337; } } else { WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \ rc, _data_DB->command.c_str()); } sqlite3_finalize(pStatement); int32_t pos = data.find_last_of("@", data.length()-2); token = data.substr(pos+1); token.erase(token.length()-1); data.clear(); break; } } bool doit = false; if(miss[activeMission].services[j].output[t].find(">") != string::npos) { _data_DB->command="SELECT "; _data_DB->command.append(_data_DB->tablename); _data_DB->command.append(".* from "); _data_DB->command.append(_data_DB->tablename); _data_DB->command.append(" where Tag=='"); _data_DB->command.append(miss[activeMission].services[j].output[t].erase(0, 1)); _data_DB->command.append("';"); sqlite3_stmt *pStatement; rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL); if(rc == SQLITE_OK) { if(sqlite3_step(pStatement) == SQLITE_ROW) data2 = (const char *) sqlite3_column_text(pStatement, 1); else { WARNING("6 data_DB:: Data not yet in DB.\n"); rc=31337; } } else { WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \ rc, _data_DB->command.c_str()); } sqlite3_finalize(pStatement); int32_t pos = data2.find_last_of("@", data2.length()-2); token2 = data2.substr(pos + 1); token2.erase(token2.length()-1); if(atof(token.c_str()) > atof(token2.c_str())) doit=true; data.clear(); } else if(miss[activeMission].services[j].output[t].find(token) != string::npos) doit = true; if(doit) { for(size_t l = j + 1; l <= j + miss[activeMission].services[j].num_conds; l++) { TransactData(l); } } numstatements[2] += miss[activeMission].services[j].num_conds + 1; j += miss[activeMission].services[j].num_conds; } else if(miss[activeMission].services[j].name.compare("dowhile") == 0) { numstatements[0]=0; while(true) { uint32_t l; for(l = j + 1; l <= j+miss[activeMission].services[j].num_conds; l++) { TransactData(l); } data.clear(); input.clear(); check.clear(); int32_t t; for(t = 0; t < 10; t++) { if(!miss[activeMission].services[j].output[t].empty()){ input=miss[activeMission].services[l-2].output[t]; _data_DB->command="SELECT "; _data_DB->command.append(_data_DB->tablename); _data_DB->command.append(".* from "); _data_DB->command.append(_data_DB->tablename); _data_DB->command.append(" where Tag=='"); _data_DB->command.append(input); _data_DB->command.append("';"); sqlite3_stmt *pStatement; rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL); if(rc == SQLITE_OK){ if(sqlite3_step(pStatement) == SQLITE_ROW) data = (const char *) sqlite3_column_text(pStatement, 1); else { WARNING("7 data_DB:: Data not yet in DB.: %s\n", _data_DB->command.c_str()); rc = 31337; } } else { WARNING("data_DB:: SQL statement error. rc = %i\n%s\n", \ rc, _data_DB->command.c_str()); } sqlite3_finalize(pStatement); int32_t pos = data.find_last_of("@", data.length() - 2); token = data.substr(pos + 1); token.erase(token.length() - 1); break; } } if(miss[activeMission].services[j].output[t].find(token) == string::npos) { break; } } j += miss[activeMission].services[j].num_conds; } else { numstatements[2] = 0; TransactData(j); } } } numstatements[1] += miss[activeMission].services[k].num_conds + 1; k += miss[activeMission].services[k].num_conds; } else if(miss[activeMission].services[k].name.compare("dowhile") == 0) { numstatements[0] = 0; while(true) { int32_t j; for(j = k + 1; j <= k + miss[activeMission].services[k].num_conds; j++) { TransactData(j); } data.clear(); input.clear(); check.clear(); int32_t t; for(t = 0; t < 10; t++) { if(!miss[activeMission].services[k].output[t].empty()) { input = miss[activeMission].services[j - 1].output[t]; _data_DB->command="SELECT "; _data_DB->command.append(_data_DB->tablename); _data_DB->command.append(".* from "); _data_DB->command.append(_data_DB->tablename); _data_DB->command.append(" where Tag=='"); _data_DB->command.append(input); _data_DB->command.append("';"); sqlite3_stmt *pStatement; rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL); if(rc == SQLITE_OK) { if(sqlite3_step(pStatement) == SQLITE_ROW) data = (const char *) sqlite3_column_text(pStatement, 1); else { WARNING("8 data_DB:: Data not yet in DB.\n"); rc = 31337; } } else { WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \ rc, _data_DB->command.c_str()); } sqlite3_finalize(pStatement); int32_t pos = data.find_last_of("@", data.length() - 2); token = data.substr(pos + 1); token.erase(token.length() - 1); break; } } if(miss[activeMission].services[k].output[t].find(token) == string::npos) { break; } } k += miss[activeMission].services[k].num_conds; } else{ numstatements[1] = 0; TransactData(k); } } } numstatements[0] += miss[activeMission].services[i].num_conds + 1; i += miss[activeMission].services[i].num_conds; } else if(miss[activeMission].services[i].name.compare("dowhile") == 0) { numstatements[0] = 0; while(true) { uint32_t k; for(k = i + 1; k <= i + miss[activeMission].services[i].num_conds; k++){ TransactData(k); } data.clear(); input.clear(); check.clear(); int32_t t; for(t = 0; t < 10; t++){ if(!miss[activeMission].services[i].output[t].empty()) { input = miss[activeMission].services[k - 1].output[t]; _data_DB->command="SELECT "; _data_DB->command.append(_data_DB->tablename); _data_DB->command.append(".* from "); _data_DB->command.append(_data_DB->tablename); _data_DB->command.append(" where Tag=='"); _data_DB->command.append(input); _data_DB->command.append("';"); sqlite3_stmt *pStatement; rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL); if(rc == SQLITE_OK) { if(sqlite3_step(pStatement) == SQLITE_ROW) data = (const char *) sqlite3_column_text(pStatement, 1); else { WARNING("10data_DB:: Data not yet in DB.\n"); rc = 31337; } } else { WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \ rc, _data_DB->command.c_str()); } sqlite3_finalize(pStatement); int32_t pos = data.find_last_of("@", data.length()-2); token = data.substr(pos + 1); token.erase(token.length() - 1); break; } } if(miss[activeMission].services[i].output[t].find(token) == string::npos) { break; } } i += miss[activeMission].services[i].num_conds; } else{ numstatements[0] = 0; TransactData(i); } } int32_t i = 0; data.clear(); if(!shellFound) { int k = 0; while(k < 10 && !miss[activeMission].input[k].empty()) { k++; } sprintf(buffer, "%d", k); SendMessage(shellSocketFD, buffer); for(size_t t = 0; t < k; t++) { SendMessage(shellSocketFD, miss[activeMission].input[t].c_str()); SendMessage(shellSocketFD, "0"); } } LOG("ServiceManagementLayer:: Done performing active mission.\n"); } /* CALLED BY: MessageHandler * INPUTS: * OUTPUTS: * * DESCRIPTION: Print a list of the services currently registered and the ID's of the components that registered them */ void ServiceManagementLayer::ListServices() { _services_DB->command="select "; _services_DB->command.append(_services_DB->tablename); _services_DB->command.append(".* from "); _services_DB->command.append(_services_DB->tablename); _services_DB->command.append(";"); // Execute print (select all) command char *errorMsg; int32_t rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg); if((rc != SQLITE_OK) && (rc != 101)) WARNING("SQL error: %s\n", errorMsg); LOG("database %s, table %s:\n", _services_DB->filename.c_str(), _services_DB->tablename.c_str()); } /* CALLED BY: Reset * INPUTS: * OUTPUTS: * * DESCRIPTION: Clear and reinitialize the mission array, then reload the configuration file */ void ServiceManagementLayer::ReloadConfiguration() { LOG("ServiceManagementLayer:: Reloading Configuration.\n"); free(miss); miss = new Mission[10]; for(size_t i = 0; i < 10; i++) miss[i].services = new Service[30]; LoadConfiguration(_SML_Config.c_str(), miss); } /* CALLED BY: constructor * INPUTS: |SML_Config| Address (either relitive or full) of the XML file containing mission data * |mList| Mission array to be modified * OUTPUTS: * * DESCRIPTION: IMPORTANT - See formatting instructions for correct parsing of data * Can currently handle 10 inputs and 10 outputs per service, but easily expandable * Also, can handle two layer of nested conditional statements, but could * be expanded to meet additional needs. * * Components assigned to mission during "set active mission" stage so that * components can still continue to register after the configuration is loaded */ void ServiceManagementLayer::LoadConfiguration(const char *SML_Config, Mission* &mList) { TiXmlElement *pMission; TiXmlElement *pService; TiXmlElement *pChild0, *pChild1, *pChild2, *pChild3, *pChild4; TiXmlHandle hRoot(0); LOG("ServiceManagementLayer:: Loading Configuration.\n"); TiXmlDocument doc("."); doc.LoadFile(SML_Config); bool loadOkay = doc.LoadFile(); if(!loadOkay) WARNING("Loading SML configuration failed: %s\n", SML_Config); TiXmlHandle hDoc(&doc); pMission = hDoc.FirstChildElement().Element(); if(!pMission) WARNING("No valid root!"); hRoot = TiXmlHandle(pMission); pService = pMission->FirstChildElement(); int32_t mission_num = 0; /* Iterate through the missions */ for(pChild0 = pMission->FirstChildElement(); pChild0 ; pChild0 = pChild0->NextSiblingElement()) { int32_t service_num = 0; uint16_t cond_array[] = {0, 0, 0}; for(pChild1 = (pChild0->FirstChildElement())->FirstChildElement(); pChild1; \ pChild1 = pChild1->NextSiblingElement()) { int32_t conditional_0 = service_num; for(pChild2 = pChild1->FirstChildElement(); pChild2; pChild2 = pChild2->NextSiblingElement()) { service_num++; int32_t conditional_1 = service_num; for(pChild3 = pChild2->FirstChildElement(); pChild3; pChild3 = pChild3->NextSiblingElement()) { service_num++; int32_t conditional_2 = service_num; for(pChild4 = pChild3->FirstChildElement(); pChild4; pChild4 = pChild4->NextSiblingElement()) { service_num++; if(pChild4->Attribute("name")) mList[mission_num].services[service_num].name = pChild4->Attribute("name"); else mList[mission_num].services[service_num].name = pChild4->Value(); for(size_t i = 1; i <= 10; i++) { char buffer[9]="input"; sprintf(buffer, "%s%d", buffer, i); if(pChild4->Attribute(buffer)) mList[mission_num].services[service_num].input[i - 1] = pChild4->Attribute(buffer); char buffer2[9]="output"; sprintf(buffer2, "%s%d", buffer2, i); if(pChild4->Attribute(buffer2)) mList[mission_num].services[service_num].output[i - 1] = pChild4->Attribute(buffer2); } if(pChild4->Attribute("parameter")) mList[mission_num].services[service_num].parameter = pChild4->Attribute("parameter"); cond_array[2]++; } if(!strcmp(pChild3->Value(), "shell") || conditional_2 != service_num) { mList[mission_num].services[conditional_2].name = pChild3->Value(); } else { mList[mission_num].services[service_num].name = pChild3->Attribute("name"); } for(size_t i = 1; i <= 10; i++) { char buffer[9]="input"; sprintf(buffer, "%s%d", buffer, i); if(pChild3->Attribute(buffer)) mList[mission_num].services[conditional_2].input[i - 1] = pChild3->Attribute(buffer); char buffer2[9]="output"; sprintf(buffer2, "%s%d", buffer2, i); if(pChild3->Attribute(buffer2)) mList[mission_num].services[conditional_2].output[i - 1] = pChild3->Attribute(buffer2); } if(pChild3->Attribute("parameter")) mList[mission_num].services[conditional_2].parameter = pChild3->Attribute("parameter"); mList[mission_num].services[conditional_2].num_conds = cond_array[2]; cond_array[1] += cond_array[2] + 1; cond_array[2] = 0; } if(!strcmp(pChild2->Value(), "shell") || (conditional_1 != service_num)) { mList[mission_num].services[conditional_1].name = pChild2->Value(); } else{ mList[mission_num].services[service_num].name = pChild2->Attribute("name"); } for(int i = 1; i <= 10; i++) { char buffer[9]="input"; sprintf(buffer, "%s%d", buffer, i); if(pChild2->Attribute(buffer)) mList[mission_num].services[conditional_1].input[i - 1] = pChild2->Attribute(buffer); char buffer2[9]="output"; sprintf(buffer2, "%s%d", buffer2, i); if(pChild2->Attribute(buffer2)) mList[mission_num].services[conditional_1].output[i - 1] = pChild2->Attribute(buffer2); } if(pChild2->Attribute("parameter")) mList[mission_num].services[conditional_1].parameter = pChild2->Attribute("parameter"); mList[mission_num].services[conditional_1].num_conds = cond_array[1]; cond_array[0] += cond_array[1] + 1; cond_array[1] = 0; } if(!strcmp(pChild1->Value(), "shell") || conditional_0 != service_num) { mList[mission_num].services[conditional_0].name = pChild1->Value(); } else{ mList[mission_num].services[conditional_0].name = pChild1->Attribute("name"); } for(size_t i = 1; i <= 10; i++) { char buffer[9]="input"; sprintf(buffer, "%s%d", buffer, i); if(pChild1->Attribute(buffer)) mList[mission_num].services[conditional_0].input[i-1] = pChild1->Attribute(buffer); char buffer2[9]="output"; sprintf(buffer2, "%s%d", buffer2, i); if(pChild1->Attribute(buffer2)) mList[mission_num].services[conditional_0].output[i-1] = pChild1->Attribute(buffer2); } if(pChild1->Attribute("parameter")) mList[mission_num].services[conditional_0].parameter = pChild1->Attribute("parameter"); mList[mission_num].services[conditional_0].num_conds = cond_array[0]; cond_array[0] = 0; service_num++; } mList[mission_num].numServices = service_num; mList[mission_num].name = pChild0->Attribute("name"); mList[mission_num].missionID = atoi(pChild0->Attribute("id")); for(size_t i = 1; i <= 10; i++) { char buffer[9]="param"; sprintf(buffer, "%s%d", buffer, i); if(pChild0->Attribute(buffer)){ mList[mission_num].input[i-1] = pChild0->Attribute(buffer); } } mission_num++; } LOG("ServiceManagementLayer:: Done Loading Configuration\n"); } /* CALLED BY: MessageHandler * INPUTS: |ID| The ID number of the engine to be registered * OUTPUTS: * * DESCRIPTION: Sends a registration message onto the shell and sends the ACK back to the component */ void ServiceManagementLayer::RegisterCognitiveEngine(int32_t ID) { SendMessage(shellSocketFD, "register_engine_cognitive"); LOG("ServiceManagementLayer:: CE registration message forwarded to shell.\n"); char buffer[256]; memset(buffer, 0, 256); ReadMessage(shellSocketFD, buffer); SendMessage(CE_List[ID].FD, buffer); TransferRadioConfiguration(ID); memset(buffer, 0, 256); TransferExperience(ID); memset(buffer, 0, 256); numberOfCognitiveEngines++; CE_Present = true; } /* CALLED BY: MessageHandler * INPUTS: |ID| The ID number of the engine to have it's services deregistered * OUTPUTS: * * DESCRIPTION: Deletes individual services from the DB * NOTE THAT this function only needs to be called ifservice deregistration is going * to be done at a different time than component deregistration; it is handled * more efficiently and directly during that deregistration process. */ void ServiceManagementLayer::DeregisterServices(int32_t ID) { char buffer[256]; memset(buffer, 0, 256); ReadMessage(CE_List[ID].FD, buffer); _services_DB->command="DELETE FROM "; _services_DB->command.append(_services_DB->tablename); _services_DB->command.append(" WHERE ID_Num IN (SELECT "); char tmp[3]; memset(tmp,0,3); sprintf(tmp, "%d", ID); _services_DB->command.append(tmp); _services_DB->command.append(" FROM "); _services_DB->command.append(_services_DB->tablename); _services_DB->command.append(" WHERE Service_Name"); _services_DB->command.append("=='"); _services_DB->command.append(buffer); _services_DB->command.append("');"); char *errorMsg; int32_t rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg); if((rc != SQLITE_OK) && (rc != 101)) WARNING("SQL error: %s\n", errorMsg); } /* CALLED BY: MessageHandler * INPUTS: |ID| The ID number of the engine to have it's services deregistered * OUTPUTS: * * DESCRIPTION: Deletes the contact info for the cognitive engine, forwards a deregistration message to the shell * Also, deletes the services from the DB */ void ServiceManagementLayer::DeregisterCognitiveEngine(int32_t ID) { LOG("ServiceManagementLayer:: CE deregistration message forwarded to shell.\n"); numberOfCognitiveEngines--; if(numberOfCognitiveEngines == 0) CE_Present = false; SendMessage(shellSocketFD, "deregister_engine_cognitive"); char buffer[256]; memset(buffer, 0, 256); ReadMessage(shellSocketFD, buffer); SendMessage(CE_List[ID].FD, buffer); if(strcmp("deregister_ack", buffer) != 0) { ERROR(1, "SML:: Failed to close CE socket\n"); } //Deregister the services _services_DB->command="DELETE FROM "; _services_DB->command.append(_services_DB->tablename); _services_DB->command.append(" WHERE "); _services_DB->command.append("ID_Num"); _services_DB->command.append("=="); char tmp[3]; memset(tmp,0,3); sprintf(tmp, "%d", ID); _services_DB->command.append(tmp); _services_DB->command.append(";"); char *errorMsg; int32_t rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg); if((rc != SQLITE_OK) && (rc != 101)) WARNING("SQL error: %s\n", errorMsg); CE_List[ID].FD = -1; CE_List[ID].ID_num = -1; LOG("Cognitive Radio Shell:: CE Socket closed for engine #%d.\n", ID); } /* CALLED BY: test class * INPUTS: * OUTPUTS: * * DESCRIPTION: Sets up a server socket and listens for communication on either that or the shell socket */ void ServiceManagementLayer::StartSMLServer() { struct timeval selTimeout; int32_t running = 1; int32_t port, rc, new_sd = 1; int32_t desc_ready = 1; fd_set sockSet, shellSet; cogEngSrv = CreateTCPServerSocket(SMLport); int32_t maxDescriptor = cogEngSrv; if(InitializeTCPServerPort(cogEngSrv) == -1) ERROR(1,"Error initializing primary port\n"); while (running) { /* Zero socket descriptor vector and set for server sockets */ /* This must be reset every time select() is called */ FD_ZERO(&sockSet); FD_SET(cogEngSrv, &sockSet); for(uint16_t k = 0; k < Current_ID; k++){ if(CE_List[k].ID_num != -1) FD_SET(CE_List[k].FD, &sockSet); } /* Timeout specification */ /* This must be reset every time select() is called */ selTimeout.tv_sec = 0; /* timeout (secs.) */ selTimeout.tv_usec = 0; /* 0 microseconds */ /* Changed both to zero so that select will check messages from the shell * instead of blocking when there is no command from the CE's to be processed */ /* Check ifthere is a message on the socket waiting to be read */ rc = select(maxDescriptor + 1, &sockSet, NULL, NULL, &selTimeout); if(rc == 0) { FD_ZERO(&shellSet); FD_SET(shellSocketFD, &shellSet); selTimeout.tv_sec = 0; selTimeout.tv_usec = 0; /* Check if there is a message on the shell socket ready to be processed */ select(shellSocketFD + 1, &shellSet, NULL, NULL, &selTimeout); if(FD_ISSET(shellSocketFD, &shellSet)){ MessageHandler(-1);} } else { desc_ready = rc; for(port = 0; port <= maxDescriptor && desc_ready > 0; port++) { if(FD_ISSET(port, &sockSet)) { desc_ready -= 1; /* Check ifrequest is new or on an existing open descriptor */ if(port == cogEngSrv) { /* If new, assign it a descriptor and give it an ID */ new_sd = AcceptTCPConnection(port); if(new_sd < 0) break; CE_List[Current_ID].FD = new_sd; CE_List[Current_ID].ID_num = Current_ID; MessageHandler(Current_ID); Current_ID++; FD_SET(new_sd,&sockSet); if(new_sd > maxDescriptor) maxDescriptor = new_sd; } else { /* If old, figure out which ID it coresponds to and handle it accordingly */ for(size_t z = 0; z < Current_ID; z++) { if(CE_List[z].FD == port) { MessageHandler(z); } } } } } } } /* Close sockets */ close(cogEngSrv); return; }