/* Copyright 2009 Virginia Polytechnic Institute and State University Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ /* Inter-component communication handled by sockets and FD's. * Server support has been completely implemented and tested. * * Services are stored in a SQLite DB by the ID of the CE that registered them. Service * support has been completely implemented and tested. * * Missions are loaded from an XML file, connected with services provided by components, * and run. See the documentation for the "PerformActiveMission" below for important * info. */ #include #include #include #include #include #include "vtcross/common.h" #include "components.h" #include "vtcross/containers.h" #include "vtcross/debug.h" #include "vtcross/error.h" #include "vtcross/socketcomm.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include "tinyxml/tinyxml.h" #include "tinyxml/tinystr.h" #include "sqlite3.h" typedef struct services_s *services_DB; typedef struct data_s *data_DB; using namespace std; struct services_s { char filename[64]; char tablename[64]; char command[2048]; sqlite3 *db; unsigned int num_columns; }; struct data_s { char filename[64]; char tablename[64]; char command[2048]; sqlite3 *db; unsigned int num_columns; }; services_DB _services_DB; data_DB _data_DB; const char *_SML_Config; bool shellFound; //Callback function used internally by some of the SQLite3 commands int callback(void *notUsed, int argc, char **argv, char **azColName){ int i; for(i=0; icommand, "drop table "); strcat(_services_DB->command, _services_DB->tablename); int rc = sqlite3_exec(_services_DB->db, _services_DB->command, callback, 0, &errorMsg); if( rc!=SQLITE_OK && rc!=101 ) fprintf(stderr, "ServiceManagementLayer::Destructor services 'drop table' error: %s\n", errorMsg); strcpy(_services_DB->command, "vacuum"); rc = sqlite3_exec(_services_DB->db, _services_DB->command, callback, 0, &errorMsg); if( rc!=SQLITE_OK && rc!=101 ) fprintf(stderr, "ServiceManagementLayer::Destructor services 'vacuum' error: %s\n", errorMsg); free(_services_DB); strcpy(_data_DB->command, "drop table "); strcat(_data_DB->command, _data_DB->tablename); rc = sqlite3_exec(_data_DB->db, _data_DB->command, callback, 0, &errorMsg); if( rc!=SQLITE_OK && rc!=101 ) fprintf(stderr, "ServiceManagementLayer::Destructor data 'drop table' error: %s\n", errorMsg); strcpy(_data_DB->command, "vacuum"); rc = sqlite3_exec(_data_DB->db, _data_DB->command, callback, 0, &errorMsg); if( rc!=SQLITE_OK && rc!=101 ) fprintf(stderr, "ServiceManagementLayer::Destructor data 'vacuum' error: %s\n", errorMsg); free(_data_DB); } //Note that sizes of CE_List, miss, and service are hardcoded for now. //Also, their sizes are hardcoded into the code in various places; a fix for a future version. ServiceManagementLayer::ServiceManagementLayer(const char* SML_Config, \ const char* serverName, const char* serverPort, int16_t clientPort) { LOG("Creating Service Management Layer.\n"); _SML_Config = SML_Config; SMLport = clientPort; ConnectToShell(serverName, serverPort); CE_List = (CE_Reg *) malloc(10*sizeof(struct CE_Reg)); CE_List = new CE_Reg[10]; miss = new Mission[10]; for(int i = 0; i < 10; i++) miss[i].services = new Service[30]; Current_ID = 0; LoadConfiguration(SML_Config, miss); CreateServicesDB(); CreateDataDB(); } /* CALLED BY: constructor * INPUTS: * OUTPUTS: * * DESCRIPTION: Create and initialize a DB to hold the services registered by components */ void ServiceManagementLayer::CreateServicesDB() { sqlite3_stmt *ppStmt; /* OUT: Statement handle */ const char *pzTail; /* OUT: Pointer to unused portion of zSql */ _services_DB = (services_DB) malloc(sizeof(struct services_s)); // char *errorMsg; // create database // copy filename strcpy(_services_DB->filename, "Services_Table"); // execute create database command // database handle //_services_DB->db = NULL; sqlite3_open(_services_DB->filename, &(_services_DB->db)); char* cols[] = {(char *)"ID_Num", (char *)"Service_Name"}; // copy tablename strcpy(_services_DB->tablename, "Services"); //If program execution ends in anything other than a ordered shutdown, DB's will still be there for next run //Need to get rid of it so that old data isn't inadvertantly used in the next execution cycle sprintf(_services_DB->command, "DROP TABLE IF EXISTS Services;"); int rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command, 128, &ppStmt, &pzTail); if( rc!=SQLITE_OK && rc!=101 ) printf("ServiceManagementLayer::CreateServicesDB 'prepare_stmt' error %d\n", rc); rc = sqlite3_step(ppStmt); if( rc!=SQLITE_OK && rc!=101 ) printf("ServiceManagementLayer::CreateServicesDB 'step' error\n"); // number of columns in the table _services_DB->num_columns = 2; // generate command strcpy(_services_DB->command, "CREATE TABLE "); strcat(_services_DB->command, _services_DB->tablename); strcat(_services_DB->command, "("); strcat(_services_DB->command, cols[0]); strcat(_services_DB->command, " INT, "); strcat(_services_DB->command, cols[1]); strcat(_services_DB->command, " TEXT"); strcat(_services_DB->command, ");"); // execute create table command rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command, 128, &ppStmt, &pzTail); if( rc!=SQLITE_OK && rc!=101 ) printf("ServiceManagementLayer::CreateServicesDB 'prepare_stmt' error %d\n", rc); rc = sqlite3_step(ppStmt); if( rc!=SQLITE_OK && rc!=101 ) printf("ServiceManagementLayer::CreateServicesDB 'step' error\n"); } /* CALLED BY: constructor * INPUTS: * OUTPUTS: * * DESCRIPTION: Create and initialize a DB to hold the data sent by components */ void ServiceManagementLayer::CreateDataDB() { _data_DB = (data_DB) malloc(sizeof(struct data_s)); //char *errorMsg; sqlite3_stmt *ppStmt; /* OUT: Statement handle */ const char *pzTail; /* OUT: Pointer to unused portion of zSql */ // create database // copy filename strcpy(_data_DB->filename, "Data_Table"); // execute create database command // database handle //_services_DB->db = NULL; sqlite3_open(_data_DB->filename, &(_data_DB->db)); char* cols[] = {(char *)"Tag", (char *)"Data"}; // create table // copy tablename strcpy(_data_DB->tablename, "Data"); sprintf(_data_DB->command, "DROP TABLE IF EXISTS Data;"); int rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command, 128, &ppStmt, &pzTail); if( rc!=SQLITE_OK && rc!=101 ) printf("ServiceManagementLayer::CreateServicesDB 'prepare_stmt' error %d\n", rc); rc = sqlite3_step(ppStmt); if( rc!=SQLITE_OK && rc!=101 ) printf("ServiceManagementLayer::CreateServicesDB 'step' error\n"); // number of columns in the table _data_DB->num_columns = 2; // generate command strcpy(_data_DB->command, "CREATE TABLE "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, "("); strcat(_data_DB->command, cols[0]); //First column is the name of the data (coresponding to the name of the output/input pair) //It is the primary key so any subsequent data with the same name will replace the row strcat(_data_DB->command, " TEXT PRIMARY KEY ON CONFLICT REPLACE, "); strcat(_data_DB->command, cols[1]); strcat(_data_DB->command, " TEXT"); strcat(_data_DB->command, ");"); // execute create table command rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command, 128, &ppStmt, &pzTail); if( rc!=SQLITE_OK && rc!=101 ) printf("ServiceManagementLayer::CreateDataDB 'prepare_stmt' error %d\n", rc); rc = sqlite3_step(ppStmt); if( rc!=SQLITE_OK && rc!=101 ) printf("ServiceManagementLayer::CreateDataDB 'step' error\n"); } /* CALLED BY: MessageHandler * INPUTS: * OUTPUTS: * * DESCRIPTION: Sends a message identifying this component as an SML to the Shell */ void ServiceManagementLayer::SendComponentType() { SendMessage(shellSocketFD, "response_sml"); LOG("SML responded to GetRemoteComponentType query.\n"); } /* CALLED BY: constructor * INPUTS: |serverName| the IPv4 name of the server (127.0.0.1 for localhost) * |serverPort| the port on the server to connect to * OUTPUTS: * * DESCRIPTION: Connecting to the shell takes 2 steps * 1) Establish a client socket for communication * 2) Run the initial Registration/handshake routine */ void ServiceManagementLayer::ConnectToShell(const char* serverName, \ const char* serverPort) { shellSocketFD = ClientSocket(serverName, serverPort); RegisterComponent(); } /* CALLED BY: StartSMLServer * INPUTS: |ID| The ID number of the CE that has a message wating * OUTPUTS: * * DESCRIPTION: Called whenever a socket is identified as being ready for communication * This funciton reads the message and calls the appropriate helper */ void ServiceManagementLayer::MessageHandler(int32_t ID) { char buffer[256]; memset(buffer, 0, 256); int32_t _FD; if(ID != -1) _FD = CE_List[ID].FD; else _FD = shellSocketFD; ReadMessage(_FD, buffer); //printf("MH_buffer = %s\n", buffer); //--------Policy Engine Stuff - no policy engine support in this version-------// //printf("********* %s **********\n", buffer); // TODO // If we send integer op codes rather than strings, this process will be // MUCH faster since instead of donig string compares we can simply // switch on the integer value... /*if(strcmp(buffer, "register_service") == 0) { if(strcmp(buffer, "policy_geo") == 0) { } else if(strcmp(buffer, "policy_time") == 0) { } else if(strcmp(buffer, "policy_spectrum") == 0) { } else if(strcmp(buffer, "policy_spacial") == 0) { } } else if(strcmp(buffer, "deregister_service") == 0) { if(strcmp(buffer, "policy_geo") == 0) { } else if(strcmp(buffer, "policy_time") == 0) { } else if(strcmp(buffer, "policy_spectrum") == 0) { } else if(strcmp(buffer, "policy_spacial") == 0) { } }*/ //Go down the list to call the appropriate function if(strcmp(buffer, "query_component_type") == 0) { SendComponentType(); } else if(strcmp(buffer, "reset_sml") == 0) { Reset(); } else if(strcmp(buffer, "shutdown_sml") == 0) { Shutdown(); } else if(strcmp(buffer, "register_engine_cognitive") == 0) { RegisterCognitiveEngine(ID); } else if(strcmp(buffer, "register_service") == 0) { ReceiveServices(ID); } else if(strcmp(buffer, "send_component_type") == 0) { SendComponentType(); } else if(strcmp(buffer, "list_services") == 0) { ListServices(); } else if(strcmp(buffer, "set_active_mission") == 0) { SetActiveMission(); } else if(strcmp(buffer, "request_optimization") == 0) { PerformActiveMission(); } else if(strcmp(buffer, "deregister_engine_cognitive") == 0) { DeregisterCognitiveEngine(ID); } else if(strcmp(buffer, "deregister_service") == 0) { DeregisterServices(ID); } } //TODO Finish /* CALLED BY: MessageHandler * INPUTS: * OUTPUTS: * * DESCRIPTION: Deregisters the component from the Shell. */ void ServiceManagementLayer::Shutdown() { DeregisterComponent(); } //TODO Finish /* CALLED BY: MessageHandler * INPUTS: * OUTPUTS: * * DESCRIPTION: Deregisters the component from the Shell */ void ServiceManagementLayer::Reset() { DeregisterComponent(); ReloadConfiguration(); } /* CALLED BY: ConnectToShell * INPUTS: * OUTPUTS: * * DESCRIPTION: Sends the registration message to the Shell */ void ServiceManagementLayer::RegisterComponent() { SendMessage(shellSocketFD, "register_sml"); LOG("ServiceManagementLayer:: Registration message sent.\n"); //printf("SSFD = %d\n", shellSocketFD); } /* CALLED BY: Shutdown * INPUTS: * OUTPUTS: * * DESCRIPTION: Closes the client socket with the shell, sends a deregstration message */ void ServiceManagementLayer::DeregisterComponent() { SendMessage(shellSocketFD, "deregister_sml"); LOG("ServiceManagementLayer:: Deregistration message sent.\n"); shutdown(shellSocketFD, 2); close(shellSocketFD); shellSocketFD = -1; LOG("ServiceManagementLayer:: Shell socket closed.\n"); } /* CALLED BY: RegisterCognitiveEngine * INPUTS: |ID| The ID number of the component where the data is to be transfered to * OUTPUTS: * * DESCRIPTION: Streams config data directly from the shell to the CE, and checks * for an "ack" message from the CE after every sent message * to know when to stop communication. */ //Modified to check the incoming message buffer rather than the outgoing message buffer to avoid a portion of the delay //May change this again to handle data more inteligently, taking advantage of it's properties. void ServiceManagementLayer::TransferRadioConfiguration(int32_t ID) { //printf("transRadConfig\n"); struct timeval selTimeout; fd_set sockSet; int32_t rc = 1; char buffer[256]; //Send data until the CE sends an ACK message back while(rc!=0){ memset(buffer, 0, 256); //Receive data from Shell ReadMessage(shellSocketFD, buffer); //printf("buffer = %s\n", buffer); //Send data to CE SendMessage(CE_List[ID].FD, buffer); FD_ZERO(&sockSet); FD_SET(shellSocketFD, &sockSet); selTimeout.tv_sec = 0; selTimeout.tv_usec = 5000; //Check if there is a message on the shell ready to be processed rc=select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout); } memset(buffer, 0, 256); ReadMessage(CE_List[ID].FD, buffer); SendMessage(shellSocketFD, buffer); //printf("transfer done!\n"); } /* CALLED BY: RegisterCognitiveEngine * INPUTS: |ID| The ID number of the component where the data is to be transfered to * OUTPUTS: * * DESCRIPTION: Simmilar to TransferRadioConfig, just with Experience data */ //Modified to check the incoming message buffer rather than the outgoing message buffer to avoid a portion of the delay //May change this again to handle data more inteligently, taking advantage of it's properties. void ServiceManagementLayer::TransferExperience(int32_t ID) { struct timeval selTimeout; fd_set sockSet; int32_t rc = 1; char buffer[256]; //Send data until the CE sends an ACK message back while(rc!=0){ //printf("transfering...\n"); memset(buffer, 0, 256); //Receive data from Shell ReadMessage(shellSocketFD, buffer); //printf("buffer = %s\n", buffer); //Send data to CE SendMessage(CE_List[ID].FD, buffer); FD_ZERO(&sockSet); FD_SET(shellSocketFD, &sockSet); selTimeout.tv_sec = 0; selTimeout.tv_usec = 5000; //Check if there is a message on the shell ready to be processed rc=select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout); } memset(buffer, 0, 256); //printf("done trans exp!\n"); ReadMessage(CE_List[ID].FD, buffer); SendMessage(shellSocketFD, buffer); } /* CALLED BY: MessageHandler * INPUTS: |ID| The ID number of the component where service is located * OUTPUTS: * * DESCRIPTION: Inserts a service into the DB with the ID of the component where it exists */ void ServiceManagementLayer::ReceiveServices(int32_t ID) { char buffer[256]; memset(buffer, 0, 256); ReadMessage(CE_List[ID].FD, buffer); char* cols[] = {(char *)"ID_Num", (char *)"Service_Name"}; //printf("RS_buffer = %s\n", buffer); // generate command strcpy(_services_DB->command, "insert into "); strcat(_services_DB->command, _services_DB->tablename); strcat(_services_DB->command, " ("); strcat(_services_DB->command, cols[0]); strcat(_services_DB->command, ", "); strcat(_services_DB->command, cols[1]); strcat(_services_DB->command, ") "); strcat(_services_DB->command, " values("); sprintf(_services_DB->command, "%s%d", _services_DB->command, ID); strcat(_services_DB->command, ", '"); strcat(_services_DB->command, buffer); strcat(_services_DB->command, "');"); //printf("search command: %s\n", _services_DB->command); // execute add command char *errorMsg; int rc = sqlite3_exec(_services_DB->db, _services_DB->command, callback, 0, &errorMsg); if( rc!=SQLITE_OK && rc!=101 ) fprintf(stderr, "ServiceManagementLayer::RecieveServices DB Error %s\n", errorMsg); /*sprintf(outBuffer, "SML: Registering service '%s' from component number '%d'", buffer, ID); LOG(outBuffer);*/ } /* CALLED BY: MessageHandler * INPUTS: * OUTPUTS: * * DESCRIPTION: This method associates the services that components provide with the services that are requested in the mission * Each service in the mission is given the ID and FD of a component that has registered to provide that service * Deregistration is okay until this method is called without a reload, but if deregistration occurs after this * method is called it needs to be called again even if other engines also provide the services */ void ServiceManagementLayer::SetActiveMission() { char buffer[256]; memset(buffer, 0, 256); ReadMessage(shellSocketFD, buffer); uint32_t missID = atoi(buffer); for(activeMission = 0; activeMission < 10; activeMission++) { //Find the active mission by comparing mission ID's if(miss[activeMission].missionID == missID) break; } LOG("ServiceManagementLayer:: Received Set Active Mission command: %i.\n",missID); //For each service in the mission for(uint16_t i = 0; i < miss[activeMission].numServices; i++) { //Check whether the current service is an actual service or a conditional if(miss[activeMission].services[i].name.compare("if") && miss[activeMission].services[i].name.compare("dowhile") && \ miss[activeMission].services[i].name.compare("shell")){ //If it is a service, search the database of registered services to find the ID of the component that registered it strcpy(_services_DB->command, "select "); strcat(_services_DB->command, _services_DB->tablename); strcat(_services_DB->command, ".* from "); strcat(_services_DB->command, _services_DB->tablename); strcat(_services_DB->command, " where Service_Name=="); sprintf(_services_DB->command, "%s'%s';", _services_DB->command, miss[activeMission].services[i].name.c_str()); sqlite3_stmt * pStatement; int rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command, -1, &pStatement, NULL); if (rc == SQLITE_OK){ if (sqlite3_step(pStatement) == SQLITE_ROW) miss[activeMission].services[i].componentID = sqlite3_column_int(pStatement, 0); else { printf("services_DB:: Mission requires service %s not provided by any connected component.\n",miss[activeMission].services[i].name.c_str()); rc=31337; } } else { printf("services_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_services_DB->command); } //printf("s_name=%s\n",miss[activeMission].services[i].name.c_str()); sqlite3_finalize(pStatement); miss[activeMission].services[i].socketFD = CE_List[miss[activeMission].services[i].componentID].FD; //Set the FD and ID of the service to refer to the component where the service exists } //Nothing to be done for conditionals at this stage } SendMessage(shellSocketFD, "ack"); LOG("ServiceManagementLayer:: Done setting active mission.\n"); //printf("\nhere ---%d, %d---\n", miss[activeMission].services[0].componentID, miss[activeMission].services[1].componentID); } /* CALLED BY: PerformActiveMission * INPUTS: |sourceID| ID of the service that is being processed * OUTPUTS: * * DESCRIPTION: This is a helper method for the "PerformActiveMission" function * NOTE: This function has changed durrastically from the previous implementation * Takes an ID of a service * For that service, finds inputs in DB and forwords those on to the engine after sending comm-starting messages * Afterwords, listenes for the outputs so that it can store those in the database for future services or the overall output */ void ServiceManagementLayer::TransactData(int32_t sourceID) { // LOG("ServiceManagementLayer:: Data transaction occuring.\n"); char buffer[256]; std::string data; char* cols[] = {(char *)"Tag", (char *)"Data"}; int i = 0; char *token; //Send a message directly to the shell if(miss[activeMission].services[sourceID].name.find("shell")!=string::npos) { int k = 0; shellFound=true; while(k<10 && !miss[activeMission].input[k].empty()){ k++; } sprintf(buffer, "%d", k); SendMessage(shellSocketFD, buffer); for(int t = 0; t < k; t++){ memset(buffer, 0 , 256); strcpy(_data_DB->command, "select "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, ".* from "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, " where Tag=="); sprintf(_data_DB->command, "%s'%s';", _data_DB->command, miss[activeMission].input[t].c_str()); sqlite3_stmt * pStatement; int rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command, -1, &pStatement, NULL); if (rc == SQLITE_OK){ if (sqlite3_step(pStatement) == SQLITE_ROW) data=((const char*) sqlite3_column_text(pStatement, 1)); else { printf("data_DB:: Data not yet in DB.\n"); rc=31337; } } else { printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command); } sqlite3_finalize(pStatement); token = strtok((char *)data.c_str(), "@"); token = strtok(NULL, "@"); SendMessage(shellSocketFD, token); token = strtok(NULL, "@"); SendMessage(shellSocketFD, token); } return; } //If this is a service command and not a shell command... //Transmission starting messages SendMessage(miss[activeMission].services[sourceID].socketFD, "request_optimization_service"); SendMessage(miss[activeMission].services[sourceID].socketFD, miss[activeMission].services[sourceID].name.c_str()); //If the service takes a parameter, feed that parameter in if(!miss[activeMission].services[sourceID].parameter.empty()){ //printf("sending parameter!\n"); SendMessage(miss[activeMission].services[sourceID].socketFD, "1"); SendMessage(miss[activeMission].services[sourceID].socketFD, "parameter"); SendMessage(miss[activeMission].services[sourceID].socketFD, miss[activeMission].services[sourceID].parameter.c_str()); } //Load and transmit the input data while(i < 10 && !miss[activeMission].services[sourceID].input[i].empty()){ strcpy(_data_DB->command, "select "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, ".* from "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, " where Tag=="); sprintf(_data_DB->command, "%s'%s';", _data_DB->command, miss[activeMission].services[sourceID].input[i].c_str()); sqlite3_stmt * pStatement; int rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command, -1, &pStatement, NULL); if (rc == SQLITE_OK){ if (sqlite3_step(pStatement) == SQLITE_ROW) data.append((const char*) sqlite3_column_text(pStatement, 1)); else { printf("data_DB:: Data not yet in DB.\n"); rc=31337; } } else { printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command); } sqlite3_finalize(pStatement); token = strtok((char *)data.c_str(), "@"); while(token){ SendMessage(miss[activeMission].services[sourceID].socketFD, token); token = strtok(NULL, "@"); } i++; data.clear(); } int32_t j = 0; //Receive and store the output data while(j < 10 && !miss[activeMission].services[sourceID].output[j].empty()){ int rc; memset(buffer, 0, 256); //Read the number of datapairs for this output ReadMessage(miss[activeMission].services[sourceID].socketFD, buffer); data.append(buffer); data.append("@"); int t = atoi(buffer); for(int k = 0; k < t; k++){ //Read the datapairs incrementally and deliminate it with the "@" symbol memset(buffer, 0, 256); ReadMessage(miss[activeMission].services[sourceID].socketFD, buffer); data.append(buffer); data.append("@"); memset(buffer, 0, 256); ReadMessage(miss[activeMission].services[sourceID].socketFD, buffer); data.append(buffer); data.append("@"); } strcpy(_data_DB->command, "insert or replace into "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, " ("); strcat(_data_DB->command, cols[0]); strcat(_data_DB->command, ", "); strcat(_data_DB->command, cols[1]); strcat(_data_DB->command, ") "); strcat(_data_DB->command, " values('"); strcat(_data_DB->command, miss[activeMission].services[sourceID].output[j].c_str()); strcat(_data_DB->command, "', '"); strcat(_data_DB->command, data.c_str()); strcat(_data_DB->command, "');"); char *errorMsg; rc = sqlite3_exec(_data_DB->db, _data_DB->command, callback, 0, &errorMsg); if( rc!=SQLITE_OK && rc!=101 ) fprintf(stderr, "SQL error: %s\n", errorMsg); //printf("S: done putting ouptut data into DB for ID#='%d', data=%s\n", sourceID, data.c_str()); j++; data.clear(); } //printf("done transact data!\n"); // LOG("ServiceManagementLayer:: Finished with data transaction.\n"); /*printf("\n\n\n"); // generate commandi strcpy(_data_DB->command, "select "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, ".* from "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, ";"); // execute print (select all) command char *errorMsg; int rc = sqlite3_exec(_data_DB->db, _data_DB->command, callback, 0, &errorMsg); if( rc!=SQLITE_OK && rc!=101 ) fprintf(stderr, "SQL error: %s\n", errorMsg); printf("database %s, table %s:\n", _data_DB->filename, _data_DB->tablename); printf("\n\n\n");*/ } /* CALLED BY: MessageHandler * INPUTS: * OUTPUTS: * * DESCRIPTION: This function works by first sending the inputs from the shell to the appropriate components * The first service should begin immeadiately, as should any others who have all of their input parameters * When they complete, the output path is found and the data is transfered as it becomes available * Presumably at this point the second function has all of it's parameters, so it begins to compute, and the cycle repeats * * * Rules for active missions (currently) * -Five inputs/outputs per service and per mission * -All ordering constraints have been relaxed in this version; all data is stored locally and only sent when requested * -If and while support fully implemented - up to three levels (if's can be nested, but dowhiles cannot) * -For dowhiles, assumes loop condition determined on last line * -IMPORTANT: DB uses '@' to seperate individual statements; using '@' in the data stream will result in incorrect behavior */ //WHILE void ServiceManagementLayer::PerformActiveMission() { //printf("start PAM\n"); uint16_t i = 0, t; shellFound = false; std::string data_param, data_obsv, data; std::string input; std::string check; char buffer[256]; char buffer1[256]; char *token, *token2; std::string data2; int rc; char *errorMsg; char* cols[] = {(char *)"Tag", (char *)"Data"}; LOG("ServiceManagementLayer:: Received PerformActiveMission command.\n"); //Get the inputs memset(buffer, 0, 256); //Needed to read the zero passed in by GOP as the number of observables ReadMessage(shellSocketFD, buffer); /* Receive Set of Parameters */ memset(buffer, 0, 256); ReadMessage(shellSocketFD, buffer); t=atoi(buffer); for(int m = 0; m < t; m++) { //printf("data=%s\n", data_obsv.c_str()); memset(buffer1, 0, 256); ReadMessage(shellSocketFD, buffer1); strcpy(_data_DB->command, "insert into "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, " ("); strcat(_data_DB->command, cols[0]); strcat(_data_DB->command, ", "); strcat(_data_DB->command, cols[1]); strcat(_data_DB->command, ") "); memset(buffer, 0, 256); ReadMessage(shellSocketFD, buffer); sprintf(_data_DB->command, "%s values('%s', '1@%s@%s');", _data_DB->command, buffer1, buffer1, buffer); rc = sqlite3_exec(_data_DB->db, _data_DB->command, callback, 0, &errorMsg); if( rc!=SQLITE_OK && rc!=101 ) fprintf(stderr, "SQL error: %s\n", errorMsg); } //Useful for spotchecking what's in the database /*printf("\n\n\n"); // generate commandi strcpy(_data_DB->command, "select "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, ".* from "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, ";"); // execute print (select all) command rc = sqlite3_exec(_data_DB->db, _data_DB->command, callback, 0, &errorMsg); if( rc!=SQLITE_OK && rc!=101 ) fprintf(stderr, "SQL error: %s\n", errorMsg); printf("database %s, table %s:\n", _data_DB->filename, _data_DB->tablename); printf("\n\n\n");*/ i=0; int32_t numstatements[3] = {0,0,0}; while(i < miss[activeMission].numServices) { if(miss[activeMission].services[i].name.compare("if")==0) { //printf("L0:if detected\n"); input.clear(); check.clear(); int t; for(t = 0; t < 10; t++){ if(!miss[activeMission].services[i].output[t].empty()){ //printf("i-numstmts-1 = %d\n", i-numstatements[0]-1); input=miss[activeMission].services[i-numstatements[0]-1].output[t]; strcpy(_data_DB->command, "SELECT "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, ".* from "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, " where Tag=="); sprintf(_data_DB->command, "%s'%s';", _data_DB->command, input.c_str()); sqlite3_stmt * pStatement; rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command, -1, &pStatement, NULL); if (rc == SQLITE_OK){ if (sqlite3_step(pStatement) == SQLITE_ROW) data = (const char *) sqlite3_column_text(pStatement, 1); else { printf("1 data_DB:: Data not yet in DB.\n"); rc=31337; } } else { printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command); } sqlite3_finalize(pStatement); //printf("data=%s\n", data.c_str()); token = strtok((char *)data.c_str(), "@"); token = strtok(NULL, "@"); token = strtok(NULL, "@"); //printf("data=%s\n", token); data.clear(); break; } } //printf("L0:--- %s %s---\n", miss[activeMission].services[i].output[t].c_str(), token); bool doit = false; if(miss[activeMission].services[i].output[t].find(">") != string::npos){ std::string data2; strcpy(_data_DB->command, "SELECT "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, ".* from "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, " where Tag=="); miss[activeMission].services[i].output[t].erase(0, 1); //printf("here! %s\n", miss[activeMission].services[i].output[t].c_str()); sprintf(_data_DB->command, "%s'%s';", _data_DB->command, miss[activeMission].services[i].output[t].c_str()); sqlite3_stmt * pStatement; rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command, -1, &pStatement, NULL); if (rc == SQLITE_OK){ if (sqlite3_step(pStatement) == SQLITE_ROW) data2 = (const char *) sqlite3_column_text(pStatement, 1); else { printf("1 data_DB:: Data not yet in DB.\n"); rc=31337; } } else { printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command); } sqlite3_finalize(pStatement); char* token2 = strtok((char *)data2.c_str(), "@"); token2 = strtok(NULL, "@"); token2 = strtok(NULL, "@"); if(atof(token) > atof(token2)) doit=true; //printf("%s %s\n", buffer, token); } else if (strstr(miss[activeMission].services[i].output[t].c_str(), token)) doit=true; if(doit){ //if(strstr(miss[activeMission].services[i].output[t].c_str(), token)){ //printf("L0:if taken\n"); for(uint16_t k = i+1; k <= i+miss[activeMission].services[i].num_conds; k++){ //printf("transacting data for k=%d\n", k); //printf("%s---%d\n", miss[activeMission].services[k].name.c_str(), k); if(miss[activeMission].services[k].name.compare("if")==0){ //printf("L1:if detected\n"); input.clear(); check.clear(); for(t = 0; t < 10; t++){ if(!miss[activeMission].services[k].output[t].empty()){ //printf("i-numstmts = %d\n", i-numstatements-1); input=miss[activeMission].services[k-numstatements[1]-1].output[t]; strcpy(_data_DB->command, "SELECT "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, ".* from "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, " where Tag=="); sprintf(_data_DB->command, "%s'%s';", _data_DB->command, input.c_str()); sqlite3_stmt * pStatement; rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command, -1, &pStatement, NULL); if (rc == SQLITE_OK){ if (sqlite3_step(pStatement) == SQLITE_ROW) data = (const char *) sqlite3_column_text(pStatement, 1); else { printf("1 data_DB:: Data not yet in DB.\n"); rc=31337; } } else { printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command); } sqlite3_finalize(pStatement); //printf("data=%s\n", data.c_str()); token = strtok((char *)data.c_str(), "@"); token = strtok(NULL, "@"); token = strtok(NULL, "@"); //printf("data=%s\n", token); break; } } //printf("L1:--- %s %s---\n", miss[activeMission].services[k].output[t].c_str(), token); bool doit = false; //printf("here! %s\n", miss[activeMission].services[k].output[t].c_str()); if(miss[activeMission].services[k].output[t].find(">") != string::npos){ std::string data2; strcpy(_data_DB->command, "SELECT "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, ".* from "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, " where Tag=="); miss[activeMission].services[k].output[t].erase(0, 1); sprintf(_data_DB->command, "%s'%s';", _data_DB->command, miss[activeMission].services[k].output[t].c_str()); sqlite3_stmt * pStatement; rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command, -1, &pStatement, NULL); if (rc == SQLITE_OK){ if (sqlite3_step(pStatement) == SQLITE_ROW) data2 = (const char *) sqlite3_column_text(pStatement, 1); else { printf("1 data_DB:: Data not yet in DB.\n"); rc=31337; } } else { printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command); } sqlite3_finalize(pStatement); //printf("data=%s token=%s\n", data2.c_str(), token); token2 = strtok((char *)data2.c_str(), "@"); token2 = strtok(NULL, "@"); token2 = strtok(NULL, "@"); //printf("token2 %s token %s\n", token2, token); if(atof(token) > atof(token2)) doit=true; } else if (strstr(miss[activeMission].services[k].output[t].c_str(), token)) doit=true; if(doit){ //printf("L1:if taken\n"); for(uint16_t j = k+1; j <= k+miss[activeMission].services[k].num_conds; j++){ //printf("transacting data for k=%d\n", k); //printf("%s---%d\n", miss[activeMission].services[j].name.c_str(), j); if(miss[activeMission].services[j].name.compare("if")==0){ //printf("L2:if detected\n"); input.clear(); check.clear(); for(t = 0; t < 10; t++){ if(!miss[activeMission].services[j].output[t].empty()){ //printf("i-numstmts = %d\n", i-numstatements-1); input=miss[activeMission].services[j-numstatements[2]-1].output[t]; strcpy(_data_DB->command, "SELECT "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, ".* from "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, " where Tag=="); sprintf(_data_DB->command, "%s'%s';", _data_DB->command, input.c_str()); sqlite3_stmt * pStatement; rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command, -1, &pStatement, NULL); if (rc == SQLITE_OK){ if (sqlite3_step(pStatement) == SQLITE_ROW) data = (const char *) sqlite3_column_text(pStatement, 1); else { printf("1 data_DB:: Data not yet in DB.\n"); rc=31337; } } else { printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command); } sqlite3_finalize(pStatement); //printf("data=%s\n", data.c_str()); token = strtok((char *)data.c_str(), "@"); token = strtok(NULL, "@"); token = strtok(NULL, "@"); //printf("data=%s\n", token); data.clear(); break; } } //printf("L2:---%s||%s---\n", miss[activeMission].services[j].output[t].c_str(), token); bool doit = false; //printf("here! %s\n", miss[activeMission].services[j].output[t].c_str()); if(miss[activeMission].services[j].output[t].find(">") != string::npos){ strcpy(_data_DB->command, "SELECT "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, ".* from "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, " where Tag=="); miss[activeMission].services[k].output[t].erase(0, 1); sprintf(_data_DB->command, "%s'%s';", _data_DB->command, miss[activeMission].services[j].output[t].c_str()); sqlite3_stmt * pStatement; rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command, -1, &pStatement, NULL); if (rc == SQLITE_OK){ if (sqlite3_step(pStatement) == SQLITE_ROW) data2 = (const char *) sqlite3_column_text(pStatement, 1); else { printf("1 data_DB:: Data not yet in DB.\n"); rc=31337; } } else { printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command); } sqlite3_finalize(pStatement); token2 = strtok((char *)data2.c_str(), "@"); token2 = strtok(NULL, "@"); token2 = strtok(NULL, "@"); if(atof(token) > atof(token2)) doit=true; data.clear(); } else if (strstr(miss[activeMission].services[j].output[t].c_str(), token)) doit=true; if(doit){ //printf("L1:if taken\n"); for(uint16_t l = j+1; l <= j+miss[activeMission].services[j].num_conds; l++){ //printf("transacting data for k=%d\n", k); TransactData(l); } } else //printf("L2: if not taken\n"); numstatements[2] +=miss[activeMission].services[j].num_conds+1; j+=miss[activeMission].services[j].num_conds; //printf("doneif %d, %d, %d\n", numstatements, miss[activeMission].services[i].num_conds, i); }else if(miss[activeMission].services[j].name.compare("dowhile")==0){ numstatements[0]=0; //printf("L2 while detected\n"); while(true){ uint16_t l; for(l = j+1; l <= j+miss[activeMission].services[j].num_conds; l++){ TransactData(l); //printf("l=%d\n", l); } data.clear(); //printf("L2:while detected %d, %d\n", k, miss[activeMission].services[i].num_conds); input.clear(); check.clear(); int t; for(t = 0; t < 10; t++){ if(!miss[activeMission].services[j].output[t].empty()){ input=miss[activeMission].services[l-1].output[t]; strcpy(_data_DB->command, "SELECT "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, ".* from "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, " where Tag=="); sprintf(_data_DB->command, "%s'%s';", _data_DB->command, input.c_str()); sqlite3_stmt * pStatement; rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command, -1, &pStatement, NULL); if (rc == SQLITE_OK){ if (sqlite3_step(pStatement) == SQLITE_ROW) data = (const char *) sqlite3_column_text(pStatement, 1); else { printf("1 data_DB:: Data not yet in DB.\n"); rc=31337; } } else { printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command); } sqlite3_finalize(pStatement); //printf("data=%s\n", data.c_str()); token = strtok((char *)data.c_str(), "@"); token = strtok(NULL, "@"); token = strtok(NULL, "@"); //printf("data=%s\n", token); break; } } //printf("L2:--- %s %s---\n", miss[activeMission].services[j].output[t].c_str(), token); if(strstr(miss[activeMission].services[j].output[t].c_str(), token)){ //printf("L2:while taken again!\n"); } else { //printf("no more while\n"); break;} } j+=miss[activeMission].services[j].num_conds; //printf("doneif\n"); } else{ //printf("NO L2 COND!\n"); numstatements[2]=0; TransactData(j); } } } else{ //printf("L1: if not taken\n"); } numstatements[1] +=miss[activeMission].services[k].num_conds+1; k+=miss[activeMission].services[k].num_conds; //printf("doneif %d, %d, %d\n", numstatements, miss[activeMission].services[i].num_conds, i); }else if(miss[activeMission].services[k].name.compare("dowhile")==0){ numstatements[0]=0; //printf("L1 while detected\n"); while(true){ uint16_t j; for(j = k+1; j <= k+miss[activeMission].services[k].num_conds; j++){ TransactData(j); } data.clear(); //printf("L1:while detected %d, %d\n", k, miss[activeMission].services[i].num_conds); input.clear(); check.clear(); int t; for(t = 0; t < 10; t++){ if(!miss[activeMission].services[k].output[t].empty()){ input=miss[activeMission].services[j-1].output[t]; //printf("input=%s\n", input.c_str()); strcpy(_data_DB->command, "SELECT "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, ".* from "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, " where Tag=="); sprintf(_data_DB->command, "%s'%s';", _data_DB->command, input.c_str()); sqlite3_stmt * pStatement; rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command, -1, &pStatement, NULL); if (rc == SQLITE_OK){ if (sqlite3_step(pStatement) == SQLITE_ROW) data = (const char *) sqlite3_column_text(pStatement, 1); else { printf("1 data_DB:: Data not yet in DB.\n"); rc=31337; } } else { printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command); } sqlite3_finalize(pStatement); //printf("data=%s\n", data.c_str()); token = strtok((char *)data.c_str(), "@"); token = strtok(NULL, "@"); token = strtok(NULL, "@"); //printf("data=%s\n", token); break; } } //printf("L1:--- %s %s---\n", miss[activeMission].services[k].output[t].c_str(), token); if(strstr(miss[activeMission].services[k].output[t].c_str(), token)){ //printf("L1:while taken again!\n"); } else { //printf("no more while\n"); break;} } k+=miss[activeMission].services[k].num_conds; //printf("doneif\n"); } else{ //printf("NO L1 COND!\n"); numstatements[1]=0; TransactData(k); } //numstatements[0] +=miss[activeMission].services[i].num_conds+1; //i+=miss[activeMission].services[i].num_conds; //printf("doneif %d, %d, %d\n", numstatements, miss[activeMission].services[i].num_conds, i); } } // else //printf("LO if not taken\n"); numstatements[0] +=miss[activeMission].services[i].num_conds+1; i+=miss[activeMission].services[i].num_conds; //printf("doneif %d, %d, %d\n", numstatements, miss[activeMission].services[i].num_conds, i); } else if(miss[activeMission].services[i].name.compare("dowhile")==0) { numstatements[0]=0; //printf("L0 while detected\n"); while(true){ uint16_t k; for(k = i+1; k <= i+miss[activeMission].services[i].num_conds; k++){ TransactData(k); } data.clear(); input.clear(); check.clear(); int t; for(t = 0; t < 10; t++){ if(!miss[activeMission].services[i].output[t].empty()){ input=miss[activeMission].services[k-1].output[t]; //printf("input=%s\n", input.c_str()); strcpy(_data_DB->command, "SELECT "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, ".* from "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, " where Tag=="); sprintf(_data_DB->command, "%s'%s';", _data_DB->command, input.c_str()); sqlite3_stmt * pStatement; rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command, -1, &pStatement, NULL); if (rc == SQLITE_OK){ if (sqlite3_step(pStatement) == SQLITE_ROW) data = (const char *) sqlite3_column_text(pStatement, 1); else { printf("1 data_DB:: Data not yet in DB.\n"); rc=31337; } } else { printf("data_DB:: Error executing SQL statement. rc = %i\n%s\n",rc,_data_DB->command); } sqlite3_finalize(pStatement); token = strtok((char *)data.c_str(), "@"); token = strtok(NULL, "@"); token = strtok(NULL, "@"); break; } } //printf("L0:--- %s %s---\n", miss[activeMission].services[i].output[t].c_str(), token); if(strstr(miss[activeMission].services[i].output[t].c_str(), token)){ //printf("L0:while taken again!\n"); } else { //printf("no more while\n"); break;} } i+=miss[activeMission].services[i].num_conds; //printf("doneif\n"); } else{ numstatements[0]=0; //printf("L0 Neither if nor while\n"); TransactData(i);} i++; } i=0; data.clear(); if(!shellFound) { int k = 0; while(k<10 && !miss[activeMission].input[k].empty()){ k++; } sprintf(buffer, "%d", k); SendMessage(shellSocketFD, buffer); for(int t = 0; t < k; t++){ //printf("---%s---\n", miss[activeMission].input[t].c_str()); SendMessage(shellSocketFD, miss[activeMission].input[t].c_str()); SendMessage(shellSocketFD, "0"); } } LOG("ServiceManagementLayer:: Done performing active mission.\n"); /* strcpy(_data_DB->command, "select "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, ".* from "); strcat(_data_DB->command, _data_DB->tablename); strcat(_data_DB->command, ";"); // execute print (select all) command rc = sqlite3_exec(_data_DB->db, _data_DB->command, callback, 0, &errorMsg); if( rc!=SQLITE_OK && rc!=101 ) fprintf(stderr, "SQL error: %s\n", errorMsg); printf("database %s, table %s:\n", _data_DB->filename, _data_DB->tablename); printf("\n\n\n");*/ } /* CALLED BY: MessageHandler * INPUTS: * OUTPUTS: * * DESCRIPTION: Print a list of the services currently registered and the ID's of the components that registered them */ void ServiceManagementLayer::ListServices() { strcpy(_services_DB->command, "select "); strcat(_services_DB->command, _services_DB->tablename); strcat(_services_DB->command, ".* from "); strcat(_services_DB->command, _services_DB->tablename); strcat(_services_DB->command, ";"); // execute print (select all) command char *errorMsg; int rc = sqlite3_exec(_services_DB->db, _services_DB->command, callback, 0, &errorMsg); if( rc!=SQLITE_OK && rc!=101 ) fprintf(stderr, "SQL error: %s\n", errorMsg); printf("database %s, table %s:\n", _services_DB->filename, _services_DB->tablename); } /* CALLED BY: Reset * INPUTS: * OUTPUTS: * * DESCRIPTION: Clear and reinitialize the mission array, then reload the configuration file */ void ServiceManagementLayer::ReloadConfiguration() { LOG("ServiceManagementLayer:: Reloading Configuration.\n"); free(miss); miss = new Mission[10]; for(int i = 0; i < 10; i++) miss[i].services = new Service[30]; LoadConfiguration(_SML_Config, miss); } /* CALLED BY: constructor * INPUTS: |SML_Config| Address (either relitive or full) of the XML file containing mission data * |mList| Mission array to be modified * OUTPUTS: * * DESCRIPTION: IMPORTANT - See formatting instructions for correct parsing of data * Can currently handle 10 inputs and 10 outputs per service, but easily expandable * Also, can handle two layer of nested conditional statements, but could * be expanded to meet additional needs. * * Components assigned to mission during "set active mission" stage so that * components can still continue to register after the configuration is loaded */ void ServiceManagementLayer::LoadConfiguration(const char *SML_Config, Mission* &mList) { TiXmlElement *pMission; TiXmlElement *pService; TiXmlElement *pChild0, *pChild1, *pChild2, *pChild3, *pChild4; TiXmlHandle hRoot(0); printf("ServiceManagementLayer:: Loading Configuration.\n"); TiXmlDocument doc("."); doc.LoadFile(SML_Config); bool loadOkay = doc.LoadFile(); if(!loadOkay) printf("Loading SML configuration failed: %s\n", SML_Config); TiXmlHandle hDoc(&doc); pMission = hDoc.FirstChildElement().Element(); if(!pMission) printf("No valid root!"); hRoot = TiXmlHandle(pMission); pService = pMission->FirstChildElement(); int32_t mission_num = 0; //Iterate through the missions for(pChild0 = pMission->FirstChildElement(); pChild0 ; \ pChild0 = pChild0->NextSiblingElement()) { int32_t service_num = 0; uint16_t cond_array[] = {0, 0, 0}; for(pChild1 = (pChild0->FirstChildElement())->FirstChildElement(); pChild1; \ pChild1 = pChild1->NextSiblingElement()) { int32_t conditional_0 = service_num; for(pChild2 = pChild1->FirstChildElement(); \ pChild2; pChild2 = pChild2->NextSiblingElement()) { service_num++; int32_t conditional_1 = service_num; for(pChild3 = pChild2->FirstChildElement(); \ pChild3; pChild3 = pChild3->NextSiblingElement()) { service_num++; int32_t conditional_2 = service_num; for(pChild4 = pChild3->FirstChildElement(); \ pChild4; pChild4 = pChild4->NextSiblingElement()) { service_num++; if(pChild4->Attribute("name")) mList[mission_num].services[service_num].name = pChild4->Attribute("name"); else mList[mission_num].services[service_num].name = pChild4->Value(); for(int i = 1; i <= 10; i++) { char buffer[9]="input"; sprintf(buffer, "%s%d", buffer, i); //printf("buffer=%s\n", buffer); if(pChild4->Attribute(buffer)) mList[mission_num].services[service_num].input[i-1] = pChild4->Attribute(buffer); char buffer2[9]="output"; sprintf(buffer2, "%s%d", buffer2, i); if(pChild4->Attribute(buffer2)) mList[mission_num].services[service_num].output[i-1] = pChild4->Attribute(buffer2); } if(pChild4->Attribute("parameter")) mList[mission_num].services[service_num].parameter = pChild4->Attribute("parameter"); cond_array[2]++; } if(!strcmp(pChild3->Value(), "shell") || conditional_2 != service_num) { mList[mission_num].services[conditional_2].name = pChild3->Value(); } else{ mList[mission_num].services[service_num].name = pChild3->Attribute("name"); } for(int i = 1; i <= 10; i++) { char buffer[9]="input"; sprintf(buffer, "%s%d", buffer, i); if(pChild3->Attribute(buffer)) mList[mission_num].services[conditional_2].input[i-1] = pChild3->Attribute(buffer); char buffer2[9]="output"; sprintf(buffer2, "%s%d", buffer2, i); if(pChild3->Attribute(buffer2)) mList[mission_num].services[conditional_2].output[i-1] = pChild3->Attribute(buffer2); } if(pChild3->Attribute("parameter")) mList[mission_num].services[conditional_2].parameter = pChild3->Attribute("parameter"); mList[mission_num].services[conditional_2].num_conds = cond_array[2]; cond_array[1]+=cond_array[2]+1; cond_array[2] = 0; } if(!strcmp(pChild2->Value(), "shell") || conditional_1 != service_num) { mList[mission_num].services[conditional_1].name = pChild2->Value(); } else{ mList[mission_num].services[service_num].name = pChild2->Attribute("name"); } for(int i = 1; i <= 10; i++) { char buffer[9]="input"; sprintf(buffer, "%s%d", buffer, i); if(pChild2->Attribute(buffer)) mList[mission_num].services[conditional_1].input[i-1] = pChild2->Attribute(buffer); char buffer2[9]="output"; sprintf(buffer2, "%s%d", buffer2, i); if(pChild2->Attribute(buffer2)) mList[mission_num].services[conditional_1].output[i-1] = pChild2->Attribute(buffer2); } if(pChild2->Attribute("parameter")) mList[mission_num].services[conditional_1].parameter = pChild2->Attribute("parameter"); mList[mission_num].services[conditional_1].num_conds = cond_array[1]; cond_array[0]+=cond_array[1]+1; cond_array[1] = 0; } if(!strcmp(pChild1->Value(), "shell") || conditional_0 != service_num) { mList[mission_num].services[conditional_0].name = pChild1->Value(); } else{ mList[mission_num].services[conditional_0].name = pChild1->Attribute("name"); } for(int i = 1; i <= 10; i++) { char buffer[9]="input"; sprintf(buffer, "%s%d", buffer, i); if(pChild1->Attribute(buffer)) mList[mission_num].services[conditional_0].input[i-1] = pChild1->Attribute(buffer); char buffer2[9]="output"; sprintf(buffer2, "%s%d", buffer2, i); if(pChild1->Attribute(buffer2)) mList[mission_num].services[conditional_0].output[i-1] = pChild1->Attribute(buffer2); } if(pChild1->Attribute("parameter")) mList[mission_num].services[conditional_0].parameter = pChild1->Attribute("parameter"); mList[mission_num].services[conditional_0].num_conds = cond_array[0]; cond_array[0] = 0; service_num++; } mList[mission_num].numServices = service_num; mList[mission_num].name = pChild0->Attribute("name"); mList[mission_num].missionID = atoi(pChild0->Attribute("id")); for(int i = 1; i <= 10; i++) { char buffer[9]="param"; sprintf(buffer, "%s%d", buffer, i); if(pChild0->Attribute(buffer)){ mList[mission_num].input[i-1] = pChild0->Attribute(buffer); } } mission_num++; } LOG("ServiceManagementLayer:: Done Loading Configuration\n"); } /* CALLED BY: MessageHandler * INPUTS: |ID| The ID number of the engine to be registered * OUTPUTS: * * DESCRIPTION: Sends a registration message onto the shell and sends the ACK back to the component */ void ServiceManagementLayer::RegisterCognitiveEngine(int32_t ID) { SendMessage(shellSocketFD, "register_engine_cognitive"); LOG("ServiceManagementLayer:: CE registration message forwarded to shell.\n"); char buffer[256]; memset(buffer, 0, 256); ReadMessage(shellSocketFD, buffer); SendMessage(CE_List[ID].FD, buffer); TransferRadioConfiguration(ID); memset(buffer, 0, 256); TransferExperience(ID); memset(buffer, 0, 256); numberOfCognitiveEngines++; CE_Present = true; } /* CALLED BY: MessageHandler * INPUTS: |ID| The ID number of the engine to have it's services deregistered * OUTPUTS: * * DESCRIPTION: Deletes individual services from the DB * NOTE THAT this function only needs to be called if service deregistration is going * to be done at a different time than component deregistration; it is handled * more efficiently and directly during that deregistration process. */ void ServiceManagementLayer::DeregisterServices(int32_t ID) { char buffer[256]; memset(buffer, 0, 256); ReadMessage(CE_List[ID].FD, buffer); strcpy(_services_DB->command, "DELETE FROM "); strcat(_services_DB->command, _services_DB->tablename); strcat(_services_DB->command, " WHERE ID_Num IN (SELECT"); sprintf(_services_DB->command, " %s %d",_services_DB->command, ID); strcat(_services_DB->command, " FROM "); strcat(_services_DB->command, _services_DB->tablename); strcat(_services_DB->command, " WHERE Service_Name"); strcat(_services_DB->command, "=="); sprintf(_services_DB->command, "%s'%s');", _services_DB->command, buffer); char *errorMsg; int rc = sqlite3_exec(_services_DB->db, _services_DB->command, callback, 0, &errorMsg); if( rc!=SQLITE_OK && rc!=101 ) fprintf(stderr, "SQL error: %s\n", errorMsg); } /* CALLED BY: MessageHandler * INPUTS: |ID| The ID number of the engine to have it's services deregistered * OUTPUTS: * * DESCRIPTION: Deletes the contact info for the cognitive engine, forwards a deregistration message to the shell * Also, deletes the services from the DB */ void ServiceManagementLayer::DeregisterCognitiveEngine(int32_t ID) { LOG("ServiceManagementLayer:: CE deregistration message forwarded to shell.\n"); numberOfCognitiveEngines--; if(numberOfCognitiveEngines == 0) CE_Present = false; SendMessage(shellSocketFD, "deregister_engine_cognitive"); char buffer[256]; memset(buffer, 0, 256); ReadMessage(shellSocketFD, buffer); SendMessage(CE_List[ID].FD, buffer); if(strcmp("deregister_ack", buffer) != 0) { ERROR(1, "SML:: Failed to close CE socket\n"); } //Deregister the services strcpy(_services_DB->command, "DELETE FROM "); strcat(_services_DB->command, _services_DB->tablename); strcat(_services_DB->command, " WHERE "); strcat(_services_DB->command, "ID_Num"); strcat(_services_DB->command, "=="); sprintf(_services_DB->command, "%s%d;", _services_DB->command, ID); char *errorMsg; int rc = sqlite3_exec(_services_DB->db, _services_DB->command, callback, 0, &errorMsg); if( rc!=SQLITE_OK && rc!=101 ) fprintf(stderr, "SQL error: %s\n", errorMsg); CE_List[ID].FD = -1; CE_List[ID].ID_num = -1; LOG("Cognitive Radio Shell:: CE Socket closed for engine #%d.\n", ID); } /* CALLED BY: test class * INPUTS: * OUTPUTS: * * DESCRIPTION: Sets up a server socket and listens for communication on either that or the shell socket */ void ServiceManagementLayer::StartSMLServer() { //printf("Ready for CE Signal! (registration done)\n"); struct timeval selTimeout; int32_t running = 1; int32_t port, rc, new_sd = 1; int32_t desc_ready = 1; //If there is, call the MessageHandler with the Shell_Msg code of -1 fd_set sockSet, shellSet; cogEngSrv = CreateTCPServerSocket(SMLport); int32_t maxDescriptor = cogEngSrv; if(InitializeTCPServerPort(cogEngSrv) == -1) ERROR(1,"Error initializing primary port\n"); while (running) { /* Zero socket descriptor vector and set for server sockets */ /* This must be reset every time select() is called */ FD_ZERO(&sockSet); FD_SET(cogEngSrv, &sockSet); for(uint16_t k = 0; k < Current_ID; k++){ if(CE_List[k].ID_num != -1) FD_SET(CE_List[k].FD, &sockSet); } //printf("k=%d, CID=%d\n", k, CE_List[k].FD); /* Timeout specification */ /* This must be reset every time select() is called */ selTimeout.tv_sec = 0; /* timeout (secs.) */ selTimeout.tv_usec = 0; /* 0 microseconds */ //Changed both to zero so that select will check messages from the shell instead of blocking //when there is no command from the CE's to be processed //Check if there is a message on the socket waiting to be read rc = select(maxDescriptor + 1, &sockSet, NULL, NULL, &selTimeout); //printf("rc=%d\n", rc); if(rc == 0){ //LOG("No echo requests for %i secs...Server still alive\n", timeout); FD_ZERO(&shellSet); FD_SET(shellSocketFD, &shellSet); selTimeout.tv_sec = 0; selTimeout.tv_usec = 0; //Check if there is a message on the shell socket ready to be processed select(shellSocketFD + 1, &shellSet, NULL, NULL, &selTimeout); //printf("rc2=%d\n", rc2); //If there is, call the MessageHandler with the Shell_Msg code of -1 if(FD_ISSET(shellSocketFD, &shellSet)){ //printf("shell_msg, %d\n", rc2); MessageHandler(-1);} } else { desc_ready = rc; for(port = 0; port <= maxDescriptor && desc_ready > 0; port++) { if(FD_ISSET(port, &sockSet)) { desc_ready -= 1; //Check if request is new or on an existing open descriptor if(port == cogEngSrv) { //If new, assign it a descriptor and give it an ID new_sd = AcceptTCPConnection(port); if(new_sd < 0) break; CE_List[Current_ID].FD = new_sd; CE_List[Current_ID].ID_num = Current_ID; MessageHandler(Current_ID); Current_ID++; FD_SET(new_sd,&sockSet); if(new_sd > maxDescriptor) maxDescriptor = new_sd; } else { //If old, figure out which ID it coresponds to and handle it accordingly for(uint16_t z = 0; z < Current_ID; z++) { if(CE_List[z].FD == port){ MessageHandler(z);} } } } } } } /* Close sockets */ close(cogEngSrv); //delete &cogEngSrv; return; }